1use std::{
3 cell::RefCell,
4 collections::{HashMap, HashSet, VecDeque, hash_map::Entry},
5 hash::{DefaultHasher, Hash, Hasher},
6 io::Error as IoError,
7 net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
8 os::unix::io::{AsRawFd, FromRawFd},
9 rc::Rc,
10 str::FromStr,
11 sync::LazyLock,
12 time::{Duration, Instant},
13};
14
15use mio::{
16 Events, Interest, Poll, Token,
17 net::{TcpListener as MioTcpListener, TcpStream},
18};
19use slab::Slab;
20use sozu_command::{
21 channel::Channel,
22 config::MetricDetailLevel,
23 logging,
24 proto::command::{
25 ActivateListener, AddBackend, CertificatesWithFingerprints, Cluster, ClusterHashes,
26 ClusterInformations, DeactivateListener, Event, EventKind, HttpListenerConfig,
27 HttpsListenerConfig, InitialState, ListenerType, LoadBalancingAlgorithms, LoadMetric,
28 MetricDetail, MetricsConfiguration, RemoveBackend, Request, ResponseContent,
29 ResponseStatus, ServerConfig, TcpListenerConfig as CommandTcpListener,
30 UpdateHttpListenerConfig, UpdateHttpsListenerConfig, UpdateTcpListenerConfig,
31 WorkerRequest, WorkerResponse, request::RequestType, response_content::ContentType,
32 },
33 ready::Ready,
34 scm_socket::{Listeners, ScmSocket, ScmSocketError},
35 state::ConfigState,
36};
37
38use crate::metrics::names;
39use crate::{
40 AcceptError, Protocol, ProxyConfiguration, ProxySession, SessionIsToBeClosed,
41 backends::{Backend, BackendMap},
42 features::FEATURES,
43 health_check::HealthChecker,
44 http, https,
45 metrics::METRICS,
46 pool::Pool,
47 tcp,
48 timer::Timer,
49};
50
51pub const CONN_RETRIES: u8 = 3;
53
54pub const PER_SOURCE_BUCKETS: usize = 256;
68
69static PER_SOURCE_BUCKET_KEYS: LazyLock<[&'static str; PER_SOURCE_BUCKETS]> = LazyLock::new(|| {
74 let mut keys: [&'static str; PER_SOURCE_BUCKETS] = [""; PER_SOURCE_BUCKETS];
75 for (i, slot) in keys.iter_mut().enumerate() {
76 let owned = format!("client.connect.per_source.bucket_{i:03}");
78 *slot = Box::leak(owned.into_boxed_str());
79 }
80 keys
81});
82
83fn per_source_bucket(peer: &SocketAddr) -> &'static str {
88 let mut hasher = DefaultHasher::new();
89 match peer.ip() {
90 IpAddr::V4(v4) => {
91 let octets = v4.octets();
92 let masked = Ipv4Addr::new(octets[0], octets[1], octets[2], 0);
94 masked.hash(&mut hasher);
95 }
96 IpAddr::V6(v6) => {
97 let octets = v6.octets();
98 let mut masked_octets = [0u8; 16];
100 masked_octets[..6].copy_from_slice(&octets[..6]);
101 Ipv6Addr::from(masked_octets).hash(&mut hasher);
102 }
103 }
104 let idx = (hasher.finish() as usize) % PER_SOURCE_BUCKETS;
105 PER_SOURCE_BUCKET_KEYS[idx]
106}
107
108const ACCEPT_SATURATION_TICK: Duration = Duration::from_secs(1);
113
114pub type ProxyChannel = Channel<WorkerResponse, WorkerRequest>;
115
116thread_local! {
117 pub static QUEUE: RefCell<VecDeque<WorkerResponse>> = const { RefCell::new(VecDeque::new()) };
118}
119
120thread_local! {
121 pub static TIMER: RefCell<Timer<Token>> = RefCell::new(Timer::default());
122}
123
124pub fn push_queue(message: WorkerResponse) {
125 QUEUE.with(|queue| {
126 (*queue.borrow_mut()).push_back(message);
127 });
128}
129
130pub fn push_event(event: Event) {
131 QUEUE.with(|queue| {
132 (*queue.borrow_mut()).push_back(WorkerResponse {
133 id: "EVENT".to_string(),
134 message: String::new(),
135 status: ResponseStatus::Processing.into(),
136 content: Some(ContentType::Event(event).into()),
137 });
138 });
139}
140
141fn worker_metric_detail_status_content(
147 configured: MetricDetailLevel,
148 effective: MetricDetailLevel,
149 previous_effective: MetricDetailLevel,
150 active_lease_count: u32,
151) -> ResponseContent {
152 use sozu_command::proto::command::WorkerMetricDetailStatus;
153 ContentType::WorkerMetricDetailStatus(WorkerMetricDetailStatus {
154 configured: MetricDetail::from(configured) as i32,
155 effective: MetricDetail::from(effective) as i32,
156 previous_effective: MetricDetail::from(previous_effective) as i32,
157 active_lease_count,
158 })
159 .into()
160}
161
162fn push_metric_detail_transition(
169 previous: MetricDetailLevel,
170 effective: MetricDetailLevel,
171 transition_kind: &'static str,
172 client_id: Option<String>,
173) {
174 use sozu_command::proto::command::MetricDetailTransition;
175 if previous == effective {
180 return;
181 }
182 push_event(Event {
183 kind: EventKind::MetricDetailChanged as i32,
184 cluster_id: None,
185 backend_id: None,
186 address: None,
187 metric_detail: Some(MetricDetailTransition {
188 previous_effective: MetricDetail::from(previous) as i32,
189 effective: MetricDetail::from(effective) as i32,
190 transition_kind: transition_kind.to_owned(),
191 client_id,
192 }),
193 });
194}
195
196#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
197pub struct ListenToken(pub usize);
198#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
199pub struct SessionToken(pub usize);
200
201impl From<usize> for ListenToken {
202 fn from(val: usize) -> ListenToken {
203 ListenToken(val)
204 }
205}
206
207impl From<ListenToken> for usize {
208 fn from(val: ListenToken) -> usize {
209 val.0
210 }
211}
212
213impl From<usize> for SessionToken {
214 fn from(val: usize) -> SessionToken {
215 SessionToken(val)
216 }
217}
218
219impl From<SessionToken> for usize {
220 fn from(val: SessionToken) -> usize {
221 val.0
222 }
223}
224
225pub struct SessionManager {
226 pub max_connections: usize,
227 pub nb_connections: usize,
228 pub can_accept: bool,
229 pub slab: Slab<Rc<RefCell<dyn ProxySession>>>,
230 pub max_connections_per_ip: u64,
234 pub retry_after: u32,
238 connections_per_cluster_ip: HashMap<String, HashMap<IpAddr, usize>>,
262 cluster_ip_tracks: HashMap<Token, HashMap<String, HashSet<IpAddr>>>,
269}
270
271impl SessionManager {
272 pub fn new(
273 slab: Slab<Rc<RefCell<dyn ProxySession>>>,
274 max_connections: usize,
275 max_connections_per_ip: u64,
276 retry_after: u32,
277 ) -> Rc<RefCell<Self>> {
278 Rc::new(RefCell::new(SessionManager {
279 max_connections,
280 nb_connections: 0,
281 can_accept: true,
282 slab,
283 max_connections_per_ip,
284 retry_after,
285 connections_per_cluster_ip: HashMap::new(),
286 cluster_ip_tracks: HashMap::new(),
287 }))
288 }
289
290 pub fn effective_max_connections_per_ip(&self, override_value: Option<u64>) -> u64 {
295 override_value.unwrap_or(self.max_connections_per_ip)
296 }
297
298 pub fn effective_retry_after(&self, override_value: Option<u32>) -> u32 {
302 override_value.unwrap_or(self.retry_after)
303 }
304
305 pub fn cluster_ip_at_limit(
317 &self,
318 token: Token,
319 cluster_id: &str,
320 ip: &IpAddr,
321 override_value: Option<u64>,
322 ) -> bool {
323 let limit = self.effective_max_connections_per_ip(override_value);
324 if limit == 0 {
325 return false;
326 }
327 if self
328 .cluster_ip_tracks
329 .get(&token)
330 .and_then(|by_cluster| by_cluster.get(cluster_id))
331 .is_some_and(|ips| ips.contains(ip))
332 {
333 return false;
334 }
335 self.connections_per_cluster_ip
336 .get(cluster_id)
337 .and_then(|by_ip| by_ip.get(ip))
338 .is_some_and(|c| (*c as u64) >= limit)
339 }
340
341 pub fn track_cluster_ip(&mut self, token: Token, cluster_id: String, ip: IpAddr) {
351 let inserted = self
352 .cluster_ip_tracks
353 .entry(token)
354 .or_default()
355 .entry(cluster_id.clone())
356 .or_default()
357 .insert(ip);
358 if inserted {
359 *self
360 .connections_per_cluster_ip
361 .entry(cluster_id)
362 .or_default()
363 .entry(ip)
364 .or_insert(0) += 1;
365 }
366 }
367
368 pub fn untrack_all_cluster_ip(&mut self, token: Token) {
375 let Some(by_cluster) = self.cluster_ip_tracks.remove(&token) else {
376 return;
377 };
378 for (cluster_id, ips) in by_cluster {
379 let Entry::Occupied(mut outer) = self.connections_per_cluster_ip.entry(cluster_id)
380 else {
381 continue;
382 };
383 for ip in ips {
384 if let Entry::Occupied(mut inner) = outer.get_mut().entry(ip) {
385 let count = inner.get_mut();
386 *count = count.saturating_sub(1);
387 if *count == 0 {
388 inner.remove();
389 }
390 }
391 }
392 if outer.get().is_empty() {
393 outer.remove();
394 }
395 }
396 }
397
398 pub fn clear_cluster_ip_tracking(&mut self) {
403 self.cluster_ip_tracks.clear();
404 self.connections_per_cluster_ip.clear();
405 }
406
407 pub fn at_capacity(&self) -> bool {
409 self.slab.len() >= self.accept_slab_threshold()
410 }
411
412 pub fn accept_slab_threshold(&self) -> usize {
424 10 + 2 * self.max_connections
425 }
426
427 pub fn check_limits(&mut self) -> bool {
430 if self.nb_connections >= self.max_connections {
431 error!("max number of session connection reached, flushing the accept queue");
432 gauge!(names::accept_queue::BACKPRESSURE, 1);
433 self.can_accept = false;
434 return false;
435 }
436
437 if self.at_capacity() {
438 error!("not enough memory to accept another session, flushing the accept queue");
439 error!(
440 "nb_connections: {}, max_connections: {}",
441 self.nb_connections, self.max_connections
442 );
443 gauge!(names::accept_queue::BACKPRESSURE, 1);
444 self.can_accept = false;
445
446 return false;
447 }
448
449 true
450 }
451
452 pub fn to_session(token: Token) -> SessionToken {
453 SessionToken(token.0)
454 }
455
456 pub fn incr(&mut self) {
457 self.nb_connections += 1;
458 assert!(self.nb_connections <= self.max_connections);
459 gauge!(names::client::CONNECTIONS, self.nb_connections);
465 }
466
467 pub fn decr(&mut self) {
470 assert!(self.nb_connections != 0);
471 self.nb_connections -= 1;
472 gauge!(names::client::CONNECTIONS, self.nb_connections);
473
474 if !self.can_accept && self.nb_connections < self.max_connections * 90 / 100 {
476 debug!(
477 "nb_connections = {}, max_connections = {}, starting to accept again",
478 self.nb_connections, self.max_connections
479 );
480 gauge!(names::accept_queue::BACKPRESSURE, 0);
481 self.can_accept = true;
482 }
483 }
484}
485
486#[derive(thiserror::Error, Debug)]
487pub enum ServerError {
488 #[error("could not create event loop with MIO poll: {0}")]
489 CreatePoll(IoError),
490 #[error("could not clone the MIO registry: {0}")]
491 CloneRegistry(IoError),
492 #[error("could not register the channel: {0}")]
493 RegisterChannel(IoError),
494 #[error("{msg}:{scm_err}")]
495 ScmSocket {
496 msg: String,
497 scm_err: ScmSocketError,
498 },
499}
500
501pub struct Server {
518 accept_queue_timeout: Duration,
519 accept_queue: VecDeque<(
526 TcpStream,
527 ListenToken,
528 Protocol,
529 Instant,
530 Option<SocketAddr>,
531 )>,
532 evict_on_queue_full: bool,
536 accept_ready: HashSet<ListenToken>,
537 backends: Rc<RefCell<BackendMap>>,
538 base_sessions_count: usize,
539 channel: ProxyChannel,
540 config_state: ConfigState,
541 current_poll_errors: i32,
542 health_checker: HealthChecker,
543 http: Rc<RefCell<http::HttpProxy>>,
544 https: Rc<RefCell<https::HttpsProxy>>,
545 last_sessions_len: usize,
546 last_shutting_down_message: Option<Instant>,
547 last_zombie_check: Instant,
548 loop_start: Instant,
549 started_at: Instant,
553 last_saturation_tick: Instant,
558 max_poll_errors: i32, pool: Rc<RefCell<Pool>>,
565 pub poll: Poll,
566 poll_timeout: Option<Duration>, scm_listeners: Option<Listeners>,
568 scm: ScmSocket,
569 sessions: Rc<RefCell<SessionManager>>,
570 should_poll_at: Option<Instant>,
571 shutting_down: Option<String>,
572 tcp: Rc<RefCell<tcp::TcpProxy>>,
573 zombie_check_interval: Duration,
574}
575
576impl Server {
577 pub fn try_new_from_config(
578 worker_to_main_channel: ProxyChannel,
579 worker_to_main_scm: ScmSocket,
580 config: ServerConfig,
581 initial_state: InitialState,
582 expects_initial_status: bool,
583 ) -> Result<Self, ServerError> {
584 let event_loop = Poll::new().map_err(ServerError::CreatePoll)?;
585 if let Some(cap) = config.basic_auth_max_credential_bytes {
592 crate::protocol::mux::auth::set_max_decoded_credential_bytes(cap as usize);
593 }
594 #[cfg(all(target_os = "linux", feature = "splice"))]
601 if let Some(cap) = config.splice_pipe_capacity_bytes {
602 crate::splice::set_pipe_capacity(cap as usize);
603 }
604 let pool = Rc::new(RefCell::new(Pool::with_capacity(
605 config.min_buffers as usize,
606 config.max_buffers as usize,
607 config.buffer_size as usize,
608 )));
609 let backends = Rc::new(RefCell::new(BackendMap::new()));
610
611 let sessions: Rc<RefCell<SessionManager>> = SessionManager::new(
616 Slab::with_capacity(config.slab_capacity() as usize),
617 config.max_connections as usize,
618 config
619 .max_connections_per_ip
620 .unwrap_or(sozu_command::config::DEFAULT_MAX_CONNECTIONS_PER_IP),
621 config
622 .retry_after
623 .unwrap_or(sozu_command::config::DEFAULT_RETRY_AFTER),
624 );
625 {
626 let mut s = sessions.borrow_mut();
627 let entry = s.slab.vacant_entry();
628 trace!("taking token {:?} for channel", SessionToken(entry.key()));
629 entry.insert(Rc::new(RefCell::new(ListenSession {
630 protocol: Protocol::Channel,
631 })));
632 }
633 {
634 let mut s = sessions.borrow_mut();
635 let entry = s.slab.vacant_entry();
636 trace!("taking token {:?} for metrics", SessionToken(entry.key()));
637 entry.insert(Rc::new(RefCell::new(ListenSession {
638 protocol: Protocol::Timer,
639 })));
640 }
641 {
642 let mut s = sessions.borrow_mut();
643 let entry = s.slab.vacant_entry();
644 trace!("taking token {:?} for metrics", SessionToken(entry.key()));
645 entry.insert(Rc::new(RefCell::new(ListenSession {
646 protocol: Protocol::Metrics,
647 })));
648 }
649
650 Server::new(
651 event_loop,
652 worker_to_main_channel,
653 worker_to_main_scm,
654 sessions,
655 pool,
656 backends,
657 None,
658 None,
659 None,
660 config,
661 Some(initial_state),
662 expects_initial_status,
663 )
664 }
665
666 #[allow(clippy::too_many_arguments)]
667 pub fn new(
668 poll: Poll,
669 mut channel: ProxyChannel,
670 scm: ScmSocket,
671 sessions: Rc<RefCell<SessionManager>>,
672 pool: Rc<RefCell<Pool>>,
673 backends: Rc<RefCell<BackendMap>>,
674 http: Option<http::HttpProxy>,
675 https: Option<https::HttpsProxy>,
676 tcp: Option<tcp::TcpProxy>,
677 server_config: ServerConfig,
678 initial_state: Option<InitialState>,
679 expects_initial_status: bool,
680 ) -> Result<Self, ServerError> {
681 FEATURES.with(|_features| {
682 });
684
685 poll.registry()
686 .register(
687 &mut channel,
688 Token(0),
689 Interest::READABLE | Interest::WRITABLE,
690 )
691 .map_err(ServerError::RegisterChannel)?;
692
693 METRICS.with(|metrics| {
694 if let Some(sock) = (*metrics.borrow_mut()).socket_mut() {
695 poll.registry()
696 .register(sock, Token(2), Interest::WRITABLE)
697 .expect("should register the metrics socket");
698 }
699 });
700
701 let base_sessions_count = sessions.borrow().slab.len();
702
703 let http = Rc::new(RefCell::new(match http {
704 Some(http) => http,
705 None => {
706 let registry = poll
707 .registry()
708 .try_clone()
709 .map_err(ServerError::CloneRegistry)?;
710
711 http::HttpProxy::new(registry, sessions.clone(), pool.clone(), backends.clone())
712 }
713 }));
714
715 let https = Rc::new(RefCell::new(match https {
716 Some(https) => https,
717 None => {
718 let registry = poll
719 .registry()
720 .try_clone()
721 .map_err(ServerError::CloneRegistry)?;
722
723 https::HttpsProxy::new(registry, sessions.clone(), pool.clone(), backends.clone())
724 }
725 }));
726
727 let tcp = Rc::new(RefCell::new(match tcp {
728 Some(tcp) => tcp,
729 None => {
730 let registry = poll
731 .registry()
732 .try_clone()
733 .map_err(ServerError::CloneRegistry)?;
734
735 tcp::TcpProxy::new(registry, sessions.clone(), pool.clone(), backends.clone())
736 }
737 }));
738
739 let mut server = Server {
740 accept_queue_timeout: Duration::from_secs(u64::from(
741 server_config.accept_queue_timeout,
742 )),
743 accept_queue: VecDeque::new(),
744 evict_on_queue_full: server_config.evict_on_queue_full.unwrap_or(false),
745 accept_ready: HashSet::new(),
746 backends,
747 base_sessions_count,
748 channel,
749 config_state: ConfigState::new(),
750 current_poll_errors: 0,
751 health_checker: HealthChecker::new(),
752 http,
753 https,
754 last_sessions_len: 0, last_shutting_down_message: None,
756 last_zombie_check: Instant::now(), loop_start: Instant::now(), started_at: Instant::now(), last_saturation_tick: Instant::now(), max_poll_errors: 10000, pool,
762 poll_timeout: Some(Duration::from_millis(1000)), poll,
764 scm_listeners: None,
765 scm,
766 sessions,
767 should_poll_at: None,
768 shutting_down: None,
769 tcp,
770 zombie_check_interval: Duration::from_secs(u64::from(
771 server_config.zombie_check_interval,
772 )),
773 };
774
775 if let Some(state) = initial_state {
777 for request in state.requests {
778 trace!("generating initial config request: {:#?}", request);
779 server.notify_proxys(request);
780 }
781
782 QUEUE.with(|queue| {
784 (*queue.borrow_mut()).clear();
785 });
786 }
787
788 if expects_initial_status {
789 server.block_channel();
792 let msg = server.channel.read_message();
793 debug!("got message: {:?}", msg);
794
795 if let Ok(WorkerRequest {
796 id,
797 content:
798 Request {
799 request_type: Some(RequestType::Status(_)),
800 },
801 }) = msg
802 {
803 if let Err(e) = server.channel.write_message(&WorkerResponse::ok(id)) {
804 error!("Could not send an ok to the main process: {}", e);
805 }
806 } else {
807 panic!(
808 "plz give me a status request first when I start, you sent me this instead: {msg:?}"
809 );
810 }
811 server.unblock_channel();
812 }
813
814 info!("will try to receive listeners");
815 server
816 .scm
817 .set_blocking(true)
818 .map_err(|scm_err| ServerError::ScmSocket {
819 msg: "Could not set the scm socket to blocking".to_string(),
820 scm_err,
821 })?;
822 let listeners =
823 server
824 .scm
825 .receive_listeners()
826 .map_err(|scm_err| ServerError::ScmSocket {
827 msg: "could not receive listeners from the scm socket".to_string(),
828 scm_err,
829 })?;
830 server
831 .scm
832 .set_blocking(false)
833 .map_err(|scm_err| ServerError::ScmSocket {
834 msg: "Could not set the scm socket to unblocking".to_string(),
835 scm_err,
836 })?;
837 info!("received listeners: {:?}", listeners);
838 server.scm_listeners = Some(listeners);
839
840 Ok(server)
841 }
842
843 pub fn run(&mut self) {
845 let mut events = Events::with_capacity(1024); self.last_sessions_len = self.sessions.borrow().slab.len();
847
848 self.last_zombie_check = Instant::now();
849 self.loop_start = Instant::now();
850
851 loop {
852 self.check_for_poll_errors();
853
854 let timeout = self.reset_loop_time_and_get_timeout();
855
856 match self.poll.poll(&mut events, timeout) {
857 Ok(_) => self.current_poll_errors = 0,
858 Err(error) => {
859 error!("Error while polling events: {:?}", error);
860 self.current_poll_errors += 1;
861 continue;
862 }
863 }
864
865 let after_epoll = Instant::now();
866 time!(
867 names::event_loop::EPOLL_TIME,
868 (after_epoll - self.loop_start).as_millis()
869 );
870 self.loop_start = after_epoll;
871
872 self.send_queue();
873
874 for event in events.iter() {
875 match event.token() {
876 Token(0) => {
878 if event.is_error() {
879 error!("error reading from command channel");
880 continue;
881 }
882 if event.is_read_closed() || event.is_write_closed() {
883 error!("command channel was closed");
884 return;
885 }
886 let ready = Ready::from(event);
887 self.channel.handle_events(ready);
888
889 loop {
891 QUEUE.with(|queue| {
892 if !(*queue.borrow()).is_empty() {
893 self.channel.interest.insert(Ready::WRITABLE);
894 }
895 });
896
897 if self.channel.readiness() == Ready::EMPTY {
900 break;
901 }
902
903 if self.read_channel_messages_and_notify() {
905 return;
906 }
907
908 QUEUE.with(|queue| {
909 if !(*queue.borrow()).is_empty() {
910 self.channel.interest.insert(Ready::WRITABLE);
911 }
912 });
913
914 self.send_queue();
915 }
916 }
917 Token(1) => {
919 while let Some(t) = TIMER.with(|timer| timer.borrow_mut().poll()) {
920 self.timeout(t);
921 }
922 }
923 Token(2) => METRICS.with(|metrics| {
925 (*metrics.borrow_mut()).writable();
926 }),
927 token if self.health_checker.owns_token(token) => {
930 self.health_checker.ready(token);
931 }
932 token => self.ready(token, Ready::from(event)),
933 }
934 }
935
936 if let Some(t) = self.should_poll_at.as_ref() {
937 if *t <= Instant::now() {
938 while let Some(t) = TIMER.with(|timer| timer.borrow_mut().poll()) {
939 self.timeout(t);
941 }
942 }
943 }
944 self.handle_remaining_readiness();
945 self.create_sessions();
946
947 self.should_poll_at = TIMER.with(|timer| timer.borrow().next_poll_date());
948
949 self.zombie_check();
950 self.health_checker
951 .poll(&self.backends, self.poll.registry());
952
953 {
966 let sessions = self.sessions.borrow();
967 let nb_connections = sessions.nb_connections;
968 let max_connections = sessions.max_connections;
969 let slab_len = sessions.slab.len();
970 let slab_capacity = sessions.slab.capacity();
971 let accept_threshold = sessions.accept_slab_threshold();
972
973 gauge!(names::client::CONNECTIONS, nb_connections);
974 gauge!(names::client::CONNECTIONS_MAX, max_connections);
975 if max_connections > 0 {
976 gauge!(
977 "client.connections_percent",
978 nb_connections * 100 / max_connections
979 );
980 }
981
982 gauge!(names::slab::ENTRIES, slab_len);
983 gauge!(names::slab::CAPACITY, slab_capacity);
984 if slab_capacity > 0 {
985 gauge!(names::slab::USAGE_PERCENT, slab_len * 100 / slab_capacity);
986 }
987 if accept_threshold > 0 {
988 gauge!(
989 "slab.accept_threshold_percent",
990 slab_len * 100 / accept_threshold
991 );
992 }
993 }
994 {
1000 let pool = self.pool.borrow();
1001 let used = pool.inner.used();
1002 let capacity = pool.inner.capacity();
1003 gauge!(names::buffer::IN_USE, used);
1004 gauge!(names::buffer::CAPACITY, capacity);
1005 if capacity > 0 {
1006 gauge!(names::buffer::USAGE_PERCENT, used * 100 / capacity);
1007 }
1008 }
1009 let now = Instant::now();
1017 if now.duration_since(self.last_saturation_tick) >= ACCEPT_SATURATION_TICK {
1018 if !self.sessions.borrow().can_accept {
1019 incr!(names::accept_queue::SATURATED_SECONDS);
1020 }
1021 self.last_saturation_tick = now;
1022 }
1023 gauge!(
1027 "process.uptime_seconds",
1028 self.started_at.elapsed().as_secs() as usize
1029 );
1030 gauge!(
1035 "server.live",
1036 if self.shutting_down.is_some() { 0 } else { 1 }
1037 );
1038 METRICS.with(|metrics| {
1039 (*metrics.borrow_mut()).send_data();
1040 });
1041
1042 if self.shutting_down.is_some() && self.shut_down_sessions() {
1043 return;
1044 }
1045 }
1046 }
1047
1048 fn check_for_poll_errors(&mut self) {
1049 if self.current_poll_errors >= self.max_poll_errors {
1050 error!(
1051 "Something is going very wrong. Last {} poll() calls failed, crashing..",
1052 self.current_poll_errors
1053 );
1054 panic!(
1055 "poll() calls failed {} times in a row",
1056 self.current_poll_errors
1057 );
1058 }
1059 }
1060
1061 fn reset_loop_time_and_get_timeout(&mut self) -> Option<Duration> {
1062 let now = Instant::now();
1063 time!(
1064 names::event_loop::EVENT_LOOP_TIME,
1065 (now - self.loop_start).as_millis()
1066 );
1067
1068 let mut timeout = match self.should_poll_at.as_ref() {
1069 None => self.poll_timeout,
1070 Some(i) => {
1071 if *i <= now {
1072 self.poll_timeout
1073 } else {
1074 let dur = *i - now;
1075 match self.poll_timeout {
1076 None => Some(dur),
1077 Some(t) => {
1078 if t < dur {
1079 Some(t)
1080 } else {
1081 Some(dur)
1082 }
1083 }
1084 }
1085 }
1086 }
1087 };
1088
1089 if self.shutting_down.is_some() {
1090 let shutdown_tick = Duration::from_millis(100);
1091 timeout = match timeout {
1092 None => Some(shutdown_tick),
1093 Some(current) => Some(current.min(shutdown_tick)),
1094 };
1095 }
1096
1097 self.loop_start = now;
1098 timeout
1099 }
1100
1101 fn read_channel_messages_and_notify(&mut self) -> bool {
1103 if !self.channel.readiness().is_readable() {
1104 return false;
1105 }
1106
1107 if let Err(e) = self.channel.readable() {
1108 error!("error reading from channel: {:?}", e);
1109 }
1110
1111 loop {
1112 let request = self.channel.read_message();
1113 debug!("Received request {:?}", request);
1114 match request {
1115 Ok(request) => match request.content.request_type {
1116 Some(RequestType::HardStop(_)) => {
1117 let req_id = request.id.clone();
1118 self.notify(request);
1119 if let Err(e) = self.channel.write_message(&WorkerResponse::ok(req_id)) {
1120 error!("Could not send ok response to the main process: {}", e);
1121 }
1122 if let Err(e) = self.channel.run() {
1123 error!("Error while running the server channel: {}", e);
1124 }
1125 return true;
1126 }
1127 Some(RequestType::SoftStop(_)) => {
1128 self.shutting_down = Some(request.id.clone());
1129 self.last_sessions_len = self.sessions.borrow().slab.len();
1130 self.notify(request);
1131 }
1132 Some(RequestType::ReturnListenSockets(_)) => {
1133 info!("received ReturnListenSockets order");
1134 match self.return_listen_sockets() {
1135 Ok(_) => push_queue(WorkerResponse::ok(request.id)),
1136 Err(error) => push_queue(worker_response_error(
1137 request.id,
1138 format!("Could not send listeners on scm socket: {error:?}"),
1139 )),
1140 }
1141 }
1142 _ => self.notify(request),
1143 },
1144 Err(_) => {
1146 if (self.channel.interest & self.channel.readiness).is_readable() {
1148 if let Err(e) = self.channel.readable() {
1149 error!("error reading from channel: {:?}", e);
1150 }
1151 continue;
1152 }
1153 break;
1154 }
1155 }
1156 }
1157 false
1158 }
1159
1160 fn zombie_check(&mut self) {
1162 let now = Instant::now();
1163 if now - self.last_zombie_check < self.zombie_check_interval {
1164 return;
1165 }
1166 info!("zombie check");
1167 self.last_zombie_check = now;
1168
1169 let mut zombie_tokens = HashSet::new();
1170
1171 for (_index, session) in self
1173 .sessions
1174 .borrow_mut()
1175 .slab
1176 .iter_mut()
1177 .filter(|(_, c)| now - c.borrow().last_event() > self.zombie_check_interval)
1178 {
1179 let session_token = session.borrow().frontend_token();
1180 if !zombie_tokens.contains(&session_token) {
1181 session.borrow().print_session();
1182 zombie_tokens.insert(session_token);
1183 }
1184 }
1185
1186 let zombie_count = zombie_tokens.len() as i64;
1187 count!(names::misc::ZOMBIES, zombie_count);
1188
1189 let remaining_count = self.shut_down_sessions_by_frontend_tokens(zombie_tokens);
1190 info!(
1191 "removing {} zombies ({} remaining entries after close)",
1192 zombie_count, remaining_count
1193 );
1194 }
1195
1196 fn shut_down_sessions_by_frontend_tokens(&self, tokens: HashSet<Token>) -> usize {
1199 if tokens.is_empty() {
1200 return 0;
1201 }
1202
1203 for token in &tokens {
1205 if self.sessions.borrow().slab.contains(token.0) {
1206 let session = { self.sessions.borrow_mut().slab.remove(token.0) };
1207 session.borrow_mut().close();
1208 self.sessions.borrow_mut().decr();
1209 }
1210 }
1211
1212 let mut dangling_entries = HashSet::new();
1214 for (entry_key, session) in &self.sessions.borrow().slab {
1215 if tokens.contains(&session.borrow().frontend_token()) {
1216 dangling_entries.insert(entry_key);
1217 }
1218 }
1219
1220 let mut dangling_entries_count = 0;
1222 for entry_key in dangling_entries {
1223 let mut sessions = self.sessions.borrow_mut();
1224 if sessions.slab.contains(entry_key) {
1225 sessions.slab.remove(entry_key);
1226 dangling_entries_count += 1;
1227 }
1228 }
1229 dangling_entries_count
1230 }
1231
1232 fn shut_down_sessions(&mut self) -> bool {
1234 let sessions_count = self.sessions.borrow().slab.len();
1235 let mut sessions_to_shut_down = HashSet::new();
1236
1237 for (_key, session) in &self.sessions.borrow().slab {
1238 let mut session = session.borrow_mut();
1239 if session.shutting_down() {
1240 debug!(
1241 "Server killing session from shutting_down: token={:?}, protocol={:?}",
1242 session.frontend_token(),
1243 session.protocol()
1244 );
1245 sessions_to_shut_down.insert(Token(session.frontend_token().0));
1246 }
1247 }
1248 let _ = self.shut_down_sessions_by_frontend_tokens(sessions_to_shut_down);
1249
1250 let new_sessions_count = self.sessions.borrow().slab.len();
1251
1252 if new_sessions_count < sessions_count {
1253 let now = Instant::now();
1254 if let Some(last) = self.last_shutting_down_message {
1255 if (now - last) > Duration::from_secs(5) {
1256 info!(
1257 "closed {} sessions, {} sessions left, base_sessions_count = {}",
1258 sessions_count - new_sessions_count,
1259 new_sessions_count,
1260 self.base_sessions_count
1261 );
1262 }
1263 }
1264 self.last_shutting_down_message = Some(now);
1265 }
1266
1267 if new_sessions_count <= self.base_sessions_count {
1268 info!("last session stopped, shutting down!");
1269 if let Err(e) = self.channel.run() {
1270 error!("Error while running the server channel: {}", e);
1271 }
1272 let id = self
1274 .shutting_down
1275 .take()
1276 .expect("should have shut down correctly"); debug!("Responding OK to main process for request {}", id);
1279
1280 let proxy_response = WorkerResponse::ok(id);
1281 if let Err(e) = self.channel.write_message(&proxy_response) {
1282 error!("Could not write response to the main process: {}", e);
1283 }
1284 if let Err(e) = self.channel.run() {
1285 error!("Error while running the server channel: {}", e);
1286 }
1287 return true;
1288 }
1289
1290 if new_sessions_count < self.last_sessions_len {
1291 info!(
1292 "shutting down, {} slab elements remaining (base: {})",
1293 new_sessions_count - self.base_sessions_count,
1294 self.base_sessions_count
1295 );
1296 self.last_sessions_len = new_sessions_count;
1297 }
1298
1299 false
1300 }
1301
1302 fn kill_session(&self, session: Rc<RefCell<dyn ProxySession>>) {
1303 let token = session.borrow().frontend_token();
1304 let _ = self.shut_down_sessions_by_frontend_tokens(HashSet::from([token]));
1305 }
1306
1307 fn send_queue(&mut self) {
1308 if self.channel.readiness.is_writable() {
1309 QUEUE.with(|q| {
1310 let mut queue = q.borrow_mut();
1311 loop {
1312 if let Some(resp) = queue.pop_front() {
1313 debug!("Sending response {:?}", resp);
1314 if let Err(e) = self.channel.write_message(&resp) {
1315 error!("Could not write message {} on the channel: {}", resp, e);
1316 queue.push_front(resp);
1317 }
1318 }
1319
1320 if self.channel.back_buf.available_data() > 0 {
1321 if let Err(e) = self.channel.writable() {
1322 error!("error writing to channel: {:?}", e);
1323 }
1324 }
1325
1326 if !self.channel.readiness.is_writable() {
1327 break;
1328 }
1329
1330 if self.channel.back_buf.available_data() == 0 && queue.is_empty() {
1331 break;
1332 }
1333 }
1334 });
1335 }
1336 }
1337
1338 fn notify(&mut self, message: WorkerRequest) {
1339 let now = std::time::Instant::now();
1346 let lease_tick_transition = METRICS.with(|metrics| {
1352 let mut m = metrics.borrow_mut();
1353 if !m.lease_tick_due(now) {
1354 return None;
1355 }
1356 let previous = m.lease_tick(now)?;
1357 let effective = m.detail_effective();
1358 Some((previous, effective))
1359 });
1360 if let Some((previous, effective)) = lease_tick_transition {
1361 push_metric_detail_transition(previous, effective, "lease_tick_expired", None);
1368 }
1369 match &message.content.request_type {
1370 Some(RequestType::ConfigureMetrics(configuration)) => {
1371 match MetricsConfiguration::try_from(*configuration) {
1372 Ok(metrics_config) => {
1373 METRICS.with(|metrics| {
1374 (*metrics.borrow_mut()).configure(&metrics_config);
1375 push_queue(WorkerResponse::ok(message.id));
1376 });
1377 }
1378 Err(e) => {
1379 error!("Error configuring metrics: {}", e);
1380 push_queue(WorkerResponse::error(message.id, e));
1381 }
1382 }
1383 return;
1384 }
1385 Some(RequestType::QueryMetrics(query_metrics_options)) => {
1386 METRICS.with(|metrics| {
1387 match (*metrics.borrow_mut()).query(query_metrics_options) {
1388 Ok(c) => push_queue(WorkerResponse::ok_with_content(message.id, c)),
1389 Err(e) => {
1390 error!("Error querying metrics: {}", e);
1391 push_queue(WorkerResponse::error(message.id, e))
1392 }
1393 }
1394 });
1395 return;
1396 }
1397 Some(RequestType::SetMetricDetail(req)) => {
1404 let presented_binding = crate::metrics::PeerBinding {
1411 pid: req.peer_pid,
1412 session_ulid: req.peer_session_ulid.as_deref().and_then(|s| {
1418 rusty_ulid::Ulid::from_str(s)
1419 .map(u128::from)
1420 .ok()
1421 .or_else(|| u128::from_str_radix(s.trim_start_matches("0x"), 16).ok())
1422 }),
1423 };
1424 if req.clear.unwrap_or(false) {
1425 if req.client_id.len() > crate::metrics::LEASE_CLIENT_ID_MAX_BYTES {
1436 let msg = format!(
1437 "SetMetricDetail: clear client_id length {} exceeds {} bytes",
1438 req.client_id.len(),
1439 crate::metrics::LEASE_CLIENT_ID_MAX_BYTES,
1440 );
1441 error!("{}", msg);
1442 push_queue(WorkerResponse::error(message.id.clone(), msg));
1443 return;
1444 }
1445 let (outcome, effective_after, configured_after, lease_count_after) = METRICS
1453 .with(|metrics| {
1454 let mut m = metrics.borrow_mut();
1455 let outcome = m.lease_clear(&req.client_id, presented_binding);
1456 (
1457 outcome,
1458 m.detail_effective(),
1459 m.detail_configured(),
1460 m.lease_count(),
1461 )
1462 });
1463 match outcome {
1464 crate::metrics::LeaseClearOutcome::Cleared { previous_effective } => {
1465 push_metric_detail_transition(
1466 previous_effective,
1467 effective_after,
1468 "lease_clear",
1469 Some(req.client_id.clone()),
1470 );
1471 push_queue(WorkerResponse::ok_with_content(
1472 message.id.clone(),
1473 worker_metric_detail_status_content(
1474 configured_after,
1475 effective_after,
1476 previous_effective,
1477 lease_count_after,
1478 ),
1479 ));
1480 }
1481 crate::metrics::LeaseClearOutcome::NotFound => {
1482 push_queue(WorkerResponse::ok_with_content(
1486 message.id.clone(),
1487 worker_metric_detail_status_content(
1488 configured_after,
1489 effective_after,
1490 effective_after,
1491 lease_count_after,
1492 ),
1493 ));
1494 }
1495 crate::metrics::LeaseClearOutcome::Unauthorized => {
1496 let msg = "SetMetricDetail: clear refused (peer \
1507 binding does not match the apply-time owner)"
1508 .to_owned();
1509 error!("{}", msg);
1510 push_queue(WorkerResponse::error(message.id.clone(), msg));
1511 }
1512 }
1513 return;
1514 }
1515 let detail_proto = match req.detail {
1516 Some(d) => d,
1517 None => {
1518 let msg = "SetMetricDetail without `detail` and without `clear`".to_owned();
1525 error!("{}", msg);
1526 push_queue(WorkerResponse::error(message.id.clone(), msg));
1527 return;
1528 }
1529 };
1530 let detail_enum = match MetricDetail::try_from(detail_proto) {
1531 Ok(d) => d,
1532 Err(e) => {
1533 let msg =
1534 format!("SetMetricDetail: invalid MetricDetail variant {detail_proto}");
1535 error!("{}: {}", msg, e);
1536 push_queue(WorkerResponse::error(message.id.clone(), msg));
1537 return;
1538 }
1539 };
1540 let level = MetricDetailLevel::from(detail_enum);
1541 if let Some(t) = req.ttl_seconds
1550 && u64::from(t) > crate::metrics::LEASE_TTL_MAX.as_secs()
1551 {
1552 let msg = format!(
1553 "SetMetricDetail: ttl_seconds={t} exceeds LEASE_TTL_MAX={}",
1554 crate::metrics::LEASE_TTL_MAX.as_secs()
1555 );
1556 error!("{}", msg);
1557 push_queue(WorkerResponse::error(message.id.clone(), msg));
1558 return;
1559 }
1560 let ttl_seconds = req.ttl_seconds.filter(|&t| t > 0).unwrap_or_else(|| {
1561 u32::try_from(crate::metrics::LEASE_TTL_DEFAULT.as_secs()).unwrap_or(60)
1568 });
1569 let ttl = std::time::Duration::from_secs(ttl_seconds.into());
1570 let (outcome, configured_after, lease_count_after) = METRICS.with(|metrics| {
1571 let mut m = metrics.borrow_mut();
1572 let outcome =
1573 m.lease_apply(req.client_id.clone(), level, ttl, presented_binding);
1574 (outcome, m.detail_configured(), m.lease_count())
1575 });
1576 match outcome {
1577 crate::metrics::LeaseApplyOutcome::Applied {
1578 previous_effective,
1579 new_effective,
1580 } => {
1581 push_metric_detail_transition(
1582 previous_effective,
1583 new_effective,
1584 "lease_apply",
1585 Some(req.client_id.clone()),
1586 );
1587 push_queue(WorkerResponse::ok_with_content(
1588 message.id.clone(),
1589 worker_metric_detail_status_content(
1590 configured_after,
1591 new_effective,
1592 previous_effective,
1593 lease_count_after,
1594 ),
1595 ));
1596 }
1597 crate::metrics::LeaseApplyOutcome::ClientIdTooLong => {
1598 let msg = format!(
1599 "SetMetricDetail: client_id length {} exceeds {} bytes",
1600 req.client_id.len(),
1601 crate::metrics::LEASE_CLIENT_ID_MAX_BYTES,
1602 );
1603 error!("{}", msg);
1604 push_queue(WorkerResponse::error(message.id.clone(), msg));
1605 }
1606 crate::metrics::LeaseApplyOutcome::TableFull => {
1607 let msg = format!(
1614 "SetMetricDetail: lease table at capacity ({} entries); reject new \
1615 apply — operators must retry after an active lease expires or is \
1616 cleared",
1617 crate::metrics::LEASE_TABLE_CAP,
1618 );
1619 error!("{}", msg);
1620 push_queue(WorkerResponse::error(message.id.clone(), msg));
1621 }
1622 crate::metrics::LeaseApplyOutcome::TtlOutOfRange => {
1623 let msg = format!(
1629 "SetMetricDetail: ttl exceeds LEASE_TTL_MAX={} (internal contract \
1630 violation: dispatch gate should have rejected)",
1631 crate::metrics::LEASE_TTL_MAX.as_secs(),
1632 );
1633 error!("{}", msg);
1634 push_queue(WorkerResponse::error(message.id.clone(), msg));
1635 }
1636 crate::metrics::LeaseApplyOutcome::Unauthorized => {
1637 let msg = "SetMetricDetail: renewal refused (peer binding does not \
1646 match the apply-time owner)"
1647 .to_owned();
1648 error!("{}", msg);
1649 push_queue(WorkerResponse::error(message.id.clone(), msg));
1650 }
1651 }
1652 return;
1653 }
1654 Some(RequestType::Logging(logging_filter)) => {
1655 info!(
1656 "{} changing logging filter to {}",
1657 message.id, logging_filter
1658 );
1659 let (directives, _errors) = logging::parse_logging_spec(logging_filter);
1661 logging::LOGGER.with(|logger| {
1662 logger.borrow_mut().set_directives(directives);
1663 });
1664 push_queue(WorkerResponse::ok(message.id));
1665 return;
1666 }
1667 Some(RequestType::QueryClustersHashes(_)) => {
1668 push_queue(WorkerResponse::ok_with_content(
1669 message.id.clone(),
1670 ContentType::ClusterHashes(ClusterHashes {
1671 map: self.config_state.hash_state(),
1672 })
1673 .into(),
1674 ));
1675 return;
1676 }
1677 Some(RequestType::QueryClusterById(cluster_id)) => {
1678 push_queue(WorkerResponse::ok_with_content(
1679 message.id.clone(),
1680 ContentType::Clusters(ClusterInformations {
1681 vec: self
1682 .config_state
1683 .cluster_state(cluster_id)
1684 .map_or(vec![], |ci| vec![ci]),
1685 })
1686 .into(),
1687 ));
1688 }
1689 Some(RequestType::SetMaxConnectionsPerIp(limit)) => {
1690 let mut sessions = self.sessions.borrow_mut();
1691 let previous = sessions.max_connections_per_ip;
1692 sessions.max_connections_per_ip = *limit;
1693 if *limit == 0 {
1698 sessions.clear_cluster_ip_tracking();
1699 }
1700 info!(
1701 "{} updated global max_connections_per_ip from {} to {}",
1702 message.id, previous, limit
1703 );
1704 push_queue(WorkerResponse::ok(message.id));
1705 return;
1706 }
1707 Some(RequestType::QueryMaxConnectionsPerIp(_)) => {
1708 let limit = self.sessions.borrow().max_connections_per_ip;
1709 push_queue(WorkerResponse::ok_with_content(
1710 message.id,
1711 ContentType::MaxConnectionsPerIpLimit(
1712 sozu_command::proto::command::MaxConnectionsPerIpLimit { limit },
1713 )
1714 .into(),
1715 ));
1716 return;
1717 }
1718 Some(RequestType::QueryClustersByDomain(domain)) => {
1719 let cluster_ids = self
1720 .config_state
1721 .get_cluster_ids_by_domain(domain.hostname.clone(), domain.path.clone());
1722 let vec = cluster_ids
1723 .iter()
1724 .filter_map(|cluster_id| self.config_state.cluster_state(cluster_id))
1725 .collect();
1726
1727 push_queue(WorkerResponse::ok_with_content(
1728 message.id.clone(),
1729 ContentType::Clusters(ClusterInformations { vec }).into(),
1730 ));
1731 return;
1732 }
1733 Some(RequestType::QueryCertificatesFromWorkers(filters)) => {
1734 if filters.fingerprint.is_some() {
1735 let certs = self.config_state.get_certificates(filters.clone());
1736 let response = if !certs.is_empty() {
1737 WorkerResponse::ok_with_content(
1738 message.id.clone(),
1739 ContentType::CertificatesWithFingerprints(
1740 CertificatesWithFingerprints { certs },
1741 )
1742 .into(),
1743 )
1744 } else {
1745 worker_response_error(
1746 message.id.clone(),
1747 "Could not find certificate for this fingerprint",
1748 )
1749 };
1750 push_queue(response);
1751 return;
1752 }
1753 }
1756 _other_request => {}
1757 }
1758 self.notify_proxys(message);
1759 }
1760
1761 pub fn notify_proxys(&mut self, request: WorkerRequest) {
1762 if let Err(e) = self.config_state.dispatch(&request.content) {
1763 error!("Could not execute order on config state: {}", e);
1764 }
1765
1766 let req_id = request.id.clone();
1767
1768 match request.content.request_type {
1769 Some(RequestType::AddCluster(ref cluster)) => {
1770 if let Some(hc) = cluster.health_check.as_ref() {
1778 if let Err(reason) = sozu_command::config::validate_health_check_config(hc) {
1779 push_queue(worker_response_error(req_id, reason));
1780 return;
1781 }
1782 }
1783 self.add_cluster(cluster);
1784 METRICS.with(|metrics| {
1789 (*metrics.borrow_mut()).add_cluster(&cluster.cluster_id);
1790 });
1791 }
1793 Some(RequestType::RemoveCluster(ref cluster_id)) => {
1794 self.remove_health_check_state(cluster_id);
1795 METRICS.with(|metrics| {
1796 (*metrics.borrow_mut()).remove_cluster(cluster_id);
1797 });
1798 }
1800 Some(RequestType::SetHealthCheck(ref set)) => {
1801 if let Err(reason) = sozu_command::config::validate_health_check_config(&set.config)
1802 {
1803 push_queue(worker_response_error(req_id, reason));
1804 return;
1805 }
1806 self.backends
1807 .borrow_mut()
1808 .set_health_check_config(&set.cluster_id, Some(set.config.to_owned()));
1809 push_queue(WorkerResponse::ok(req_id));
1810 return;
1811 }
1812 Some(RequestType::RemoveHealthCheck(ref cluster_id)) => {
1813 self.remove_health_check_state(cluster_id);
1814 push_queue(WorkerResponse::ok(req_id));
1815 return;
1816 }
1817 Some(RequestType::AddBackend(ref backend)) => {
1818 push_queue(self.add_backend(&req_id, backend));
1819 return;
1820 }
1821 Some(RequestType::RemoveBackend(ref remove_backend)) => {
1822 push_queue(self.remove_backend(&req_id, remove_backend));
1823 return;
1824 }
1825 _ => {}
1826 };
1827
1828 let proxy_destinations = request.content.get_destinations();
1829 let mut notify_response = None;
1830 if proxy_destinations.to_http_proxy {
1831 notify_response = Some(self.http.borrow_mut().notify(request.clone()));
1832 }
1833 if proxy_destinations.to_https_proxy {
1834 let http_proxy_response = self.https.borrow_mut().notify(request.clone());
1835 if http_proxy_response.is_failure() || notify_response.is_none() {
1836 notify_response = Some(http_proxy_response);
1837 }
1838 }
1839 if proxy_destinations.to_tcp_proxy {
1840 let tcp_proxy_response = self.tcp.borrow_mut().notify(request.clone());
1841 if tcp_proxy_response.is_failure() || notify_response.is_none() {
1842 notify_response = Some(tcp_proxy_response);
1843 }
1844 }
1845 if let Some(response) = notify_response {
1846 push_queue(response);
1847 }
1848
1849 match request.content.request_type {
1850 Some(RequestType::AddHttpListener(listener)) => {
1852 push_queue(self.notify_add_http_listener(&req_id, listener));
1853 }
1854 Some(RequestType::AddHttpsListener(listener)) => {
1855 push_queue(self.notify_add_https_listener(&req_id, listener));
1856 }
1857 Some(RequestType::AddTcpListener(listener)) => {
1858 push_queue(self.notify_add_tcp_listener(&req_id, listener));
1859 }
1860 Some(RequestType::UpdateHttpListener(patch)) => {
1861 push_queue(self.notify_update_http_listener(&req_id, patch));
1862 }
1863 Some(RequestType::UpdateHttpsListener(patch)) => {
1864 push_queue(self.notify_update_https_listener(&req_id, patch));
1865 }
1866 Some(RequestType::UpdateTcpListener(patch)) => {
1867 push_queue(self.notify_update_tcp_listener(&req_id, patch));
1868 }
1869 Some(RequestType::RemoveListener(ref remove)) => {
1870 debug!("{} remove {:?} listener {:?}", req_id, remove.proxy, remove);
1871 self.base_sessions_count -= 1;
1872 let response = match ListenerType::try_from(remove.proxy) {
1873 Ok(ListenerType::Http) => self.http.borrow_mut().notify(request),
1874 Ok(ListenerType::Https) => self.https.borrow_mut().notify(request),
1875 Ok(ListenerType::Tcp) => self.tcp.borrow_mut().notify(request),
1876 Err(_) => WorkerResponse::error(req_id, "Wrong variant ListenerType"),
1877 };
1878 push_queue(response);
1879 }
1880 Some(RequestType::ActivateListener(ref activate)) => {
1881 push_queue(self.notify_activate_listener(&req_id, activate));
1882 }
1883 Some(RequestType::DeactivateListener(ref deactivate)) => {
1884 push_queue(self.notify_deactivate_listener(&req_id, deactivate));
1885 }
1886 _other_request => {}
1887 };
1888 }
1889
1890 fn add_cluster(&mut self, cluster: &Cluster) {
1891 let mut backends = self.backends.borrow_mut();
1892 backends.set_load_balancing_policy_for_cluster(
1893 &cluster.cluster_id,
1894 LoadBalancingAlgorithms::try_from(cluster.load_balancing).unwrap_or_default(),
1895 cluster
1896 .load_metric
1897 .and_then(|n| LoadMetric::try_from(n).ok()),
1898 );
1899 backends.set_health_check_config(&cluster.cluster_id, cluster.health_check.to_owned());
1900 backends.set_cluster_http2(&cluster.cluster_id, cluster.http2.unwrap_or(false));
1901 }
1902
1903 fn add_backend(&mut self, req_id: &str, add_backend: &AddBackend) -> WorkerResponse {
1904 let new_backend = Backend::new(
1905 &add_backend.backend_id,
1906 add_backend.address.into(),
1907 add_backend.sticky_id.clone(),
1908 add_backend.load_balancing_parameters,
1909 add_backend.backup,
1910 );
1911 self.backends
1912 .borrow_mut()
1913 .add_backend(&add_backend.cluster_id, new_backend);
1914
1915 WorkerResponse::ok(req_id)
1916 }
1917
1918 fn remove_health_check_state(&mut self, cluster_id: &str) {
1919 self.health_checker.remove_cluster(cluster_id);
1920 self.backends
1921 .borrow_mut()
1922 .health_check_configs
1923 .remove(cluster_id);
1924 }
1925
1926 fn remove_backend(&mut self, req_id: &str, backend: &RemoveBackend) -> WorkerResponse {
1927 let address = backend.address.into();
1928 let removed_ids = self
1936 .backends
1937 .borrow_mut()
1938 .remove_backend(&backend.cluster_id, &address);
1939 if removed_ids.is_empty() {
1940 METRICS.with(|metrics| {
1945 (*metrics.borrow_mut()).remove_backend(&backend.cluster_id, &backend.backend_id);
1946 });
1947 } else {
1948 METRICS.with(|metrics| {
1949 let mut metrics = metrics.borrow_mut();
1950 for id in &removed_ids {
1951 metrics.remove_backend(&backend.cluster_id, id);
1952 }
1953 });
1954 }
1955
1956 WorkerResponse::ok(req_id)
1957 }
1958
1959 fn notify_add_http_listener(
1960 &mut self,
1961 req_id: &str,
1962 listener: HttpListenerConfig,
1963 ) -> WorkerResponse {
1964 debug!("{} add http listener {:?}", req_id, listener);
1965
1966 if self.sessions.borrow().at_capacity() {
1967 return worker_response_error(req_id, "session list is full, cannot add a listener");
1968 }
1969
1970 let mut session_manager = self.sessions.borrow_mut();
1971 let entry = session_manager.slab.vacant_entry();
1972 let token = Token(entry.key());
1973
1974 match self.http.borrow_mut().add_listener(listener, token) {
1975 Ok(_token) => {
1976 entry.insert(Rc::new(RefCell::new(ListenSession {
1977 protocol: Protocol::HTTPListen,
1978 })));
1979 self.base_sessions_count += 1;
1980 WorkerResponse::ok(req_id)
1981 }
1982 Err(e) => worker_response_error(req_id, format!("Could not add HTTP listener: {e}")),
1983 }
1984 }
1985
1986 fn notify_add_https_listener(
1987 &mut self,
1988 req_id: &str,
1989 listener: HttpsListenerConfig,
1990 ) -> WorkerResponse {
1991 debug!("{} add https listener {:?}", req_id, listener);
1992
1993 if self.sessions.borrow().at_capacity() {
1994 return worker_response_error(req_id, "session list is full, cannot add a listener");
1995 }
1996
1997 let mut session_manager = self.sessions.borrow_mut();
1998 let entry = session_manager.slab.vacant_entry();
1999 let token = Token(entry.key());
2000
2001 match self
2002 .https
2003 .borrow_mut()
2004 .add_listener(listener.clone(), token)
2005 {
2006 Ok(_token) => {
2007 entry.insert(Rc::new(RefCell::new(ListenSession {
2008 protocol: Protocol::HTTPSListen,
2009 })));
2010 self.base_sessions_count += 1;
2011 WorkerResponse::ok(req_id)
2012 }
2013 Err(e) => worker_response_error(req_id, format!("Could not add HTTPS listener: {e}")),
2014 }
2015 }
2016
2017 fn notify_add_tcp_listener(
2018 &mut self,
2019 req_id: &str,
2020 listener: CommandTcpListener,
2021 ) -> WorkerResponse {
2022 debug!("{} add tcp listener {:?}", req_id, listener);
2023
2024 if self.sessions.borrow().at_capacity() {
2025 return worker_response_error(req_id, "session list is full, cannot add a listener");
2026 }
2027
2028 let mut session_manager = self.sessions.borrow_mut();
2029 let entry = session_manager.slab.vacant_entry();
2030 let token = Token(entry.key());
2031
2032 match self.tcp.borrow_mut().add_listener(listener, token) {
2033 Ok(_token) => {
2034 entry.insert(Rc::new(RefCell::new(ListenSession {
2035 protocol: Protocol::TCPListen,
2036 })));
2037 self.base_sessions_count += 1;
2038 WorkerResponse::ok(req_id)
2039 }
2040 Err(e) => worker_response_error(req_id, format!("Could not add TCP listener: {e}")),
2041 }
2042 }
2043
2044 fn notify_update_http_listener(
2045 &mut self,
2046 req_id: &str,
2047 patch: UpdateHttpListenerConfig,
2048 ) -> WorkerResponse {
2049 debug!("{} update http listener {:?}", req_id, patch.address);
2050 match self.http.borrow_mut().update_listener(patch) {
2051 Ok(()) => WorkerResponse::ok(req_id),
2052 Err(e) => worker_response_error(req_id, format!("Could not update HTTP listener: {e}")),
2053 }
2054 }
2055
2056 fn notify_update_https_listener(
2057 &mut self,
2058 req_id: &str,
2059 patch: UpdateHttpsListenerConfig,
2060 ) -> WorkerResponse {
2061 debug!("{} update https listener {:?}", req_id, patch.address);
2062 match self.https.borrow_mut().update_listener(patch) {
2063 Ok(()) => WorkerResponse::ok(req_id),
2064 Err(e) => {
2065 worker_response_error(req_id, format!("Could not update HTTPS listener: {e}"))
2066 }
2067 }
2068 }
2069
2070 fn notify_update_tcp_listener(
2071 &mut self,
2072 req_id: &str,
2073 patch: UpdateTcpListenerConfig,
2074 ) -> WorkerResponse {
2075 debug!("{} update tcp listener {:?}", req_id, patch.address);
2076 match self.tcp.borrow_mut().update_listener(patch) {
2077 Ok(()) => WorkerResponse::ok(req_id),
2078 Err(e) => worker_response_error(req_id, format!("Could not update TCP listener: {e}")),
2079 }
2080 }
2081
2082 fn notify_activate_listener(
2083 &mut self,
2084 req_id: &str,
2085 activate: &ActivateListener,
2086 ) -> WorkerResponse {
2087 debug!(
2088 "{} activate {:?} listener {:?}",
2089 req_id, activate.proxy, activate
2090 );
2091
2092 let address: std::net::SocketAddr = activate.address.into();
2093
2094 match ListenerType::try_from(activate.proxy) {
2095 Ok(ListenerType::Http) => {
2096 let listener = self
2097 .scm_listeners
2098 .as_mut()
2099 .and_then(|s| s.get_http(&address))
2100 .map(|fd| unsafe { MioTcpListener::from_raw_fd(fd) });
2105
2106 let activated_token = self.http.borrow_mut().activate_listener(&address, listener);
2107 match activated_token {
2108 Ok(token) => {
2109 self.accept(ListenToken(token.0), Protocol::HTTPListen);
2110 WorkerResponse::ok(req_id)
2111 }
2112 Err(activate_error) => worker_response_error(
2113 req_id,
2114 format!("Could not activate HTTP listener: {activate_error}"),
2115 ),
2116 }
2117 }
2118 Ok(ListenerType::Https) => {
2119 let listener = self
2120 .scm_listeners
2121 .as_mut()
2122 .and_then(|s| s.get_https(&address))
2123 .map(|fd| unsafe { MioTcpListener::from_raw_fd(fd) });
2128
2129 let activated_token = self
2130 .https
2131 .borrow_mut()
2132 .activate_listener(&address, listener);
2133 match activated_token {
2134 Ok(token) => {
2135 self.accept(ListenToken(token.0), Protocol::HTTPSListen);
2136 WorkerResponse::ok(req_id)
2137 }
2138 Err(activate_error) => worker_response_error(
2139 req_id,
2140 format!("Could not activate HTTPS listener: {activate_error}"),
2141 ),
2142 }
2143 }
2144 Ok(ListenerType::Tcp) => {
2145 let listener = self
2146 .scm_listeners
2147 .as_mut()
2148 .and_then(|s| s.get_tcp(&address))
2149 .map(|fd| unsafe { MioTcpListener::from_raw_fd(fd) });
2154
2155 let listener_token = self.tcp.borrow_mut().activate_listener(&address, listener);
2156 match listener_token {
2157 Ok(token) => {
2158 self.accept(ListenToken(token.0), Protocol::TCPListen);
2159 WorkerResponse::ok(req_id)
2160 }
2161 Err(activate_error) => worker_response_error(
2162 req_id,
2163 format!("Could not activate TCP listener: {activate_error}"),
2164 ),
2165 }
2166 }
2167 Err(_) => worker_response_error(req_id, "Wrong variant for ListenerType on request"),
2168 }
2169 }
2170
2171 fn notify_deactivate_listener(
2172 &mut self,
2173 req_id: &str,
2174 deactivate: &DeactivateListener,
2175 ) -> WorkerResponse {
2176 debug!(
2177 "{} deactivate {:?} listener {:?}",
2178 req_id, deactivate.proxy, deactivate
2179 );
2180
2181 let address: std::net::SocketAddr = deactivate.address.into();
2182
2183 match ListenerType::try_from(deactivate.proxy) {
2184 Ok(ListenerType::Http) => {
2185 let (token, mut listener) = match self.http.borrow_mut().give_back_listener(address)
2186 {
2187 Ok((token, listener)) => (token, listener),
2188 Err(e) => {
2189 return worker_response_error(
2190 req_id,
2191 format!(
2192 "Couldn't deactivate HTTP listener at address {address:?}: {e}"
2193 ),
2194 );
2195 }
2196 };
2197
2198 if let Err(e) = self.poll.registry().deregister(&mut listener) {
2199 error!(
2200 "error deregistering HTTP listen socket({:?}): {:?}",
2201 deactivate, e
2202 );
2203 }
2204
2205 {
2206 let mut sessions = self.sessions.borrow_mut();
2207 if sessions.slab.contains(token.0) {
2208 sessions.slab.remove(token.0);
2209 info!("removed listen token {:?}", token);
2210 }
2211 }
2212
2213 if deactivate.to_scm {
2214 self.unblock_scm_socket();
2215 let listeners = Listeners {
2216 http: vec![(address, listener.as_raw_fd())],
2217 tls: vec![],
2218 tcp: vec![],
2219 };
2220 info!("sending HTTP listener: {:?}", listeners);
2221 let res = self.scm.send_listeners(&listeners);
2222
2223 self.block_scm_socket();
2224
2225 info!("sent HTTP listener: {:?}", res);
2226 }
2227 WorkerResponse::ok(req_id)
2228 }
2229 Ok(ListenerType::Https) => {
2230 let (token, mut listener) = match self
2231 .https
2232 .borrow_mut()
2233 .give_back_listener(address)
2234 {
2235 Ok((token, listener)) => (token, listener),
2236 Err(e) => {
2237 return worker_response_error(
2238 req_id,
2239 format!(
2240 "Couldn't deactivate HTTPS listener at address {address:?}: {e}",
2241 ),
2242 );
2243 }
2244 };
2245 if let Err(e) = self.poll.registry().deregister(&mut listener) {
2246 error!(
2247 "error deregistering HTTPS listen socket({:?}): {:?}",
2248 deactivate, e
2249 );
2250 }
2251 if self.sessions.borrow().slab.contains(token.0) {
2252 self.sessions.borrow_mut().slab.remove(token.0);
2253 info!("removed listen token {:?}", token);
2254 }
2255
2256 if deactivate.to_scm {
2257 self.unblock_scm_socket();
2258 let listeners = Listeners {
2259 http: vec![],
2260 tls: vec![(address, listener.as_raw_fd())],
2261 tcp: vec![],
2262 };
2263 info!("sending HTTPS listener: {:?}", listeners);
2264 let res = self.scm.send_listeners(&listeners);
2265
2266 self.block_scm_socket();
2267
2268 info!("sent HTTPS listener: {:?}", res);
2269 }
2270 WorkerResponse::ok(req_id)
2271 }
2272 Ok(ListenerType::Tcp) => {
2273 let (token, mut listener) = match self.tcp.borrow_mut().give_back_listener(address)
2274 {
2275 Ok((token, listener)) => (token, listener),
2276 Err(e) => {
2277 return worker_response_error(
2278 req_id,
2279 format!(
2280 "Could not deactivate TCP listener at address {address:?}: {e}"
2281 ),
2282 );
2283 }
2284 };
2285
2286 if let Err(e) = self.poll.registry().deregister(&mut listener) {
2287 error!(
2288 "error deregistering TCP listen socket({:?}): {:?}",
2289 deactivate, e
2290 );
2291 }
2292 if self.sessions.borrow().slab.contains(token.0) {
2293 self.sessions.borrow_mut().slab.remove(token.0);
2294 info!("removed listen token {:?}", token);
2295 }
2296
2297 if deactivate.to_scm {
2298 self.unblock_scm_socket();
2299 let listeners = Listeners {
2300 http: vec![],
2301 tls: vec![],
2302 tcp: vec![(address, listener.as_raw_fd())],
2303 };
2304 info!("sending TCP listener: {:?}", listeners);
2305 let res = self.scm.send_listeners(&listeners);
2306
2307 self.block_scm_socket();
2308
2309 info!("sent TCP listener: {:?}", res);
2310 }
2311 WorkerResponse::ok(req_id)
2312 }
2313 Err(_) => worker_response_error(req_id, "Wrong variant for ListenerType on request"),
2314 }
2315 }
2316
2317 pub fn return_listen_sockets(&mut self) -> Result<(), ScmSocketError> {
2319 self.unblock_scm_socket();
2320
2321 let mut http_listeners = self.http.borrow_mut().give_back_listeners();
2322 for &mut (_, ref mut sock) in http_listeners.iter_mut() {
2323 if let Err(e) = self.poll.registry().deregister(sock) {
2324 error!(
2325 "error deregistering HTTP listen socket({:?}): {:?}",
2326 sock, e
2327 );
2328 }
2329 }
2330
2331 let mut https_listeners = self.https.borrow_mut().give_back_listeners();
2332 for &mut (_, ref mut sock) in https_listeners.iter_mut() {
2333 if let Err(e) = self.poll.registry().deregister(sock) {
2334 error!(
2335 "error deregistering HTTPS listen socket({:?}): {:?}",
2336 sock, e
2337 );
2338 }
2339 }
2340
2341 let mut tcp_listeners = self.tcp.borrow_mut().give_back_listeners();
2342 for &mut (_, ref mut sock) in tcp_listeners.iter_mut() {
2343 if let Err(e) = self.poll.registry().deregister(sock) {
2344 error!("error deregistering TCP listen socket({:?}): {:?}", sock, e);
2345 }
2346 }
2347
2348 let listeners = Listeners {
2350 http: http_listeners
2351 .iter()
2352 .map(|(addr, listener)| (*addr, listener.as_raw_fd()))
2353 .collect(),
2354 tls: https_listeners
2355 .iter()
2356 .map(|(addr, listener)| (*addr, listener.as_raw_fd()))
2357 .collect(),
2358 tcp: tcp_listeners
2359 .iter()
2360 .map(|(addr, listener)| (*addr, listener.as_raw_fd()))
2361 .collect(),
2362 };
2363 info!("sending default listeners: {:?}", listeners);
2364 let res = self.scm.send_listeners(&listeners);
2365
2366 self.block_scm_socket();
2367
2368 info!("sent default listeners: {:?}", res);
2369 res
2370 }
2371
2372 fn block_scm_socket(&mut self) {
2373 if let Err(e) = self.scm.set_blocking(true) {
2374 error!("Could not block scm socket: {}", e);
2375 }
2376 }
2377
2378 fn unblock_scm_socket(&mut self) {
2379 if let Err(e) = self.scm.set_blocking(false) {
2380 error!("Could not unblock scm socket: {}", e);
2381 }
2382 }
2383
2384 pub fn to_session(&self, token: Token) -> SessionToken {
2385 SessionToken(token.0)
2386 }
2387
2388 pub fn from_session(&self, token: SessionToken) -> Token {
2389 Token(token.0)
2390 }
2391
2392 pub fn accept(&mut self, token: ListenToken, protocol: Protocol) {
2393 let (proto_key, accepted_protocol) = match protocol {
2406 Protocol::TCPListen => ("listener.accepted.tcp", Protocol::TCPListen),
2407 Protocol::HTTPListen => ("listener.accepted.http", Protocol::HTTPListen),
2408 Protocol::HTTPSListen => ("listener.accepted.https", Protocol::HTTPSListen),
2409 other => {
2410 warn!(
2411 "accept() called with non-listen protocol {:?} on token {:?}; skipping",
2412 other, token
2413 );
2414 return;
2415 }
2416 };
2417
2418 loop {
2419 let result = match accepted_protocol {
2420 Protocol::TCPListen => self.tcp.borrow_mut().accept(token),
2421 Protocol::HTTPListen => self.http.borrow_mut().accept(token),
2422 Protocol::HTTPSListen => self.https.borrow_mut().accept(token),
2423 other => unreachable!(
2427 "accept dispatch reached non-listen protocol {:?} after outer guard",
2428 other
2429 ),
2430 };
2431 match result {
2432 Ok(sock) => {
2433 let peer = sock.peer_addr().ok();
2438 incr!(names::listener::ACCEPTED_TOTAL);
2439 incr!(proto_key);
2440 if let Some(peer_addr) = peer.as_ref() {
2441 incr!(per_source_bucket(peer_addr));
2442 }
2443 self.accept_queue.push_back((
2444 sock,
2445 token,
2446 accepted_protocol,
2447 Instant::now(),
2448 peer,
2449 ));
2450 }
2451 Err(AcceptError::WouldBlock) => {
2452 self.accept_ready.remove(&token);
2453 break;
2454 }
2455 Err(other) => {
2456 error!(
2457 "error accepting {:?} sockets: {:?}",
2458 accepted_protocol, other
2459 );
2460 self.accept_ready.remove(&token);
2461 break;
2462 }
2463 }
2464 }
2465
2466 gauge!(names::accept_queue::CONNECTIONS, self.accept_queue.len());
2467 }
2468
2469 pub fn create_sessions(&mut self) {
2470 while let Some((sock, token, protocol, timestamp, _peer)) = self.accept_queue.pop_back() {
2471 let wait_time = Instant::now() - timestamp;
2472 time!(names::accept_queue::WAIT_TIME, wait_time.as_millis());
2473 if wait_time > self.accept_queue_timeout {
2474 incr!(names::accept_queue::TIMEOUT);
2475 continue;
2476 }
2477
2478 if !self.sessions.borrow_mut().check_limits() {
2479 incr!(names::listener::CONNECTION_CAPPED);
2485
2486 if !self.evict_on_queue_full {
2487 break;
2488 }
2489
2490 if self.shutting_down.is_some() {
2494 break;
2495 }
2496
2497 let to_evict = (self.sessions.borrow().max_connections / 100).max(1);
2506 let evicted = self.evict_least_active_sessions(to_evict);
2507 if evicted == 0 {
2508 warn!("evict_on_queue_full enabled but no candidate sessions to evict");
2514 break;
2515 }
2516
2517 count!(names::sessions::EVICTED, evicted as i64);
2518 warn!(
2519 "evicted {} least recently active sessions to make room",
2520 evicted
2521 );
2522
2523 if !self.sessions.borrow_mut().check_limits() {
2524 break;
2525 }
2526 }
2527
2528 match protocol {
2532 Protocol::TCPListen => {
2533 let proxy = self.tcp.clone();
2534 if self
2535 .tcp
2536 .borrow_mut()
2537 .create_session(sock, token, wait_time, proxy)
2538 .is_err()
2539 {
2540 break;
2541 }
2542 }
2543 Protocol::HTTPListen => {
2544 let proxy = self.http.clone();
2545 if self
2546 .http
2547 .borrow_mut()
2548 .create_session(sock, token, wait_time, proxy)
2549 .is_err()
2550 {
2551 break;
2552 }
2553 }
2554 Protocol::HTTPSListen => {
2555 if self
2556 .https
2557 .borrow_mut()
2558 .create_session(sock, token, wait_time, self.https.clone())
2559 .is_err()
2560 {
2561 break;
2562 }
2563 }
2564 _ => panic!("should not call accept() on a HTTP, HTTPS or TCP session"),
2565 };
2566 self.sessions.borrow_mut().incr();
2567 }
2568
2569 gauge!(names::accept_queue::CONNECTIONS, self.accept_queue.len());
2570 }
2571
2572 pub fn ready(&mut self, token: Token, events: Ready) {
2573 trace!("PROXY\t{:?} got events: {:?}", token, events);
2574
2575 let session_token = token.0;
2576 if self.sessions.borrow().slab.contains(session_token) {
2577 let protocol = self.sessions.borrow().slab[session_token]
2579 .borrow()
2580 .protocol();
2581 match protocol {
2583 Protocol::HTTPListen | Protocol::HTTPSListen | Protocol::TCPListen => {
2584 if events.is_readable() {
2586 self.accept_ready.insert(ListenToken(token.0));
2587 if self.sessions.borrow().can_accept {
2588 self.accept(ListenToken(token.0), protocol);
2589 }
2590 return;
2591 }
2592
2593 if events.is_writable() {
2594 error!(
2595 "received writable for listener {:?}, this should not happen",
2596 token
2597 );
2598 return;
2599 }
2600
2601 if events.is_hup() {
2602 error!("should not happen: server {:?} closed", token);
2603 return;
2604 }
2605
2606 unreachable!();
2607 }
2608 _ => {}
2609 }
2610
2611 let session = self.sessions.borrow_mut().slab[session_token].clone();
2612 session.borrow_mut().update_readiness(token, events);
2613 if session.borrow_mut().ready(session.clone()) {
2614 debug!(
2615 "Server killing session from ready: token={:?}, protocol={:?}, events={:?}",
2616 token, protocol, events
2617 );
2618 self.kill_session(session);
2619 }
2620 }
2621 }
2622
2623 pub fn timeout(&mut self, token: Token) {
2624 trace!("PROXY\t{:?} got timeout", token);
2625
2626 let session_token = token.0;
2627 if self.sessions.borrow().slab.contains(session_token) {
2628 let session = self.sessions.borrow_mut().slab[session_token].clone();
2629 if session.borrow_mut().timeout(token) {
2630 debug!(
2631 "Server killing session from timeout: token={:?}, protocol={:?}",
2632 token,
2633 session.borrow().protocol()
2634 );
2635 self.kill_session(session);
2636 }
2637 }
2638 }
2639
2640 pub fn handle_remaining_readiness(&mut self) {
2641 if self.sessions.borrow().can_accept && !self.accept_ready.is_empty() {
2644 while let Some(token) = self
2645 .accept_ready
2646 .iter()
2647 .next()
2648 .map(|token| ListenToken(token.0))
2649 {
2650 let protocol = self.sessions.borrow().slab[token.0].borrow().protocol();
2651 self.accept(token, protocol);
2652 if !self.sessions.borrow().can_accept || self.accept_ready.is_empty() {
2653 break;
2654 }
2655 }
2656 }
2657 }
2658 fn block_channel(&mut self) {
2659 if let Err(e) = self.channel.blocking() {
2660 error!("Could not block channel: {}", e);
2661 }
2662 }
2663 fn unblock_channel(&mut self) {
2664 if let Err(e) = self.channel.nonblocking() {
2665 error!("Could not block channel: {}", e);
2666 }
2667 }
2668
2669 fn evict_least_active_sessions(&self, count: usize) -> usize {
2681 if count == 0 {
2682 return 0;
2683 }
2684
2685 let tokens = {
2686 let sessions = self.sessions.borrow();
2687 let mut candidates: Vec<(Token, Instant)> = sessions
2688 .slab
2689 .iter()
2690 .filter(|(_, session)| {
2691 !matches!(
2692 session.borrow().protocol(),
2693 Protocol::HTTPListen
2694 | Protocol::HTTPSListen
2695 | Protocol::TCPListen
2696 | Protocol::Channel
2697 | Protocol::Metrics
2698 | Protocol::Timer
2699 )
2700 })
2701 .map(|(_, session)| {
2702 let s = session.borrow();
2703 (s.frontend_token(), s.last_event())
2704 })
2705 .collect();
2706
2707 if candidates.is_empty() {
2710 return 0;
2711 }
2712
2713 let pivot = count.min(candidates.len()) - 1;
2714 candidates.select_nth_unstable_by_key(pivot, |&(_, last_event)| last_event);
2715
2716 candidates[..=pivot]
2717 .iter()
2718 .map(|&(token, _)| token)
2719 .collect::<HashSet<Token>>()
2720 };
2721
2722 let evicted = tokens.len();
2723 self.shut_down_sessions_by_frontend_tokens(tokens);
2724 evicted
2725 }
2726}
2727
2728fn worker_response_error<S: ToString, T: ToString>(request_id: S, error: T) -> WorkerResponse {
2731 error!(
2732 "error on request {}, {}",
2733 request_id.to_string(),
2734 error.to_string()
2735 );
2736 WorkerResponse::error(request_id, error)
2737}
2738
2739pub struct ListenSession {
2740 pub protocol: Protocol,
2741}
2742
2743impl ProxySession for ListenSession {
2744 fn last_event(&self) -> Instant {
2745 Instant::now()
2746 }
2747
2748 fn print_session(&self) {}
2749
2750 fn frontend_token(&self) -> Token {
2751 Token(0)
2752 }
2753
2754 fn protocol(&self) -> Protocol {
2755 self.protocol
2756 }
2757
2758 fn ready(&mut self, _session: Rc<RefCell<dyn ProxySession>>) -> SessionIsToBeClosed {
2759 false
2760 }
2761
2762 fn shutting_down(&mut self) -> SessionIsToBeClosed {
2763 false
2764 }
2765
2766 fn update_readiness(&mut self, _token: Token, _events: Ready) {}
2767
2768 fn close(&mut self) {}
2769
2770 fn timeout(&mut self, _token: Token) -> SessionIsToBeClosed {
2771 error!(
2772 "called ProxySession::timeout(token={:?}, time) on ListenSession {{ protocol: {:?} }}",
2773 _token, self.protocol
2774 );
2775 false
2776 }
2777}
2778
2779#[cfg(test)]
2780mod accept_telemetry_tests {
2781 use super::*;
2782
2783 #[test]
2786 fn per_source_bucket_collapses_ipv4_slash24() {
2787 let a: SocketAddr = "203.0.113.5:1234".parse().unwrap();
2788 let b: SocketAddr = "203.0.113.250:9999".parse().unwrap();
2789 assert_eq!(
2790 per_source_bucket(&a),
2791 per_source_bucket(&b),
2792 "addresses in the same /24 must land in the same bucket"
2793 );
2794 }
2795
2796 #[test]
2798 fn per_source_bucket_collapses_ipv6_slash48() {
2799 let a: SocketAddr = "[2001:db8:1234::1]:443".parse().unwrap();
2800 let b: SocketAddr = "[2001:db8:1234:abcd::ffff]:8443".parse().unwrap();
2801 assert_eq!(
2802 per_source_bucket(&a),
2803 per_source_bucket(&b),
2804 "addresses in the same /48 must land in the same bucket"
2805 );
2806 }
2807
2808 #[test]
2811 fn per_source_bucket_keys_are_bounded() {
2812 assert_eq!(PER_SOURCE_BUCKET_KEYS.len(), PER_SOURCE_BUCKETS);
2813 for (i, key) in PER_SOURCE_BUCKET_KEYS.iter().enumerate() {
2814 let expected = format!("client.connect.per_source.bucket_{i:03}");
2815 assert_eq!(*key, expected.as_str());
2816 }
2817 }
2818
2819 #[test]
2822 fn per_source_bucket_distributes_distinct_subnets() {
2823 let mut hits = std::collections::HashSet::new();
2824 for i in 0..200u8 {
2825 let addr: SocketAddr = format!("10.0.{i}.42:80").parse().unwrap();
2826 hits.insert(per_source_bucket(&addr));
2827 }
2828 assert!(
2832 hits.len() >= 100,
2833 "expected at least 100 distinct buckets across 200 /24s, got {}",
2834 hits.len()
2835 );
2836 }
2837}
2838
2839#[cfg(test)]
2840mod eviction_tests {
2841 use std::collections::HashSet;
2842 use std::time::{Duration, Instant};
2843
2844 use mio::Token;
2845
2846 #[test]
2850 fn select_nth_finds_oldest_sessions() {
2851 let now = Instant::now();
2852 let mut candidates = [
2853 (Token(1), now - Duration::from_secs(10)), (Token(2), now - Duration::from_secs(50)), (Token(3), now - Duration::from_secs(5)), (Token(4), now - Duration::from_secs(30)), (Token(5), now - Duration::from_secs(20)), ];
2859
2860 let count = 2;
2861 let pivot = count.min(candidates.len()) - 1;
2862 candidates.select_nth_unstable_by_key(pivot, |&(_, last_event)| last_event);
2863
2864 let selected: HashSet<Token> = candidates[..=pivot]
2865 .iter()
2866 .map(|&(token, _)| token)
2867 .collect();
2868
2869 assert_eq!(selected.len(), 2);
2870 assert!(
2871 selected.contains(&Token(2)),
2872 "should contain 50s-old session"
2873 );
2874 assert!(
2875 selected.contains(&Token(4)),
2876 "should contain 30s-old session"
2877 );
2878 }
2879
2880 #[test]
2884 fn select_nth_with_count_exceeding_candidates() {
2885 let now = Instant::now();
2886 let mut candidates = [(Token(1), now - Duration::from_secs(10))];
2887
2888 let count = 5;
2889 let pivot = count.min(candidates.len()) - 1;
2890 candidates.select_nth_unstable_by_key(pivot, |&(_, last_event)| last_event);
2891
2892 let selected: HashSet<Token> = candidates[..=pivot]
2893 .iter()
2894 .map(|&(token, _)| token)
2895 .collect();
2896
2897 assert_eq!(selected.len(), 1);
2898 assert!(selected.contains(&Token(1)));
2899 }
2900}