1use std::{
2 cell::RefCell,
3 collections::{BTreeMap, HashMap, hash_map::Entry},
4 io::ErrorKind,
5 net::{Shutdown, SocketAddr},
6 os::unix::io::AsRawFd,
7 rc::{Rc, Weak},
8 str::from_utf8_unchecked,
9 time::{Duration, Instant},
10};
11
12use mio::{
13 Interest, Registry, Token,
14 net::{TcpListener as MioTcpListener, TcpStream},
15 unix::SourceFd,
16};
17use rusty_ulid::Ulid;
18use sozu_command::{
19 logging::CachedTags,
20 proto::command::{
21 Cluster, HttpListenerConfig, ListenerType, RemoveListener, RequestHttpFrontend,
22 UpdateHttpListenerConfig, WorkerRequest, WorkerResponse, request::RequestType,
23 },
24 ready::Ready,
25 response::HttpFrontend,
26 state::{ClusterId, validate_h2_flood_knobs_http, validate_sozu_id_header},
27};
28
29use crate::metrics::names;
30use crate::{
31 AcceptError, FrontendFromRequestError, L7ListenerHandler, L7Proxy, ListenerError,
32 ListenerHandler, Protocol, ProxyConfiguration, ProxyError, ProxySession, SessionIsToBeClosed,
33 SessionMetrics, SessionResult, StateMachineBuilder, StateResult,
34 backends::BackendMap,
35 pool::Pool,
36 protocol::{
37 Pipe, SessionState,
38 http::{
39 answers::HttpAnswers,
40 parser::{Method, hostname_and_port},
41 },
42 mux::{self, Mux, MuxClear},
43 proxy_protocol::expect::ExpectProxyProtocol,
44 },
45 router::{RouteResult, Router},
46 server::{ListenToken, SessionManager},
47 socket::server_bind,
48 timer::TimeoutContainer,
49};
50
51#[derive(PartialEq, Eq)]
52pub enum SessionStatus {
53 Normal,
54 DefaultAnswer,
55}
56
57StateMachineBuilder! {
58 enum HttpStateMachine impl SessionState {
64 Expect(ExpectProxyProtocol<TcpStream>),
65 Mux(MuxClear),
66 WebSocket(Pipe<crate::socket::SessionTcpStream, HttpListener>),
67 }
68}
69
70macro_rules! log_module_context {
76 () => {{
77 let (open, reset, _, _, _) = sozu_command::logging::ansi_palette();
78 format!("{open}HTTP{reset}\t >>>", open = open, reset = reset)
79 }};
80}
81
82macro_rules! log_context {
88 ($self:expr) => {{
89 let (open, reset, grey, gray, white) = sozu_command::logging::ansi_palette();
90 format!(
91 "{open}HTTP{reset}\t{grey}Session{reset}({gray}frontend{reset}={white}{frontend}{reset})\t >>>",
92 open = open,
93 reset = reset,
94 grey = grey,
95 gray = gray,
96 white = white,
97 frontend = $self.frontend_token.0,
98 )
99 }};
100}
101
102pub struct HttpSession {
106 configured_backend_timeout: Duration,
107 configured_connect_timeout: Duration,
108 configured_frontend_timeout: Duration,
109 frontend_token: Token,
110 last_event: Instant,
111 listener: Rc<RefCell<HttpListener>>,
112 metrics: SessionMetrics,
113 pool: Weak<RefCell<Pool>>,
114 proxy: Rc<RefCell<HttpProxy>>,
115 state: HttpStateMachine,
116 has_been_closed: bool,
117}
118
119impl HttpSession {
120 #[allow(clippy::too_many_arguments)]
121 pub fn new(
122 configured_backend_timeout: Duration,
123 configured_connect_timeout: Duration,
124 configured_frontend_timeout: Duration,
125 configured_request_timeout: Duration,
126 expect_proxy: bool,
127 listener: Rc<RefCell<HttpListener>>,
128 pool: Weak<RefCell<Pool>>,
129 proxy: Rc<RefCell<HttpProxy>>,
130 public_address: SocketAddr,
131 sock: TcpStream,
132 token: Token,
133 wait_time: Duration,
134 ) -> Result<Self, AcceptError> {
135 let request_id = Ulid::generate();
136 let container_frontend_timeout = TimeoutContainer::new(configured_request_timeout, token);
137
138 let state = if expect_proxy {
139 trace!("{} starting in expect proxy state", log_module_context!());
140 gauge_add!(names::protocol::PROXY_EXPECT, 1);
141
142 HttpStateMachine::Expect(ExpectProxyProtocol::new(
143 container_frontend_timeout,
144 sock,
145 token,
146 request_id,
147 ))
148 } else {
149 gauge_add!(names::protocol::HTTP, 1);
150 let session_address = sock.peer_addr().ok();
151 let session_ulid = rusty_ulid::Ulid::generate();
152 let sock = crate::socket::SessionTcpStream::new(sock, session_ulid, session_address);
153
154 let frontend =
155 mux::Connection::new_h1_server(session_ulid, sock, container_frontend_timeout);
156 let router = mux::Router::new(configured_backend_timeout, configured_connect_timeout);
157 let mut context = mux::Context::new(
158 session_ulid,
159 pool.clone(),
160 listener.clone(),
161 session_address,
162 public_address,
163 );
164 context
165 .create_stream(request_id, 1 << 16)
166 .ok_or(AcceptError::BufferCapacityReached)?;
167 HttpStateMachine::Mux(Mux {
168 configured_frontend_timeout,
169 frontend_token: token,
170 frontend,
171 router,
172 context,
173 session_ulid,
174 })
175 };
176
177 debug_assert_eq!(
184 state.marker() as u8,
185 if expect_proxy {
186 StateMarker::Expect as u8
187 } else {
188 StateMarker::Mux as u8
189 },
190 "constructed state must match the expect_proxy branch"
191 );
192 debug_assert!(
196 !state.failed(),
197 "a newly created session must not start in FailedUpgrade"
198 );
199
200 let metrics = SessionMetrics::new(Some(wait_time));
201 let session = HttpSession {
202 configured_backend_timeout,
203 configured_connect_timeout,
204 configured_frontend_timeout,
205 frontend_token: token,
206 has_been_closed: false,
207 last_event: Instant::now(),
208 listener,
209 metrics,
210 pool,
211 proxy,
212 state,
213 };
214 debug_assert_eq!(
215 session.frontend_token, token,
216 "frontend token must be the slab token used for registration"
217 );
218 #[cfg(debug_assertions)]
219 session.check_invariants();
220 Ok(session)
221 }
222
223 #[cfg(debug_assertions)]
236 fn check_invariants(&self) {
237 let marker = self.state.marker();
238 debug_assert!(
239 matches!(
240 marker,
241 StateMarker::Expect | StateMarker::Mux | StateMarker::WebSocket
242 ),
243 "session marker must be a legal H1 stage (Expect/Mux/WebSocket), got {marker:?}"
244 );
245 debug_assert!(
249 !self.state.failed() || self.has_been_closed,
250 "FailedUpgrade state must be reaped by close(), never left live"
251 );
252 }
253
254 pub fn upgrade(&mut self) -> SessionIsToBeClosed {
255 debug!("{} upgrade", log_context!(self));
256 let from_marker = self.state.marker();
262 let new_state = match self.state.take() {
263 HttpStateMachine::Mux(mux) => self.upgrade_mux(mux),
264 HttpStateMachine::Expect(expect) => self.upgrade_expect(expect),
265 HttpStateMachine::WebSocket(ws) => self.upgrade_websocket(ws),
266 HttpStateMachine::FailedUpgrade(_) => {
267 error!(
271 "{} upgrade called on FailedUpgrade state; closing session",
272 log_context!(self)
273 );
274 None
275 }
276 };
277
278 match new_state {
279 Some(state) => {
280 debug_assert!(
284 matches!(
285 (from_marker, state.marker()),
286 (StateMarker::Expect, StateMarker::Mux)
287 | (StateMarker::Mux, StateMarker::WebSocket)
288 | (StateMarker::WebSocket, StateMarker::WebSocket)
289 ),
290 "illegal protocol-upgrade transition {from_marker:?} -> {:?}",
291 state.marker()
292 );
293 debug_assert!(
294 !state.failed(),
295 "a successful upgrade must not install a FailedUpgrade state"
296 );
297 self.state = state;
298 #[cfg(debug_assertions)]
299 self.check_invariants();
300 false
301 }
302 None => {
304 debug_assert!(
308 self.state.failed(),
309 "a failed upgrade must leave the session in FailedUpgrade"
310 );
311 true
312 }
313 }
314 }
315
316 fn upgrade_expect(
317 &mut self,
318 expect: ExpectProxyProtocol<TcpStream>,
319 ) -> Option<HttpStateMachine> {
320 debug!("{} switching to HTTP", log_context!(self));
321 match expect
322 .addresses
323 .as_ref()
324 .map(|add| (add.destination(), add.source()))
325 {
326 Some((Some(public_address), Some(session_address))) => {
327 let session_ulid = rusty_ulid::Ulid::generate();
328 let frontend = mux::Connection::new_h1_server(
329 session_ulid,
330 crate::socket::SessionTcpStream::new(
331 expect.frontend,
332 session_ulid,
333 Some(session_address),
334 ),
335 expect.container_frontend_timeout,
336 );
337 let router = mux::Router::new(
338 self.configured_backend_timeout,
339 self.configured_connect_timeout,
340 );
341 let mut context = mux::Context::new(
342 session_ulid,
343 self.pool.clone(),
344 self.listener.clone(),
345 Some(session_address),
346 public_address,
347 );
348 if context.create_stream(expect.request_id, 1 << 16).is_none() {
349 error!(
350 "{} expect upgrade failed: could not create stream",
351 log_context!(self)
352 );
353 return None;
354 }
355 let mut mux = Mux {
356 configured_frontend_timeout: self.configured_frontend_timeout,
357 frontend_token: self.frontend_token,
358 frontend,
359 router,
360 context,
361 session_ulid,
362 };
363 mux.frontend.readiness_mut().event = expect.frontend_readiness.event;
364
365 debug_assert_eq!(
369 mux.frontend_token, self.frontend_token,
370 "expect upgrade must preserve the frontend token"
371 );
372 debug_assert_eq!(
375 mux.context.streams.len(),
376 1,
377 "a freshly upgraded Mux owns exactly the request stream"
378 );
379
380 gauge_add!(names::protocol::PROXY_EXPECT, -1);
384 gauge_add!(names::protocol::HTTP, 1);
385 Some(HttpStateMachine::Mux(mux))
386 }
387 _ => {
388 debug!(
389 "{} expect upgrade failed: bad header {:?}",
390 log_context!(self),
391 expect.addresses
392 );
393 None
394 }
395 }
396 }
397
398 fn upgrade_mux(&mut self, mut mux: MuxClear) -> Option<HttpStateMachine> {
399 debug!("{} mux switching to ws", log_context!(self));
400 let Some(stream) = mux.context.streams.pop() else {
401 error!(
402 "{} upgrade_mux: no stream attached to the mux session, closing",
403 log_context!(self)
404 );
405 return None;
406 };
407 let (frontend_readiness, frontend_socket, mut container_frontend_timeout) =
411 match mux.frontend {
412 mux::Connection::H1(mux::ConnectionH1 {
413 readiness,
414 socket,
415 timeout_container,
416 ..
417 }) => (readiness, socket, timeout_container),
418 mux::Connection::H2(_) => {
419 error!(
420 "{} only h1<->h1 connections can upgrade to websocket",
421 log_context!(self)
422 );
423 return None;
424 }
425 };
426
427 let mux::StreamState::Linked(back_token) = stream.state else {
428 error!(
429 "{} upgrading stream should be linked to a backend",
430 log_context!(self)
431 );
432 return None;
433 };
434 debug_assert!(
440 mux.router.backends.contains_key(&back_token),
441 "a Linked stream's back token must index a connected backend"
442 );
443 let backends_before = mux.router.backends.len();
444 let Some(backend) = mux.router.backends.remove(&back_token) else {
445 error!(
446 "{} upgrade_mux: backend for token {:?} is missing (already disconnected?), closing",
447 log_context!(self),
448 back_token
449 );
450 return None;
451 };
452 let (cluster_id, backend, backend_readiness, backend_socket, mut container_backend_timeout) =
453 match backend {
454 mux::Connection::H1(mux::ConnectionH1 {
455 position:
456 mux::Position::Client(cluster_id, backend, mux::BackendStatus::Connected),
457 readiness,
458 socket,
459 timeout_container,
460 ..
461 }) => (cluster_id, backend, readiness, socket, timeout_container),
462 mux::Connection::H1(_) => {
463 error!(
464 "{} the backend disconnected just after upgrade, abort",
465 log_context!(self)
466 );
467 return None;
468 }
469 mux::Connection::H2(_) => {
470 error!(
471 "{} only h1<->h1 connections can upgrade to websocket",
472 log_context!(self)
473 );
474 return None;
475 }
476 };
477
478 debug_assert!(
481 !mux.router.backends.contains_key(&back_token),
482 "the upgraded backend must be evicted from the router map"
483 );
484 debug_assert_eq!(
485 mux.router.backends.len(),
486 backends_before - 1,
487 "removing the backend must drop the backend count by exactly one"
488 );
489
490 let ws_context = stream.context.websocket_context();
491
492 container_frontend_timeout.reset();
493 container_backend_timeout.reset();
494
495 let backend_id = backend.borrow().backend_id.clone();
496 let backend_socket = backend_socket.stream;
501 let mut pipe = Pipe::new(
502 stream.back.storage.buffer,
503 Some(backend_id),
504 Some(backend_socket),
505 Some(backend),
506 Some(container_backend_timeout),
507 Some(container_frontend_timeout),
508 Some(cluster_id),
509 stream.front.storage.buffer,
510 self.frontend_token,
511 frontend_socket,
512 self.listener.clone(),
513 Protocol::HTTP,
514 stream.context.session_id,
515 stream.context.id,
516 stream.context.session_address,
517 ws_context,
518 );
519
520 pipe.frontend_readiness.event = frontend_readiness.event;
521 pipe.backend_readiness.event = backend_readiness.event;
522 pipe.set_back_token(back_token);
525 debug_assert_eq!(
526 pipe.back_token(),
527 vec![back_token],
528 "websocket pipe must carry exactly the upgraded backend token"
529 );
530
531 gauge_add!(names::protocol::HTTP, -1);
538 gauge_add!(names::protocol::WS, 1);
539 gauge_add!(names::websocket::ACTIVE_REQUESTS, 1);
540 Some(HttpStateMachine::WebSocket(pipe))
541 }
542
543 fn upgrade_websocket(
544 &self,
545 ws: Pipe<crate::socket::SessionTcpStream, HttpListener>,
546 ) -> Option<HttpStateMachine> {
547 error!(
549 "{} upgrade called on WS, this should not happen",
550 log_context!(self)
551 );
552 Some(HttpStateMachine::WebSocket(ws))
553 }
554}
555
556impl ProxySession for HttpSession {
557 fn close(&mut self) {
558 if self.has_been_closed {
559 return;
560 }
561 debug_assert!(
563 !self.has_been_closed,
564 "close past the guard must run on a not-yet-closed session"
565 );
566
567 trace!("{} closing HTTP session", log_context!(self));
568 self.metrics.service_stop();
569
570 match self.state.marker() {
572 StateMarker::Expect => gauge_add!(names::protocol::PROXY_EXPECT, -1),
573 StateMarker::Mux => gauge_add!(names::protocol::HTTP, -1),
574 StateMarker::WebSocket => {
575 gauge_add!(names::protocol::WS, -1);
576 gauge_add!(names::websocket::ACTIVE_REQUESTS, -1);
577 }
578 }
579
580 if self.state.failed() {
581 match self.state.marker() {
582 StateMarker::Expect => incr!(names::http::UPGRADE_EXPECT_FAILED),
583 StateMarker::Mux => incr!(names::http::UPGRADE_MUX_FAILED),
584 StateMarker::WebSocket => incr!(names::http::UPGRADE_WS_FAILED),
585 }
586 self.state.close(self.proxy.clone(), &mut self.metrics);
590 self.proxy.borrow().remove_session(self.frontend_token);
591 self.has_been_closed = true;
592 debug_assert!(
593 self.has_been_closed,
594 "failed-upgrade close path must mark the session closed"
595 );
596 return;
597 }
598
599 self.state.cancel_timeouts();
600 self.state.close(self.proxy.clone(), &mut self.metrics);
602
603 let front_socket = self.state.front_socket();
604 if let Err(e) = front_socket.shutdown(Shutdown::Write) {
609 if e.kind() != ErrorKind::NotConnected {
611 error!(
612 "{} error shutting down front socket({:?}): {:?}",
613 log_context!(self),
614 front_socket,
615 e
616 )
617 }
618 }
619
620 let proxy = self.proxy.borrow();
622 let fd = front_socket.as_raw_fd();
623 if let Err(e) = proxy.registry.deregister(&mut SourceFd(&fd)) {
624 error!(
625 "{} error deregistering front socket({:?}) while closing HTTP session: {:?}",
626 log_context!(self),
627 fd,
628 e
629 );
630 }
631 proxy.remove_session(self.frontend_token);
632
633 self.has_been_closed = true;
634 debug_assert!(
635 self.has_been_closed,
636 "close must leave the session marked closed (idempotency latch)"
637 );
638 }
639
640 fn timeout(&mut self, token: Token) -> SessionIsToBeClosed {
641 let state_result = self.state.timeout(token, &mut self.metrics);
642 state_result == StateResult::CloseSession
643 }
644
645 fn protocol(&self) -> Protocol {
646 Protocol::HTTP
647 }
648
649 fn update_readiness(&mut self, token: Token, events: Ready) {
650 trace!(
651 "{} token {:?} got event {}",
652 log_context!(self),
653 token,
654 super::ready_to_string(events)
655 );
656 self.last_event = Instant::now();
657 self.metrics.wait_start();
658 self.state.update_readiness(token, events);
659 }
660
661 fn ready(&mut self, session: Rc<RefCell<dyn ProxySession>>) -> SessionIsToBeClosed {
662 self.metrics.service_start();
663
664 let session_result =
665 self.state
666 .ready(session.clone(), self.proxy.clone(), &mut self.metrics);
667
668 let to_be_closed = match session_result {
669 SessionResult::Close => true,
670 SessionResult::Continue => false,
671 SessionResult::Upgrade => match self.upgrade() {
672 false => self.ready(session),
673 true => true,
674 },
675 };
676
677 self.metrics.service_stop();
678 #[cfg(debug_assertions)]
683 if !to_be_closed {
684 self.check_invariants();
685 }
686 to_be_closed
687 }
688
689 fn shutting_down(&mut self) -> SessionIsToBeClosed {
690 self.state.shutting_down()
691 }
692
693 fn last_event(&self) -> Instant {
694 self.last_event
695 }
696
697 fn print_session(&self) {
698 self.state.print_state("HTTP");
699 error!("{} Metrics: {:?}", log_context!(self), self.metrics);
700 }
701
702 fn frontend_token(&self) -> Token {
703 self.frontend_token
704 }
705}
706
707pub type Hostname = String;
708
709pub struct HttpListener {
731 active: bool,
732 address: SocketAddr,
733 answers: Rc<RefCell<HttpAnswers>>,
734 config: HttpListenerConfig,
735 fronts: Router,
736 listener: Option<MioTcpListener>,
737 tags: BTreeMap<String, CachedTags>,
738 token: Token,
739}
740
741impl ListenerHandler for HttpListener {
742 fn get_addr(&self) -> &SocketAddr {
743 &self.address
744 }
745
746 fn get_tags(&self, key: &str) -> Option<&CachedTags> {
747 self.tags.get(key)
748 }
749
750 fn set_tags(&mut self, key: String, tags: Option<BTreeMap<String, String>>) {
751 match tags {
752 Some(tags) => self.tags.insert(key, CachedTags::new(tags)),
753 None => self.tags.remove(&key),
754 };
755 }
756
757 fn protocol(&self) -> Protocol {
758 Protocol::HTTP
759 }
760
761 fn public_address(&self) -> SocketAddr {
762 self.config
763 .public_address
764 .map(|addr| addr.into())
765 .unwrap_or(self.address)
766 }
767}
768
769impl L7ListenerHandler for HttpListener {
770 fn get_sticky_name(&self) -> &str {
771 &self.config.sticky_name
772 }
773
774 fn get_sozu_id_header(&self) -> &str {
775 self.config
776 .sozu_id_header
777 .as_deref()
778 .filter(|s| !s.is_empty())
779 .unwrap_or("Sozu-Id")
780 }
781
782 fn get_connect_timeout(&self) -> u32 {
783 self.config.connect_timeout
784 }
785
786 fn frontend_from_request(
788 &self,
789 host: &str,
790 uri: &str,
791 method: &Method,
792 ) -> Result<RouteResult, FrontendFromRequestError> {
793 let start = Instant::now();
794 let (remaining_input, (hostname, _)) = match hostname_and_port(host.as_bytes()) {
795 Ok(tuple) => tuple,
796 Err(parse_error) => {
797 return Err(FrontendFromRequestError::HostParse {
799 host: host.to_owned(),
800 error: parse_error.to_string(),
801 });
802 }
803 };
804 if remaining_input != &b""[..] {
805 return Err(FrontendFromRequestError::InvalidCharsAfterHost(
806 host.to_owned(),
807 ));
808 }
809
810 let host = unsafe { from_utf8_unchecked(hostname) };
825
826 let route = self.fronts.lookup(host, uri, method).map_err(|e| {
827 incr!(names::http::FAILED_BACKEND_MATCHING);
828 FrontendFromRequestError::NoClusterFound(e)
829 })?;
830
831 let now = Instant::now();
832
833 if let Some(cluster) = route.cluster_id.as_deref() {
834 time!(
835 names::event_loop::FRONTEND_MATCHING_TIME,
836 cluster,
837 (now - start).as_millis()
838 );
839 }
840
841 Ok(route)
842 }
843
844 fn get_answers(&self) -> &Rc<RefCell<HttpAnswers>> {
845 &self.answers
846 }
847
848 fn get_h2_flood_config(&self) -> crate::protocol::mux::H2FloodConfig {
849 let defaults = crate::protocol::mux::H2FloodConfig::default();
850 crate::protocol::mux::H2FloodConfig {
851 max_rst_stream_per_window: self
852 .config
853 .h2_max_rst_stream_per_window
854 .unwrap_or(defaults.max_rst_stream_per_window),
855 max_ping_per_window: self
856 .config
857 .h2_max_ping_per_window
858 .unwrap_or(defaults.max_ping_per_window),
859 max_settings_per_window: self
860 .config
861 .h2_max_settings_per_window
862 .unwrap_or(defaults.max_settings_per_window),
863 max_empty_data_per_window: self
864 .config
865 .h2_max_empty_data_per_window
866 .unwrap_or(defaults.max_empty_data_per_window),
867 max_window_update_stream0_per_window: self
868 .config
869 .h2_max_window_update_stream0_per_window
870 .unwrap_or(defaults.max_window_update_stream0_per_window),
871 max_continuation_frames: self
872 .config
873 .h2_max_continuation_frames
874 .unwrap_or(defaults.max_continuation_frames),
875 max_glitch_count: self
876 .config
877 .h2_max_glitch_count
878 .unwrap_or(defaults.max_glitch_count),
879 max_rst_stream_lifetime: self
880 .config
881 .h2_max_rst_stream_lifetime
882 .unwrap_or(defaults.max_rst_stream_lifetime),
883 max_rst_stream_abusive_lifetime: self
884 .config
885 .h2_max_rst_stream_abusive_lifetime
886 .unwrap_or(defaults.max_rst_stream_abusive_lifetime),
887 max_rst_stream_emitted_lifetime: self
888 .config
889 .h2_max_rst_stream_emitted_lifetime
890 .unwrap_or(defaults.max_rst_stream_emitted_lifetime),
891 max_header_list_size: self
892 .config
893 .h2_max_header_list_size
894 .unwrap_or(defaults.max_header_list_size),
895 max_header_table_size: self
896 .config
897 .h2_max_header_table_size
898 .unwrap_or(defaults.max_header_table_size),
899 max_header_fields: self
900 .config
901 .h2_max_header_fields
902 .unwrap_or(defaults.max_header_fields),
903 }
904 }
905
906 fn get_h2_connection_config(&self) -> crate::protocol::mux::H2ConnectionConfig {
907 crate::protocol::mux::H2ConnectionConfig::from_optional(
908 self.config.h2_initial_connection_window,
909 self.config.h2_max_concurrent_streams,
910 self.config.h2_stream_shrink_ratio,
911 )
912 }
913
914 fn get_h2_stream_idle_timeout(&self) -> std::time::Duration {
915 let seconds = self
922 .config
923 .h2_stream_idle_timeout_seconds
924 .map(|s| u64::from(s.max(1)))
925 .unwrap_or_else(|| u64::from(self.config.back_timeout).max(30));
926 std::time::Duration::from_secs(seconds)
927 }
928
929 fn get_h2_graceful_shutdown_deadline(&self) -> Option<std::time::Duration> {
930 match self.config.h2_graceful_shutdown_deadline_seconds {
931 None => Some(std::time::Duration::from_secs(5)),
932 Some(0) => None,
933 Some(s) => Some(std::time::Duration::from_secs(u64::from(s))),
934 }
935 }
936
937 fn get_elide_x_real_ip(&self) -> bool {
938 self.config.elide_x_real_ip.unwrap_or(false)
939 }
940
941 fn get_send_x_real_ip(&self) -> bool {
942 self.config.send_x_real_ip.unwrap_or(false)
943 }
944}
945
946pub struct HttpProxy {
947 backends: Rc<RefCell<BackendMap>>,
948 clusters: HashMap<ClusterId, Cluster>,
949 listeners: HashMap<Token, Rc<RefCell<HttpListener>>>,
950 pool: Rc<RefCell<Pool>>,
951 registry: Registry,
952 sessions: Rc<RefCell<SessionManager>>,
953}
954
955impl HttpProxy {
956 pub fn new(
957 registry: Registry,
958 sessions: Rc<RefCell<SessionManager>>,
959 pool: Rc<RefCell<Pool>>,
960 backends: Rc<RefCell<BackendMap>>,
961 ) -> HttpProxy {
962 HttpProxy {
963 backends,
964 clusters: HashMap::new(),
965 listeners: HashMap::new(),
966 pool,
967 registry,
968 sessions,
969 }
970 }
971
972 pub fn add_listener(
973 &mut self,
974 config: HttpListenerConfig,
975 token: Token,
976 ) -> Result<Token, ProxyError> {
977 match self.listeners.entry(token) {
978 Entry::Vacant(entry) => {
979 let http_listener =
980 HttpListener::new(config, token).map_err(ProxyError::AddListener)?;
981 entry.insert(Rc::new(RefCell::new(http_listener)));
982 Ok(token)
983 }
984 _ => Err(ProxyError::ListenerAlreadyPresent),
985 }
986 }
987
988 pub fn get_listener(&self, token: &Token) -> Option<Rc<RefCell<HttpListener>>> {
989 self.listeners.get(token).cloned()
990 }
991
992 pub fn remove_listener(&mut self, remove: RemoveListener) -> Result<(), ProxyError> {
993 let len = self.listeners.len();
994 let remove_address = remove.address.into();
995 self.listeners
996 .retain(|_, l| l.borrow().address != remove_address);
997
998 if !self.listeners.len() < len {
999 info!(
1000 "{} no HTTP listener to remove at address {:?}",
1001 log_module_context!(),
1002 remove_address
1003 );
1004 }
1005 Ok(())
1006 }
1007
1008 pub fn activate_listener(
1009 &self,
1010 addr: &SocketAddr,
1011 tcp_listener: Option<MioTcpListener>,
1012 ) -> Result<Token, ProxyError> {
1013 let listener = self
1014 .listeners
1015 .values()
1016 .find(|listener| listener.borrow().address == *addr)
1017 .ok_or(ProxyError::NoListenerFound(addr.to_owned()))?;
1018
1019 listener
1020 .borrow_mut()
1021 .activate(&self.registry, tcp_listener)
1022 .map_err(|listener_error| ProxyError::ListenerActivation {
1023 address: *addr,
1024 listener_error,
1025 })
1026 }
1027
1028 pub fn give_back_listeners(&mut self) -> Vec<(SocketAddr, MioTcpListener)> {
1029 self.listeners
1030 .iter()
1031 .filter_map(|(_, listener)| {
1032 let mut owned = listener.borrow_mut();
1033 if let Some(listener) = owned.listener.take() {
1034 owned.active = false;
1037 return Some((owned.address, listener));
1038 }
1039
1040 None
1041 })
1042 .collect()
1043 }
1044
1045 pub fn give_back_listener(
1046 &mut self,
1047 address: SocketAddr,
1048 ) -> Result<(Token, MioTcpListener), ProxyError> {
1049 let listener = self
1050 .listeners
1051 .values()
1052 .find(|listener| listener.borrow().address == address)
1053 .ok_or(ProxyError::NoListenerFound(address))?;
1054
1055 let mut owned = listener.borrow_mut();
1056
1057 let taken_listener = owned
1058 .listener
1059 .take()
1060 .ok_or(ProxyError::UnactivatedListener)?;
1061
1062 owned.active = false;
1065
1066 Ok((owned.token, taken_listener))
1067 }
1068
1069 pub fn update_listener(&mut self, patch: UpdateHttpListenerConfig) -> Result<(), ProxyError> {
1071 let address: std::net::SocketAddr = patch.address.into();
1072 let listener = self
1073 .listeners
1074 .values()
1075 .find(|l| l.borrow().address == address)
1076 .ok_or(ProxyError::NoListenerFound(address))?;
1077 listener
1078 .borrow_mut()
1079 .update_config(&patch)
1080 .map_err(|listener_error| ProxyError::ListenerActivation {
1081 address,
1082 listener_error,
1083 })
1084 }
1085
1086 pub fn add_cluster(&mut self, mut cluster: Cluster) -> Result<(), ProxyError> {
1087 let mut overrides = cluster.answers.clone();
1090 if let Some(answer_503) = cluster.answer_503.take() {
1091 overrides.entry("503".to_owned()).or_insert(answer_503);
1092 }
1093 if !overrides.is_empty() {
1094 for listener in self.listeners.values() {
1095 listener
1096 .borrow()
1097 .answers
1098 .borrow_mut()
1099 .add_cluster_answers(&cluster.cluster_id, &overrides)
1100 .map_err(|(name, error)| {
1101 ProxyError::AddCluster(ListenerError::TemplateParse(name, error))
1102 })?;
1103 }
1104 }
1105 self.clusters.insert(cluster.cluster_id.clone(), cluster);
1106 Ok(())
1107 }
1108
1109 pub fn remove_cluster(&mut self, cluster_id: &str) -> Result<(), ProxyError> {
1110 self.clusters.remove(cluster_id);
1111
1112 for listener in self.listeners.values() {
1113 listener
1114 .borrow()
1115 .answers
1116 .borrow_mut()
1117 .remove_cluster_answers(cluster_id);
1118 }
1119 Ok(())
1120 }
1121
1122 pub fn add_http_frontend(&mut self, front: RequestHttpFrontend) -> Result<(), ProxyError> {
1123 if front.hsts.is_some() {
1138 incr!(names::http::HSTS_SUPPRESSED_PLAINTEXT);
1139 return Err(ProxyError::HstsOnPlainHttp(front.address.into()));
1140 }
1141
1142 let front = front.clone().to_frontend().map_err(|request_error| {
1143 ProxyError::WrongInputFrontend {
1144 front: Box::new(front),
1145 error: request_error.to_string(),
1146 }
1147 })?;
1148
1149 let mut listener = self
1150 .listeners
1151 .values()
1152 .find(|l| l.borrow().address == front.address)
1153 .ok_or(ProxyError::NoListenerFound(front.address))?
1154 .borrow_mut();
1155
1156 let hostname = front.hostname.to_owned();
1157 let tags = front.tags.to_owned();
1158
1159 listener
1160 .add_http_front(front)
1161 .map_err(ProxyError::AddFrontend)?;
1162 listener.set_tags(hostname, tags);
1163 Ok(())
1164 }
1165
1166 pub fn remove_http_frontend(&mut self, front: RequestHttpFrontend) -> Result<(), ProxyError> {
1167 let front = front.clone().to_frontend().map_err(|request_error| {
1168 ProxyError::WrongInputFrontend {
1169 front: Box::new(front),
1170 error: request_error.to_string(),
1171 }
1172 })?;
1173
1174 let mut listener = self
1175 .listeners
1176 .values()
1177 .find(|l| l.borrow().address == front.address)
1178 .ok_or(ProxyError::NoListenerFound(front.address))?
1179 .borrow_mut();
1180
1181 let hostname = front.hostname.to_owned();
1182
1183 listener
1184 .remove_http_front(front)
1185 .map_err(ProxyError::RemoveFrontend)?;
1186
1187 if !listener.fronts.has_hostname(&hostname) {
1188 listener.set_tags(hostname, None);
1189 }
1190 Ok(())
1191 }
1192
1193 pub fn soft_stop(&mut self) -> Result<(), ProxyError> {
1194 let listeners: HashMap<_, _> = self.listeners.drain().collect();
1195 let mut socket_errors = vec![];
1196 for (_, l) in listeners.iter() {
1197 if let Some(mut sock) = l.borrow_mut().listener.take() {
1198 debug!("{} deregistering socket {:?}", log_module_context!(), sock);
1199 if let Err(e) = self.registry.deregister(&mut sock) {
1200 let error = format!("socket {sock:?}: {e:?}");
1201 socket_errors.push(error);
1202 }
1203 }
1204 }
1205
1206 if !socket_errors.is_empty() {
1207 return Err(ProxyError::SoftStop {
1208 proxy_protocol: "HTTP".to_string(),
1209 error: format!("Error deregistering listen sockets: {socket_errors:?}"),
1210 });
1211 }
1212
1213 Ok(())
1214 }
1215
1216 pub fn hard_stop(&mut self) -> Result<(), ProxyError> {
1217 let mut listeners: HashMap<_, _> = self.listeners.drain().collect();
1218 let mut socket_errors = vec![];
1219 for (_, l) in listeners.drain() {
1220 if let Some(mut sock) = l.borrow_mut().listener.take() {
1221 debug!("{} deregistering socket {:?}", log_module_context!(), sock);
1222 if let Err(e) = self.registry.deregister(&mut sock) {
1223 let error = format!("socket {sock:?}: {e:?}");
1224 socket_errors.push(error);
1225 }
1226 }
1227 }
1228
1229 if !socket_errors.is_empty() {
1230 return Err(ProxyError::HardStop {
1231 proxy_protocol: "HTTP".to_string(),
1232 error: format!("Error deregistering listen sockets: {socket_errors:?}"),
1233 });
1234 }
1235
1236 Ok(())
1237 }
1238}
1239
1240impl HttpListener {
1241 pub fn new(config: HttpListenerConfig, token: Token) -> Result<HttpListener, ListenerError> {
1242 Ok(HttpListener {
1243 active: false,
1244 address: config.address.into(),
1245 answers: Rc::new(RefCell::new({
1246 let mut answers_map = config.answers.clone();
1251 if let Some(ref legacy) = config.http_answers {
1252 crate::protocol::http::answers::merge_legacy_into_map(&mut answers_map, legacy);
1253 }
1254 HttpAnswers::new(&answers_map)
1255 .map_err(|(name, error)| ListenerError::TemplateParse(name, error))?
1256 })),
1257 config,
1258 fronts: Router::new(),
1259 listener: None,
1260 tags: BTreeMap::new(),
1261 token,
1262 })
1263 }
1264
1265 pub fn activate(
1266 &mut self,
1267 registry: &Registry,
1268 tcp_listener: Option<MioTcpListener>,
1269 ) -> Result<Token, ListenerError> {
1270 if self.active {
1271 return Ok(self.token);
1272 }
1273 let address: SocketAddr = self.config.address.into();
1274
1275 let mut listener = match tcp_listener {
1276 Some(tcp_listener) => tcp_listener,
1277 None => {
1278 server_bind(address).map_err(|server_bind_error| ListenerError::Activation {
1279 address,
1280 error: server_bind_error.to_string(),
1281 })?
1282 }
1283 };
1284
1285 registry
1286 .register(&mut listener, self.token, Interest::READABLE)
1287 .map_err(ListenerError::SocketRegistration)?;
1288
1289 self.listener = Some(listener);
1290 self.active = true;
1291 Ok(self.token)
1292 }
1293
1294 pub fn update_config(&mut self, patch: &UpdateHttpListenerConfig) -> Result<(), ListenerError> {
1300 validate_h2_flood_knobs_http(patch)?;
1305 if let Some(ref hdr) = patch.sozu_id_header {
1306 validate_sozu_id_header(hdr)?;
1307 }
1308
1309 if let Some(v) = patch.public_address {
1310 self.config.public_address = Some(v);
1311 }
1312 if let Some(v) = patch.expect_proxy {
1313 self.config.expect_proxy = v;
1314 }
1315 if let Some(ref v) = patch.sticky_name {
1316 self.config.sticky_name = v.to_owned();
1317 }
1318 if let Some(v) = patch.front_timeout {
1319 self.config.front_timeout = v;
1320 }
1321 if let Some(v) = patch.back_timeout {
1322 self.config.back_timeout = v;
1323 }
1324 if let Some(v) = patch.connect_timeout {
1325 self.config.connect_timeout = v;
1326 }
1327 if let Some(v) = patch.request_timeout {
1328 self.config.request_timeout = v;
1329 }
1330 if let Some(ref v) = patch.sozu_id_header {
1331 self.config.sozu_id_header = Some(v.to_owned());
1332 }
1333 if let Some(v) = patch.elide_x_real_ip {
1334 self.config.elide_x_real_ip = Some(v);
1335 }
1336 if let Some(v) = patch.send_x_real_ip {
1337 self.config.send_x_real_ip = Some(v);
1338 }
1339
1340 if let Some(v) = patch.h2_max_rst_stream_per_window {
1342 self.config.h2_max_rst_stream_per_window = Some(v);
1343 }
1344 if let Some(v) = patch.h2_max_ping_per_window {
1345 self.config.h2_max_ping_per_window = Some(v);
1346 }
1347 if let Some(v) = patch.h2_max_settings_per_window {
1348 self.config.h2_max_settings_per_window = Some(v);
1349 }
1350 if let Some(v) = patch.h2_max_empty_data_per_window {
1351 self.config.h2_max_empty_data_per_window = Some(v);
1352 }
1353 if let Some(v) = patch.h2_max_continuation_frames {
1354 self.config.h2_max_continuation_frames = Some(v);
1355 }
1356 if let Some(v) = patch.h2_max_glitch_count {
1357 self.config.h2_max_glitch_count = Some(v);
1358 }
1359 if let Some(v) = patch.h2_initial_connection_window {
1360 self.config.h2_initial_connection_window = Some(v);
1361 }
1362 if let Some(v) = patch.h2_max_concurrent_streams {
1363 self.config.h2_max_concurrent_streams = Some(v);
1364 }
1365 if let Some(v) = patch.h2_stream_shrink_ratio {
1366 self.config.h2_stream_shrink_ratio = Some(v);
1367 }
1368 if let Some(v) = patch.h2_max_rst_stream_lifetime {
1369 self.config.h2_max_rst_stream_lifetime = Some(v);
1370 }
1371 if let Some(v) = patch.h2_max_rst_stream_abusive_lifetime {
1372 self.config.h2_max_rst_stream_abusive_lifetime = Some(v);
1373 }
1374 if let Some(v) = patch.h2_max_rst_stream_emitted_lifetime {
1375 self.config.h2_max_rst_stream_emitted_lifetime = Some(v);
1376 }
1377 if let Some(v) = patch.h2_max_header_list_size {
1378 self.config.h2_max_header_list_size = Some(v);
1379 }
1380 if let Some(v) = patch.h2_max_header_table_size {
1381 self.config.h2_max_header_table_size = Some(v);
1382 }
1383 if let Some(v) = patch.h2_max_header_fields {
1384 self.config.h2_max_header_fields = Some(v);
1385 }
1386 if let Some(v) = patch.h2_stream_idle_timeout_seconds {
1387 self.config.h2_stream_idle_timeout_seconds = Some(v);
1388 }
1389 if let Some(v) = patch.h2_graceful_shutdown_deadline_seconds {
1390 self.config.h2_graceful_shutdown_deadline_seconds = Some(v);
1391 }
1392 if let Some(v) = patch.h2_max_window_update_stream0_per_window {
1393 self.config.h2_max_window_update_stream0_per_window = Some(v);
1394 }
1395
1396 let answers_changed = patch.http_answers.is_some() || !patch.answers.is_empty();
1401 if answers_changed {
1402 if let Some(ref new_answers) = patch.http_answers {
1403 crate::sozu_command::state::merge_custom_http_answers(
1404 &mut self.config.http_answers,
1405 new_answers,
1406 );
1407 }
1408 for (code, body) in &patch.answers {
1409 if !body.is_empty() {
1410 self.config.answers.insert(code.clone(), body.clone());
1411 }
1412 }
1413
1414 let mut answers_map = self.config.answers.clone();
1415 if let Some(ref legacy) = self.config.http_answers {
1416 crate::protocol::http::answers::merge_legacy_into_map(&mut answers_map, legacy);
1417 }
1418 let mut new_answers = HttpAnswers::new(&answers_map)
1421 .map_err(|(name, error)| ListenerError::TemplateParse(name, error))?;
1422 let preserved = std::mem::take(&mut self.answers.borrow_mut().cluster_answers);
1423 new_answers.cluster_answers = preserved;
1424 *self.answers.borrow_mut() = new_answers;
1425 }
1426
1427 Ok(())
1428 }
1429
1430 pub fn add_http_front(&mut self, http_front: HttpFrontend) -> Result<(), ListenerError> {
1431 self.fronts
1432 .add_http_front(&http_front)
1433 .map_err(ListenerError::AddFrontend)
1434 }
1435
1436 pub fn remove_http_front(&mut self, http_front: HttpFrontend) -> Result<(), ListenerError> {
1437 debug!(
1438 "{} removing http_front {:?}",
1439 log_module_context!(),
1440 http_front
1441 );
1442 self.fronts
1443 .remove_http_front(&http_front)
1444 .map_err(ListenerError::RemoveFrontend)
1445 }
1446
1447 fn accept(&mut self) -> Result<TcpStream, AcceptError> {
1448 if let Some(ref sock) = self.listener {
1449 sock.accept()
1450 .map_err(|e| match e.kind() {
1451 ErrorKind::WouldBlock => AcceptError::WouldBlock,
1452 _ => {
1453 error!("{} accept() IO error: {:?}", log_module_context!(), e);
1454 AcceptError::IoError
1455 }
1456 })
1457 .map(|(sock, _)| sock)
1458 } else {
1459 error!(
1460 "{} cannot accept connections, no listening socket available",
1461 log_module_context!()
1462 );
1463 Err(AcceptError::IoError)
1464 }
1465 }
1466}
1467
1468impl ProxyConfiguration for HttpProxy {
1469 fn notify(&mut self, request: WorkerRequest) -> WorkerResponse {
1470 let request_id = request.id.clone();
1471
1472 let result = match request.content.request_type {
1473 Some(RequestType::AddCluster(cluster)) => {
1474 debug!(
1475 "{} {} add cluster {:?}",
1476 log_module_context!(),
1477 request.id,
1478 cluster
1479 );
1480 self.add_cluster(cluster)
1481 }
1482 Some(RequestType::RemoveCluster(cluster_id)) => {
1483 debug!(
1484 "{} {} remove cluster {:?}",
1485 log_module_context!(),
1486 request_id,
1487 cluster_id
1488 );
1489 self.remove_cluster(&cluster_id)
1490 }
1491 Some(RequestType::AddHttpFrontend(front)) => {
1492 debug!(
1493 "{} {} add front {:?}",
1494 log_module_context!(),
1495 request_id,
1496 front
1497 );
1498 self.add_http_frontend(front)
1499 }
1500 Some(RequestType::RemoveHttpFrontend(front)) => {
1501 debug!(
1502 "{} {} remove front {:?}",
1503 log_module_context!(),
1504 request_id,
1505 front
1506 );
1507 self.remove_http_frontend(front)
1508 }
1509 Some(RequestType::RemoveListener(remove)) => {
1510 debug!(
1511 "{} removing HTTP listener at address {:?}",
1512 log_module_context!(),
1513 remove.address
1514 );
1515 self.remove_listener(remove)
1516 }
1517 Some(RequestType::SoftStop(_)) => {
1518 debug!(
1519 "{} {} processing soft shutdown",
1520 log_module_context!(),
1521 request_id
1522 );
1523 match self.soft_stop() {
1524 Ok(()) => {
1525 info!(
1526 "{} {} soft stop successful",
1527 log_module_context!(),
1528 request_id
1529 );
1530 return WorkerResponse::processing(request.id);
1531 }
1532 Err(e) => Err(e),
1533 }
1534 }
1535 Some(RequestType::HardStop(_)) => {
1536 debug!(
1537 "{} {} processing hard shutdown",
1538 log_module_context!(),
1539 request_id
1540 );
1541 match self.hard_stop() {
1542 Ok(()) => {
1543 info!(
1544 "{} {} hard stop successful",
1545 log_module_context!(),
1546 request_id
1547 );
1548 return WorkerResponse::processing(request.id);
1549 }
1550 Err(e) => Err(e),
1551 }
1552 }
1553 Some(RequestType::Status(_)) => {
1554 debug!("{} {} status", log_module_context!(), request_id);
1555 Ok(())
1556 }
1557 other_command => {
1558 debug!(
1559 "{} {} unsupported message for HTTP proxy, ignoring: {:?}",
1560 log_module_context!(),
1561 request.id,
1562 other_command
1563 );
1564 Err(ProxyError::UnsupportedMessage)
1565 }
1566 };
1567
1568 match result {
1569 Ok(()) => {
1570 debug!("{} {} successful", log_module_context!(), request_id);
1571 WorkerResponse::ok(request_id)
1572 }
1573 Err(proxy_error) => {
1574 debug!(
1575 "{} {} unsuccessful: {}",
1576 log_module_context!(),
1577 request_id,
1578 proxy_error
1579 );
1580 WorkerResponse::error(request_id, proxy_error)
1581 }
1582 }
1583 }
1584
1585 fn accept(&mut self, token: ListenToken) -> Result<TcpStream, AcceptError> {
1586 if let Some(listener) = self.listeners.get(&Token(token.0)) {
1587 listener.borrow_mut().accept()
1588 } else {
1589 Err(AcceptError::IoError)
1590 }
1591 }
1592
1593 fn create_session(
1594 &mut self,
1595 mut frontend_sock: TcpStream,
1596 listener_token: ListenToken,
1597 wait_time: Duration,
1598 proxy: Rc<RefCell<Self>>,
1599 ) -> Result<(), AcceptError> {
1600 let listener = self
1601 .listeners
1602 .get(&Token(listener_token.0))
1603 .cloned()
1604 .ok_or(AcceptError::IoError)?;
1605
1606 if let Err(e) = frontend_sock.set_nodelay(true) {
1607 error!(
1608 "{} error setting nodelay on front socket({:?}): {:?}",
1609 log_module_context!(),
1610 frontend_sock,
1611 e
1612 );
1613 }
1614 let mut session_manager = self.sessions.borrow_mut();
1615 let slab_len_before = session_manager.slab.len();
1616 let session_entry = session_manager.slab.vacant_entry();
1617 let session_token = Token(session_entry.key());
1618 debug_assert_eq!(
1623 session_token.0,
1624 session_entry.key(),
1625 "session token must equal the slab vacant-entry key"
1626 );
1627 let owned = listener.borrow();
1628
1629 if let Err(register_error) = self.registry.register(
1630 &mut frontend_sock,
1631 session_token,
1632 Interest::READABLE | Interest::WRITABLE,
1633 ) {
1634 error!(
1635 "{} error registering listen socket({:?}): {:?}",
1636 log_module_context!(),
1637 frontend_sock,
1638 register_error
1639 );
1640 return Err(AcceptError::RegisterError);
1641 }
1642
1643 let public_address: SocketAddr = match owned.config.public_address {
1644 Some(pub_addr) => pub_addr.into(),
1645 None => owned.config.address.into(),
1646 };
1647
1648 let session = HttpSession::new(
1649 Duration::from_secs(owned.config.back_timeout as u64),
1650 Duration::from_secs(owned.config.connect_timeout as u64),
1651 Duration::from_secs(owned.config.front_timeout as u64),
1652 Duration::from_secs(owned.config.request_timeout as u64),
1653 owned.config.expect_proxy,
1654 listener.clone(),
1655 Rc::downgrade(&self.pool),
1656 proxy,
1657 public_address,
1658 frontend_sock,
1659 session_token,
1660 wait_time,
1661 )?;
1662
1663 debug_assert_eq!(
1667 session.frontend_token, session_token,
1668 "session must own the frontend token it was created with"
1669 );
1670
1671 let session = Rc::new(RefCell::new(session));
1672 session_entry.insert(session);
1673 debug_assert_eq!(
1677 session_manager.slab.len(),
1678 slab_len_before + 1,
1679 "creating a session must occupy exactly one new slab slot"
1680 );
1681
1682 Ok(())
1683 }
1684}
1685
1686impl L7Proxy for HttpProxy {
1687 fn kind(&self) -> ListenerType {
1688 ListenerType::Http
1689 }
1690
1691 fn register_socket(
1692 &self,
1693 source: &mut TcpStream,
1694 token: Token,
1695 interest: Interest,
1696 ) -> Result<(), std::io::Error> {
1697 self.registry.register(source, token, interest)
1698 }
1699
1700 fn deregister_socket(&self, tcp_stream: &mut TcpStream) -> Result<(), std::io::Error> {
1701 self.registry.deregister(tcp_stream)
1702 }
1703
1704 fn add_session(&self, session: Rc<RefCell<dyn ProxySession>>) -> Token {
1705 let mut session_manager = self.sessions.borrow_mut();
1706 let len_before = session_manager.slab.len();
1707 let entry = session_manager.slab.vacant_entry();
1708 let token = Token(entry.key());
1709 let _entry = entry.insert(session);
1710 debug_assert_eq!(
1713 session_manager.slab.len(),
1714 len_before + 1,
1715 "add_session must occupy exactly one new slab slot"
1716 );
1717 debug_assert!(
1718 session_manager.slab.contains(token.0),
1719 "the returned token must index the freshly inserted session"
1720 );
1721 token
1722 }
1723
1724 fn remove_session(&self, token: Token) -> bool {
1725 let mut sessions = self.sessions.borrow_mut();
1726 let was_present = sessions.slab.contains(token.0);
1727 let len_before = sessions.slab.len();
1728 sessions.untrack_all_cluster_ip(token);
1734 let removed = sessions.slab.try_remove(token.0).is_some();
1735 debug_assert_eq!(
1739 removed, was_present,
1740 "try_remove reports presence iff the slot was occupied"
1741 );
1742 debug_assert_eq!(
1743 sessions.slab.len(),
1744 len_before - removed as usize,
1745 "slab len drops by exactly one iff a session was removed"
1746 );
1747 debug_assert!(
1748 !sessions.slab.contains(token.0),
1749 "the slot must be free after remove_session"
1750 );
1751 removed
1752 }
1753
1754 fn backends(&self) -> Rc<RefCell<BackendMap>> {
1755 self.backends.clone()
1756 }
1757
1758 fn clusters(&self) -> &HashMap<ClusterId, Cluster> {
1759 &self.clusters
1760 }
1761
1762 fn sessions(&self) -> Rc<RefCell<SessionManager>> {
1763 self.sessions.clone()
1764 }
1765}
1766
1767pub mod testing {
1768 use crate::testing::*;
1769
1770 pub fn start_http_worker(
1772 config: HttpListenerConfig,
1773 channel: ProxyChannel,
1774 max_buffers: usize,
1775 buffer_size: usize,
1776 ) -> anyhow::Result<()> {
1777 let address = config.address.into();
1778
1779 let ServerParts {
1780 event_loop,
1781 registry,
1782 sessions,
1783 pool,
1784 backends,
1785 client_scm_socket: _,
1786 server_scm_socket,
1787 server_config,
1788 } = prebuild_server(max_buffers, buffer_size, true)?;
1789
1790 let token = {
1791 let mut sessions = sessions.borrow_mut();
1792 let entry = sessions.slab.vacant_entry();
1793 let key = entry.key();
1794 let _ = entry.insert(Rc::new(RefCell::new(ListenSession {
1795 protocol: Protocol::HTTPListen,
1796 })));
1797 Token(key)
1798 };
1799
1800 let mut proxy = HttpProxy::new(registry, sessions.clone(), pool.clone(), backends.clone());
1801 proxy
1802 .add_listener(config, token)
1803 .with_context(|| "Failed at creating adding the listener")?;
1804 proxy
1805 .activate_listener(&address, None)
1806 .with_context(|| "Failed at creating activating the listener")?;
1807
1808 let mut server = Server::new(
1809 event_loop,
1810 channel,
1811 server_scm_socket,
1812 sessions,
1813 pool,
1814 backends,
1815 Some(proxy),
1816 None,
1817 None,
1818 server_config,
1819 None,
1820 false,
1821 )
1822 .with_context(|| "Failed at creating server")?;
1823
1824 debug!("{} starting event loop", log_module_context!());
1825 server.run();
1826 debug!("{} ending event loop", log_module_context!());
1827 Ok(())
1828 }
1829}
1830
1831#[cfg(test)]
1832mod tests {
1833 extern crate tiny_http;
1834
1835 use std::{
1836 io::{Read, Write},
1837 net::TcpStream,
1838 str,
1839 sync::{Arc, Barrier},
1840 thread,
1841 time::Duration,
1842 };
1843
1844 use sozu_command::proto::command::SocketAddress;
1845
1846 use super::{testing::start_http_worker, *};
1847 use crate::sozu_command::{
1848 channel::Channel,
1849 config::ListenerBuilder,
1850 proto::command::{
1851 LoadBalancingParams, PathRule, RulePosition, SoftStop, WorkerRequest,
1852 request::RequestType,
1853 },
1854 response::{Backend, HttpFrontend},
1855 };
1856
1857 #[test]
1871 fn round_trip() {
1872 setup_test_logger!();
1873 let front_port = crate::testing::provide_port();
1874 let backend_server = Arc::new(
1875 tiny_http::Server::http("127.0.0.1:0").expect("could not create tiny_http server"),
1876 );
1877 let backend_port = backend_server
1878 .server_addr()
1879 .to_ip()
1880 .expect("tiny_http server should bind to IP address")
1881 .port();
1882
1883 let barrier = Arc::new(Barrier::new(2));
1884
1885 let config = ListenerBuilder::new_http(SocketAddress::new_v4(127, 0, 0, 1, front_port))
1886 .to_http(None)
1887 .expect("could not create listener config");
1888
1889 let (mut command, channel) =
1890 Channel::generate(1000, 10000).expect("should create a channel");
1891
1892 thread::scope(|s| {
1893 let backend_handle = backend_server.clone();
1894 let barrier_clone = barrier.to_owned();
1895 s.spawn(move || {
1896 setup_test_logger!();
1897 start_server(&backend_handle, barrier_clone);
1898 });
1899 barrier.wait();
1900
1901 s.spawn(move || {
1902 setup_test_logger!();
1903 start_http_worker(config, channel, 10, 16384)
1904 .expect("could not start the http server");
1905 });
1906
1907 let front = RequestHttpFrontend {
1908 cluster_id: Some("cluster_1".to_owned()),
1909 address: SocketAddress::new_v4(127, 0, 0, 1, front_port),
1910 hostname: "localhost".to_owned(),
1911 path: PathRule::prefix("/".to_owned()),
1912 ..Default::default()
1913 };
1914 command
1915 .write_message(&WorkerRequest {
1916 id: "ID_ABCD".to_owned(),
1917 content: RequestType::AddHttpFrontend(front).into(),
1918 })
1919 .expect("could not send AddHttpFrontend");
1920 let backend = Backend {
1921 cluster_id: "cluster_1".to_owned(),
1922 backend_id: "cluster_1-0".to_owned(),
1923 address: SocketAddress::new_v4(127, 0, 0, 1, backend_port).into(),
1924 load_balancing_parameters: Some(LoadBalancingParams::default()),
1925 sticky_id: None,
1926 backup: None,
1927 };
1928 command
1929 .write_message(&WorkerRequest {
1930 id: "ID_EFGH".to_owned(),
1931 content: RequestType::AddBackend(backend.to_add_backend()).into(),
1932 })
1933 .expect("could not send AddBackend");
1934
1935 println!("test received: {:?}", command.read_message());
1936 println!("test received: {:?}", command.read_message());
1937
1938 let mut client =
1939 TcpStream::connect(("127.0.0.1", front_port)).expect("could not connect to sozu");
1940
1941 client
1942 .set_read_timeout(Some(Duration::new(1, 0)))
1943 .expect("could not set read timeout");
1944 let request = format!(
1945 "GET / HTTP/1.1\r\nHost: localhost:{front_port}\r\nConnection: Close\r\n\r\n"
1946 );
1947 let w = client.write(request.as_bytes());
1948 println!("http client write: {w:?}");
1949
1950 barrier.wait();
1951 let mut buffer = [0; 4096];
1952 let mut index = 0;
1953
1954 let expected_len = 191;
1957
1958 loop {
1959 assert!(index <= expected_len);
1960 if index == expected_len {
1961 break;
1962 }
1963
1964 let r = client.read(&mut buffer[index..]);
1965 println!("http client read: {r:?}");
1966 match r {
1967 Err(e) => panic!("client request should not fail. Error: {e:?}"),
1968 Ok(sz) => {
1969 index += sz;
1970 }
1971 }
1972 }
1973 println!(
1974 "Response: {}",
1975 str::from_utf8(&buffer[..index]).expect("could not make string from buffer")
1976 );
1977
1978 command
1980 .write_message(&WorkerRequest {
1981 id: "ID_STOP".to_owned(),
1982 content: RequestType::SoftStop(SoftStop {}).into(),
1983 })
1984 .expect("could not send SoftStop");
1985 backend_server.unblock();
1987 });
1988 }
1989
1990 #[test]
1991 fn keep_alive() {
1992 setup_test_logger!();
1993 let front_port = crate::testing::provide_port();
1994 let backend_server = Arc::new(
1995 tiny_http::Server::http("127.0.0.1:0").expect("could not create tiny_http server"),
1996 );
1997 let backend_port = backend_server
1998 .server_addr()
1999 .to_ip()
2000 .expect("tiny_http server should bind to IP address")
2001 .port();
2002
2003 let barrier = Arc::new(Barrier::new(2));
2004
2005 let config = ListenerBuilder::new_http(SocketAddress::new_v4(127, 0, 0, 1, front_port))
2006 .to_http(None)
2007 .expect("could not create listener config");
2008
2009 let (mut command, channel) =
2010 Channel::generate(1000, 10000).expect("should create a channel");
2011
2012 thread::scope(|s| {
2013 let backend_handle = backend_server.clone();
2014 let barrier_clone = barrier.to_owned();
2015 s.spawn(move || {
2016 setup_test_logger!();
2017 start_server(&backend_handle, barrier_clone);
2018 });
2019 barrier.wait();
2020
2021 s.spawn(move || {
2022 setup_test_logger!();
2023 start_http_worker(config, channel, 10, 16384)
2024 .expect("could not start the http server");
2025 });
2026
2027 let front = RequestHttpFrontend {
2028 address: SocketAddress::new_v4(127, 0, 0, 1, front_port),
2029 hostname: "localhost".to_owned(),
2030 path: PathRule::prefix("/".to_owned()),
2031 cluster_id: Some("cluster_1".to_owned()),
2032 ..Default::default()
2033 };
2034 command
2035 .write_message(&WorkerRequest {
2036 id: "ID_ABCD".to_owned(),
2037 content: RequestType::AddHttpFrontend(front).into(),
2038 })
2039 .expect("could not send AddHttpFrontend");
2040 let backend = Backend {
2041 address: SocketAddress::new_v4(127, 0, 0, 1, backend_port).into(),
2042 backend_id: "cluster_1-0".to_owned(),
2043 backup: None,
2044 cluster_id: "cluster_1".to_owned(),
2045 load_balancing_parameters: Some(LoadBalancingParams::default()),
2046 sticky_id: None,
2047 };
2048 command
2049 .write_message(&WorkerRequest {
2050 id: "ID_EFGH".to_owned(),
2051 content: RequestType::AddBackend(backend.to_add_backend()).into(),
2052 })
2053 .expect("could not send AddBackend");
2054
2055 println!("test received: {:?}", command.read_message());
2056 println!("test received: {:?}", command.read_message());
2057
2058 let mut client =
2059 TcpStream::connect(("127.0.0.1", front_port)).expect("could not connect to sozu");
2060 client
2061 .set_read_timeout(Some(Duration::new(5, 0)))
2062 .expect("could not set read timeout");
2063
2064 let expected_len = 191;
2067
2068 let request = format!("GET / HTTP/1.1\r\nHost: localhost:{front_port}\r\n\r\n");
2069 let w = client
2070 .write(request.as_bytes())
2071 .expect("could not write first request");
2072 println!("http client write: {w:?}");
2073 barrier.wait();
2074
2075 let mut buffer = [0; 4096];
2076 let mut index = 0;
2077
2078 loop {
2079 assert!(index <= expected_len);
2080 if index == expected_len {
2081 break;
2082 }
2083
2084 let r = client.read(&mut buffer[index..]);
2085 println!("http client read: {r:?}");
2086 match r {
2087 Err(e) => panic!("client request should not fail. Error: {e:?}"),
2088 Ok(sz) => {
2089 index += sz;
2090 }
2091 }
2092 }
2093
2094 println!(
2095 "Response: {}",
2096 str::from_utf8(&buffer[..index]).expect("could not make string from buffer")
2097 );
2098
2099 println!("first request ended, will send second one");
2100 let request2 = format!("GET / HTTP/1.1\r\nHost: localhost:{front_port}\r\n\r\n");
2101 let w2 = client.write(request2.as_bytes());
2102 println!("http client write: {w2:?}");
2103 barrier.wait();
2104
2105 let mut buffer2 = [0; 4096];
2106 let mut index = 0;
2107
2108 loop {
2109 assert!(index <= expected_len);
2110 if index == expected_len {
2111 break;
2112 }
2113
2114 let r2 = client.read(&mut buffer2[index..]);
2115 println!("http client read: {r2:?}");
2116 match r2 {
2117 Err(e) => panic!("client request should not fail. Error: {e:?}"),
2118 Ok(sz) => {
2119 index += sz;
2120 }
2121 }
2122 }
2123 println!(
2124 "Response: {}",
2125 str::from_utf8(&buffer2[..index]).expect("could not make string from buffer")
2126 );
2127
2128 command
2130 .write_message(&WorkerRequest {
2131 id: "ID_STOP".to_owned(),
2132 content: RequestType::SoftStop(SoftStop {}).into(),
2133 })
2134 .expect("could not send SoftStop");
2135 backend_server.unblock();
2137 });
2138 }
2139
2140 use self::tiny_http::Response;
2141
2142 fn start_server(server: &tiny_http::Server, barrier: Arc<Barrier>) {
2143 let addr = server.server_addr();
2144 info!("starting web server on {:?}", addr);
2145 barrier.wait();
2146
2147 for request in server.incoming_requests() {
2148 info!(
2149 "backend web server got request -> method: {:?}, url: {:?}, headers: {:?}",
2150 request.method(),
2151 request.url(),
2152 request.headers()
2153 );
2154
2155 let response = Response::from_string("hello world");
2156 request
2157 .respond(response)
2158 .expect("could not respond to request");
2159 info!("backend web server sent response");
2160 barrier.wait();
2161 info!("server session stopped");
2162 }
2163
2164 println!("server on {addr:?} closed");
2165 }
2166
2167 #[test]
2168 fn frontend_from_request_test() {
2169 let cluster_id1 = "cluster_1".to_owned();
2170 let cluster_id2 = "cluster_2".to_owned();
2171 let cluster_id3 = "cluster_3".to_owned();
2172 let uri1 = "/".to_owned();
2173 let uri2 = "/yolo".to_owned();
2174 let uri3 = "/yolo/swag".to_owned();
2175
2176 let mut fronts = Router::new();
2177 fronts
2178 .add_http_front(&HttpFrontend {
2179 address: "0.0.0.0:80".parse().unwrap(),
2180 hostname: "lolcatho.st".to_owned(),
2181 method: None,
2182 path: PathRule::prefix(uri1),
2183 position: RulePosition::Tree,
2184 cluster_id: Some(cluster_id1),
2185 tags: None,
2186 redirect: None,
2187 redirect_scheme: None,
2188 redirect_template: None,
2189 rewrite_host: None,
2190 rewrite_path: None,
2191 rewrite_port: None,
2192 required_auth: None,
2193 headers: Vec::new(),
2194 hsts: None,
2195 })
2196 .expect("Could not add http frontend");
2197 fronts
2198 .add_http_front(&HttpFrontend {
2199 address: "0.0.0.0:80".parse().unwrap(),
2200 hostname: "lolcatho.st".to_owned(),
2201 method: None,
2202 path: PathRule::prefix(uri2),
2203 position: RulePosition::Tree,
2204 cluster_id: Some(cluster_id2),
2205 tags: None,
2206 redirect: None,
2207 redirect_scheme: None,
2208 redirect_template: None,
2209 rewrite_host: None,
2210 rewrite_path: None,
2211 rewrite_port: None,
2212 required_auth: None,
2213 headers: Vec::new(),
2214 hsts: None,
2215 })
2216 .expect("Could not add http frontend");
2217 fronts
2218 .add_http_front(&HttpFrontend {
2219 address: "0.0.0.0:80".parse().unwrap(),
2220 hostname: "lolcatho.st".to_owned(),
2221 method: None,
2222 path: PathRule::prefix(uri3),
2223 position: RulePosition::Tree,
2224 cluster_id: Some(cluster_id3),
2225 tags: None,
2226 redirect: None,
2227 redirect_scheme: None,
2228 redirect_template: None,
2229 rewrite_host: None,
2230 rewrite_path: None,
2231 rewrite_port: None,
2232 required_auth: None,
2233 headers: Vec::new(),
2234 hsts: None,
2235 })
2236 .expect("Could not add http frontend");
2237 fronts
2238 .add_http_front(&HttpFrontend {
2239 address: "0.0.0.0:80".parse().unwrap(),
2240 hostname: "other.domain".to_owned(),
2241 method: None,
2242 path: PathRule::prefix("/test".to_owned()),
2243 position: RulePosition::Tree,
2244 cluster_id: Some("cluster_1".to_owned()),
2245 tags: None,
2246 redirect: None,
2247 redirect_scheme: None,
2248 redirect_template: None,
2249 rewrite_host: None,
2250 rewrite_path: None,
2251 rewrite_port: None,
2252 required_auth: None,
2253 headers: Vec::new(),
2254 hsts: None,
2255 })
2256 .expect("Could not add http frontend");
2257
2258 let address = SocketAddress::new_v4(127, 0, 0, 1, 1030);
2259
2260 let default_config = ListenerBuilder::new_http(address)
2261 .to_http(None)
2262 .expect("Could not create default HTTP listener config");
2263
2264 let listener = HttpListener {
2265 listener: None,
2266 address: address.into(),
2267 fronts,
2268 answers: Rc::new(RefCell::new(HttpAnswers::new(&BTreeMap::new()).unwrap())),
2269 config: default_config,
2270 token: Token(0),
2271 active: true,
2272 tags: BTreeMap::new(),
2273 };
2274
2275 let frontend1 = listener.frontend_from_request("lolcatho.st", "/", &Method::Get);
2276 let frontend2 = listener.frontend_from_request("lolcatho.st", "/test", &Method::Get);
2277 let frontend3 = listener.frontend_from_request("lolcatho.st", "/yolo/test", &Method::Get);
2278 let frontend4 = listener.frontend_from_request("lolcatho.st", "/yolo/swag", &Method::Get);
2279 let frontend5 = listener.frontend_from_request("domain", "/", &Method::Get);
2280 assert_eq!(
2281 frontend1
2282 .expect("should find frontend")
2283 .cluster_id
2284 .as_deref(),
2285 Some("cluster_1")
2286 );
2287 assert_eq!(
2288 frontend2
2289 .expect("should find frontend")
2290 .cluster_id
2291 .as_deref(),
2292 Some("cluster_1")
2293 );
2294 assert_eq!(
2295 frontend3
2296 .expect("should find frontend")
2297 .cluster_id
2298 .as_deref(),
2299 Some("cluster_2")
2300 );
2301 assert_eq!(
2302 frontend4
2303 .expect("should find frontend")
2304 .cluster_id
2305 .as_deref(),
2306 Some("cluster_3")
2307 );
2308 assert!(frontend5.is_err());
2309 }
2310
2311 #[test]
2312 fn h2_stream_idle_timeout_inherits_back_timeout() {
2313 let address = SocketAddress::new_v4(127, 0, 0, 1, 1040);
2314 let build = |back_timeout: u32, explicit: Option<u32>| -> HttpListener {
2315 let mut cfg = ListenerBuilder::new_http(address)
2316 .to_http(None)
2317 .expect("default HTTP listener config");
2318 cfg.back_timeout = back_timeout;
2319 cfg.h2_stream_idle_timeout_seconds = explicit;
2320 HttpListener::new(cfg, Token(0)).expect("build listener")
2321 };
2322
2323 assert_eq!(
2325 build(180, None).get_h2_stream_idle_timeout(),
2326 Duration::from_secs(180)
2327 );
2328
2329 assert_eq!(
2332 build(5, None).get_h2_stream_idle_timeout(),
2333 Duration::from_secs(30)
2334 );
2335
2336 assert_eq!(
2339 build(180, Some(10)).get_h2_stream_idle_timeout(),
2340 Duration::from_secs(10)
2341 );
2342 assert_eq!(
2343 build(5, Some(600)).get_h2_stream_idle_timeout(),
2344 Duration::from_secs(600)
2345 );
2346
2347 assert_eq!(
2349 build(180, Some(0)).get_h2_stream_idle_timeout(),
2350 Duration::from_secs(1)
2351 );
2352 }
2353}