1use std::{
2 cell::RefCell,
3 collections::{BTreeMap, HashMap, hash_map::Entry},
4 io::ErrorKind,
5 net::{Shutdown, SocketAddr},
6 os::unix::io::AsRawFd,
7 rc::{Rc, Weak},
8 str::from_utf8_unchecked,
9 time::{Duration, Instant},
10};
11
12use mio::{
13 Interest, Registry, Token,
14 net::{TcpListener as MioTcpListener, TcpStream},
15 unix::SourceFd,
16};
17use rusty_ulid::Ulid;
18use sozu_command::{
19 logging::CachedTags,
20 proto::command::{
21 Cluster, HttpListenerConfig, ListenerType, RemoveListener, RequestHttpFrontend,
22 UpdateHttpListenerConfig, WorkerRequest, WorkerResponse, request::RequestType,
23 },
24 ready::Ready,
25 response::HttpFrontend,
26 state::{ClusterId, validate_h2_flood_knobs_http, validate_sozu_id_header},
27};
28
29use crate::metrics::names;
30use crate::{
31 AcceptError, FrontendFromRequestError, L7ListenerHandler, L7Proxy, ListenerError,
32 ListenerHandler, Protocol, ProxyConfiguration, ProxyError, ProxySession, SessionIsToBeClosed,
33 SessionMetrics, SessionResult, StateMachineBuilder, StateResult,
34 backends::BackendMap,
35 pool::Pool,
36 protocol::{
37 Pipe, SessionState,
38 http::{
39 answers::HttpAnswers,
40 parser::{Method, hostname_and_port},
41 },
42 mux::{self, Mux, MuxClear},
43 proxy_protocol::expect::ExpectProxyProtocol,
44 },
45 router::{RouteResult, Router},
46 server::{ListenToken, SessionManager},
47 socket::server_bind,
48 timer::TimeoutContainer,
49};
50
51#[derive(PartialEq, Eq)]
52pub enum SessionStatus {
53 Normal,
54 DefaultAnswer,
55}
56
57StateMachineBuilder! {
58 enum HttpStateMachine impl SessionState {
64 Expect(ExpectProxyProtocol<TcpStream>),
65 Mux(MuxClear),
66 WebSocket(Pipe<crate::socket::SessionTcpStream, HttpListener>),
67 }
68}
69
70macro_rules! log_module_context {
76 () => {{
77 let (open, reset, _, _, _) = sozu_command::logging::ansi_palette();
78 format!("{open}HTTP{reset}\t >>>", open = open, reset = reset)
79 }};
80}
81
82macro_rules! log_context {
88 ($self:expr) => {{
89 let (open, reset, grey, gray, white) = sozu_command::logging::ansi_palette();
90 format!(
91 "{open}HTTP{reset}\t{grey}Session{reset}({gray}frontend{reset}={white}{frontend}{reset})\t >>>",
92 open = open,
93 reset = reset,
94 grey = grey,
95 gray = gray,
96 white = white,
97 frontend = $self.frontend_token.0,
98 )
99 }};
100}
101
102pub struct HttpSession {
106 configured_backend_timeout: Duration,
107 configured_connect_timeout: Duration,
108 configured_frontend_timeout: Duration,
109 frontend_token: Token,
110 last_event: Instant,
111 listener: Rc<RefCell<HttpListener>>,
112 metrics: SessionMetrics,
113 pool: Weak<RefCell<Pool>>,
114 proxy: Rc<RefCell<HttpProxy>>,
115 state: HttpStateMachine,
116 has_been_closed: bool,
117}
118
119impl HttpSession {
120 #[allow(clippy::too_many_arguments)]
121 pub fn new(
122 configured_backend_timeout: Duration,
123 configured_connect_timeout: Duration,
124 configured_frontend_timeout: Duration,
125 configured_request_timeout: Duration,
126 expect_proxy: bool,
127 listener: Rc<RefCell<HttpListener>>,
128 pool: Weak<RefCell<Pool>>,
129 proxy: Rc<RefCell<HttpProxy>>,
130 public_address: SocketAddr,
131 sock: TcpStream,
132 token: Token,
133 wait_time: Duration,
134 ) -> Result<Self, AcceptError> {
135 let request_id = Ulid::generate();
136 let container_frontend_timeout = TimeoutContainer::new(configured_request_timeout, token);
137
138 let state = if expect_proxy {
139 trace!("{} starting in expect proxy state", log_module_context!());
140 gauge_add!(names::protocol::PROXY_EXPECT, 1);
141
142 HttpStateMachine::Expect(ExpectProxyProtocol::new(
143 container_frontend_timeout,
144 sock,
145 token,
146 request_id,
147 ))
148 } else {
149 gauge_add!(names::protocol::HTTP, 1);
150 let session_address = sock.peer_addr().ok();
151 let session_ulid = rusty_ulid::Ulid::generate();
152 let sock = crate::socket::SessionTcpStream::new(sock, session_ulid, session_address);
153
154 let frontend =
155 mux::Connection::new_h1_server(session_ulid, sock, container_frontend_timeout);
156 let router = mux::Router::new(configured_backend_timeout, configured_connect_timeout);
157 let mut context = mux::Context::new(
158 session_ulid,
159 pool.clone(),
160 listener.clone(),
161 session_address,
162 public_address,
163 );
164 context
165 .create_stream(request_id, 1 << 16)
166 .ok_or(AcceptError::BufferCapacityReached)?;
167 HttpStateMachine::Mux(Mux {
168 configured_frontend_timeout,
169 frontend_token: token,
170 frontend,
171 router,
172 context,
173 session_ulid,
174 })
175 };
176
177 let metrics = SessionMetrics::new(Some(wait_time));
178 Ok(HttpSession {
179 configured_backend_timeout,
180 configured_connect_timeout,
181 configured_frontend_timeout,
182 frontend_token: token,
183 has_been_closed: false,
184 last_event: Instant::now(),
185 listener,
186 metrics,
187 pool,
188 proxy,
189 state,
190 })
191 }
192
193 pub fn upgrade(&mut self) -> SessionIsToBeClosed {
194 debug!("{} upgrade", log_context!(self));
195 let new_state = match self.state.take() {
196 HttpStateMachine::Mux(mux) => self.upgrade_mux(mux),
197 HttpStateMachine::Expect(expect) => self.upgrade_expect(expect),
198 HttpStateMachine::WebSocket(ws) => self.upgrade_websocket(ws),
199 HttpStateMachine::FailedUpgrade(_) => {
200 error!(
204 "{} upgrade called on FailedUpgrade state; closing session",
205 log_context!(self)
206 );
207 None
208 }
209 };
210
211 match new_state {
212 Some(state) => {
213 self.state = state;
214 false
215 }
216 None => true,
218 }
219 }
220
221 fn upgrade_expect(
222 &mut self,
223 expect: ExpectProxyProtocol<TcpStream>,
224 ) -> Option<HttpStateMachine> {
225 debug!("{} switching to HTTP", log_context!(self));
226 match expect
227 .addresses
228 .as_ref()
229 .map(|add| (add.destination(), add.source()))
230 {
231 Some((Some(public_address), Some(session_address))) => {
232 let session_ulid = rusty_ulid::Ulid::generate();
233 let frontend = mux::Connection::new_h1_server(
234 session_ulid,
235 crate::socket::SessionTcpStream::new(
236 expect.frontend,
237 session_ulid,
238 Some(session_address),
239 ),
240 expect.container_frontend_timeout,
241 );
242 let router = mux::Router::new(
243 self.configured_backend_timeout,
244 self.configured_connect_timeout,
245 );
246 let mut context = mux::Context::new(
247 session_ulid,
248 self.pool.clone(),
249 self.listener.clone(),
250 Some(session_address),
251 public_address,
252 );
253 if context.create_stream(expect.request_id, 1 << 16).is_none() {
254 error!(
255 "{} expect upgrade failed: could not create stream",
256 log_context!(self)
257 );
258 return None;
259 }
260 let mut mux = Mux {
261 configured_frontend_timeout: self.configured_frontend_timeout,
262 frontend_token: self.frontend_token,
263 frontend,
264 router,
265 context,
266 session_ulid,
267 };
268 mux.frontend.readiness_mut().event = expect.frontend_readiness.event;
269
270 gauge_add!(names::protocol::PROXY_EXPECT, -1);
271 gauge_add!(names::protocol::HTTP, 1);
272 Some(HttpStateMachine::Mux(mux))
273 }
274 _ => {
275 debug!(
276 "{} expect upgrade failed: bad header {:?}",
277 log_context!(self),
278 expect.addresses
279 );
280 None
281 }
282 }
283 }
284
285 fn upgrade_mux(&mut self, mut mux: MuxClear) -> Option<HttpStateMachine> {
286 debug!("{} mux switching to ws", log_context!(self));
287 let Some(stream) = mux.context.streams.pop() else {
288 error!(
289 "{} upgrade_mux: no stream attached to the mux session, closing",
290 log_context!(self)
291 );
292 return None;
293 };
294 let (frontend_readiness, frontend_socket, mut container_frontend_timeout) =
298 match mux.frontend {
299 mux::Connection::H1(mux::ConnectionH1 {
300 readiness,
301 socket,
302 timeout_container,
303 ..
304 }) => (readiness, socket, timeout_container),
305 mux::Connection::H2(_) => {
306 error!(
307 "{} only h1<->h1 connections can upgrade to websocket",
308 log_context!(self)
309 );
310 return None;
311 }
312 };
313
314 let mux::StreamState::Linked(back_token) = stream.state else {
315 error!(
316 "{} upgrading stream should be linked to a backend",
317 log_context!(self)
318 );
319 return None;
320 };
321 let Some(backend) = mux.router.backends.remove(&back_token) else {
322 error!(
323 "{} upgrade_mux: backend for token {:?} is missing (already disconnected?), closing",
324 log_context!(self),
325 back_token
326 );
327 return None;
328 };
329 let (cluster_id, backend, backend_readiness, backend_socket, mut container_backend_timeout) =
330 match backend {
331 mux::Connection::H1(mux::ConnectionH1 {
332 position:
333 mux::Position::Client(cluster_id, backend, mux::BackendStatus::Connected),
334 readiness,
335 socket,
336 timeout_container,
337 ..
338 }) => (cluster_id, backend, readiness, socket, timeout_container),
339 mux::Connection::H1(_) => {
340 error!(
341 "{} the backend disconnected just after upgrade, abort",
342 log_context!(self)
343 );
344 return None;
345 }
346 mux::Connection::H2(_) => {
347 error!(
348 "{} only h1<->h1 connections can upgrade to websocket",
349 log_context!(self)
350 );
351 return None;
352 }
353 };
354
355 let ws_context = stream.context.websocket_context();
356
357 container_frontend_timeout.reset();
358 container_backend_timeout.reset();
359
360 let backend_id = backend.borrow().backend_id.clone();
361 let backend_socket = backend_socket.stream;
366 let mut pipe = Pipe::new(
367 stream.back.storage.buffer,
368 Some(backend_id),
369 Some(backend_socket),
370 Some(backend),
371 Some(container_backend_timeout),
372 Some(container_frontend_timeout),
373 Some(cluster_id),
374 stream.front.storage.buffer,
375 self.frontend_token,
376 frontend_socket,
377 self.listener.clone(),
378 Protocol::HTTP,
379 stream.context.session_id,
380 stream.context.id,
381 stream.context.session_address,
382 ws_context,
383 );
384
385 pipe.frontend_readiness.event = frontend_readiness.event;
386 pipe.backend_readiness.event = backend_readiness.event;
387 pipe.set_back_token(back_token);
388
389 gauge_add!(names::protocol::HTTP, -1);
392 gauge_add!(names::protocol::WS, 1);
393 gauge_add!(names::websocket::ACTIVE_REQUESTS, 1);
394 Some(HttpStateMachine::WebSocket(pipe))
395 }
396
397 fn upgrade_websocket(
398 &self,
399 ws: Pipe<crate::socket::SessionTcpStream, HttpListener>,
400 ) -> Option<HttpStateMachine> {
401 error!(
403 "{} upgrade called on WS, this should not happen",
404 log_context!(self)
405 );
406 Some(HttpStateMachine::WebSocket(ws))
407 }
408}
409
410impl ProxySession for HttpSession {
411 fn close(&mut self) {
412 if self.has_been_closed {
413 return;
414 }
415
416 trace!("{} closing HTTP session", log_context!(self));
417 self.metrics.service_stop();
418
419 match self.state.marker() {
421 StateMarker::Expect => gauge_add!(names::protocol::PROXY_EXPECT, -1),
422 StateMarker::Mux => gauge_add!(names::protocol::HTTP, -1),
423 StateMarker::WebSocket => {
424 gauge_add!(names::protocol::WS, -1);
425 gauge_add!(names::websocket::ACTIVE_REQUESTS, -1);
426 }
427 }
428
429 if self.state.failed() {
430 match self.state.marker() {
431 StateMarker::Expect => incr!(names::http::UPGRADE_EXPECT_FAILED),
432 StateMarker::Mux => incr!(names::http::UPGRADE_MUX_FAILED),
433 StateMarker::WebSocket => incr!(names::http::UPGRADE_WS_FAILED),
434 }
435 self.state.close(self.proxy.clone(), &mut self.metrics);
439 self.proxy.borrow().remove_session(self.frontend_token);
440 self.has_been_closed = true;
441 return;
442 }
443
444 self.state.cancel_timeouts();
445 self.state.close(self.proxy.clone(), &mut self.metrics);
447
448 let front_socket = self.state.front_socket();
449 if let Err(e) = front_socket.shutdown(Shutdown::Write) {
454 if e.kind() != ErrorKind::NotConnected {
456 error!(
457 "{} error shutting down front socket({:?}): {:?}",
458 log_context!(self),
459 front_socket,
460 e
461 )
462 }
463 }
464
465 let proxy = self.proxy.borrow();
467 let fd = front_socket.as_raw_fd();
468 if let Err(e) = proxy.registry.deregister(&mut SourceFd(&fd)) {
469 error!(
470 "{} error deregistering front socket({:?}) while closing HTTP session: {:?}",
471 log_context!(self),
472 fd,
473 e
474 );
475 }
476 proxy.remove_session(self.frontend_token);
477
478 self.has_been_closed = true;
479 }
480
481 fn timeout(&mut self, token: Token) -> SessionIsToBeClosed {
482 let state_result = self.state.timeout(token, &mut self.metrics);
483 state_result == StateResult::CloseSession
484 }
485
486 fn protocol(&self) -> Protocol {
487 Protocol::HTTP
488 }
489
490 fn update_readiness(&mut self, token: Token, events: Ready) {
491 trace!(
492 "{} token {:?} got event {}",
493 log_context!(self),
494 token,
495 super::ready_to_string(events)
496 );
497 self.last_event = Instant::now();
498 self.metrics.wait_start();
499 self.state.update_readiness(token, events);
500 }
501
502 fn ready(&mut self, session: Rc<RefCell<dyn ProxySession>>) -> SessionIsToBeClosed {
503 self.metrics.service_start();
504
505 let session_result =
506 self.state
507 .ready(session.clone(), self.proxy.clone(), &mut self.metrics);
508
509 let to_be_closed = match session_result {
510 SessionResult::Close => true,
511 SessionResult::Continue => false,
512 SessionResult::Upgrade => match self.upgrade() {
513 false => self.ready(session),
514 true => true,
515 },
516 };
517
518 self.metrics.service_stop();
519 to_be_closed
520 }
521
522 fn shutting_down(&mut self) -> SessionIsToBeClosed {
523 self.state.shutting_down()
524 }
525
526 fn last_event(&self) -> Instant {
527 self.last_event
528 }
529
530 fn print_session(&self) {
531 self.state.print_state("HTTP");
532 error!("{} Metrics: {:?}", log_context!(self), self.metrics);
533 }
534
535 fn frontend_token(&self) -> Token {
536 self.frontend_token
537 }
538}
539
540pub type Hostname = String;
541
542pub struct HttpListener {
564 active: bool,
565 address: SocketAddr,
566 answers: Rc<RefCell<HttpAnswers>>,
567 config: HttpListenerConfig,
568 fronts: Router,
569 listener: Option<MioTcpListener>,
570 tags: BTreeMap<String, CachedTags>,
571 token: Token,
572}
573
574impl ListenerHandler for HttpListener {
575 fn get_addr(&self) -> &SocketAddr {
576 &self.address
577 }
578
579 fn get_tags(&self, key: &str) -> Option<&CachedTags> {
580 self.tags.get(key)
581 }
582
583 fn set_tags(&mut self, key: String, tags: Option<BTreeMap<String, String>>) {
584 match tags {
585 Some(tags) => self.tags.insert(key, CachedTags::new(tags)),
586 None => self.tags.remove(&key),
587 };
588 }
589
590 fn protocol(&self) -> Protocol {
591 Protocol::HTTP
592 }
593
594 fn public_address(&self) -> SocketAddr {
595 self.config
596 .public_address
597 .map(|addr| addr.into())
598 .unwrap_or(self.address)
599 }
600}
601
602impl L7ListenerHandler for HttpListener {
603 fn get_sticky_name(&self) -> &str {
604 &self.config.sticky_name
605 }
606
607 fn get_sozu_id_header(&self) -> &str {
608 self.config
609 .sozu_id_header
610 .as_deref()
611 .filter(|s| !s.is_empty())
612 .unwrap_or("Sozu-Id")
613 }
614
615 fn get_connect_timeout(&self) -> u32 {
616 self.config.connect_timeout
617 }
618
619 fn frontend_from_request(
621 &self,
622 host: &str,
623 uri: &str,
624 method: &Method,
625 ) -> Result<RouteResult, FrontendFromRequestError> {
626 let start = Instant::now();
627 let (remaining_input, (hostname, _)) = match hostname_and_port(host.as_bytes()) {
628 Ok(tuple) => tuple,
629 Err(parse_error) => {
630 return Err(FrontendFromRequestError::HostParse {
632 host: host.to_owned(),
633 error: parse_error.to_string(),
634 });
635 }
636 };
637 if remaining_input != &b""[..] {
638 return Err(FrontendFromRequestError::InvalidCharsAfterHost(
639 host.to_owned(),
640 ));
641 }
642
643 let host = unsafe { from_utf8_unchecked(hostname) };
658
659 let route = self.fronts.lookup(host, uri, method).map_err(|e| {
660 incr!(names::http::FAILED_BACKEND_MATCHING);
661 FrontendFromRequestError::NoClusterFound(e)
662 })?;
663
664 let now = Instant::now();
665
666 if let Some(cluster) = route.cluster_id.as_deref() {
667 time!(
668 names::event_loop::FRONTEND_MATCHING_TIME,
669 cluster,
670 (now - start).as_millis()
671 );
672 }
673
674 Ok(route)
675 }
676
677 fn get_answers(&self) -> &Rc<RefCell<HttpAnswers>> {
678 &self.answers
679 }
680
681 fn get_h2_flood_config(&self) -> crate::protocol::mux::H2FloodConfig {
682 let defaults = crate::protocol::mux::H2FloodConfig::default();
683 crate::protocol::mux::H2FloodConfig {
684 max_rst_stream_per_window: self
685 .config
686 .h2_max_rst_stream_per_window
687 .unwrap_or(defaults.max_rst_stream_per_window),
688 max_ping_per_window: self
689 .config
690 .h2_max_ping_per_window
691 .unwrap_or(defaults.max_ping_per_window),
692 max_settings_per_window: self
693 .config
694 .h2_max_settings_per_window
695 .unwrap_or(defaults.max_settings_per_window),
696 max_empty_data_per_window: self
697 .config
698 .h2_max_empty_data_per_window
699 .unwrap_or(defaults.max_empty_data_per_window),
700 max_window_update_stream0_per_window: self
701 .config
702 .h2_max_window_update_stream0_per_window
703 .unwrap_or(defaults.max_window_update_stream0_per_window),
704 max_continuation_frames: self
705 .config
706 .h2_max_continuation_frames
707 .unwrap_or(defaults.max_continuation_frames),
708 max_glitch_count: self
709 .config
710 .h2_max_glitch_count
711 .unwrap_or(defaults.max_glitch_count),
712 max_rst_stream_lifetime: self
713 .config
714 .h2_max_rst_stream_lifetime
715 .unwrap_or(defaults.max_rst_stream_lifetime),
716 max_rst_stream_abusive_lifetime: self
717 .config
718 .h2_max_rst_stream_abusive_lifetime
719 .unwrap_or(defaults.max_rst_stream_abusive_lifetime),
720 max_rst_stream_emitted_lifetime: self
721 .config
722 .h2_max_rst_stream_emitted_lifetime
723 .unwrap_or(defaults.max_rst_stream_emitted_lifetime),
724 max_header_list_size: self
725 .config
726 .h2_max_header_list_size
727 .unwrap_or(defaults.max_header_list_size),
728 max_header_table_size: self
729 .config
730 .h2_max_header_table_size
731 .unwrap_or(defaults.max_header_table_size),
732 }
733 }
734
735 fn get_h2_connection_config(&self) -> crate::protocol::mux::H2ConnectionConfig {
736 crate::protocol::mux::H2ConnectionConfig::from_optional(
737 self.config.h2_initial_connection_window,
738 self.config.h2_max_concurrent_streams,
739 self.config.h2_stream_shrink_ratio,
740 )
741 }
742
743 fn get_h2_stream_idle_timeout(&self) -> std::time::Duration {
744 let seconds = self
751 .config
752 .h2_stream_idle_timeout_seconds
753 .map(|s| u64::from(s.max(1)))
754 .unwrap_or_else(|| u64::from(self.config.back_timeout).max(30));
755 std::time::Duration::from_secs(seconds)
756 }
757
758 fn get_h2_graceful_shutdown_deadline(&self) -> Option<std::time::Duration> {
759 match self.config.h2_graceful_shutdown_deadline_seconds {
760 None => Some(std::time::Duration::from_secs(5)),
761 Some(0) => None,
762 Some(s) => Some(std::time::Duration::from_secs(u64::from(s))),
763 }
764 }
765
766 fn get_elide_x_real_ip(&self) -> bool {
767 self.config.elide_x_real_ip.unwrap_or(false)
768 }
769
770 fn get_send_x_real_ip(&self) -> bool {
771 self.config.send_x_real_ip.unwrap_or(false)
772 }
773}
774
775pub struct HttpProxy {
776 backends: Rc<RefCell<BackendMap>>,
777 clusters: HashMap<ClusterId, Cluster>,
778 listeners: HashMap<Token, Rc<RefCell<HttpListener>>>,
779 pool: Rc<RefCell<Pool>>,
780 registry: Registry,
781 sessions: Rc<RefCell<SessionManager>>,
782}
783
784impl HttpProxy {
785 pub fn new(
786 registry: Registry,
787 sessions: Rc<RefCell<SessionManager>>,
788 pool: Rc<RefCell<Pool>>,
789 backends: Rc<RefCell<BackendMap>>,
790 ) -> HttpProxy {
791 HttpProxy {
792 backends,
793 clusters: HashMap::new(),
794 listeners: HashMap::new(),
795 pool,
796 registry,
797 sessions,
798 }
799 }
800
801 pub fn add_listener(
802 &mut self,
803 config: HttpListenerConfig,
804 token: Token,
805 ) -> Result<Token, ProxyError> {
806 match self.listeners.entry(token) {
807 Entry::Vacant(entry) => {
808 let http_listener =
809 HttpListener::new(config, token).map_err(ProxyError::AddListener)?;
810 entry.insert(Rc::new(RefCell::new(http_listener)));
811 Ok(token)
812 }
813 _ => Err(ProxyError::ListenerAlreadyPresent),
814 }
815 }
816
817 pub fn get_listener(&self, token: &Token) -> Option<Rc<RefCell<HttpListener>>> {
818 self.listeners.get(token).cloned()
819 }
820
821 pub fn remove_listener(&mut self, remove: RemoveListener) -> Result<(), ProxyError> {
822 let len = self.listeners.len();
823 let remove_address = remove.address.into();
824 self.listeners
825 .retain(|_, l| l.borrow().address != remove_address);
826
827 if !self.listeners.len() < len {
828 info!(
829 "{} no HTTP listener to remove at address {:?}",
830 log_module_context!(),
831 remove_address
832 );
833 }
834 Ok(())
835 }
836
837 pub fn activate_listener(
838 &self,
839 addr: &SocketAddr,
840 tcp_listener: Option<MioTcpListener>,
841 ) -> Result<Token, ProxyError> {
842 let listener = self
843 .listeners
844 .values()
845 .find(|listener| listener.borrow().address == *addr)
846 .ok_or(ProxyError::NoListenerFound(addr.to_owned()))?;
847
848 listener
849 .borrow_mut()
850 .activate(&self.registry, tcp_listener)
851 .map_err(|listener_error| ProxyError::ListenerActivation {
852 address: *addr,
853 listener_error,
854 })
855 }
856
857 pub fn give_back_listeners(&mut self) -> Vec<(SocketAddr, MioTcpListener)> {
858 self.listeners
859 .iter()
860 .filter_map(|(_, listener)| {
861 let mut owned = listener.borrow_mut();
862 if let Some(listener) = owned.listener.take() {
863 owned.active = false;
866 return Some((owned.address, listener));
867 }
868
869 None
870 })
871 .collect()
872 }
873
874 pub fn give_back_listener(
875 &mut self,
876 address: SocketAddr,
877 ) -> Result<(Token, MioTcpListener), ProxyError> {
878 let listener = self
879 .listeners
880 .values()
881 .find(|listener| listener.borrow().address == address)
882 .ok_or(ProxyError::NoListenerFound(address))?;
883
884 let mut owned = listener.borrow_mut();
885
886 let taken_listener = owned
887 .listener
888 .take()
889 .ok_or(ProxyError::UnactivatedListener)?;
890
891 owned.active = false;
894
895 Ok((owned.token, taken_listener))
896 }
897
898 pub fn update_listener(&mut self, patch: UpdateHttpListenerConfig) -> Result<(), ProxyError> {
900 let address: std::net::SocketAddr = patch.address.into();
901 let listener = self
902 .listeners
903 .values()
904 .find(|l| l.borrow().address == address)
905 .ok_or(ProxyError::NoListenerFound(address))?;
906 listener
907 .borrow_mut()
908 .update_config(&patch)
909 .map_err(|listener_error| ProxyError::ListenerActivation {
910 address,
911 listener_error,
912 })
913 }
914
915 pub fn add_cluster(&mut self, mut cluster: Cluster) -> Result<(), ProxyError> {
916 let mut overrides = cluster.answers.clone();
919 if let Some(answer_503) = cluster.answer_503.take() {
920 overrides.entry("503".to_owned()).or_insert(answer_503);
921 }
922 if !overrides.is_empty() {
923 for listener in self.listeners.values() {
924 listener
925 .borrow()
926 .answers
927 .borrow_mut()
928 .add_cluster_answers(&cluster.cluster_id, &overrides)
929 .map_err(|(name, error)| {
930 ProxyError::AddCluster(ListenerError::TemplateParse(name, error))
931 })?;
932 }
933 }
934 self.clusters.insert(cluster.cluster_id.clone(), cluster);
935 Ok(())
936 }
937
938 pub fn remove_cluster(&mut self, cluster_id: &str) -> Result<(), ProxyError> {
939 self.clusters.remove(cluster_id);
940
941 for listener in self.listeners.values() {
942 listener
943 .borrow()
944 .answers
945 .borrow_mut()
946 .remove_cluster_answers(cluster_id);
947 }
948 Ok(())
949 }
950
951 pub fn add_http_frontend(&mut self, front: RequestHttpFrontend) -> Result<(), ProxyError> {
952 if front.hsts.is_some() {
967 incr!(names::http::HSTS_SUPPRESSED_PLAINTEXT);
968 return Err(ProxyError::HstsOnPlainHttp(front.address.into()));
969 }
970
971 let front = front.clone().to_frontend().map_err(|request_error| {
972 ProxyError::WrongInputFrontend {
973 front: Box::new(front),
974 error: request_error.to_string(),
975 }
976 })?;
977
978 let mut listener = self
979 .listeners
980 .values()
981 .find(|l| l.borrow().address == front.address)
982 .ok_or(ProxyError::NoListenerFound(front.address))?
983 .borrow_mut();
984
985 let hostname = front.hostname.to_owned();
986 let tags = front.tags.to_owned();
987
988 listener
989 .add_http_front(front)
990 .map_err(ProxyError::AddFrontend)?;
991 listener.set_tags(hostname, tags);
992 Ok(())
993 }
994
995 pub fn remove_http_frontend(&mut self, front: RequestHttpFrontend) -> Result<(), ProxyError> {
996 let front = front.clone().to_frontend().map_err(|request_error| {
997 ProxyError::WrongInputFrontend {
998 front: Box::new(front),
999 error: request_error.to_string(),
1000 }
1001 })?;
1002
1003 let mut listener = self
1004 .listeners
1005 .values()
1006 .find(|l| l.borrow().address == front.address)
1007 .ok_or(ProxyError::NoListenerFound(front.address))?
1008 .borrow_mut();
1009
1010 let hostname = front.hostname.to_owned();
1011
1012 listener
1013 .remove_http_front(front)
1014 .map_err(ProxyError::RemoveFrontend)?;
1015
1016 if !listener.fronts.has_hostname(&hostname) {
1017 listener.set_tags(hostname, None);
1018 }
1019 Ok(())
1020 }
1021
1022 pub fn soft_stop(&mut self) -> Result<(), ProxyError> {
1023 let listeners: HashMap<_, _> = self.listeners.drain().collect();
1024 let mut socket_errors = vec![];
1025 for (_, l) in listeners.iter() {
1026 if let Some(mut sock) = l.borrow_mut().listener.take() {
1027 debug!("{} deregistering socket {:?}", log_module_context!(), sock);
1028 if let Err(e) = self.registry.deregister(&mut sock) {
1029 let error = format!("socket {sock:?}: {e:?}");
1030 socket_errors.push(error);
1031 }
1032 }
1033 }
1034
1035 if !socket_errors.is_empty() {
1036 return Err(ProxyError::SoftStop {
1037 proxy_protocol: "HTTP".to_string(),
1038 error: format!("Error deregistering listen sockets: {socket_errors:?}"),
1039 });
1040 }
1041
1042 Ok(())
1043 }
1044
1045 pub fn hard_stop(&mut self) -> Result<(), ProxyError> {
1046 let mut listeners: HashMap<_, _> = self.listeners.drain().collect();
1047 let mut socket_errors = vec![];
1048 for (_, l) in listeners.drain() {
1049 if let Some(mut sock) = l.borrow_mut().listener.take() {
1050 debug!("{} deregistering socket {:?}", log_module_context!(), sock);
1051 if let Err(e) = self.registry.deregister(&mut sock) {
1052 let error = format!("socket {sock:?}: {e:?}");
1053 socket_errors.push(error);
1054 }
1055 }
1056 }
1057
1058 if !socket_errors.is_empty() {
1059 return Err(ProxyError::HardStop {
1060 proxy_protocol: "HTTP".to_string(),
1061 error: format!("Error deregistering listen sockets: {socket_errors:?}"),
1062 });
1063 }
1064
1065 Ok(())
1066 }
1067}
1068
1069impl HttpListener {
1070 pub fn new(config: HttpListenerConfig, token: Token) -> Result<HttpListener, ListenerError> {
1071 Ok(HttpListener {
1072 active: false,
1073 address: config.address.into(),
1074 answers: Rc::new(RefCell::new({
1075 let mut answers_map = config.answers.clone();
1080 if let Some(ref legacy) = config.http_answers {
1081 crate::protocol::http::answers::merge_legacy_into_map(&mut answers_map, legacy);
1082 }
1083 HttpAnswers::new(&answers_map)
1084 .map_err(|(name, error)| ListenerError::TemplateParse(name, error))?
1085 })),
1086 config,
1087 fronts: Router::new(),
1088 listener: None,
1089 tags: BTreeMap::new(),
1090 token,
1091 })
1092 }
1093
1094 pub fn activate(
1095 &mut self,
1096 registry: &Registry,
1097 tcp_listener: Option<MioTcpListener>,
1098 ) -> Result<Token, ListenerError> {
1099 if self.active {
1100 return Ok(self.token);
1101 }
1102 let address: SocketAddr = self.config.address.into();
1103
1104 let mut listener = match tcp_listener {
1105 Some(tcp_listener) => tcp_listener,
1106 None => {
1107 server_bind(address).map_err(|server_bind_error| ListenerError::Activation {
1108 address,
1109 error: server_bind_error.to_string(),
1110 })?
1111 }
1112 };
1113
1114 registry
1115 .register(&mut listener, self.token, Interest::READABLE)
1116 .map_err(ListenerError::SocketRegistration)?;
1117
1118 self.listener = Some(listener);
1119 self.active = true;
1120 Ok(self.token)
1121 }
1122
1123 pub fn update_config(&mut self, patch: &UpdateHttpListenerConfig) -> Result<(), ListenerError> {
1129 validate_h2_flood_knobs_http(patch)?;
1134 if let Some(ref hdr) = patch.sozu_id_header {
1135 validate_sozu_id_header(hdr)?;
1136 }
1137
1138 if let Some(v) = patch.public_address {
1139 self.config.public_address = Some(v);
1140 }
1141 if let Some(v) = patch.expect_proxy {
1142 self.config.expect_proxy = v;
1143 }
1144 if let Some(ref v) = patch.sticky_name {
1145 self.config.sticky_name = v.to_owned();
1146 }
1147 if let Some(v) = patch.front_timeout {
1148 self.config.front_timeout = v;
1149 }
1150 if let Some(v) = patch.back_timeout {
1151 self.config.back_timeout = v;
1152 }
1153 if let Some(v) = patch.connect_timeout {
1154 self.config.connect_timeout = v;
1155 }
1156 if let Some(v) = patch.request_timeout {
1157 self.config.request_timeout = v;
1158 }
1159 if let Some(ref v) = patch.sozu_id_header {
1160 self.config.sozu_id_header = Some(v.to_owned());
1161 }
1162 if let Some(v) = patch.elide_x_real_ip {
1163 self.config.elide_x_real_ip = Some(v);
1164 }
1165 if let Some(v) = patch.send_x_real_ip {
1166 self.config.send_x_real_ip = Some(v);
1167 }
1168
1169 if let Some(v) = patch.h2_max_rst_stream_per_window {
1171 self.config.h2_max_rst_stream_per_window = Some(v);
1172 }
1173 if let Some(v) = patch.h2_max_ping_per_window {
1174 self.config.h2_max_ping_per_window = Some(v);
1175 }
1176 if let Some(v) = patch.h2_max_settings_per_window {
1177 self.config.h2_max_settings_per_window = Some(v);
1178 }
1179 if let Some(v) = patch.h2_max_empty_data_per_window {
1180 self.config.h2_max_empty_data_per_window = Some(v);
1181 }
1182 if let Some(v) = patch.h2_max_continuation_frames {
1183 self.config.h2_max_continuation_frames = Some(v);
1184 }
1185 if let Some(v) = patch.h2_max_glitch_count {
1186 self.config.h2_max_glitch_count = Some(v);
1187 }
1188 if let Some(v) = patch.h2_initial_connection_window {
1189 self.config.h2_initial_connection_window = Some(v);
1190 }
1191 if let Some(v) = patch.h2_max_concurrent_streams {
1192 self.config.h2_max_concurrent_streams = Some(v);
1193 }
1194 if let Some(v) = patch.h2_stream_shrink_ratio {
1195 self.config.h2_stream_shrink_ratio = Some(v);
1196 }
1197 if let Some(v) = patch.h2_max_rst_stream_lifetime {
1198 self.config.h2_max_rst_stream_lifetime = Some(v);
1199 }
1200 if let Some(v) = patch.h2_max_rst_stream_abusive_lifetime {
1201 self.config.h2_max_rst_stream_abusive_lifetime = Some(v);
1202 }
1203 if let Some(v) = patch.h2_max_rst_stream_emitted_lifetime {
1204 self.config.h2_max_rst_stream_emitted_lifetime = Some(v);
1205 }
1206 if let Some(v) = patch.h2_max_header_list_size {
1207 self.config.h2_max_header_list_size = Some(v);
1208 }
1209 if let Some(v) = patch.h2_max_header_table_size {
1210 self.config.h2_max_header_table_size = Some(v);
1211 }
1212 if let Some(v) = patch.h2_stream_idle_timeout_seconds {
1213 self.config.h2_stream_idle_timeout_seconds = Some(v);
1214 }
1215 if let Some(v) = patch.h2_graceful_shutdown_deadline_seconds {
1216 self.config.h2_graceful_shutdown_deadline_seconds = Some(v);
1217 }
1218 if let Some(v) = patch.h2_max_window_update_stream0_per_window {
1219 self.config.h2_max_window_update_stream0_per_window = Some(v);
1220 }
1221
1222 let answers_changed = patch.http_answers.is_some() || !patch.answers.is_empty();
1227 if answers_changed {
1228 if let Some(ref new_answers) = patch.http_answers {
1229 crate::sozu_command::state::merge_custom_http_answers(
1230 &mut self.config.http_answers,
1231 new_answers,
1232 );
1233 }
1234 for (code, body) in &patch.answers {
1235 if !body.is_empty() {
1236 self.config.answers.insert(code.clone(), body.clone());
1237 }
1238 }
1239
1240 let mut answers_map = self.config.answers.clone();
1241 if let Some(ref legacy) = self.config.http_answers {
1242 crate::protocol::http::answers::merge_legacy_into_map(&mut answers_map, legacy);
1243 }
1244 let mut new_answers = HttpAnswers::new(&answers_map)
1247 .map_err(|(name, error)| ListenerError::TemplateParse(name, error))?;
1248 let preserved = std::mem::take(&mut self.answers.borrow_mut().cluster_answers);
1249 new_answers.cluster_answers = preserved;
1250 *self.answers.borrow_mut() = new_answers;
1251 }
1252
1253 Ok(())
1254 }
1255
1256 pub fn add_http_front(&mut self, http_front: HttpFrontend) -> Result<(), ListenerError> {
1257 self.fronts
1258 .add_http_front(&http_front)
1259 .map_err(ListenerError::AddFrontend)
1260 }
1261
1262 pub fn remove_http_front(&mut self, http_front: HttpFrontend) -> Result<(), ListenerError> {
1263 debug!(
1264 "{} removing http_front {:?}",
1265 log_module_context!(),
1266 http_front
1267 );
1268 self.fronts
1269 .remove_http_front(&http_front)
1270 .map_err(ListenerError::RemoveFrontend)
1271 }
1272
1273 fn accept(&mut self) -> Result<TcpStream, AcceptError> {
1274 if let Some(ref sock) = self.listener {
1275 sock.accept()
1276 .map_err(|e| match e.kind() {
1277 ErrorKind::WouldBlock => AcceptError::WouldBlock,
1278 _ => {
1279 error!("{} accept() IO error: {:?}", log_module_context!(), e);
1280 AcceptError::IoError
1281 }
1282 })
1283 .map(|(sock, _)| sock)
1284 } else {
1285 error!(
1286 "{} cannot accept connections, no listening socket available",
1287 log_module_context!()
1288 );
1289 Err(AcceptError::IoError)
1290 }
1291 }
1292}
1293
1294impl ProxyConfiguration for HttpProxy {
1295 fn notify(&mut self, request: WorkerRequest) -> WorkerResponse {
1296 let request_id = request.id.clone();
1297
1298 let result = match request.content.request_type {
1299 Some(RequestType::AddCluster(cluster)) => {
1300 debug!(
1301 "{} {} add cluster {:?}",
1302 log_module_context!(),
1303 request.id,
1304 cluster
1305 );
1306 self.add_cluster(cluster)
1307 }
1308 Some(RequestType::RemoveCluster(cluster_id)) => {
1309 debug!(
1310 "{} {} remove cluster {:?}",
1311 log_module_context!(),
1312 request_id,
1313 cluster_id
1314 );
1315 self.remove_cluster(&cluster_id)
1316 }
1317 Some(RequestType::AddHttpFrontend(front)) => {
1318 debug!(
1319 "{} {} add front {:?}",
1320 log_module_context!(),
1321 request_id,
1322 front
1323 );
1324 self.add_http_frontend(front)
1325 }
1326 Some(RequestType::RemoveHttpFrontend(front)) => {
1327 debug!(
1328 "{} {} remove front {:?}",
1329 log_module_context!(),
1330 request_id,
1331 front
1332 );
1333 self.remove_http_frontend(front)
1334 }
1335 Some(RequestType::RemoveListener(remove)) => {
1336 debug!(
1337 "{} removing HTTP listener at address {:?}",
1338 log_module_context!(),
1339 remove.address
1340 );
1341 self.remove_listener(remove)
1342 }
1343 Some(RequestType::SoftStop(_)) => {
1344 debug!(
1345 "{} {} processing soft shutdown",
1346 log_module_context!(),
1347 request_id
1348 );
1349 match self.soft_stop() {
1350 Ok(()) => {
1351 info!(
1352 "{} {} soft stop successful",
1353 log_module_context!(),
1354 request_id
1355 );
1356 return WorkerResponse::processing(request.id);
1357 }
1358 Err(e) => Err(e),
1359 }
1360 }
1361 Some(RequestType::HardStop(_)) => {
1362 debug!(
1363 "{} {} processing hard shutdown",
1364 log_module_context!(),
1365 request_id
1366 );
1367 match self.hard_stop() {
1368 Ok(()) => {
1369 info!(
1370 "{} {} hard stop successful",
1371 log_module_context!(),
1372 request_id
1373 );
1374 return WorkerResponse::processing(request.id);
1375 }
1376 Err(e) => Err(e),
1377 }
1378 }
1379 Some(RequestType::Status(_)) => {
1380 debug!("{} {} status", log_module_context!(), request_id);
1381 Ok(())
1382 }
1383 other_command => {
1384 debug!(
1385 "{} {} unsupported message for HTTP proxy, ignoring: {:?}",
1386 log_module_context!(),
1387 request.id,
1388 other_command
1389 );
1390 Err(ProxyError::UnsupportedMessage)
1391 }
1392 };
1393
1394 match result {
1395 Ok(()) => {
1396 debug!("{} {} successful", log_module_context!(), request_id);
1397 WorkerResponse::ok(request_id)
1398 }
1399 Err(proxy_error) => {
1400 debug!(
1401 "{} {} unsuccessful: {}",
1402 log_module_context!(),
1403 request_id,
1404 proxy_error
1405 );
1406 WorkerResponse::error(request_id, proxy_error)
1407 }
1408 }
1409 }
1410
1411 fn accept(&mut self, token: ListenToken) -> Result<TcpStream, AcceptError> {
1412 if let Some(listener) = self.listeners.get(&Token(token.0)) {
1413 listener.borrow_mut().accept()
1414 } else {
1415 Err(AcceptError::IoError)
1416 }
1417 }
1418
1419 fn create_session(
1420 &mut self,
1421 mut frontend_sock: TcpStream,
1422 listener_token: ListenToken,
1423 wait_time: Duration,
1424 proxy: Rc<RefCell<Self>>,
1425 ) -> Result<(), AcceptError> {
1426 let listener = self
1427 .listeners
1428 .get(&Token(listener_token.0))
1429 .cloned()
1430 .ok_or(AcceptError::IoError)?;
1431
1432 if let Err(e) = frontend_sock.set_nodelay(true) {
1433 error!(
1434 "{} error setting nodelay on front socket({:?}): {:?}",
1435 log_module_context!(),
1436 frontend_sock,
1437 e
1438 );
1439 }
1440 let mut session_manager = self.sessions.borrow_mut();
1441 let session_entry = session_manager.slab.vacant_entry();
1442 let session_token = Token(session_entry.key());
1443 let owned = listener.borrow();
1444
1445 if let Err(register_error) = self.registry.register(
1446 &mut frontend_sock,
1447 session_token,
1448 Interest::READABLE | Interest::WRITABLE,
1449 ) {
1450 error!(
1451 "{} error registering listen socket({:?}): {:?}",
1452 log_module_context!(),
1453 frontend_sock,
1454 register_error
1455 );
1456 return Err(AcceptError::RegisterError);
1457 }
1458
1459 let public_address: SocketAddr = match owned.config.public_address {
1460 Some(pub_addr) => pub_addr.into(),
1461 None => owned.config.address.into(),
1462 };
1463
1464 let session = HttpSession::new(
1465 Duration::from_secs(owned.config.back_timeout as u64),
1466 Duration::from_secs(owned.config.connect_timeout as u64),
1467 Duration::from_secs(owned.config.front_timeout as u64),
1468 Duration::from_secs(owned.config.request_timeout as u64),
1469 owned.config.expect_proxy,
1470 listener.clone(),
1471 Rc::downgrade(&self.pool),
1472 proxy,
1473 public_address,
1474 frontend_sock,
1475 session_token,
1476 wait_time,
1477 )?;
1478
1479 let session = Rc::new(RefCell::new(session));
1480 session_entry.insert(session);
1481
1482 Ok(())
1483 }
1484}
1485
1486impl L7Proxy for HttpProxy {
1487 fn kind(&self) -> ListenerType {
1488 ListenerType::Http
1489 }
1490
1491 fn register_socket(
1492 &self,
1493 source: &mut TcpStream,
1494 token: Token,
1495 interest: Interest,
1496 ) -> Result<(), std::io::Error> {
1497 self.registry.register(source, token, interest)
1498 }
1499
1500 fn deregister_socket(&self, tcp_stream: &mut TcpStream) -> Result<(), std::io::Error> {
1501 self.registry.deregister(tcp_stream)
1502 }
1503
1504 fn add_session(&self, session: Rc<RefCell<dyn ProxySession>>) -> Token {
1505 let mut session_manager = self.sessions.borrow_mut();
1506 let entry = session_manager.slab.vacant_entry();
1507 let token = Token(entry.key());
1508 let _entry = entry.insert(session);
1509 token
1510 }
1511
1512 fn remove_session(&self, token: Token) -> bool {
1513 let mut sessions = self.sessions.borrow_mut();
1514 sessions.untrack_all_cluster_ip(token);
1520 sessions.slab.try_remove(token.0).is_some()
1521 }
1522
1523 fn backends(&self) -> Rc<RefCell<BackendMap>> {
1524 self.backends.clone()
1525 }
1526
1527 fn clusters(&self) -> &HashMap<ClusterId, Cluster> {
1528 &self.clusters
1529 }
1530
1531 fn sessions(&self) -> Rc<RefCell<SessionManager>> {
1532 self.sessions.clone()
1533 }
1534}
1535
1536pub mod testing {
1537 use crate::testing::*;
1538
1539 pub fn start_http_worker(
1541 config: HttpListenerConfig,
1542 channel: ProxyChannel,
1543 max_buffers: usize,
1544 buffer_size: usize,
1545 ) -> anyhow::Result<()> {
1546 let address = config.address.into();
1547
1548 let ServerParts {
1549 event_loop,
1550 registry,
1551 sessions,
1552 pool,
1553 backends,
1554 client_scm_socket: _,
1555 server_scm_socket,
1556 server_config,
1557 } = prebuild_server(max_buffers, buffer_size, true)?;
1558
1559 let token = {
1560 let mut sessions = sessions.borrow_mut();
1561 let entry = sessions.slab.vacant_entry();
1562 let key = entry.key();
1563 let _ = entry.insert(Rc::new(RefCell::new(ListenSession {
1564 protocol: Protocol::HTTPListen,
1565 })));
1566 Token(key)
1567 };
1568
1569 let mut proxy = HttpProxy::new(registry, sessions.clone(), pool.clone(), backends.clone());
1570 proxy
1571 .add_listener(config, token)
1572 .with_context(|| "Failed at creating adding the listener")?;
1573 proxy
1574 .activate_listener(&address, None)
1575 .with_context(|| "Failed at creating activating the listener")?;
1576
1577 let mut server = Server::new(
1578 event_loop,
1579 channel,
1580 server_scm_socket,
1581 sessions,
1582 pool,
1583 backends,
1584 Some(proxy),
1585 None,
1586 None,
1587 server_config,
1588 None,
1589 false,
1590 )
1591 .with_context(|| "Failed at creating server")?;
1592
1593 debug!("{} starting event loop", log_module_context!());
1594 server.run();
1595 debug!("{} ending event loop", log_module_context!());
1596 Ok(())
1597 }
1598}
1599
1600#[cfg(test)]
1601mod tests {
1602 extern crate tiny_http;
1603
1604 use std::{
1605 io::{Read, Write},
1606 net::TcpStream,
1607 str,
1608 sync::{Arc, Barrier},
1609 thread,
1610 time::Duration,
1611 };
1612
1613 use sozu_command::proto::command::SocketAddress;
1614
1615 use super::{testing::start_http_worker, *};
1616 use crate::sozu_command::{
1617 channel::Channel,
1618 config::ListenerBuilder,
1619 proto::command::{
1620 LoadBalancingParams, PathRule, RulePosition, SoftStop, WorkerRequest,
1621 request::RequestType,
1622 },
1623 response::{Backend, HttpFrontend},
1624 };
1625
1626 #[test]
1640 fn round_trip() {
1641 setup_test_logger!();
1642 let front_port = crate::testing::provide_port();
1643 let backend_server = Arc::new(
1644 tiny_http::Server::http("127.0.0.1:0").expect("could not create tiny_http server"),
1645 );
1646 let backend_port = backend_server
1647 .server_addr()
1648 .to_ip()
1649 .expect("tiny_http server should bind to IP address")
1650 .port();
1651
1652 let barrier = Arc::new(Barrier::new(2));
1653
1654 let config = ListenerBuilder::new_http(SocketAddress::new_v4(127, 0, 0, 1, front_port))
1655 .to_http(None)
1656 .expect("could not create listener config");
1657
1658 let (mut command, channel) =
1659 Channel::generate(1000, 10000).expect("should create a channel");
1660
1661 thread::scope(|s| {
1662 let backend_handle = backend_server.clone();
1663 let barrier_clone = barrier.to_owned();
1664 s.spawn(move || {
1665 setup_test_logger!();
1666 start_server(&backend_handle, barrier_clone);
1667 });
1668 barrier.wait();
1669
1670 s.spawn(move || {
1671 setup_test_logger!();
1672 start_http_worker(config, channel, 10, 16384)
1673 .expect("could not start the http server");
1674 });
1675
1676 let front = RequestHttpFrontend {
1677 cluster_id: Some("cluster_1".to_owned()),
1678 address: SocketAddress::new_v4(127, 0, 0, 1, front_port),
1679 hostname: "localhost".to_owned(),
1680 path: PathRule::prefix("/".to_owned()),
1681 ..Default::default()
1682 };
1683 command
1684 .write_message(&WorkerRequest {
1685 id: "ID_ABCD".to_owned(),
1686 content: RequestType::AddHttpFrontend(front).into(),
1687 })
1688 .expect("could not send AddHttpFrontend");
1689 let backend = Backend {
1690 cluster_id: "cluster_1".to_owned(),
1691 backend_id: "cluster_1-0".to_owned(),
1692 address: SocketAddress::new_v4(127, 0, 0, 1, backend_port).into(),
1693 load_balancing_parameters: Some(LoadBalancingParams::default()),
1694 sticky_id: None,
1695 backup: None,
1696 };
1697 command
1698 .write_message(&WorkerRequest {
1699 id: "ID_EFGH".to_owned(),
1700 content: RequestType::AddBackend(backend.to_add_backend()).into(),
1701 })
1702 .expect("could not send AddBackend");
1703
1704 println!("test received: {:?}", command.read_message());
1705 println!("test received: {:?}", command.read_message());
1706
1707 let mut client =
1708 TcpStream::connect(("127.0.0.1", front_port)).expect("could not connect to sozu");
1709
1710 client
1711 .set_read_timeout(Some(Duration::new(1, 0)))
1712 .expect("could not set read timeout");
1713 let request = format!(
1714 "GET / HTTP/1.1\r\nHost: localhost:{front_port}\r\nConnection: Close\r\n\r\n"
1715 );
1716 let w = client.write(request.as_bytes());
1717 println!("http client write: {w:?}");
1718
1719 barrier.wait();
1720 let mut buffer = [0; 4096];
1721 let mut index = 0;
1722
1723 let expected_len = 191;
1726
1727 loop {
1728 assert!(index <= expected_len);
1729 if index == expected_len {
1730 break;
1731 }
1732
1733 let r = client.read(&mut buffer[index..]);
1734 println!("http client read: {r:?}");
1735 match r {
1736 Err(e) => panic!("client request should not fail. Error: {e:?}"),
1737 Ok(sz) => {
1738 index += sz;
1739 }
1740 }
1741 }
1742 println!(
1743 "Response: {}",
1744 str::from_utf8(&buffer[..index]).expect("could not make string from buffer")
1745 );
1746
1747 command
1749 .write_message(&WorkerRequest {
1750 id: "ID_STOP".to_owned(),
1751 content: RequestType::SoftStop(SoftStop {}).into(),
1752 })
1753 .expect("could not send SoftStop");
1754 backend_server.unblock();
1756 });
1757 }
1758
1759 #[test]
1760 fn keep_alive() {
1761 setup_test_logger!();
1762 let front_port = crate::testing::provide_port();
1763 let backend_server = Arc::new(
1764 tiny_http::Server::http("127.0.0.1:0").expect("could not create tiny_http server"),
1765 );
1766 let backend_port = backend_server
1767 .server_addr()
1768 .to_ip()
1769 .expect("tiny_http server should bind to IP address")
1770 .port();
1771
1772 let barrier = Arc::new(Barrier::new(2));
1773
1774 let config = ListenerBuilder::new_http(SocketAddress::new_v4(127, 0, 0, 1, front_port))
1775 .to_http(None)
1776 .expect("could not create listener config");
1777
1778 let (mut command, channel) =
1779 Channel::generate(1000, 10000).expect("should create a channel");
1780
1781 thread::scope(|s| {
1782 let backend_handle = backend_server.clone();
1783 let barrier_clone = barrier.to_owned();
1784 s.spawn(move || {
1785 setup_test_logger!();
1786 start_server(&backend_handle, barrier_clone);
1787 });
1788 barrier.wait();
1789
1790 s.spawn(move || {
1791 setup_test_logger!();
1792 start_http_worker(config, channel, 10, 16384)
1793 .expect("could not start the http server");
1794 });
1795
1796 let front = RequestHttpFrontend {
1797 address: SocketAddress::new_v4(127, 0, 0, 1, front_port),
1798 hostname: "localhost".to_owned(),
1799 path: PathRule::prefix("/".to_owned()),
1800 cluster_id: Some("cluster_1".to_owned()),
1801 ..Default::default()
1802 };
1803 command
1804 .write_message(&WorkerRequest {
1805 id: "ID_ABCD".to_owned(),
1806 content: RequestType::AddHttpFrontend(front).into(),
1807 })
1808 .expect("could not send AddHttpFrontend");
1809 let backend = Backend {
1810 address: SocketAddress::new_v4(127, 0, 0, 1, backend_port).into(),
1811 backend_id: "cluster_1-0".to_owned(),
1812 backup: None,
1813 cluster_id: "cluster_1".to_owned(),
1814 load_balancing_parameters: Some(LoadBalancingParams::default()),
1815 sticky_id: None,
1816 };
1817 command
1818 .write_message(&WorkerRequest {
1819 id: "ID_EFGH".to_owned(),
1820 content: RequestType::AddBackend(backend.to_add_backend()).into(),
1821 })
1822 .expect("could not send AddBackend");
1823
1824 println!("test received: {:?}", command.read_message());
1825 println!("test received: {:?}", command.read_message());
1826
1827 let mut client =
1828 TcpStream::connect(("127.0.0.1", front_port)).expect("could not connect to sozu");
1829 client
1830 .set_read_timeout(Some(Duration::new(5, 0)))
1831 .expect("could not set read timeout");
1832
1833 let expected_len = 191;
1836
1837 let request = format!("GET / HTTP/1.1\r\nHost: localhost:{front_port}\r\n\r\n");
1838 let w = client
1839 .write(request.as_bytes())
1840 .expect("could not write first request");
1841 println!("http client write: {w:?}");
1842 barrier.wait();
1843
1844 let mut buffer = [0; 4096];
1845 let mut index = 0;
1846
1847 loop {
1848 assert!(index <= expected_len);
1849 if index == expected_len {
1850 break;
1851 }
1852
1853 let r = client.read(&mut buffer[index..]);
1854 println!("http client read: {r:?}");
1855 match r {
1856 Err(e) => panic!("client request should not fail. Error: {e:?}"),
1857 Ok(sz) => {
1858 index += sz;
1859 }
1860 }
1861 }
1862
1863 println!(
1864 "Response: {}",
1865 str::from_utf8(&buffer[..index]).expect("could not make string from buffer")
1866 );
1867
1868 println!("first request ended, will send second one");
1869 let request2 = format!("GET / HTTP/1.1\r\nHost: localhost:{front_port}\r\n\r\n");
1870 let w2 = client.write(request2.as_bytes());
1871 println!("http client write: {w2:?}");
1872 barrier.wait();
1873
1874 let mut buffer2 = [0; 4096];
1875 let mut index = 0;
1876
1877 loop {
1878 assert!(index <= expected_len);
1879 if index == expected_len {
1880 break;
1881 }
1882
1883 let r2 = client.read(&mut buffer2[index..]);
1884 println!("http client read: {r2:?}");
1885 match r2 {
1886 Err(e) => panic!("client request should not fail. Error: {e:?}"),
1887 Ok(sz) => {
1888 index += sz;
1889 }
1890 }
1891 }
1892 println!(
1893 "Response: {}",
1894 str::from_utf8(&buffer2[..index]).expect("could not make string from buffer")
1895 );
1896
1897 command
1899 .write_message(&WorkerRequest {
1900 id: "ID_STOP".to_owned(),
1901 content: RequestType::SoftStop(SoftStop {}).into(),
1902 })
1903 .expect("could not send SoftStop");
1904 backend_server.unblock();
1906 });
1907 }
1908
1909 use self::tiny_http::Response;
1910
1911 fn start_server(server: &tiny_http::Server, barrier: Arc<Barrier>) {
1912 let addr = server.server_addr();
1913 info!("starting web server on {:?}", addr);
1914 barrier.wait();
1915
1916 for request in server.incoming_requests() {
1917 info!(
1918 "backend web server got request -> method: {:?}, url: {:?}, headers: {:?}",
1919 request.method(),
1920 request.url(),
1921 request.headers()
1922 );
1923
1924 let response = Response::from_string("hello world");
1925 request
1926 .respond(response)
1927 .expect("could not respond to request");
1928 info!("backend web server sent response");
1929 barrier.wait();
1930 info!("server session stopped");
1931 }
1932
1933 println!("server on {addr:?} closed");
1934 }
1935
1936 #[test]
1937 fn frontend_from_request_test() {
1938 let cluster_id1 = "cluster_1".to_owned();
1939 let cluster_id2 = "cluster_2".to_owned();
1940 let cluster_id3 = "cluster_3".to_owned();
1941 let uri1 = "/".to_owned();
1942 let uri2 = "/yolo".to_owned();
1943 let uri3 = "/yolo/swag".to_owned();
1944
1945 let mut fronts = Router::new();
1946 fronts
1947 .add_http_front(&HttpFrontend {
1948 address: "0.0.0.0:80".parse().unwrap(),
1949 hostname: "lolcatho.st".to_owned(),
1950 method: None,
1951 path: PathRule::prefix(uri1),
1952 position: RulePosition::Tree,
1953 cluster_id: Some(cluster_id1),
1954 tags: None,
1955 redirect: None,
1956 redirect_scheme: None,
1957 redirect_template: None,
1958 rewrite_host: None,
1959 rewrite_path: None,
1960 rewrite_port: None,
1961 required_auth: None,
1962 headers: Vec::new(),
1963 hsts: None,
1964 })
1965 .expect("Could not add http frontend");
1966 fronts
1967 .add_http_front(&HttpFrontend {
1968 address: "0.0.0.0:80".parse().unwrap(),
1969 hostname: "lolcatho.st".to_owned(),
1970 method: None,
1971 path: PathRule::prefix(uri2),
1972 position: RulePosition::Tree,
1973 cluster_id: Some(cluster_id2),
1974 tags: None,
1975 redirect: None,
1976 redirect_scheme: None,
1977 redirect_template: None,
1978 rewrite_host: None,
1979 rewrite_path: None,
1980 rewrite_port: None,
1981 required_auth: None,
1982 headers: Vec::new(),
1983 hsts: None,
1984 })
1985 .expect("Could not add http frontend");
1986 fronts
1987 .add_http_front(&HttpFrontend {
1988 address: "0.0.0.0:80".parse().unwrap(),
1989 hostname: "lolcatho.st".to_owned(),
1990 method: None,
1991 path: PathRule::prefix(uri3),
1992 position: RulePosition::Tree,
1993 cluster_id: Some(cluster_id3),
1994 tags: None,
1995 redirect: None,
1996 redirect_scheme: None,
1997 redirect_template: None,
1998 rewrite_host: None,
1999 rewrite_path: None,
2000 rewrite_port: None,
2001 required_auth: None,
2002 headers: Vec::new(),
2003 hsts: None,
2004 })
2005 .expect("Could not add http frontend");
2006 fronts
2007 .add_http_front(&HttpFrontend {
2008 address: "0.0.0.0:80".parse().unwrap(),
2009 hostname: "other.domain".to_owned(),
2010 method: None,
2011 path: PathRule::prefix("/test".to_owned()),
2012 position: RulePosition::Tree,
2013 cluster_id: Some("cluster_1".to_owned()),
2014 tags: None,
2015 redirect: None,
2016 redirect_scheme: None,
2017 redirect_template: None,
2018 rewrite_host: None,
2019 rewrite_path: None,
2020 rewrite_port: None,
2021 required_auth: None,
2022 headers: Vec::new(),
2023 hsts: None,
2024 })
2025 .expect("Could not add http frontend");
2026
2027 let address = SocketAddress::new_v4(127, 0, 0, 1, 1030);
2028
2029 let default_config = ListenerBuilder::new_http(address)
2030 .to_http(None)
2031 .expect("Could not create default HTTP listener config");
2032
2033 let listener = HttpListener {
2034 listener: None,
2035 address: address.into(),
2036 fronts,
2037 answers: Rc::new(RefCell::new(HttpAnswers::new(&BTreeMap::new()).unwrap())),
2038 config: default_config,
2039 token: Token(0),
2040 active: true,
2041 tags: BTreeMap::new(),
2042 };
2043
2044 let frontend1 = listener.frontend_from_request("lolcatho.st", "/", &Method::Get);
2045 let frontend2 = listener.frontend_from_request("lolcatho.st", "/test", &Method::Get);
2046 let frontend3 = listener.frontend_from_request("lolcatho.st", "/yolo/test", &Method::Get);
2047 let frontend4 = listener.frontend_from_request("lolcatho.st", "/yolo/swag", &Method::Get);
2048 let frontend5 = listener.frontend_from_request("domain", "/", &Method::Get);
2049 assert_eq!(
2050 frontend1
2051 .expect("should find frontend")
2052 .cluster_id
2053 .as_deref(),
2054 Some("cluster_1")
2055 );
2056 assert_eq!(
2057 frontend2
2058 .expect("should find frontend")
2059 .cluster_id
2060 .as_deref(),
2061 Some("cluster_1")
2062 );
2063 assert_eq!(
2064 frontend3
2065 .expect("should find frontend")
2066 .cluster_id
2067 .as_deref(),
2068 Some("cluster_2")
2069 );
2070 assert_eq!(
2071 frontend4
2072 .expect("should find frontend")
2073 .cluster_id
2074 .as_deref(),
2075 Some("cluster_3")
2076 );
2077 assert!(frontend5.is_err());
2078 }
2079
2080 #[test]
2081 fn h2_stream_idle_timeout_inherits_back_timeout() {
2082 let address = SocketAddress::new_v4(127, 0, 0, 1, 1040);
2083 let build = |back_timeout: u32, explicit: Option<u32>| -> HttpListener {
2084 let mut cfg = ListenerBuilder::new_http(address)
2085 .to_http(None)
2086 .expect("default HTTP listener config");
2087 cfg.back_timeout = back_timeout;
2088 cfg.h2_stream_idle_timeout_seconds = explicit;
2089 HttpListener::new(cfg, Token(0)).expect("build listener")
2090 };
2091
2092 assert_eq!(
2094 build(180, None).get_h2_stream_idle_timeout(),
2095 Duration::from_secs(180)
2096 );
2097
2098 assert_eq!(
2101 build(5, None).get_h2_stream_idle_timeout(),
2102 Duration::from_secs(30)
2103 );
2104
2105 assert_eq!(
2108 build(180, Some(10)).get_h2_stream_idle_timeout(),
2109 Duration::from_secs(10)
2110 );
2111 assert_eq!(
2112 build(5, Some(600)).get_h2_stream_idle_timeout(),
2113 Duration::from_secs(600)
2114 );
2115
2116 assert_eq!(
2118 build(180, Some(0)).get_h2_stream_idle_timeout(),
2119 Duration::from_secs(1)
2120 );
2121 }
2122}