1use std::{
3 cell::RefCell,
4 collections::{HashMap, HashSet, VecDeque, hash_map::Entry},
5 hash::{DefaultHasher, Hash, Hasher},
6 io::Error as IoError,
7 net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
8 os::unix::io::{AsRawFd, FromRawFd},
9 rc::Rc,
10 str::FromStr,
11 sync::LazyLock,
12 time::{Duration, Instant},
13};
14
15use mio::{
16 Events, Interest, Poll, Token,
17 net::{TcpListener as MioTcpListener, TcpStream, UdpSocket as MioUdpSocket},
18};
19use slab::Slab;
20use sozu_command::{
21 channel::Channel,
22 config::MetricDetailLevel,
23 logging,
24 proto::command::{
25 ActivateListener, AddBackend, CertificatesWithFingerprints, Cluster, ClusterHashes,
26 ClusterInformations, DeactivateListener, Event, EventKind, HttpListenerConfig,
27 HttpsListenerConfig, InitialState, ListenerType, LoadBalancingAlgorithms, LoadMetric,
28 MetricDetail, MetricsConfiguration, RemoveBackend, Request, ResponseContent,
29 ResponseStatus, ServerConfig, TcpListenerConfig as CommandTcpListener,
30 UdpListenerConfig as CommandUdpListener, UpdateHttpListenerConfig,
31 UpdateHttpsListenerConfig, UpdateTcpListenerConfig, UpdateUdpListenerConfig, WorkerRequest,
32 WorkerResponse, request::RequestType, response_content::ContentType,
33 },
34 ready::Ready,
35 scm_socket::{Listeners, ScmSocket, ScmSocketError},
36 state::ConfigState,
37};
38
39use crate::metrics::names;
40use crate::{
41 AcceptError, Protocol, ProxyConfiguration, ProxySession, SessionIsToBeClosed,
42 backends::{Backend, BackendMap},
43 features::FEATURES,
44 health_check::HealthChecker,
45 http, https,
46 metrics::METRICS,
47 pool::Pool,
48 tcp,
49 timer::Timer,
50 udp,
51};
52
53pub const CONN_RETRIES: u8 = 3;
55
56pub const PER_SOURCE_BUCKETS: usize = 256;
70
71static PER_SOURCE_BUCKET_KEYS: LazyLock<[&'static str; PER_SOURCE_BUCKETS]> = LazyLock::new(|| {
76 let mut keys: [&'static str; PER_SOURCE_BUCKETS] = [""; PER_SOURCE_BUCKETS];
77 for (i, slot) in keys.iter_mut().enumerate() {
78 let owned = format!("client.connect.per_source.bucket_{i:03}");
80 *slot = Box::leak(owned.into_boxed_str());
81 }
82 keys
83});
84
85fn per_source_bucket(peer: &SocketAddr) -> &'static str {
90 let mut hasher = DefaultHasher::new();
91 match peer.ip() {
92 IpAddr::V4(v4) => {
93 let octets = v4.octets();
94 let masked = Ipv4Addr::new(octets[0], octets[1], octets[2], 0);
96 masked.hash(&mut hasher);
97 }
98 IpAddr::V6(v6) => {
99 let octets = v6.octets();
100 let mut masked_octets = [0u8; 16];
102 masked_octets[..6].copy_from_slice(&octets[..6]);
103 Ipv6Addr::from(masked_octets).hash(&mut hasher);
104 }
105 }
106 let idx = (hasher.finish() as usize) % PER_SOURCE_BUCKETS;
107 PER_SOURCE_BUCKET_KEYS[idx]
108}
109
110const ACCEPT_SATURATION_TICK: Duration = Duration::from_secs(1);
115
116pub type ProxyChannel = Channel<WorkerResponse, WorkerRequest>;
117
118thread_local! {
119 pub static QUEUE: RefCell<VecDeque<WorkerResponse>> = const { RefCell::new(VecDeque::new()) };
120}
121
122thread_local! {
123 pub static TIMER: RefCell<Timer<Token>> = RefCell::new(Timer::default());
124}
125
126pub fn push_queue(message: WorkerResponse) {
127 QUEUE.with(|queue| {
128 (*queue.borrow_mut()).push_back(message);
129 });
130}
131
132pub fn push_event(event: Event) {
133 QUEUE.with(|queue| {
134 (*queue.borrow_mut()).push_back(WorkerResponse {
135 id: "EVENT".to_string(),
136 message: String::new(),
137 status: ResponseStatus::Processing.into(),
138 content: Some(ContentType::Event(event).into()),
139 });
140 });
141}
142
143fn worker_metric_detail_status_content(
149 configured: MetricDetailLevel,
150 effective: MetricDetailLevel,
151 previous_effective: MetricDetailLevel,
152 active_lease_count: u32,
153) -> ResponseContent {
154 use sozu_command::proto::command::WorkerMetricDetailStatus;
155 ContentType::WorkerMetricDetailStatus(WorkerMetricDetailStatus {
156 configured: MetricDetail::from(configured) as i32,
157 effective: MetricDetail::from(effective) as i32,
158 previous_effective: MetricDetail::from(previous_effective) as i32,
159 active_lease_count,
160 })
161 .into()
162}
163
164fn push_metric_detail_transition(
171 previous: MetricDetailLevel,
172 effective: MetricDetailLevel,
173 transition_kind: &'static str,
174 client_id: Option<String>,
175) {
176 use sozu_command::proto::command::MetricDetailTransition;
177 if previous == effective {
182 return;
183 }
184 push_event(Event {
185 kind: EventKind::MetricDetailChanged as i32,
186 cluster_id: None,
187 backend_id: None,
188 address: None,
189 metric_detail: Some(MetricDetailTransition {
190 previous_effective: MetricDetail::from(previous) as i32,
191 effective: MetricDetail::from(effective) as i32,
192 transition_kind: transition_kind.to_owned(),
193 client_id,
194 }),
195 });
196}
197
198#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
199pub struct ListenToken(pub usize);
200#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
201pub struct SessionToken(pub usize);
202
203impl From<usize> for ListenToken {
204 fn from(val: usize) -> ListenToken {
205 ListenToken(val)
206 }
207}
208
209impl From<ListenToken> for usize {
210 fn from(val: ListenToken) -> usize {
211 val.0
212 }
213}
214
215impl From<usize> for SessionToken {
216 fn from(val: usize) -> SessionToken {
217 SessionToken(val)
218 }
219}
220
221impl From<SessionToken> for usize {
222 fn from(val: SessionToken) -> usize {
223 val.0
224 }
225}
226
227pub struct SessionManager {
228 pub max_connections: usize,
229 pub nb_connections: usize,
230 pub can_accept: bool,
231 pub slab: Slab<Rc<RefCell<dyn ProxySession>>>,
232 pub max_connections_per_ip: u64,
236 pub retry_after: u32,
240 connections_per_cluster_ip: HashMap<String, HashMap<IpAddr, usize>>,
264 cluster_ip_tracks: HashMap<Token, HashMap<String, HashSet<IpAddr>>>,
271}
272
273impl SessionManager {
274 pub fn new(
275 slab: Slab<Rc<RefCell<dyn ProxySession>>>,
276 max_connections: usize,
277 max_connections_per_ip: u64,
278 retry_after: u32,
279 ) -> Rc<RefCell<Self>> {
280 Rc::new(RefCell::new(SessionManager {
281 max_connections,
282 nb_connections: 0,
283 can_accept: true,
284 slab,
285 max_connections_per_ip,
286 retry_after,
287 connections_per_cluster_ip: HashMap::new(),
288 cluster_ip_tracks: HashMap::new(),
289 }))
290 }
291
292 pub fn effective_max_connections_per_ip(&self, override_value: Option<u64>) -> u64 {
297 override_value.unwrap_or(self.max_connections_per_ip)
298 }
299
300 pub fn effective_retry_after(&self, override_value: Option<u32>) -> u32 {
304 override_value.unwrap_or(self.retry_after)
305 }
306
307 pub fn cluster_ip_at_limit(
319 &self,
320 token: Token,
321 cluster_id: &str,
322 ip: &IpAddr,
323 override_value: Option<u64>,
324 ) -> bool {
325 let limit = self.effective_max_connections_per_ip(override_value);
326 if limit == 0 {
327 return false;
328 }
329 debug_assert!(
332 limit > 0,
333 "limit==0 (unlimited) must have returned before reaching the bounded check"
334 );
335 let already_tracked = self
336 .cluster_ip_tracks
337 .get(&token)
338 .and_then(|by_cluster| by_cluster.get(cluster_id))
339 .is_some_and(|ips| ips.contains(ip));
340 if already_tracked {
341 debug_assert!(
345 self.connections_per_cluster_ip
346 .get(cluster_id)
347 .and_then(|by_ip| by_ip.get(ip))
348 .is_some_and(|c| *c > 0),
349 "a tracked (token, cluster, ip) slot must have a positive forward count"
350 );
351 return false;
352 }
353 self.connections_per_cluster_ip
354 .get(cluster_id)
355 .and_then(|by_ip| by_ip.get(ip))
356 .is_some_and(|c| (*c as u64) >= limit)
357 }
358
359 pub fn track_cluster_ip(&mut self, token: Token, cluster_id: String, ip: IpAddr) {
369 let count_before = self
373 .connections_per_cluster_ip
374 .get(&cluster_id)
375 .and_then(|by_ip| by_ip.get(&ip))
376 .copied()
377 .unwrap_or(0);
378 let inserted = self
379 .cluster_ip_tracks
380 .entry(token)
381 .or_default()
382 .entry(cluster_id.clone())
383 .or_default()
384 .insert(ip);
385 if inserted {
386 *self
387 .connections_per_cluster_ip
388 .entry(cluster_id.clone())
389 .or_default()
390 .entry(ip)
391 .or_insert(0) += 1;
392 }
393 debug_assert!(
397 self.cluster_ip_tracks
398 .get(&token)
399 .and_then(|by_cluster| by_cluster.get(&cluster_id))
400 .is_some_and(|ips| ips.contains(&ip)),
401 "track must leave the (token, cluster, ip) recorded in the reverse index"
402 );
403 debug_assert_eq!(
404 self.connections_per_cluster_ip
405 .get(&cluster_id)
406 .and_then(|by_ip| by_ip.get(&ip))
407 .copied()
408 .unwrap_or(0),
409 count_before + inserted as usize,
410 "forward count must advance by exactly 1 on first track, 0 on a repeat"
411 );
412 #[cfg(debug_assertions)]
413 self.check_invariants();
414 }
415
416 pub fn untrack_all_cluster_ip(&mut self, token: Token) {
423 let Some(by_cluster) = self.cluster_ip_tracks.remove(&token) else {
424 return;
425 };
426 debug_assert!(
429 !self.cluster_ip_tracks.contains_key(&token),
430 "untrack_all must evict the token from the reverse index"
431 );
432 for (cluster_id, ips) in by_cluster {
433 let Entry::Occupied(mut outer) = self.connections_per_cluster_ip.entry(cluster_id)
434 else {
435 continue;
436 };
437 for ip in ips {
438 if let Entry::Occupied(mut inner) = outer.get_mut().entry(ip) {
439 let count = inner.get_mut();
440 *count = count.saturating_sub(1);
441 if *count == 0 {
442 inner.remove();
443 }
444 }
445 }
446 if outer.get().is_empty() {
447 outer.remove();
448 }
449 }
450 debug_assert!(
453 self.connections_per_cluster_ip
454 .values()
455 .all(|by_ip| !by_ip.is_empty() && by_ip.values().all(|&c| c > 0)),
456 "untrack_all must not leave empty inner maps or zero-count ips behind"
457 );
458 #[cfg(debug_assertions)]
459 self.check_invariants();
460 }
461
462 pub fn clear_cluster_ip_tracking(&mut self) {
467 self.cluster_ip_tracks.clear();
468 self.connections_per_cluster_ip.clear();
469 debug_assert!(
472 self.cluster_ip_tracks.is_empty() && self.connections_per_cluster_ip.is_empty(),
473 "clear must wipe both the reverse index and the forward count map"
474 );
475 #[cfg(debug_assertions)]
476 self.check_invariants();
477 }
478
479 pub fn at_capacity(&self) -> bool {
481 self.slab.len() >= self.accept_slab_threshold()
482 }
483
484 pub fn accept_slab_threshold(&self) -> usize {
496 let threshold = 10 + 2 * self.max_connections;
497 debug_assert!(
501 threshold > self.max_connections,
502 "accept gate must sit strictly above max_connections to reserve system slots"
503 );
504 threshold
505 }
506
507 pub fn check_limits(&mut self) -> bool {
510 debug_assert!(
513 self.nb_connections <= self.max_connections,
514 "nb_connections must never exceed max_connections"
515 );
516 if self.nb_connections >= self.max_connections {
517 error!("max number of session connection reached, flushing the accept queue");
518 gauge!(names::accept_queue::BACKPRESSURE, 1);
519 self.can_accept = false;
520 debug_assert!(
522 !self.can_accept,
523 "refusing at the cap must clear can_accept"
524 );
525 return false;
526 }
527
528 if self.at_capacity() {
529 error!("not enough memory to accept another session, flushing the accept queue");
530 error!(
531 "nb_connections: {}, max_connections: {}",
532 self.nb_connections, self.max_connections
533 );
534 gauge!(names::accept_queue::BACKPRESSURE, 1);
535 self.can_accept = false;
536
537 debug_assert!(
538 !self.can_accept,
539 "refusing at slab capacity must clear can_accept"
540 );
541 return false;
542 }
543
544 debug_assert!(
546 self.nb_connections < self.max_connections && !self.at_capacity(),
547 "check_limits returned room while a gate was actually saturated"
548 );
549 true
550 }
551
552 pub fn to_session(token: Token) -> SessionToken {
553 SessionToken(token.0)
554 }
555
556 pub fn incr(&mut self) {
557 let before = self.nb_connections;
558 self.nb_connections += 1;
559 assert!(self.nb_connections <= self.max_connections);
560 debug_assert_eq!(
562 self.nb_connections,
563 before + 1,
564 "incr must raise nb_connections by exactly one"
565 );
566 gauge!(names::client::CONNECTIONS, self.nb_connections);
572 }
573
574 pub fn decr(&mut self) {
577 assert!(self.nb_connections != 0);
578 let before = self.nb_connections;
579 self.nb_connections -= 1;
580 debug_assert_eq!(
582 self.nb_connections,
583 before - 1,
584 "decr must lower nb_connections by exactly one"
585 );
586 gauge!(names::client::CONNECTIONS, self.nb_connections);
587
588 if !self.can_accept && self.nb_connections < self.max_connections * 90 / 100 {
590 debug!(
591 "nb_connections = {}, max_connections = {}, starting to accept again",
592 self.nb_connections, self.max_connections
593 );
594 gauge!(names::accept_queue::BACKPRESSURE, 0);
595 self.can_accept = true;
596 }
597 }
598
599 #[cfg(debug_assertions)]
604 fn check_invariants(&self) {
605 debug_assert!(
607 self.nb_connections <= self.max_connections,
608 "nb_connections {} exceeds max_connections {}",
609 self.nb_connections,
610 self.max_connections
611 );
612 debug_assert!(
615 self.connections_per_cluster_ip
616 .values()
617 .all(|by_ip| !by_ip.is_empty() && by_ip.values().all(|&c| c > 0)),
618 "connections_per_cluster_ip holds an empty inner map or a zero count"
619 );
620 debug_assert!(
625 self.cluster_ip_tracks.values().all(|by_cluster| {
626 by_cluster.iter().all(|(cluster_id, ips)| {
627 ips.iter().all(|ip| {
628 self.connections_per_cluster_ip
629 .get(cluster_id)
630 .and_then(|by_ip| by_ip.get(ip))
631 .is_some_and(|&c| c > 0)
632 })
633 })
634 }),
635 "a tracked (token, cluster, ip) slot has no positive forward count"
636 );
637 debug_assert!(
639 self.cluster_ip_tracks.values().all(|by_cluster| {
640 !by_cluster.is_empty() && by_cluster.values().all(|ips| !ips.is_empty())
641 }),
642 "cluster_ip_tracks retains an empty per-token or per-cluster entry"
643 );
644 }
645}
646
647#[derive(thiserror::Error, Debug)]
648pub enum ServerError {
649 #[error("could not create event loop with MIO poll: {0}")]
650 CreatePoll(IoError),
651 #[error("could not clone the MIO registry: {0}")]
652 CloneRegistry(IoError),
653 #[error("could not register the channel: {0}")]
654 RegisterChannel(IoError),
655 #[error("{msg}:{scm_err}")]
656 ScmSocket {
657 msg: String,
658 scm_err: ScmSocketError,
659 },
660}
661
662pub struct Server {
679 accept_queue_timeout: Duration,
680 accept_queue: VecDeque<(
687 TcpStream,
688 ListenToken,
689 Protocol,
690 Instant,
691 Option<SocketAddr>,
692 )>,
693 evict_on_queue_full: bool,
697 accept_ready: HashSet<ListenToken>,
698 backends: Rc<RefCell<BackendMap>>,
699 base_sessions_count: usize,
700 channel: ProxyChannel,
701 config_state: ConfigState,
702 current_poll_errors: i32,
703 health_checker: HealthChecker,
704 http: Rc<RefCell<http::HttpProxy>>,
705 https: Rc<RefCell<https::HttpsProxy>>,
706 last_sessions_len: usize,
707 last_shutting_down_message: Option<Instant>,
708 last_zombie_check: Instant,
709 loop_start: Instant,
710 started_at: Instant,
714 last_saturation_tick: Instant,
719 max_poll_errors: i32, pool: Rc<RefCell<Pool>>,
726 pub poll: Poll,
727 poll_timeout: Option<Duration>, scm_listeners: Option<Listeners>,
729 scm: ScmSocket,
730 sessions: Rc<RefCell<SessionManager>>,
731 should_poll_at: Option<Instant>,
732 shutting_down: Option<String>,
733 tcp: Rc<RefCell<tcp::TcpProxy>>,
734 udp: Rc<RefCell<udp::UdpProxy>>,
735 zombie_check_interval: Duration,
736}
737
738impl Server {
739 pub fn try_new_from_config(
740 worker_to_main_channel: ProxyChannel,
741 worker_to_main_scm: ScmSocket,
742 config: ServerConfig,
743 initial_state: InitialState,
744 expects_initial_status: bool,
745 ) -> Result<Self, ServerError> {
746 let event_loop = Poll::new().map_err(ServerError::CreatePoll)?;
747 if let Some(cap) = config.basic_auth_max_credential_bytes {
754 crate::protocol::mux::auth::set_max_decoded_credential_bytes(cap as usize);
755 }
756 #[cfg(all(target_os = "linux", feature = "splice"))]
763 if let Some(cap) = config.splice_pipe_capacity_bytes {
764 crate::splice::set_pipe_capacity(cap as usize);
765 }
766 let pool = Rc::new(RefCell::new(Pool::with_capacity(
767 config.min_buffers as usize,
768 config.max_buffers as usize,
769 config.buffer_size as usize,
770 )));
771 let backends = Rc::new(RefCell::new(BackendMap::new()));
772
773 let sessions: Rc<RefCell<SessionManager>> = SessionManager::new(
778 Slab::with_capacity(config.slab_capacity() as usize),
779 config.max_connections as usize,
780 config
781 .max_connections_per_ip
782 .unwrap_or(sozu_command::config::DEFAULT_MAX_CONNECTIONS_PER_IP),
783 config
784 .retry_after
785 .unwrap_or(sozu_command::config::DEFAULT_RETRY_AFTER),
786 );
787 {
788 let mut s = sessions.borrow_mut();
789 let entry = s.slab.vacant_entry();
790 trace!("taking token {:?} for channel", SessionToken(entry.key()));
791 entry.insert(Rc::new(RefCell::new(ListenSession {
792 protocol: Protocol::Channel,
793 })));
794 }
795 {
796 let mut s = sessions.borrow_mut();
797 let entry = s.slab.vacant_entry();
798 trace!("taking token {:?} for metrics", SessionToken(entry.key()));
799 entry.insert(Rc::new(RefCell::new(ListenSession {
800 protocol: Protocol::Timer,
801 })));
802 }
803 {
804 let mut s = sessions.borrow_mut();
805 let entry = s.slab.vacant_entry();
806 trace!("taking token {:?} for metrics", SessionToken(entry.key()));
807 entry.insert(Rc::new(RefCell::new(ListenSession {
808 protocol: Protocol::Metrics,
809 })));
810 }
811
812 Server::new(
813 event_loop,
814 worker_to_main_channel,
815 worker_to_main_scm,
816 sessions,
817 pool,
818 backends,
819 None,
820 None,
821 None,
822 config,
823 Some(initial_state),
824 expects_initial_status,
825 )
826 }
827
828 #[allow(clippy::too_many_arguments)]
829 pub fn new(
830 poll: Poll,
831 mut channel: ProxyChannel,
832 scm: ScmSocket,
833 sessions: Rc<RefCell<SessionManager>>,
834 pool: Rc<RefCell<Pool>>,
835 backends: Rc<RefCell<BackendMap>>,
836 http: Option<http::HttpProxy>,
837 https: Option<https::HttpsProxy>,
838 tcp: Option<tcp::TcpProxy>,
839 server_config: ServerConfig,
840 initial_state: Option<InitialState>,
841 expects_initial_status: bool,
842 ) -> Result<Self, ServerError> {
843 FEATURES.with(|_features| {
844 });
846
847 poll.registry()
848 .register(
849 &mut channel,
850 Token(0),
851 Interest::READABLE | Interest::WRITABLE,
852 )
853 .map_err(ServerError::RegisterChannel)?;
854
855 METRICS.with(|metrics| {
856 if let Some(sock) = (*metrics.borrow_mut()).socket_mut() {
857 poll.registry()
858 .register(sock, Token(2), Interest::WRITABLE)
859 .expect("should register the metrics socket");
860 }
861 });
862
863 let base_sessions_count = sessions.borrow().slab.len();
864
865 let http = Rc::new(RefCell::new(match http {
866 Some(http) => http,
867 None => {
868 let registry = poll
869 .registry()
870 .try_clone()
871 .map_err(ServerError::CloneRegistry)?;
872
873 http::HttpProxy::new(registry, sessions.clone(), pool.clone(), backends.clone())
874 }
875 }));
876
877 let https = Rc::new(RefCell::new(match https {
878 Some(https) => https,
879 None => {
880 let registry = poll
881 .registry()
882 .try_clone()
883 .map_err(ServerError::CloneRegistry)?;
884
885 https::HttpsProxy::new(registry, sessions.clone(), pool.clone(), backends.clone())
886 }
887 }));
888
889 let tcp = Rc::new(RefCell::new(match tcp {
890 Some(tcp) => tcp,
891 None => {
892 let registry = poll
893 .registry()
894 .try_clone()
895 .map_err(ServerError::CloneRegistry)?;
896
897 tcp::TcpProxy::new(registry, sessions.clone(), pool.clone(), backends.clone())
898 }
899 }));
900
901 let udp = Rc::new(RefCell::new({
905 let registry = poll
906 .registry()
907 .try_clone()
908 .map_err(ServerError::CloneRegistry)?;
909
910 udp::UdpProxy::new(
911 registry,
912 sessions.clone(),
913 pool.clone(),
914 backends.clone(),
915 server_config.max_connections as usize,
916 server_config.buffer_size as usize,
917 )
918 }));
919
920 let mut server = Server {
921 accept_queue_timeout: Duration::from_secs(u64::from(
922 server_config.accept_queue_timeout,
923 )),
924 accept_queue: VecDeque::new(),
925 evict_on_queue_full: server_config.evict_on_queue_full.unwrap_or(false),
926 accept_ready: HashSet::new(),
927 backends,
928 base_sessions_count,
929 channel,
930 config_state: ConfigState::new(),
931 current_poll_errors: 0,
932 health_checker: HealthChecker::new(),
933 http,
934 https,
935 last_sessions_len: 0, last_shutting_down_message: None,
937 last_zombie_check: Instant::now(), loop_start: Instant::now(), started_at: Instant::now(), last_saturation_tick: Instant::now(), max_poll_errors: 10000, pool,
943 poll_timeout: Some(Duration::from_millis(1000)), poll,
945 scm_listeners: None,
946 scm,
947 sessions,
948 should_poll_at: None,
949 shutting_down: None,
950 tcp,
951 udp,
952 zombie_check_interval: Duration::from_secs(u64::from(
953 server_config.zombie_check_interval,
954 )),
955 };
956
957 if let Some(state) = initial_state {
959 for request in state.requests {
960 trace!("generating initial config request: {:#?}", request);
961 server.notify_proxys(request);
962 }
963
964 QUEUE.with(|queue| {
966 (*queue.borrow_mut()).clear();
967 });
968 }
969
970 if expects_initial_status {
971 server.block_channel();
974 let msg = server.channel.read_message();
975 debug!("got message: {:?}", msg);
976
977 if let Ok(WorkerRequest {
978 id,
979 content:
980 Request {
981 request_type: Some(RequestType::Status(_)),
982 },
983 }) = msg
984 {
985 if let Err(e) = server.channel.write_message(&WorkerResponse::ok(id)) {
986 error!("Could not send an ok to the main process: {}", e);
987 }
988 } else {
989 panic!(
990 "plz give me a status request first when I start, you sent me this instead: {msg:?}"
991 );
992 }
993 server.unblock_channel();
994 }
995
996 info!("will try to receive listeners");
997 server
998 .scm
999 .set_blocking(true)
1000 .map_err(|scm_err| ServerError::ScmSocket {
1001 msg: "Could not set the scm socket to blocking".to_string(),
1002 scm_err,
1003 })?;
1004 let listeners =
1005 server
1006 .scm
1007 .receive_listeners()
1008 .map_err(|scm_err| ServerError::ScmSocket {
1009 msg: "could not receive listeners from the scm socket".to_string(),
1010 scm_err,
1011 })?;
1012 server
1013 .scm
1014 .set_blocking(false)
1015 .map_err(|scm_err| ServerError::ScmSocket {
1016 msg: "Could not set the scm socket to unblocking".to_string(),
1017 scm_err,
1018 })?;
1019 info!("received listeners: {:?}", listeners);
1020 server.scm_listeners = Some(listeners);
1021
1022 Ok(server)
1023 }
1024
1025 pub fn run(&mut self) {
1027 let mut events = Events::with_capacity(1024); self.last_sessions_len = self.sessions.borrow().slab.len();
1029
1030 self.last_zombie_check = Instant::now();
1031 self.loop_start = Instant::now();
1032
1033 loop {
1034 self.check_for_poll_errors();
1035
1036 let timeout = self.reset_loop_time_and_get_timeout();
1037
1038 match self.poll.poll(&mut events, timeout) {
1039 Ok(_) => self.current_poll_errors = 0,
1040 Err(error) => {
1041 error!("Error while polling events: {:?}", error);
1042 self.current_poll_errors += 1;
1043 continue;
1044 }
1045 }
1046
1047 let after_epoll = Instant::now();
1048 time!(
1049 names::event_loop::EPOLL_TIME,
1050 (after_epoll - self.loop_start).as_millis()
1051 );
1052 self.loop_start = after_epoll;
1053
1054 self.send_queue();
1055
1056 for event in events.iter() {
1057 match event.token() {
1058 Token(0) => {
1060 if event.is_error() {
1061 error!("error reading from command channel");
1062 continue;
1063 }
1064 if event.is_read_closed() || event.is_write_closed() {
1065 error!("command channel was closed");
1066 return;
1067 }
1068 let ready = Ready::from(event);
1069 self.channel.handle_events(ready);
1070
1071 loop {
1073 QUEUE.with(|queue| {
1074 if !(*queue.borrow()).is_empty() {
1075 self.channel.interest.insert(Ready::WRITABLE);
1076 }
1077 });
1078
1079 if self.channel.readiness() == Ready::EMPTY {
1082 break;
1083 }
1084
1085 if self.read_channel_messages_and_notify() {
1087 return;
1088 }
1089
1090 QUEUE.with(|queue| {
1091 if !(*queue.borrow()).is_empty() {
1092 self.channel.interest.insert(Ready::WRITABLE);
1093 }
1094 });
1095
1096 self.send_queue();
1097 }
1098 }
1099 Token(1) => {
1101 while let Some(t) = TIMER.with(|timer| timer.borrow_mut().poll()) {
1102 self.timeout(t);
1103 }
1104 }
1105 Token(2) => METRICS.with(|metrics| {
1107 (*metrics.borrow_mut()).writable();
1108 }),
1109 token if self.health_checker.owns_token(token) => {
1112 self.health_checker.ready(token);
1113 }
1114 token if self.udp.borrow().health_owns_token(token) => {
1115 self.udp.borrow_mut().health_ready(token);
1116 }
1117 token => self.ready(token, Ready::from(event)),
1118 }
1119 }
1120
1121 if let Some(t) = self.should_poll_at.as_ref() {
1122 if *t <= Instant::now() {
1123 while let Some(t) = TIMER.with(|timer| timer.borrow_mut().poll()) {
1124 self.timeout(t);
1126 }
1127 }
1128 }
1129 self.handle_remaining_readiness();
1130 self.create_sessions();
1131
1132 self.should_poll_at = TIMER.with(|timer| timer.borrow().next_poll_date());
1133
1134 self.zombie_check();
1135 self.health_checker
1136 .poll(&self.backends, self.poll.registry());
1137 self.udp.borrow_mut().health_poll();
1141
1142 {
1155 let sessions = self.sessions.borrow();
1156 let nb_connections = sessions.nb_connections;
1157 let max_connections = sessions.max_connections;
1158 let slab_len = sessions.slab.len();
1159 let slab_capacity = sessions.slab.capacity();
1160 let accept_threshold = sessions.accept_slab_threshold();
1161
1162 gauge!(names::client::CONNECTIONS, nb_connections);
1163 gauge!(names::client::CONNECTIONS_MAX, max_connections);
1164 if max_connections > 0 {
1165 gauge!(
1166 "client.connections_percent",
1167 nb_connections * 100 / max_connections
1168 );
1169 }
1170
1171 gauge!(names::slab::ENTRIES, slab_len);
1172 gauge!(names::slab::CAPACITY, slab_capacity);
1173 if slab_capacity > 0 {
1174 gauge!(names::slab::USAGE_PERCENT, slab_len * 100 / slab_capacity);
1175 }
1176 if accept_threshold > 0 {
1177 gauge!(
1178 "slab.accept_threshold_percent",
1179 slab_len * 100 / accept_threshold
1180 );
1181 }
1182 }
1183 {
1189 let pool = self.pool.borrow();
1190 let used = pool.inner.used();
1191 let capacity = pool.inner.capacity();
1192 gauge!(names::buffer::IN_USE, used);
1193 gauge!(names::buffer::CAPACITY, capacity);
1194 if capacity > 0 {
1195 gauge!(names::buffer::USAGE_PERCENT, used * 100 / capacity);
1196 }
1197 }
1198 let now = Instant::now();
1206 if now.duration_since(self.last_saturation_tick) >= ACCEPT_SATURATION_TICK {
1207 if !self.sessions.borrow().can_accept {
1208 incr!(names::accept_queue::SATURATED_SECONDS);
1209 }
1210 self.last_saturation_tick = now;
1211 }
1212 gauge!(
1216 "process.uptime_seconds",
1217 self.started_at.elapsed().as_secs() as usize
1218 );
1219 gauge!(
1224 "server.live",
1225 if self.shutting_down.is_some() { 0 } else { 1 }
1226 );
1227 METRICS.with(|metrics| {
1228 (*metrics.borrow_mut()).send_data();
1229 });
1230
1231 if self.shutting_down.is_some() && self.shut_down_sessions() {
1232 return;
1233 }
1234 }
1235 }
1236
1237 fn check_for_poll_errors(&mut self) {
1238 if self.current_poll_errors >= self.max_poll_errors {
1239 error!(
1240 "Something is going very wrong. Last {} poll() calls failed, crashing..",
1241 self.current_poll_errors
1242 );
1243 panic!(
1244 "poll() calls failed {} times in a row",
1245 self.current_poll_errors
1246 );
1247 }
1248 }
1249
1250 fn reset_loop_time_and_get_timeout(&mut self) -> Option<Duration> {
1251 let now = Instant::now();
1252 time!(
1253 names::event_loop::EVENT_LOOP_TIME,
1254 (now - self.loop_start).as_millis()
1255 );
1256
1257 let mut timeout = match self.should_poll_at.as_ref() {
1258 None => self.poll_timeout,
1259 Some(i) => {
1260 if *i <= now {
1261 self.poll_timeout
1262 } else {
1263 let dur = *i - now;
1264 match self.poll_timeout {
1265 None => Some(dur),
1266 Some(t) => {
1267 if t < dur {
1268 Some(t)
1269 } else {
1270 Some(dur)
1271 }
1272 }
1273 }
1274 }
1275 }
1276 };
1277
1278 if self.shutting_down.is_some() {
1279 let shutdown_tick = Duration::from_millis(100);
1280 timeout = match timeout {
1281 None => Some(shutdown_tick),
1282 Some(current) => Some(current.min(shutdown_tick)),
1283 };
1284 }
1285
1286 self.loop_start = now;
1287 timeout
1288 }
1289
1290 fn read_channel_messages_and_notify(&mut self) -> bool {
1292 if !self.channel.readiness().is_readable() {
1293 return false;
1294 }
1295
1296 if let Err(e) = self.channel.readable() {
1297 error!("error reading from channel: {:?}", e);
1298 }
1299
1300 loop {
1301 let request = self.channel.read_message();
1302 debug!("Received request {:?}", request);
1303 match request {
1304 Ok(request) => match request.content.request_type {
1305 Some(RequestType::HardStop(_)) => {
1306 let req_id = request.id.clone();
1307 self.notify(request);
1308 if let Err(e) = self.channel.write_message(&WorkerResponse::ok(req_id)) {
1309 error!("Could not send ok response to the main process: {}", e);
1310 }
1311 if let Err(e) = self.channel.run() {
1312 error!("Error while running the server channel: {}", e);
1313 }
1314 return true;
1315 }
1316 Some(RequestType::SoftStop(_)) => {
1317 self.shutting_down = Some(request.id.clone());
1318 self.last_sessions_len = self.sessions.borrow().slab.len();
1319 self.notify(request);
1320 }
1321 Some(RequestType::ReturnListenSockets(_)) => {
1322 info!("received ReturnListenSockets order");
1323 match self.return_listen_sockets() {
1324 Ok(_) => push_queue(WorkerResponse::ok(request.id)),
1325 Err(error) => push_queue(worker_response_error(
1326 request.id,
1327 format!("Could not send listeners on scm socket: {error:?}"),
1328 )),
1329 }
1330 }
1331 _ => self.notify(request),
1332 },
1333 Err(_) => {
1335 if (self.channel.interest & self.channel.readiness).is_readable() {
1337 if let Err(e) = self.channel.readable() {
1338 error!("error reading from channel: {:?}", e);
1339 }
1340 continue;
1341 }
1342 break;
1343 }
1344 }
1345 }
1346 false
1347 }
1348
1349 fn zombie_check(&mut self) {
1351 let now = Instant::now();
1352 if now - self.last_zombie_check < self.zombie_check_interval {
1353 return;
1354 }
1355 info!("zombie check");
1356 debug_assert!(
1359 now >= self.last_zombie_check,
1360 "zombie-check timestamp must never move backwards"
1361 );
1362 self.last_zombie_check = now;
1363
1364 let mut zombie_tokens = HashSet::new();
1365
1366 for (_index, session) in self
1368 .sessions
1369 .borrow_mut()
1370 .slab
1371 .iter_mut()
1372 .filter(|(_, c)| now - c.borrow().last_event() > self.zombie_check_interval)
1373 {
1374 let session_token = session.borrow().frontend_token();
1375 if !zombie_tokens.contains(&session_token) {
1376 session.borrow().print_session();
1377 zombie_tokens.insert(session_token);
1378 }
1379 }
1380
1381 debug_assert!(
1386 !self.sessions.borrow().slab.iter().any(|(_, session)| {
1387 let s = session.borrow();
1388 zombie_tokens.contains(&s.frontend_token())
1389 && matches!(
1390 s.protocol(),
1391 Protocol::HTTPListen | Protocol::HTTPSListen | Protocol::TCPListen
1392 )
1393 }),
1394 "zombie reaping must never target a listener session"
1395 );
1396
1397 let zombie_count = zombie_tokens.len() as i64;
1398 count!(names::misc::ZOMBIES, zombie_count);
1399
1400 let remaining_count = self.shut_down_sessions_by_frontend_tokens(zombie_tokens);
1401 info!(
1402 "removing {} zombies ({} remaining entries after close)",
1403 zombie_count, remaining_count
1404 );
1405 }
1406
1407 fn shut_down_sessions_by_frontend_tokens(&self, tokens: HashSet<Token>) -> usize {
1410 if tokens.is_empty() {
1411 return 0;
1412 }
1413
1414 for token in &tokens {
1416 if self.sessions.borrow().slab.contains(token.0) {
1417 let slab_before = self.sessions.borrow().slab.len();
1418 let session = { self.sessions.borrow_mut().slab.remove(token.0) };
1419 session.borrow_mut().close();
1420 self.sessions.borrow_mut().decr();
1421 debug_assert!(
1426 !self.sessions.borrow().slab.contains(token.0),
1427 "removed token must be absent from the slab"
1428 );
1429 debug_assert!(
1430 self.sessions.borrow().slab.len() < slab_before,
1431 "removing a present session must free at least its own slab slot"
1432 );
1433 }
1434 }
1435
1436 let mut dangling_entries = HashSet::new();
1438 for (entry_key, session) in &self.sessions.borrow().slab {
1439 if tokens.contains(&session.borrow().frontend_token()) {
1440 dangling_entries.insert(entry_key);
1441 }
1442 }
1443
1444 let mut dangling_entries_count = 0;
1446 for entry_key in dangling_entries {
1447 let mut sessions = self.sessions.borrow_mut();
1448 if sessions.slab.contains(entry_key) {
1449 sessions.slab.remove(entry_key);
1450 dangling_entries_count += 1;
1451 }
1452 }
1453 debug_assert!(
1457 !self
1458 .sessions
1459 .borrow()
1460 .slab
1461 .iter()
1462 .any(|(_, session)| tokens.contains(&session.borrow().frontend_token())),
1463 "no slab entry may reference a closed frontend token after teardown"
1464 );
1465 dangling_entries_count
1466 }
1467
1468 fn shut_down_sessions(&mut self) -> bool {
1470 let sessions_count = self.sessions.borrow().slab.len();
1471 let mut sessions_to_shut_down = HashSet::new();
1472
1473 for (_key, session) in &self.sessions.borrow().slab {
1474 let mut session = session.borrow_mut();
1475 if session.shutting_down() {
1476 debug!(
1477 "Server killing session from shutting_down: token={:?}, protocol={:?}",
1478 session.frontend_token(),
1479 session.protocol()
1480 );
1481 sessions_to_shut_down.insert(Token(session.frontend_token().0));
1482 }
1483 }
1484 let _ = self.shut_down_sessions_by_frontend_tokens(sessions_to_shut_down);
1485
1486 let new_sessions_count = self.sessions.borrow().slab.len();
1487
1488 if new_sessions_count < sessions_count {
1489 let now = Instant::now();
1490 if let Some(last) = self.last_shutting_down_message {
1491 if (now - last) > Duration::from_secs(5) {
1492 info!(
1493 "closed {} sessions, {} sessions left, base_sessions_count = {}",
1494 sessions_count - new_sessions_count,
1495 new_sessions_count,
1496 self.base_sessions_count
1497 );
1498 }
1499 }
1500 self.last_shutting_down_message = Some(now);
1501 }
1502
1503 if new_sessions_count <= self.base_sessions_count {
1504 info!("last session stopped, shutting down!");
1505 if let Err(e) = self.channel.run() {
1506 error!("Error while running the server channel: {}", e);
1507 }
1508 let id = self
1510 .shutting_down
1511 .take()
1512 .expect("should have shut down correctly"); debug!("Responding OK to main process for request {}", id);
1515
1516 let proxy_response = WorkerResponse::ok(id);
1517 if let Err(e) = self.channel.write_message(&proxy_response) {
1518 error!("Could not write response to the main process: {}", e);
1519 }
1520 if let Err(e) = self.channel.run() {
1521 error!("Error while running the server channel: {}", e);
1522 }
1523 return true;
1524 }
1525
1526 if new_sessions_count < self.last_sessions_len {
1527 info!(
1528 "shutting down, {} slab elements remaining (base: {})",
1529 new_sessions_count - self.base_sessions_count,
1530 self.base_sessions_count
1531 );
1532 self.last_sessions_len = new_sessions_count;
1533 }
1534
1535 false
1536 }
1537
1538 fn kill_session(&self, session: Rc<RefCell<dyn ProxySession>>) {
1539 let token = session.borrow().frontend_token();
1540 let _ = self.shut_down_sessions_by_frontend_tokens(HashSet::from([token]));
1541 }
1542
1543 fn send_queue(&mut self) {
1544 if self.channel.readiness.is_writable() {
1545 QUEUE.with(|q| {
1546 let mut queue = q.borrow_mut();
1547 loop {
1548 if let Some(resp) = queue.pop_front() {
1549 debug!("Sending response {:?}", resp);
1550 if let Err(e) = self.channel.write_message(&resp) {
1551 error!("Could not write message {} on the channel: {}", resp, e);
1552 queue.push_front(resp);
1553 }
1554 }
1555
1556 if self.channel.back_buf.available_data() > 0 {
1557 if let Err(e) = self.channel.writable() {
1558 error!("error writing to channel: {:?}", e);
1559 }
1560 }
1561
1562 if !self.channel.readiness.is_writable() {
1563 break;
1564 }
1565
1566 if self.channel.back_buf.available_data() == 0 && queue.is_empty() {
1567 break;
1568 }
1569 }
1570 });
1571 }
1572 }
1573
1574 fn notify(&mut self, message: WorkerRequest) {
1575 let now = std::time::Instant::now();
1582 let lease_tick_transition = METRICS.with(|metrics| {
1588 let mut m = metrics.borrow_mut();
1589 if !m.lease_tick_due(now) {
1590 return None;
1591 }
1592 let previous = m.lease_tick(now)?;
1593 let effective = m.detail_effective();
1594 Some((previous, effective))
1595 });
1596 if let Some((previous, effective)) = lease_tick_transition {
1597 push_metric_detail_transition(previous, effective, "lease_tick_expired", None);
1604 }
1605 match &message.content.request_type {
1606 Some(RequestType::ConfigureMetrics(configuration)) => {
1607 match MetricsConfiguration::try_from(*configuration) {
1608 Ok(metrics_config) => {
1609 METRICS.with(|metrics| {
1610 (*metrics.borrow_mut()).configure(&metrics_config);
1611 push_queue(WorkerResponse::ok(message.id));
1612 });
1613 }
1614 Err(e) => {
1615 error!("Error configuring metrics: {}", e);
1616 push_queue(WorkerResponse::error(message.id, e));
1617 }
1618 }
1619 return;
1620 }
1621 Some(RequestType::QueryMetrics(query_metrics_options)) => {
1622 METRICS.with(|metrics| {
1623 match (*metrics.borrow_mut()).query(query_metrics_options) {
1624 Ok(c) => push_queue(WorkerResponse::ok_with_content(message.id, c)),
1625 Err(e) => {
1626 error!("Error querying metrics: {}", e);
1627 push_queue(WorkerResponse::error(message.id, e))
1628 }
1629 }
1630 });
1631 return;
1632 }
1633 Some(RequestType::SetMetricDetail(req)) => {
1640 let presented_binding = crate::metrics::PeerBinding {
1647 pid: req.peer_pid,
1648 session_ulid: req.peer_session_ulid.as_deref().and_then(|s| {
1654 rusty_ulid::Ulid::from_str(s)
1655 .map(u128::from)
1656 .ok()
1657 .or_else(|| u128::from_str_radix(s.trim_start_matches("0x"), 16).ok())
1658 }),
1659 };
1660 if req.clear.unwrap_or(false) {
1661 if req.client_id.len() > crate::metrics::LEASE_CLIENT_ID_MAX_BYTES {
1672 let msg = format!(
1673 "SetMetricDetail: clear client_id length {} exceeds {} bytes",
1674 req.client_id.len(),
1675 crate::metrics::LEASE_CLIENT_ID_MAX_BYTES,
1676 );
1677 error!("{}", msg);
1678 push_queue(WorkerResponse::error(message.id.clone(), msg));
1679 return;
1680 }
1681 let (outcome, effective_after, configured_after, lease_count_after) = METRICS
1689 .with(|metrics| {
1690 let mut m = metrics.borrow_mut();
1691 let outcome = m.lease_clear(&req.client_id, presented_binding);
1692 (
1693 outcome,
1694 m.detail_effective(),
1695 m.detail_configured(),
1696 m.lease_count(),
1697 )
1698 });
1699 match outcome {
1700 crate::metrics::LeaseClearOutcome::Cleared { previous_effective } => {
1701 push_metric_detail_transition(
1702 previous_effective,
1703 effective_after,
1704 "lease_clear",
1705 Some(req.client_id.clone()),
1706 );
1707 push_queue(WorkerResponse::ok_with_content(
1708 message.id.clone(),
1709 worker_metric_detail_status_content(
1710 configured_after,
1711 effective_after,
1712 previous_effective,
1713 lease_count_after,
1714 ),
1715 ));
1716 }
1717 crate::metrics::LeaseClearOutcome::NotFound => {
1718 push_queue(WorkerResponse::ok_with_content(
1722 message.id.clone(),
1723 worker_metric_detail_status_content(
1724 configured_after,
1725 effective_after,
1726 effective_after,
1727 lease_count_after,
1728 ),
1729 ));
1730 }
1731 crate::metrics::LeaseClearOutcome::Unauthorized => {
1732 let msg = "SetMetricDetail: clear refused (peer \
1743 binding does not match the apply-time owner)"
1744 .to_owned();
1745 error!("{}", msg);
1746 push_queue(WorkerResponse::error(message.id.clone(), msg));
1747 }
1748 }
1749 return;
1750 }
1751 let detail_proto = match req.detail {
1752 Some(d) => d,
1753 None => {
1754 let msg = "SetMetricDetail without `detail` and without `clear`".to_owned();
1761 error!("{}", msg);
1762 push_queue(WorkerResponse::error(message.id.clone(), msg));
1763 return;
1764 }
1765 };
1766 let detail_enum = match MetricDetail::try_from(detail_proto) {
1767 Ok(d) => d,
1768 Err(e) => {
1769 let msg =
1770 format!("SetMetricDetail: invalid MetricDetail variant {detail_proto}");
1771 error!("{}: {}", msg, e);
1772 push_queue(WorkerResponse::error(message.id.clone(), msg));
1773 return;
1774 }
1775 };
1776 let level = MetricDetailLevel::from(detail_enum);
1777 if let Some(t) = req.ttl_seconds
1786 && u64::from(t) > crate::metrics::LEASE_TTL_MAX.as_secs()
1787 {
1788 let msg = format!(
1789 "SetMetricDetail: ttl_seconds={t} exceeds LEASE_TTL_MAX={}",
1790 crate::metrics::LEASE_TTL_MAX.as_secs()
1791 );
1792 error!("{}", msg);
1793 push_queue(WorkerResponse::error(message.id.clone(), msg));
1794 return;
1795 }
1796 let ttl_seconds = req.ttl_seconds.filter(|&t| t > 0).unwrap_or_else(|| {
1797 u32::try_from(crate::metrics::LEASE_TTL_DEFAULT.as_secs()).unwrap_or(60)
1804 });
1805 let ttl = std::time::Duration::from_secs(ttl_seconds.into());
1806 let (outcome, configured_after, lease_count_after) = METRICS.with(|metrics| {
1807 let mut m = metrics.borrow_mut();
1808 let outcome =
1809 m.lease_apply(req.client_id.clone(), level, ttl, presented_binding);
1810 (outcome, m.detail_configured(), m.lease_count())
1811 });
1812 match outcome {
1813 crate::metrics::LeaseApplyOutcome::Applied {
1814 previous_effective,
1815 new_effective,
1816 } => {
1817 push_metric_detail_transition(
1818 previous_effective,
1819 new_effective,
1820 "lease_apply",
1821 Some(req.client_id.clone()),
1822 );
1823 push_queue(WorkerResponse::ok_with_content(
1824 message.id.clone(),
1825 worker_metric_detail_status_content(
1826 configured_after,
1827 new_effective,
1828 previous_effective,
1829 lease_count_after,
1830 ),
1831 ));
1832 }
1833 crate::metrics::LeaseApplyOutcome::ClientIdTooLong => {
1834 let msg = format!(
1835 "SetMetricDetail: client_id length {} exceeds {} bytes",
1836 req.client_id.len(),
1837 crate::metrics::LEASE_CLIENT_ID_MAX_BYTES,
1838 );
1839 error!("{}", msg);
1840 push_queue(WorkerResponse::error(message.id.clone(), msg));
1841 }
1842 crate::metrics::LeaseApplyOutcome::TableFull => {
1843 let msg = format!(
1850 "SetMetricDetail: lease table at capacity ({} entries); reject new \
1851 apply — operators must retry after an active lease expires or is \
1852 cleared",
1853 crate::metrics::LEASE_TABLE_CAP,
1854 );
1855 error!("{}", msg);
1856 push_queue(WorkerResponse::error(message.id.clone(), msg));
1857 }
1858 crate::metrics::LeaseApplyOutcome::TtlOutOfRange => {
1859 let msg = format!(
1865 "SetMetricDetail: ttl exceeds LEASE_TTL_MAX={} (internal contract \
1866 violation: dispatch gate should have rejected)",
1867 crate::metrics::LEASE_TTL_MAX.as_secs(),
1868 );
1869 error!("{}", msg);
1870 push_queue(WorkerResponse::error(message.id.clone(), msg));
1871 }
1872 crate::metrics::LeaseApplyOutcome::Unauthorized => {
1873 let msg = "SetMetricDetail: renewal refused (peer binding does not \
1882 match the apply-time owner)"
1883 .to_owned();
1884 error!("{}", msg);
1885 push_queue(WorkerResponse::error(message.id.clone(), msg));
1886 }
1887 }
1888 return;
1889 }
1890 Some(RequestType::Logging(logging_filter)) => {
1891 info!(
1892 "{} changing logging filter to {}",
1893 message.id, logging_filter
1894 );
1895 let (directives, _errors) = logging::parse_logging_spec(logging_filter);
1897 logging::LOGGER.with(|logger| {
1898 logger.borrow_mut().set_directives(directives);
1899 });
1900 push_queue(WorkerResponse::ok(message.id));
1901 return;
1902 }
1903 Some(RequestType::QueryClustersHashes(_)) => {
1904 push_queue(WorkerResponse::ok_with_content(
1905 message.id.clone(),
1906 ContentType::ClusterHashes(ClusterHashes {
1907 map: self.config_state.hash_state(),
1908 })
1909 .into(),
1910 ));
1911 return;
1912 }
1913 Some(RequestType::QueryClusterById(cluster_id)) => {
1914 push_queue(WorkerResponse::ok_with_content(
1915 message.id.clone(),
1916 ContentType::Clusters(ClusterInformations {
1917 vec: self
1918 .config_state
1919 .cluster_state(cluster_id)
1920 .map_or(vec![], |ci| vec![ci]),
1921 })
1922 .into(),
1923 ));
1924 }
1925 Some(RequestType::SetMaxConnectionsPerIp(limit)) => {
1926 let mut sessions = self.sessions.borrow_mut();
1927 let previous = sessions.max_connections_per_ip;
1928 sessions.max_connections_per_ip = *limit;
1929 if *limit == 0 {
1934 sessions.clear_cluster_ip_tracking();
1935 }
1936 info!(
1937 "{} updated global max_connections_per_ip from {} to {}",
1938 message.id, previous, limit
1939 );
1940 push_queue(WorkerResponse::ok(message.id));
1941 return;
1942 }
1943 Some(RequestType::QueryMaxConnectionsPerIp(_)) => {
1944 let limit = self.sessions.borrow().max_connections_per_ip;
1945 push_queue(WorkerResponse::ok_with_content(
1946 message.id,
1947 ContentType::MaxConnectionsPerIpLimit(
1948 sozu_command::proto::command::MaxConnectionsPerIpLimit { limit },
1949 )
1950 .into(),
1951 ));
1952 return;
1953 }
1954 Some(RequestType::QueryClustersByDomain(domain)) => {
1955 let cluster_ids = self
1956 .config_state
1957 .get_cluster_ids_by_domain(domain.hostname.clone(), domain.path.clone());
1958 let vec = cluster_ids
1959 .iter()
1960 .filter_map(|cluster_id| self.config_state.cluster_state(cluster_id))
1961 .collect();
1962
1963 push_queue(WorkerResponse::ok_with_content(
1964 message.id.clone(),
1965 ContentType::Clusters(ClusterInformations { vec }).into(),
1966 ));
1967 return;
1968 }
1969 Some(RequestType::QueryCertificatesFromWorkers(filters)) => {
1970 if filters.fingerprint.is_some() {
1971 let certs = self.config_state.get_certificates(filters.clone());
1972 let response = if !certs.is_empty() {
1973 WorkerResponse::ok_with_content(
1974 message.id.clone(),
1975 ContentType::CertificatesWithFingerprints(
1976 CertificatesWithFingerprints { certs },
1977 )
1978 .into(),
1979 )
1980 } else {
1981 worker_response_error(
1982 message.id.clone(),
1983 "Could not find certificate for this fingerprint",
1984 )
1985 };
1986 push_queue(response);
1987 return;
1988 }
1989 }
1992 _other_request => {}
1993 }
1994 self.notify_proxys(message);
1995 }
1996
1997 pub fn notify_proxys(&mut self, request: WorkerRequest) {
1998 if let Err(e) = self.config_state.dispatch(&request.content) {
1999 error!("Could not execute order on config state: {}", e);
2000 }
2001
2002 let req_id = request.id.clone();
2003
2004 match request.content.request_type {
2005 Some(RequestType::AddCluster(ref cluster)) => {
2006 if let Some(hc) = cluster.health_check.as_ref() {
2014 if let Err(reason) = sozu_command::config::validate_health_check_config(hc) {
2015 push_queue(worker_response_error(req_id, reason));
2016 return;
2017 }
2018 }
2019 self.add_cluster(cluster);
2020 METRICS.with(|metrics| {
2025 (*metrics.borrow_mut()).add_cluster(&cluster.cluster_id);
2026 });
2027 }
2029 Some(RequestType::RemoveCluster(ref cluster_id)) => {
2030 self.remove_health_check_state(cluster_id);
2031 METRICS.with(|metrics| {
2032 (*metrics.borrow_mut()).remove_cluster(cluster_id);
2033 });
2034 }
2036 Some(RequestType::SetHealthCheck(ref set)) => {
2037 if let Err(reason) = sozu_command::config::validate_health_check_config(&set.config)
2038 {
2039 push_queue(worker_response_error(req_id, reason));
2040 return;
2041 }
2042 self.backends
2043 .borrow_mut()
2044 .set_health_check_config(&set.cluster_id, Some(set.config.to_owned()));
2045 push_queue(WorkerResponse::ok(req_id));
2046 return;
2047 }
2048 Some(RequestType::RemoveHealthCheck(ref cluster_id)) => {
2049 self.remove_health_check_state(cluster_id);
2050 push_queue(WorkerResponse::ok(req_id));
2051 return;
2052 }
2053 Some(RequestType::AddBackend(ref backend)) => {
2054 push_queue(self.add_backend(&req_id, backend));
2055 return;
2056 }
2057 Some(RequestType::RemoveBackend(ref remove_backend)) => {
2058 push_queue(self.remove_backend(&req_id, remove_backend));
2059 return;
2060 }
2061 _ => {}
2062 };
2063
2064 let proxy_destinations = request.content.get_destinations();
2065 let mut notify_response = None;
2066 if proxy_destinations.to_http_proxy {
2067 notify_response = Some(self.http.borrow_mut().notify(request.clone()));
2068 }
2069 if proxy_destinations.to_https_proxy {
2070 let http_proxy_response = self.https.borrow_mut().notify(request.clone());
2071 if http_proxy_response.is_failure() || notify_response.is_none() {
2072 notify_response = Some(http_proxy_response);
2073 }
2074 }
2075 if proxy_destinations.to_tcp_proxy {
2076 let tcp_proxy_response = self.tcp.borrow_mut().notify(request.clone());
2077 if tcp_proxy_response.is_failure() || notify_response.is_none() {
2078 notify_response = Some(tcp_proxy_response);
2079 }
2080 }
2081 if proxy_destinations.to_udp_proxy {
2082 let udp_proxy_response = self.udp.borrow_mut().notify(request.clone());
2083 if udp_proxy_response.is_failure() || notify_response.is_none() {
2084 notify_response = Some(udp_proxy_response);
2085 }
2086 }
2087 if let Some(response) = notify_response {
2088 push_queue(response);
2089 }
2090
2091 match request.content.request_type {
2092 Some(RequestType::AddHttpListener(listener)) => {
2094 push_queue(self.notify_add_http_listener(&req_id, listener));
2095 }
2096 Some(RequestType::AddHttpsListener(listener)) => {
2097 push_queue(self.notify_add_https_listener(&req_id, listener));
2098 }
2099 Some(RequestType::AddTcpListener(listener)) => {
2100 push_queue(self.notify_add_tcp_listener(&req_id, listener));
2101 }
2102 Some(RequestType::AddUdpListener(listener)) => {
2103 push_queue(self.notify_add_udp_listener(&req_id, listener));
2104 }
2105 Some(RequestType::UpdateHttpListener(patch)) => {
2106 push_queue(self.notify_update_http_listener(&req_id, patch));
2107 }
2108 Some(RequestType::UpdateHttpsListener(patch)) => {
2109 push_queue(self.notify_update_https_listener(&req_id, patch));
2110 }
2111 Some(RequestType::UpdateTcpListener(patch)) => {
2112 push_queue(self.notify_update_tcp_listener(&req_id, patch));
2113 }
2114 Some(RequestType::UpdateUdpListener(patch)) => {
2115 push_queue(self.notify_update_udp_listener(&req_id, patch));
2116 }
2117 Some(RequestType::RemoveListener(ref remove)) => {
2118 debug!("{} remove {:?} listener {:?}", req_id, remove.proxy, remove);
2119 debug_assert!(
2122 self.base_sessions_count > 0,
2123 "removing a listener with base_sessions_count == 0 would underflow"
2124 );
2125 self.base_sessions_count -= 1;
2126 let response = match ListenerType::try_from(remove.proxy) {
2127 Ok(ListenerType::Http) => self.http.borrow_mut().notify(request),
2128 Ok(ListenerType::Https) => self.https.borrow_mut().notify(request),
2129 Ok(ListenerType::Tcp) => self.tcp.borrow_mut().notify(request),
2130 Ok(ListenerType::Udp) => self.udp.borrow_mut().notify(request),
2131 Err(_) => WorkerResponse::error(req_id, "Wrong variant ListenerType"),
2132 };
2133 push_queue(response);
2134 }
2135 Some(RequestType::ActivateListener(ref activate)) => {
2136 push_queue(self.notify_activate_listener(&req_id, activate));
2137 }
2138 Some(RequestType::DeactivateListener(ref deactivate)) => {
2139 push_queue(self.notify_deactivate_listener(&req_id, deactivate));
2140 }
2141 _other_request => {}
2142 };
2143 }
2144
2145 fn add_cluster(&mut self, cluster: &Cluster) {
2146 let mut backends = self.backends.borrow_mut();
2147 backends.set_load_balancing_policy_for_cluster(
2148 &cluster.cluster_id,
2149 LoadBalancingAlgorithms::try_from(cluster.load_balancing).unwrap_or_default(),
2150 cluster
2151 .load_metric
2152 .and_then(|n| LoadMetric::try_from(n).ok()),
2153 );
2154 backends.set_health_check_config(&cluster.cluster_id, cluster.health_check.to_owned());
2155 backends.set_cluster_http2(&cluster.cluster_id, cluster.http2.unwrap_or(false));
2156 }
2157
2158 fn add_backend(&mut self, req_id: &str, add_backend: &AddBackend) -> WorkerResponse {
2159 let new_backend = Backend::new(
2160 &add_backend.backend_id,
2161 add_backend.address.into(),
2162 add_backend.sticky_id.clone(),
2163 add_backend.load_balancing_parameters,
2164 add_backend.backup,
2165 );
2166 self.backends
2167 .borrow_mut()
2168 .add_backend(&add_backend.cluster_id, new_backend);
2169
2170 WorkerResponse::ok(req_id)
2171 }
2172
2173 fn remove_health_check_state(&mut self, cluster_id: &str) {
2174 self.health_checker.remove_cluster(cluster_id);
2175 self.backends
2176 .borrow_mut()
2177 .health_check_configs
2178 .remove(cluster_id);
2179 }
2180
2181 fn remove_backend(&mut self, req_id: &str, backend: &RemoveBackend) -> WorkerResponse {
2182 let address = backend.address.into();
2183 let removed_ids = self
2191 .backends
2192 .borrow_mut()
2193 .remove_backend(&backend.cluster_id, &address);
2194 if removed_ids.is_empty() {
2195 METRICS.with(|metrics| {
2200 (*metrics.borrow_mut()).remove_backend(&backend.cluster_id, &backend.backend_id);
2201 });
2202 } else {
2203 METRICS.with(|metrics| {
2204 let mut metrics = metrics.borrow_mut();
2205 for id in &removed_ids {
2206 metrics.remove_backend(&backend.cluster_id, id);
2207 }
2208 });
2209 }
2210
2211 WorkerResponse::ok(req_id)
2212 }
2213
2214 fn notify_add_http_listener(
2215 &mut self,
2216 req_id: &str,
2217 listener: HttpListenerConfig,
2218 ) -> WorkerResponse {
2219 debug!("{} add http listener {:?}", req_id, listener);
2220
2221 if self.sessions.borrow().at_capacity() {
2222 return worker_response_error(req_id, "session list is full, cannot add a listener");
2223 }
2224
2225 let mut session_manager = self.sessions.borrow_mut();
2226 let slab_before = session_manager.slab.len();
2228 debug_assert!(
2229 !session_manager
2230 .slab
2231 .contains(session_manager.slab.vacant_key()),
2232 "the next vacant slab key must be free before insertion"
2233 );
2234 let entry = session_manager.slab.vacant_entry();
2235 let token = Token(entry.key());
2236
2237 match self.http.borrow_mut().add_listener(listener, token) {
2238 Ok(_token) => {
2239 entry.insert(Rc::new(RefCell::new(ListenSession {
2240 protocol: Protocol::HTTPListen,
2241 })));
2242 debug_assert!(
2245 session_manager.slab.contains(token.0),
2246 "listener insert must occupy the token's slab key"
2247 );
2248 debug_assert_eq!(
2249 session_manager.slab.len(),
2250 slab_before + 1,
2251 "adding a listener must occupy exactly one slab slot"
2252 );
2253 self.base_sessions_count += 1;
2254 WorkerResponse::ok(req_id)
2255 }
2256 Err(e) => worker_response_error(req_id, format!("Could not add HTTP listener: {e}")),
2257 }
2258 }
2259
2260 fn notify_add_https_listener(
2261 &mut self,
2262 req_id: &str,
2263 listener: HttpsListenerConfig,
2264 ) -> WorkerResponse {
2265 debug!("{} add https listener {:?}", req_id, listener);
2266
2267 if self.sessions.borrow().at_capacity() {
2268 return worker_response_error(req_id, "session list is full, cannot add a listener");
2269 }
2270
2271 let mut session_manager = self.sessions.borrow_mut();
2272 let slab_before = session_manager.slab.len();
2273 debug_assert!(
2274 !session_manager
2275 .slab
2276 .contains(session_manager.slab.vacant_key()),
2277 "the next vacant slab key must be free before insertion"
2278 );
2279 let entry = session_manager.slab.vacant_entry();
2280 let token = Token(entry.key());
2281
2282 match self
2283 .https
2284 .borrow_mut()
2285 .add_listener(listener.clone(), token)
2286 {
2287 Ok(_token) => {
2288 entry.insert(Rc::new(RefCell::new(ListenSession {
2289 protocol: Protocol::HTTPSListen,
2290 })));
2291 debug_assert!(
2292 session_manager.slab.contains(token.0),
2293 "listener insert must occupy the token's slab key"
2294 );
2295 debug_assert_eq!(
2296 session_manager.slab.len(),
2297 slab_before + 1,
2298 "adding a listener must occupy exactly one slab slot"
2299 );
2300 self.base_sessions_count += 1;
2301 WorkerResponse::ok(req_id)
2302 }
2303 Err(e) => worker_response_error(req_id, format!("Could not add HTTPS listener: {e}")),
2304 }
2305 }
2306
2307 fn notify_add_tcp_listener(
2308 &mut self,
2309 req_id: &str,
2310 listener: CommandTcpListener,
2311 ) -> WorkerResponse {
2312 debug!("{} add tcp listener {:?}", req_id, listener);
2313
2314 if self.sessions.borrow().at_capacity() {
2315 return worker_response_error(req_id, "session list is full, cannot add a listener");
2316 }
2317
2318 let mut session_manager = self.sessions.borrow_mut();
2319 let slab_before = session_manager.slab.len();
2320 debug_assert!(
2321 !session_manager
2322 .slab
2323 .contains(session_manager.slab.vacant_key()),
2324 "the next vacant slab key must be free before insertion"
2325 );
2326 let entry = session_manager.slab.vacant_entry();
2327 let token = Token(entry.key());
2328
2329 match self.tcp.borrow_mut().add_listener(listener, token) {
2330 Ok(_token) => {
2331 entry.insert(Rc::new(RefCell::new(ListenSession {
2332 protocol: Protocol::TCPListen,
2333 })));
2334 debug_assert!(
2335 session_manager.slab.contains(token.0),
2336 "listener insert must occupy the token's slab key"
2337 );
2338 debug_assert_eq!(
2339 session_manager.slab.len(),
2340 slab_before + 1,
2341 "adding a listener must occupy exactly one slab slot"
2342 );
2343 self.base_sessions_count += 1;
2344 WorkerResponse::ok(req_id)
2345 }
2346 Err(e) => worker_response_error(req_id, format!("Could not add TCP listener: {e}")),
2347 }
2348 }
2349
2350 fn notify_add_udp_listener(
2351 &mut self,
2352 req_id: &str,
2353 listener: CommandUdpListener,
2354 ) -> WorkerResponse {
2355 debug!("{} add udp listener {:?}", req_id, listener);
2356
2357 if self.sessions.borrow().at_capacity() {
2358 return worker_response_error(req_id, "session list is full, cannot add a listener");
2359 }
2360
2361 let mut session_manager = self.sessions.borrow_mut();
2362 let entry = session_manager.slab.vacant_entry();
2363 let token = Token(entry.key());
2364
2365 match self.udp.borrow_mut().add_listener(listener, token) {
2366 Ok(_token) => {
2367 entry.insert(Rc::new(RefCell::new(ListenSession {
2368 protocol: Protocol::UDPListen,
2369 })));
2370 self.base_sessions_count += 1;
2371 WorkerResponse::ok(req_id)
2372 }
2373 Err(e) => worker_response_error(req_id, format!("Could not add UDP listener: {e}")),
2374 }
2375 }
2376
2377 fn notify_update_udp_listener(
2378 &mut self,
2379 req_id: &str,
2380 patch: UpdateUdpListenerConfig,
2381 ) -> WorkerResponse {
2382 debug!("{} update udp listener {:?}", req_id, patch.address);
2383 match self.udp.borrow_mut().update_listener(patch) {
2384 Ok(()) => WorkerResponse::ok(req_id),
2385 Err(e) => worker_response_error(req_id, format!("Could not update UDP listener: {e}")),
2386 }
2387 }
2388
2389 fn notify_update_http_listener(
2390 &mut self,
2391 req_id: &str,
2392 patch: UpdateHttpListenerConfig,
2393 ) -> WorkerResponse {
2394 debug!("{} update http listener {:?}", req_id, patch.address);
2395 match self.http.borrow_mut().update_listener(patch) {
2396 Ok(()) => WorkerResponse::ok(req_id),
2397 Err(e) => worker_response_error(req_id, format!("Could not update HTTP listener: {e}")),
2398 }
2399 }
2400
2401 fn notify_update_https_listener(
2402 &mut self,
2403 req_id: &str,
2404 patch: UpdateHttpsListenerConfig,
2405 ) -> WorkerResponse {
2406 debug!("{} update https listener {:?}", req_id, patch.address);
2407 match self.https.borrow_mut().update_listener(patch) {
2408 Ok(()) => WorkerResponse::ok(req_id),
2409 Err(e) => {
2410 worker_response_error(req_id, format!("Could not update HTTPS listener: {e}"))
2411 }
2412 }
2413 }
2414
2415 fn notify_update_tcp_listener(
2416 &mut self,
2417 req_id: &str,
2418 patch: UpdateTcpListenerConfig,
2419 ) -> WorkerResponse {
2420 debug!("{} update tcp listener {:?}", req_id, patch.address);
2421 match self.tcp.borrow_mut().update_listener(patch) {
2422 Ok(()) => WorkerResponse::ok(req_id),
2423 Err(e) => worker_response_error(req_id, format!("Could not update TCP listener: {e}")),
2424 }
2425 }
2426
2427 fn notify_activate_listener(
2428 &mut self,
2429 req_id: &str,
2430 activate: &ActivateListener,
2431 ) -> WorkerResponse {
2432 debug!(
2433 "{} activate {:?} listener {:?}",
2434 req_id, activate.proxy, activate
2435 );
2436
2437 let address: std::net::SocketAddr = activate.address.into();
2438
2439 match ListenerType::try_from(activate.proxy) {
2440 Ok(ListenerType::Http) => {
2441 let listener = self
2442 .scm_listeners
2443 .as_mut()
2444 .and_then(|s| s.get_http(&address))
2445 .map(|fd| unsafe { MioTcpListener::from_raw_fd(fd) });
2450
2451 let activated_token = self.http.borrow_mut().activate_listener(&address, listener);
2452 match activated_token {
2453 Ok(token) => {
2454 self.accept(ListenToken(token.0), Protocol::HTTPListen);
2455 WorkerResponse::ok(req_id)
2456 }
2457 Err(activate_error) => worker_response_error(
2458 req_id,
2459 format!("Could not activate HTTP listener: {activate_error}"),
2460 ),
2461 }
2462 }
2463 Ok(ListenerType::Https) => {
2464 let listener = self
2465 .scm_listeners
2466 .as_mut()
2467 .and_then(|s| s.get_https(&address))
2468 .map(|fd| unsafe { MioTcpListener::from_raw_fd(fd) });
2473
2474 let activated_token = self
2475 .https
2476 .borrow_mut()
2477 .activate_listener(&address, listener);
2478 match activated_token {
2479 Ok(token) => {
2480 self.accept(ListenToken(token.0), Protocol::HTTPSListen);
2481 WorkerResponse::ok(req_id)
2482 }
2483 Err(activate_error) => worker_response_error(
2484 req_id,
2485 format!("Could not activate HTTPS listener: {activate_error}"),
2486 ),
2487 }
2488 }
2489 Ok(ListenerType::Tcp) => {
2490 let listener = self
2491 .scm_listeners
2492 .as_mut()
2493 .and_then(|s| s.get_tcp(&address))
2494 .map(|fd| unsafe { MioTcpListener::from_raw_fd(fd) });
2499
2500 let listener_token = self.tcp.borrow_mut().activate_listener(&address, listener);
2501 match listener_token {
2502 Ok(token) => {
2503 self.accept(ListenToken(token.0), Protocol::TCPListen);
2504 WorkerResponse::ok(req_id)
2505 }
2506 Err(activate_error) => worker_response_error(
2507 req_id,
2508 format!("Could not activate TCP listener: {activate_error}"),
2509 ),
2510 }
2511 }
2512 Ok(ListenerType::Udp) => {
2513 let socket = self
2514 .scm_listeners
2515 .as_mut()
2516 .and_then(|s| s.get_udp(&address))
2517 .map(|fd| unsafe { MioUdpSocket::from_raw_fd(fd) });
2525
2526 let activated_token = self.udp.borrow_mut().activate_listener(&address, socket);
2527 match activated_token {
2528 Ok(token) => {
2529 if let Some(session) = self.udp.borrow_mut().build_session(token) {
2535 let mut sessions = self.sessions.borrow_mut();
2536 if sessions.slab.contains(token.0) {
2537 sessions.slab[token.0] = session;
2538 }
2539 }
2540 WorkerResponse::ok(req_id)
2541 }
2542 Err(activate_error) => worker_response_error(
2543 req_id,
2544 format!("Could not activate UDP listener: {activate_error}"),
2545 ),
2546 }
2547 }
2548 Err(_) => worker_response_error(req_id, "Wrong variant for ListenerType on request"),
2549 }
2550 }
2551
2552 fn notify_deactivate_listener(
2553 &mut self,
2554 req_id: &str,
2555 deactivate: &DeactivateListener,
2556 ) -> WorkerResponse {
2557 debug!(
2558 "{} deactivate {:?} listener {:?}",
2559 req_id, deactivate.proxy, deactivate
2560 );
2561
2562 let address: std::net::SocketAddr = deactivate.address.into();
2563
2564 match ListenerType::try_from(deactivate.proxy) {
2565 Ok(ListenerType::Http) => {
2566 let (token, mut listener) = match self.http.borrow_mut().give_back_listener(address)
2567 {
2568 Ok((token, listener)) => (token, listener),
2569 Err(e) => {
2570 return worker_response_error(
2571 req_id,
2572 format!(
2573 "Couldn't deactivate HTTP listener at address {address:?}: {e}"
2574 ),
2575 );
2576 }
2577 };
2578
2579 if let Err(e) = self.poll.registry().deregister(&mut listener) {
2580 error!(
2581 "error deregistering HTTP listen socket({:?}): {:?}",
2582 deactivate, e
2583 );
2584 }
2585
2586 {
2587 let mut sessions = self.sessions.borrow_mut();
2588 if sessions.slab.contains(token.0) {
2589 sessions.slab.remove(token.0);
2590 info!("removed listen token {:?}", token);
2591 }
2592 }
2593
2594 if deactivate.to_scm {
2595 self.unblock_scm_socket();
2596 let listeners = Listeners {
2597 http: vec![(address, listener.as_raw_fd())],
2598 tls: vec![],
2599 tcp: vec![],
2600 udp: vec![],
2601 };
2602 info!("sending HTTP listener: {:?}", listeners);
2603 let res = self.scm.send_listeners(&listeners);
2604
2605 self.block_scm_socket();
2606
2607 info!("sent HTTP listener: {:?}", res);
2608 }
2609 WorkerResponse::ok(req_id)
2610 }
2611 Ok(ListenerType::Https) => {
2612 let (token, mut listener) = match self
2613 .https
2614 .borrow_mut()
2615 .give_back_listener(address)
2616 {
2617 Ok((token, listener)) => (token, listener),
2618 Err(e) => {
2619 return worker_response_error(
2620 req_id,
2621 format!(
2622 "Couldn't deactivate HTTPS listener at address {address:?}: {e}",
2623 ),
2624 );
2625 }
2626 };
2627 if let Err(e) = self.poll.registry().deregister(&mut listener) {
2628 error!(
2629 "error deregistering HTTPS listen socket({:?}): {:?}",
2630 deactivate, e
2631 );
2632 }
2633 if self.sessions.borrow().slab.contains(token.0) {
2634 self.sessions.borrow_mut().slab.remove(token.0);
2635 info!("removed listen token {:?}", token);
2636 }
2637
2638 if deactivate.to_scm {
2639 self.unblock_scm_socket();
2640 let listeners = Listeners {
2641 http: vec![],
2642 tls: vec![(address, listener.as_raw_fd())],
2643 tcp: vec![],
2644 udp: vec![],
2645 };
2646 info!("sending HTTPS listener: {:?}", listeners);
2647 let res = self.scm.send_listeners(&listeners);
2648
2649 self.block_scm_socket();
2650
2651 info!("sent HTTPS listener: {:?}", res);
2652 }
2653 WorkerResponse::ok(req_id)
2654 }
2655 Ok(ListenerType::Tcp) => {
2656 let (token, mut listener) = match self.tcp.borrow_mut().give_back_listener(address)
2657 {
2658 Ok((token, listener)) => (token, listener),
2659 Err(e) => {
2660 return worker_response_error(
2661 req_id,
2662 format!(
2663 "Could not deactivate TCP listener at address {address:?}: {e}"
2664 ),
2665 );
2666 }
2667 };
2668
2669 if let Err(e) = self.poll.registry().deregister(&mut listener) {
2670 error!(
2671 "error deregistering TCP listen socket({:?}): {:?}",
2672 deactivate, e
2673 );
2674 }
2675 if self.sessions.borrow().slab.contains(token.0) {
2676 self.sessions.borrow_mut().slab.remove(token.0);
2677 info!("removed listen token {:?}", token);
2678 }
2679
2680 if deactivate.to_scm {
2681 self.unblock_scm_socket();
2682 let listeners = Listeners {
2683 http: vec![],
2684 tls: vec![],
2685 tcp: vec![(address, listener.as_raw_fd())],
2686 udp: vec![],
2687 };
2688 info!("sending TCP listener: {:?}", listeners);
2689 let res = self.scm.send_listeners(&listeners);
2690
2691 self.block_scm_socket();
2692
2693 info!("sent TCP listener: {:?}", res);
2694 }
2695 WorkerResponse::ok(req_id)
2696 }
2697 Ok(ListenerType::Udp) => {
2698 let (token, mut listener) = match self.udp.borrow_mut().give_back_listener(address)
2699 {
2700 Ok((token, listener)) => (token, listener),
2701 Err(e) => {
2702 return worker_response_error(
2703 req_id,
2704 format!(
2705 "Could not deactivate UDP listener at address {address:?}: {e}"
2706 ),
2707 );
2708 }
2709 };
2710
2711 if let Err(e) = self.poll.registry().deregister(&mut listener) {
2712 error!(
2713 "error deregistering UDP listen socket({:?}): {:?}",
2714 deactivate, e
2715 );
2716 }
2717 if self.sessions.borrow().slab.contains(token.0) {
2718 self.sessions.borrow_mut().slab.remove(token.0);
2719 info!("removed listen token {:?}", token);
2720 }
2721
2722 if deactivate.to_scm {
2723 self.unblock_scm_socket();
2724 let listeners = Listeners {
2725 http: vec![],
2726 tls: vec![],
2727 tcp: vec![],
2728 udp: vec![(address, listener.as_raw_fd())],
2729 };
2730 info!("sending UDP listener: {:?}", listeners);
2731 let res = self.scm.send_listeners(&listeners);
2732
2733 self.block_scm_socket();
2734
2735 info!("sent UDP listener: {:?}", res);
2736 }
2737 WorkerResponse::ok(req_id)
2738 }
2739 Err(_) => worker_response_error(req_id, "Wrong variant for ListenerType on request"),
2740 }
2741 }
2742
2743 pub fn return_listen_sockets(&mut self) -> Result<(), ScmSocketError> {
2745 self.unblock_scm_socket();
2746
2747 let mut http_listeners = self.http.borrow_mut().give_back_listeners();
2748 for &mut (_, ref mut sock) in http_listeners.iter_mut() {
2749 if let Err(e) = self.poll.registry().deregister(sock) {
2750 error!(
2751 "error deregistering HTTP listen socket({:?}): {:?}",
2752 sock, e
2753 );
2754 }
2755 }
2756
2757 let mut https_listeners = self.https.borrow_mut().give_back_listeners();
2758 for &mut (_, ref mut sock) in https_listeners.iter_mut() {
2759 if let Err(e) = self.poll.registry().deregister(sock) {
2760 error!(
2761 "error deregistering HTTPS listen socket({:?}): {:?}",
2762 sock, e
2763 );
2764 }
2765 }
2766
2767 let mut tcp_listeners = self.tcp.borrow_mut().give_back_listeners();
2768 for &mut (_, ref mut sock) in tcp_listeners.iter_mut() {
2769 if let Err(e) = self.poll.registry().deregister(sock) {
2770 error!("error deregistering TCP listen socket({:?}): {:?}", sock, e);
2771 }
2772 }
2773
2774 let mut udp_listeners = self.udp.borrow_mut().give_back_listeners();
2775 for &mut (_, ref mut sock) in udp_listeners.iter_mut() {
2776 if let Err(e) = self.poll.registry().deregister(sock) {
2777 error!("error deregistering UDP listen socket({:?}): {:?}", sock, e);
2778 }
2779 }
2780
2781 let listeners = Listeners {
2783 http: http_listeners
2784 .iter()
2785 .map(|(addr, listener)| (*addr, listener.as_raw_fd()))
2786 .collect(),
2787 tls: https_listeners
2788 .iter()
2789 .map(|(addr, listener)| (*addr, listener.as_raw_fd()))
2790 .collect(),
2791 tcp: tcp_listeners
2792 .iter()
2793 .map(|(addr, listener)| (*addr, listener.as_raw_fd()))
2794 .collect(),
2795 udp: udp_listeners
2796 .iter()
2797 .map(|(addr, listener)| (*addr, listener.as_raw_fd()))
2798 .collect(),
2799 };
2800 debug_assert_eq!(
2804 listeners.http.len(),
2805 http_listeners.len(),
2806 "every HTTP listener must be collected exactly once"
2807 );
2808 debug_assert_eq!(
2809 listeners.tls.len(),
2810 https_listeners.len(),
2811 "every HTTPS listener must be collected exactly once"
2812 );
2813 debug_assert_eq!(
2814 listeners.tcp.len(),
2815 tcp_listeners.len(),
2816 "every TCP listener must be collected exactly once"
2817 );
2818 info!("sending default listeners: {:?}", listeners);
2819 let res = self.scm.send_listeners(&listeners);
2820
2821 self.block_scm_socket();
2822
2823 info!("sent default listeners: {:?}", res);
2824 res
2825 }
2826
2827 fn block_scm_socket(&mut self) {
2828 if let Err(e) = self.scm.set_blocking(true) {
2829 error!("Could not block scm socket: {}", e);
2830 }
2831 }
2832
2833 fn unblock_scm_socket(&mut self) {
2834 if let Err(e) = self.scm.set_blocking(false) {
2835 error!("Could not unblock scm socket: {}", e);
2836 }
2837 }
2838
2839 pub fn to_session(&self, token: Token) -> SessionToken {
2840 SessionToken(token.0)
2841 }
2842
2843 pub fn from_session(&self, token: SessionToken) -> Token {
2844 Token(token.0)
2845 }
2846
2847 pub fn accept(&mut self, token: ListenToken, protocol: Protocol) {
2848 let (proto_key, accepted_protocol) = match protocol {
2861 Protocol::TCPListen => ("listener.accepted.tcp", Protocol::TCPListen),
2862 Protocol::HTTPListen => ("listener.accepted.http", Protocol::HTTPListen),
2863 Protocol::HTTPSListen => ("listener.accepted.https", Protocol::HTTPSListen),
2864 other => {
2865 warn!(
2866 "accept() called with non-listen protocol {:?} on token {:?}; skipping",
2867 other, token
2868 );
2869 return;
2870 }
2871 };
2872
2873 debug_assert!(
2876 matches!(
2877 accepted_protocol,
2878 Protocol::TCPListen | Protocol::HTTPListen | Protocol::HTTPSListen
2879 ),
2880 "accept dispatch must run with a listen protocol only"
2881 );
2882
2883 loop {
2884 let result = match accepted_protocol {
2885 Protocol::TCPListen => self.tcp.borrow_mut().accept(token),
2886 Protocol::HTTPListen => self.http.borrow_mut().accept(token),
2887 Protocol::HTTPSListen => self.https.borrow_mut().accept(token),
2888 other => unreachable!(
2892 "accept dispatch reached non-listen protocol {:?} after outer guard",
2893 other
2894 ),
2895 };
2896 match result {
2897 Ok(sock) => {
2898 let peer = sock.peer_addr().ok();
2903 incr!(names::listener::ACCEPTED_TOTAL);
2904 incr!(proto_key);
2905 if let Some(peer_addr) = peer.as_ref() {
2906 incr!(per_source_bucket(peer_addr));
2907 }
2908 let queue_before = self.accept_queue.len();
2909 self.accept_queue.push_back((
2910 sock,
2911 token,
2912 accepted_protocol,
2913 Instant::now(),
2914 peer,
2915 ));
2916 debug_assert_eq!(
2918 self.accept_queue.len(),
2919 queue_before + 1,
2920 "each accepted socket must enqueue exactly one entry"
2921 );
2922 }
2923 Err(AcceptError::WouldBlock) => {
2924 self.accept_ready.remove(&token);
2925 break;
2926 }
2927 Err(other) => {
2928 error!(
2929 "error accepting {:?} sockets: {:?}",
2930 accepted_protocol, other
2931 );
2932 self.accept_ready.remove(&token);
2933 break;
2934 }
2935 }
2936 }
2937
2938 gauge!(names::accept_queue::CONNECTIONS, self.accept_queue.len());
2939 }
2940
2941 pub fn create_sessions(&mut self) {
2942 while let Some((sock, token, protocol, timestamp, _peer)) = self.accept_queue.pop_back() {
2943 let wait_time = Instant::now() - timestamp;
2944 time!(names::accept_queue::WAIT_TIME, wait_time.as_millis());
2945 if wait_time > self.accept_queue_timeout {
2946 incr!(names::accept_queue::TIMEOUT);
2947 continue;
2948 }
2949
2950 if !self.sessions.borrow_mut().check_limits() {
2951 incr!(names::listener::CONNECTION_CAPPED);
2957
2958 if !self.evict_on_queue_full {
2959 break;
2960 }
2961
2962 if self.shutting_down.is_some() {
2966 break;
2967 }
2968
2969 let to_evict = (self.sessions.borrow().max_connections / 100).max(1);
2978 let evicted = self.evict_least_active_sessions(to_evict);
2979 if evicted == 0 {
2980 warn!("evict_on_queue_full enabled but no candidate sessions to evict");
2986 break;
2987 }
2988
2989 count!(names::sessions::EVICTED, evicted as i64);
2990 warn!(
2991 "evicted {} least recently active sessions to make room",
2992 evicted
2993 );
2994
2995 if !self.sessions.borrow_mut().check_limits() {
2996 break;
2997 }
2998 }
2999
3000 debug_assert!(
3007 matches!(
3008 protocol,
3009 Protocol::TCPListen | Protocol::HTTPListen | Protocol::HTTPSListen
3010 ),
3011 "accept queue must only hold listen protocols, got {protocol:?}"
3012 );
3013 match protocol {
3014 Protocol::TCPListen => {
3015 let proxy = self.tcp.clone();
3016 if self
3017 .tcp
3018 .borrow_mut()
3019 .create_session(sock, token, wait_time, proxy)
3020 .is_err()
3021 {
3022 break;
3023 }
3024 }
3025 Protocol::HTTPListen => {
3026 let proxy = self.http.clone();
3027 if self
3028 .http
3029 .borrow_mut()
3030 .create_session(sock, token, wait_time, proxy)
3031 .is_err()
3032 {
3033 break;
3034 }
3035 }
3036 Protocol::HTTPSListen => {
3037 if self
3038 .https
3039 .borrow_mut()
3040 .create_session(sock, token, wait_time, self.https.clone())
3041 .is_err()
3042 {
3043 break;
3044 }
3045 }
3046 _ => panic!("should not call accept() on a HTTP, HTTPS or TCP session"),
3047 };
3048 let nb_before = self.sessions.borrow().nb_connections;
3049 self.sessions.borrow_mut().incr();
3050 debug_assert_eq!(
3052 self.sessions.borrow().nb_connections,
3053 nb_before + 1,
3054 "create_sessions must account exactly one new connection per created session"
3055 );
3056 }
3057
3058 gauge!(names::accept_queue::CONNECTIONS, self.accept_queue.len());
3059 }
3060
3061 pub fn ready(&mut self, token: Token, events: Ready) {
3062 trace!("PROXY\t{:?} got events: {:?}", token, events);
3063
3064 let session_token = token.0;
3065 if self.sessions.borrow().slab.contains(session_token) {
3066 let protocol = self.sessions.borrow().slab[session_token]
3068 .borrow()
3069 .protocol();
3070 match protocol {
3079 Protocol::HTTPListen | Protocol::HTTPSListen | Protocol::TCPListen => {
3080 if events.is_readable() {
3082 self.accept_ready.insert(ListenToken(token.0));
3083 if self.sessions.borrow().can_accept {
3084 self.accept(ListenToken(token.0), protocol);
3085 }
3086 return;
3087 }
3088
3089 if events.is_writable() {
3090 error!(
3091 "received writable for listener {:?}, this should not happen",
3092 token
3093 );
3094 return;
3095 }
3096
3097 if events.is_hup() {
3098 error!("should not happen: server {:?} closed", token);
3099 return;
3100 }
3101
3102 unreachable!();
3103 }
3104 _ => {}
3105 }
3106
3107 let session = self.sessions.borrow_mut().slab[session_token].clone();
3108 session.borrow_mut().update_readiness(token, events);
3109 if session.borrow_mut().ready(session.clone()) {
3110 debug!(
3111 "Server killing session from ready: token={:?}, protocol={:?}, events={:?}",
3112 token, protocol, events
3113 );
3114 self.kill_session(session);
3115 }
3116 }
3117 }
3118
3119 pub fn timeout(&mut self, token: Token) {
3120 trace!("PROXY\t{:?} got timeout", token);
3121
3122 let session_token = token.0;
3123 if self.sessions.borrow().slab.contains(session_token) {
3124 let session = self.sessions.borrow_mut().slab[session_token].clone();
3125 if session.borrow_mut().timeout(token) {
3126 debug!(
3127 "Server killing session from timeout: token={:?}, protocol={:?}",
3128 token,
3129 session.borrow().protocol()
3130 );
3131 self.kill_session(session);
3132 }
3133 }
3134 }
3135
3136 pub fn handle_remaining_readiness(&mut self) {
3137 if self.sessions.borrow().can_accept && !self.accept_ready.is_empty() {
3140 while let Some(token) = self
3141 .accept_ready
3142 .iter()
3143 .next()
3144 .map(|token| ListenToken(token.0))
3145 {
3146 let protocol = self.sessions.borrow().slab[token.0].borrow().protocol();
3147 self.accept(token, protocol);
3148 if !self.sessions.borrow().can_accept || self.accept_ready.is_empty() {
3149 break;
3150 }
3151 }
3152 }
3153 }
3154 fn block_channel(&mut self) {
3155 if let Err(e) = self.channel.blocking() {
3156 error!("Could not block channel: {}", e);
3157 }
3158 }
3159 fn unblock_channel(&mut self) {
3160 if let Err(e) = self.channel.nonblocking() {
3161 error!("Could not block channel: {}", e);
3162 }
3163 }
3164
3165 fn evict_least_active_sessions(&self, count: usize) -> usize {
3177 if count == 0 {
3178 return 0;
3179 }
3180
3181 let tokens = {
3182 let sessions = self.sessions.borrow();
3183 let mut candidates: Vec<(Token, Instant)> = sessions
3184 .slab
3185 .iter()
3186 .filter(|(_, session)| {
3187 !matches!(
3188 session.borrow().protocol(),
3189 Protocol::HTTPListen
3190 | Protocol::HTTPSListen
3191 | Protocol::TCPListen
3192 | Protocol::UDPListen
3193 | Protocol::Channel
3194 | Protocol::Metrics
3195 | Protocol::Timer
3196 )
3197 })
3198 .map(|(_, session)| {
3199 let s = session.borrow();
3200 (s.frontend_token(), s.last_event())
3201 })
3202 .collect();
3203
3204 if candidates.is_empty() {
3207 return 0;
3208 }
3209
3210 let pivot = count.min(candidates.len()) - 1;
3211 candidates.select_nth_unstable_by_key(pivot, |&(_, last_event)| last_event);
3212
3213 candidates[..=pivot]
3214 .iter()
3215 .map(|&(token, _)| token)
3216 .collect::<HashSet<Token>>()
3217 };
3218
3219 let evicted = tokens.len();
3220 self.shut_down_sessions_by_frontend_tokens(tokens);
3221 evicted
3222 }
3223}
3224
3225fn worker_response_error<S: ToString, T: ToString>(request_id: S, error: T) -> WorkerResponse {
3228 error!(
3229 "error on request {}, {}",
3230 request_id.to_string(),
3231 error.to_string()
3232 );
3233 WorkerResponse::error(request_id, error)
3234}
3235
3236pub struct ListenSession {
3237 pub protocol: Protocol,
3238}
3239
3240impl ProxySession for ListenSession {
3241 fn last_event(&self) -> Instant {
3242 Instant::now()
3243 }
3244
3245 fn print_session(&self) {}
3246
3247 fn frontend_token(&self) -> Token {
3248 Token(0)
3249 }
3250
3251 fn protocol(&self) -> Protocol {
3252 self.protocol
3253 }
3254
3255 fn ready(&mut self, _session: Rc<RefCell<dyn ProxySession>>) -> SessionIsToBeClosed {
3256 false
3257 }
3258
3259 fn shutting_down(&mut self) -> SessionIsToBeClosed {
3260 false
3261 }
3262
3263 fn update_readiness(&mut self, _token: Token, _events: Ready) {}
3264
3265 fn close(&mut self) {}
3266
3267 fn timeout(&mut self, _token: Token) -> SessionIsToBeClosed {
3268 error!(
3269 "called ProxySession::timeout(token={:?}, time) on ListenSession {{ protocol: {:?} }}",
3270 _token, self.protocol
3271 );
3272 false
3273 }
3274}
3275
3276#[cfg(test)]
3277mod accept_telemetry_tests {
3278 use super::*;
3279
3280 #[test]
3283 fn per_source_bucket_collapses_ipv4_slash24() {
3284 let a: SocketAddr = "203.0.113.5:1234".parse().unwrap();
3285 let b: SocketAddr = "203.0.113.250:9999".parse().unwrap();
3286 assert_eq!(
3287 per_source_bucket(&a),
3288 per_source_bucket(&b),
3289 "addresses in the same /24 must land in the same bucket"
3290 );
3291 }
3292
3293 #[test]
3295 fn per_source_bucket_collapses_ipv6_slash48() {
3296 let a: SocketAddr = "[2001:db8:1234::1]:443".parse().unwrap();
3297 let b: SocketAddr = "[2001:db8:1234:abcd::ffff]:8443".parse().unwrap();
3298 assert_eq!(
3299 per_source_bucket(&a),
3300 per_source_bucket(&b),
3301 "addresses in the same /48 must land in the same bucket"
3302 );
3303 }
3304
3305 #[test]
3308 fn per_source_bucket_keys_are_bounded() {
3309 assert_eq!(PER_SOURCE_BUCKET_KEYS.len(), PER_SOURCE_BUCKETS);
3310 for (i, key) in PER_SOURCE_BUCKET_KEYS.iter().enumerate() {
3311 let expected = format!("client.connect.per_source.bucket_{i:03}");
3312 assert_eq!(*key, expected.as_str());
3313 }
3314 }
3315
3316 #[test]
3319 fn per_source_bucket_distributes_distinct_subnets() {
3320 let mut hits = std::collections::HashSet::new();
3321 for i in 0..200u8 {
3322 let addr: SocketAddr = format!("10.0.{i}.42:80").parse().unwrap();
3323 hits.insert(per_source_bucket(&addr));
3324 }
3325 assert!(
3329 hits.len() >= 100,
3330 "expected at least 100 distinct buckets across 200 /24s, got {}",
3331 hits.len()
3332 );
3333 }
3334}
3335
3336#[cfg(test)]
3337mod eviction_tests {
3338 use std::collections::HashSet;
3339 use std::time::{Duration, Instant};
3340
3341 use mio::Token;
3342
3343 #[test]
3347 fn select_nth_finds_oldest_sessions() {
3348 let now = Instant::now();
3349 let mut candidates = [
3350 (Token(1), now - Duration::from_secs(10)), (Token(2), now - Duration::from_secs(50)), (Token(3), now - Duration::from_secs(5)), (Token(4), now - Duration::from_secs(30)), (Token(5), now - Duration::from_secs(20)), ];
3356
3357 let count = 2;
3358 let pivot = count.min(candidates.len()) - 1;
3359 candidates.select_nth_unstable_by_key(pivot, |&(_, last_event)| last_event);
3360
3361 let selected: HashSet<Token> = candidates[..=pivot]
3362 .iter()
3363 .map(|&(token, _)| token)
3364 .collect();
3365
3366 assert_eq!(selected.len(), 2);
3367 assert!(
3368 selected.contains(&Token(2)),
3369 "should contain 50s-old session"
3370 );
3371 assert!(
3372 selected.contains(&Token(4)),
3373 "should contain 30s-old session"
3374 );
3375 }
3376
3377 #[test]
3381 fn select_nth_with_count_exceeding_candidates() {
3382 let now = Instant::now();
3383 let mut candidates = [(Token(1), now - Duration::from_secs(10))];
3384
3385 let count = 5;
3386 let pivot = count.min(candidates.len()) - 1;
3387 candidates.select_nth_unstable_by_key(pivot, |&(_, last_event)| last_event);
3388
3389 let selected: HashSet<Token> = candidates[..=pivot]
3390 .iter()
3391 .map(|&(token, _)| token)
3392 .collect();
3393
3394 assert_eq!(selected.len(), 1);
3395 assert!(selected.contains(&Token(1)));
3396 }
3397}