Skip to main content

sozu_lib/
server.rs

1//! event loop management
2use std::{
3    cell::RefCell,
4    collections::{HashMap, HashSet, VecDeque, hash_map::Entry},
5    hash::{DefaultHasher, Hash, Hasher},
6    io::Error as IoError,
7    net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
8    os::unix::io::{AsRawFd, FromRawFd},
9    rc::Rc,
10    str::FromStr,
11    sync::LazyLock,
12    time::{Duration, Instant},
13};
14
15use mio::{
16    Events, Interest, Poll, Token,
17    net::{TcpListener as MioTcpListener, TcpStream, UdpSocket as MioUdpSocket},
18};
19use slab::Slab;
20use sozu_command::{
21    channel::Channel,
22    config::MetricDetailLevel,
23    logging,
24    proto::command::{
25        ActivateListener, AddBackend, CertificatesWithFingerprints, Cluster, ClusterHashes,
26        ClusterInformations, DeactivateListener, Event, EventKind, HttpListenerConfig,
27        HttpsListenerConfig, InitialState, ListenerType, LoadBalancingAlgorithms, LoadMetric,
28        MetricDetail, MetricsConfiguration, RemoveBackend, Request, ResponseContent,
29        ResponseStatus, ServerConfig, TcpListenerConfig as CommandTcpListener,
30        UdpListenerConfig as CommandUdpListener, UpdateHttpListenerConfig,
31        UpdateHttpsListenerConfig, UpdateTcpListenerConfig, UpdateUdpListenerConfig, WorkerRequest,
32        WorkerResponse, request::RequestType, response_content::ContentType,
33    },
34    ready::Ready,
35    scm_socket::{Listeners, ScmSocket, ScmSocketError},
36    state::ConfigState,
37};
38
39use crate::metrics::names;
40use crate::{
41    AcceptError, Protocol, ProxyConfiguration, ProxySession, SessionIsToBeClosed,
42    backends::{Backend, BackendMap},
43    features::FEATURES,
44    health_check::HealthChecker,
45    http, https,
46    metrics::METRICS,
47    pool::Pool,
48    tcp,
49    timer::Timer,
50    udp,
51};
52
53// Number of retries to perform on a server after a connection failure
54pub const CONN_RETRIES: u8 = 3;
55
56/// Number of bounded buckets for the per-source connect-rate counter.
57///
58/// `incr!` requires a `&'static str`, so per-IP labelling would either need
59/// runtime `Box::leak` per unique source (unbounded under SYN flood — direct
60/// OWASP A05 / NIST SP 800-92 cardinality-blow-up risk) or a fixed bucket
61/// table. We pick the bucket table: 256 static labels precomputed at startup,
62/// each masked subnet hashes into one of them.
63///
64/// Bucket-noise vs per-IP fidelity is a deliberate trade. Operators wanting
65/// per-IP attribution should pair these counters with structured access logs
66/// or a downstream rate-limiter; the metric here is for "is some /24 spamming
67/// us right now?", not "which IP exactly". 256 buckets keep the memory + UDP
68/// statsd cost flat regardless of attacker effort.
69pub const PER_SOURCE_BUCKETS: usize = 256;
70
71/// Pre-leaked `&'static str` table for per-source bucket counters.
72/// `incr!` requires `&'static str`; we leak once at first access (LazyLock)
73/// for `PER_SOURCE_BUCKETS` keys, totalling ~10 KB heap. The leak is bounded
74/// by `PER_SOURCE_BUCKETS` and never grows with traffic.
75static PER_SOURCE_BUCKET_KEYS: LazyLock<[&'static str; PER_SOURCE_BUCKETS]> = LazyLock::new(|| {
76    let mut keys: [&'static str; PER_SOURCE_BUCKETS] = [""; PER_SOURCE_BUCKETS];
77    for (i, slot) in keys.iter_mut().enumerate() {
78        // e.g. "client.connect.per_source.bucket_042"
79        let owned = format!("client.connect.per_source.bucket_{i:03}");
80        *slot = Box::leak(owned.into_boxed_str());
81    }
82    keys
83});
84
85/// Mask an IP address to its bounded prefix (/24 for IPv4, /48 for IPv6) and
86/// hash it into one of `PER_SOURCE_BUCKETS` slots. The hash is `DefaultHasher`,
87/// which is deterministic within a process but salted across runs — fine for
88/// telemetry, not suitable for cross-host correlation.
89fn per_source_bucket(peer: &SocketAddr) -> &'static str {
90    let mut hasher = DefaultHasher::new();
91    match peer.ip() {
92        IpAddr::V4(v4) => {
93            let octets = v4.octets();
94            // /24 mask: keep first three octets, zero the host portion.
95            let masked = Ipv4Addr::new(octets[0], octets[1], octets[2], 0);
96            masked.hash(&mut hasher);
97        }
98        IpAddr::V6(v6) => {
99            let octets = v6.octets();
100            // /48 mask: keep first 6 bytes, zero the rest.
101            let mut masked_octets = [0u8; 16];
102            masked_octets[..6].copy_from_slice(&octets[..6]);
103            Ipv6Addr::from(masked_octets).hash(&mut hasher);
104        }
105    }
106    let idx = (hasher.finish() as usize) % PER_SOURCE_BUCKETS;
107    PER_SOURCE_BUCKET_KEYS[idx]
108}
109
110/// Period between two `accept_queue.saturated_seconds` ticks. The counter is
111/// incremented once per period while [`SessionManager::can_accept`] is `false`,
112/// distinguishing "queue spent N seconds at max" from "queue briefly hit max"
113/// — the binary `accept_queue.backpressure` gauge collapses that duration.
114const ACCEPT_SATURATION_TICK: Duration = Duration::from_secs(1);
115
116pub type ProxyChannel = Channel<WorkerResponse, WorkerRequest>;
117
118thread_local! {
119  pub static QUEUE: RefCell<VecDeque<WorkerResponse>> = const { RefCell::new(VecDeque::new()) };
120}
121
122thread_local! {
123  pub static TIMER: RefCell<Timer<Token>> = RefCell::new(Timer::default());
124}
125
126pub fn push_queue(message: WorkerResponse) {
127    QUEUE.with(|queue| {
128        (*queue.borrow_mut()).push_back(message);
129    });
130}
131
132pub fn push_event(event: Event) {
133    QUEUE.with(|queue| {
134        (*queue.borrow_mut()).push_back(WorkerResponse {
135            id: "EVENT".to_string(),
136            message: String::new(),
137            status: ResponseStatus::Processing.into(),
138            content: Some(ContentType::Event(event).into()),
139        });
140    });
141}
142
143/// Build the `WorkerMetricDetailStatus` content payload returned in
144/// every successful `SetMetricDetail` worker response. The master
145/// collects these across the fan-out and assembles them into
146/// `MetricDetailStatus.workers[<worker_id>]` so the TUI sees each
147/// worker's actual aggregator state instead of the master's view.
148fn worker_metric_detail_status_content(
149    configured: MetricDetailLevel,
150    effective: MetricDetailLevel,
151    previous_effective: MetricDetailLevel,
152    active_lease_count: u32,
153) -> ResponseContent {
154    use sozu_command::proto::command::WorkerMetricDetailStatus;
155    ContentType::WorkerMetricDetailStatus(WorkerMetricDetailStatus {
156        configured: MetricDetail::from(configured) as i32,
157        effective: MetricDetail::from(effective) as i32,
158        previous_effective: MetricDetail::from(previous_effective) as i32,
159        active_lease_count,
160    })
161    .into()
162}
163
164/// Build a `METRIC_DETAIL_CHANGED` event carrying the worker-local
165/// transition payload (previous/effective levels + transition kind).
166/// `client_id` is `Some(_)` for explicit apply/clear, `None` for the
167/// polled janitor's bulk expiry. The master folds this Event into the
168/// audit log alongside operator-initiated transitions emitted at the
169/// dispatch site in `bin/src/command/requests.rs::worker_request`.
170fn push_metric_detail_transition(
171    previous: MetricDetailLevel,
172    effective: MetricDetailLevel,
173    transition_kind: &'static str,
174    client_id: Option<String>,
175) {
176    use sozu_command::proto::command::MetricDetailTransition;
177    // No-op when nothing actually changed. Defence-in-depth — every
178    // caller already gates on `previous != effective`, but
179    // double-checking here means future call sites can't accidentally
180    // emit a "ghost" transition.
181    if previous == effective {
182        return;
183    }
184    push_event(Event {
185        kind: EventKind::MetricDetailChanged as i32,
186        cluster_id: None,
187        backend_id: None,
188        address: None,
189        metric_detail: Some(MetricDetailTransition {
190            previous_effective: MetricDetail::from(previous) as i32,
191            effective: MetricDetail::from(effective) as i32,
192            transition_kind: transition_kind.to_owned(),
193            client_id,
194        }),
195    });
196}
197
198#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
199pub struct ListenToken(pub usize);
200#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
201pub struct SessionToken(pub usize);
202
203impl From<usize> for ListenToken {
204    fn from(val: usize) -> ListenToken {
205        ListenToken(val)
206    }
207}
208
209impl From<ListenToken> for usize {
210    fn from(val: ListenToken) -> usize {
211        val.0
212    }
213}
214
215impl From<usize> for SessionToken {
216    fn from(val: usize) -> SessionToken {
217        SessionToken(val)
218    }
219}
220
221impl From<SessionToken> for usize {
222    fn from(val: SessionToken) -> usize {
223        val.0
224    }
225}
226
227pub struct SessionManager {
228    pub max_connections: usize,
229    pub nb_connections: usize,
230    pub can_accept: bool,
231    pub slab: Slab<Rc<RefCell<dyn ProxySession>>>,
232    /// Default per-(cluster, source-IP) connection limit. `0` disables
233    /// the feature; cluster-level overrides take precedence at check
234    /// time.
235    pub max_connections_per_ip: u64,
236    /// Default `Retry-After` header value (seconds) for HTTP 429
237    /// responses emitted on per-(cluster, source-IP) limit hit. `0`
238    /// omits the header.
239    pub retry_after: u32,
240    /// Active **frontend connections** per `(cluster_id, source_ip)`.
241    /// Each frontend session contributes AT MOST 1 to the count for any
242    /// given `(cluster, ip)` pair, regardless of how many streams it
243    /// multiplexes to that cluster (an H2 connection serving 100
244    /// streams to cluster X from IP 1.2.3.4 still counts as 1). The
245    /// counter is incremented the first time a session's
246    /// `Router::connect` resolves to a fresh `(cluster, ip)` pair, and
247    /// decremented when the session closes. Empty when the feature is
248    /// unused.
249    ///
250    /// ── Why nested maps instead of `HashMap<(String, IpAddr), usize>` ──
251    ///
252    /// The per-request hot path (`cluster_ip_at_limit`, called from
253    /// `mux/router::connect` for every cluster-resolving request) used
254    /// to allocate a `String` to build the compound key on every
255    /// lookup. Splitting the storage so the outer key is `String`
256    /// lets the lookup take `&str` — `HashMap::get(cluster_id)` on a
257    /// `HashMap<String, _>` accepts a borrow via the `Borrow<str>`
258    /// impl. The hot path no longer allocates; the only `String` clone
259    /// is in `track_cluster_ip`, which runs at most once per
260    /// `(cluster, ip)` pair per session. Memory footprint is unchanged
261    /// in steady state — entries are still reaped to zero on session
262    /// close.
263    connections_per_cluster_ip: HashMap<String, HashMap<IpAddr, usize>>,
264    /// Reverse index: per-token map of `cluster_id` → set of source IPs
265    /// already counted against `connections_per_cluster_ip`. Used to
266    /// make `track_cluster_ip` idempotent within a session (so H2
267    /// streams to the same cluster from the same client only consume
268    /// one slot in the limit) and to drain a session's contributions on
269    /// close. Same nesting rationale as above.
270    cluster_ip_tracks: HashMap<Token, HashMap<String, HashSet<IpAddr>>>,
271}
272
273impl SessionManager {
274    pub fn new(
275        slab: Slab<Rc<RefCell<dyn ProxySession>>>,
276        max_connections: usize,
277        max_connections_per_ip: u64,
278        retry_after: u32,
279    ) -> Rc<RefCell<Self>> {
280        Rc::new(RefCell::new(SessionManager {
281            max_connections,
282            nb_connections: 0,
283            can_accept: true,
284            slab,
285            max_connections_per_ip,
286            retry_after,
287            connections_per_cluster_ip: HashMap::new(),
288            cluster_ip_tracks: HashMap::new(),
289        }))
290    }
291
292    /// Resolve the effective per-(cluster, source-IP) limit. `override_value`
293    /// is the cluster-level setting from the proto `Cluster` message:
294    /// `None` inherits the global default, `Some(0)` is explicit
295    /// "unlimited", `Some(n > 0)` overrides.
296    pub fn effective_max_connections_per_ip(&self, override_value: Option<u64>) -> u64 {
297        override_value.unwrap_or(self.max_connections_per_ip)
298    }
299
300    /// Resolve the effective `Retry-After` header value. `Some(0)` (or
301    /// the global default of 0) signals "omit the header" — caller
302    /// must skip emission rather than render `Retry-After: 0`.
303    pub fn effective_retry_after(&self, override_value: Option<u32>) -> u32 {
304        override_value.unwrap_or(self.retry_after)
305    }
306
307    /// Returns `true` when admitting `token` to one more connection for
308    /// `(cluster, ip)` would exceed the resolved limit. `0` is treated
309    /// as unlimited. A token that already holds a slot for this
310    /// `(cluster, ip)` is NEVER at the limit — H2 sessions multiplex
311    /// many streams to the same cluster on a single connection, and
312    /// the limit governs distinct frontend connections, not streams.
313    ///
314    /// Hot-path: called for every cluster-resolving request from
315    /// `mux/router::connect`. The nested-map storage lets both lookups
316    /// borrow `cluster_id` and `ip`; no per-call allocation runs here
317    /// in steady state.
318    pub fn cluster_ip_at_limit(
319        &self,
320        token: Token,
321        cluster_id: &str,
322        ip: &IpAddr,
323        override_value: Option<u64>,
324    ) -> bool {
325        let limit = self.effective_max_connections_per_ip(override_value);
326        if limit == 0 {
327            return false;
328        }
329        // Pure query: the limit==0 branch already returned, so any work below
330        // runs only with a positive cap.
331        debug_assert!(
332            limit > 0,
333            "limit==0 (unlimited) must have returned before reaching the bounded check"
334        );
335        let already_tracked = self
336            .cluster_ip_tracks
337            .get(&token)
338            .and_then(|by_cluster| by_cluster.get(cluster_id))
339            .is_some_and(|ips| ips.contains(ip));
340        if already_tracked {
341            // Reverse-index/forward-count coherence: if this token already
342            // holds a slot for (cluster, ip), the forward count must be > 0
343            // (decrement-to-zero reaps the inner entry in untrack_all).
344            debug_assert!(
345                self.connections_per_cluster_ip
346                    .get(cluster_id)
347                    .and_then(|by_ip| by_ip.get(ip))
348                    .is_some_and(|c| *c > 0),
349                "a tracked (token, cluster, ip) slot must have a positive forward count"
350            );
351            return false;
352        }
353        self.connections_per_cluster_ip
354            .get(cluster_id)
355            .and_then(|by_ip| by_ip.get(ip))
356            .is_some_and(|c| (*c as u64) >= limit)
357    }
358
359    /// Account `token`'s active connection against `(cluster, ip)`.
360    /// Idempotent within a token: a second call for the same
361    /// `(cluster, ip)` is a no-op so H2 retries / multi-stream opens
362    /// to the same cluster do not double-count.
363    ///
364    /// Allocates a single owned `String` per `(token, cluster)` pair on
365    /// first observation — `entry(cluster_id.clone())` materialises a
366    /// new outer-map slot. Subsequent IPs under the same `(token,
367    /// cluster)` reuse the existing slot.
368    pub fn track_cluster_ip(&mut self, token: Token, cluster_id: String, ip: IpAddr) {
369        // Snapshot the forward count for this (cluster, ip) before the insert
370        // so we can pair-assert the delta. Ungated `let`: read only inside the
371        // debug_assert! below → optimised out in release (no E0425).
372        let count_before = self
373            .connections_per_cluster_ip
374            .get(&cluster_id)
375            .and_then(|by_ip| by_ip.get(&ip))
376            .copied()
377            .unwrap_or(0);
378        let inserted = self
379            .cluster_ip_tracks
380            .entry(token)
381            .or_default()
382            .entry(cluster_id.clone())
383            .or_default()
384            .insert(ip);
385        if inserted {
386            *self
387                .connections_per_cluster_ip
388                .entry(cluster_id.clone())
389                .or_default()
390                .entry(ip)
391                .or_insert(0) += 1;
392        }
393        // Postconditions: the reverse index now records this (token, cluster,
394        // ip), and the forward count advanced by exactly `inserted as usize`
395        // (idempotent: a repeat call for the same triple is a no-op on both).
396        debug_assert!(
397            self.cluster_ip_tracks
398                .get(&token)
399                .and_then(|by_cluster| by_cluster.get(&cluster_id))
400                .is_some_and(|ips| ips.contains(&ip)),
401            "track must leave the (token, cluster, ip) recorded in the reverse index"
402        );
403        debug_assert_eq!(
404            self.connections_per_cluster_ip
405                .get(&cluster_id)
406                .and_then(|by_ip| by_ip.get(&ip))
407                .copied()
408                .unwrap_or(0),
409            count_before + inserted as usize,
410            "forward count must advance by exactly 1 on first track, 0 on a repeat"
411        );
412        #[cfg(debug_assertions)]
413        self.check_invariants();
414    }
415
416    /// Drain every `(cluster, ip)` slot held by `token` and apply the
417    /// matching decrements. Called on session teardown only — there is
418    /// no per-stream untrack because the limit is per-connection, not
419    /// per-stream. Removes empty inner maps so the outer
420    /// `connections_per_cluster_ip` does not retain `(cluster_id,
421    /// empty_map)` orphans across cluster lifetimes.
422    pub fn untrack_all_cluster_ip(&mut self, token: Token) {
423        let Some(by_cluster) = self.cluster_ip_tracks.remove(&token) else {
424            return;
425        };
426        // The reverse index for this token was just drained by `remove`; no
427        // other code path re-inserts it within this call.
428        debug_assert!(
429            !self.cluster_ip_tracks.contains_key(&token),
430            "untrack_all must evict the token from the reverse index"
431        );
432        for (cluster_id, ips) in by_cluster {
433            let Entry::Occupied(mut outer) = self.connections_per_cluster_ip.entry(cluster_id)
434            else {
435                continue;
436            };
437            for ip in ips {
438                if let Entry::Occupied(mut inner) = outer.get_mut().entry(ip) {
439                    let count = inner.get_mut();
440                    *count = count.saturating_sub(1);
441                    if *count == 0 {
442                        inner.remove();
443                    }
444                }
445            }
446            if outer.get().is_empty() {
447                outer.remove();
448            }
449        }
450        // No orphan bookkeeping survives: the forward map must not retain a
451        // cluster with an empty inner ip-map, nor an ip whose count is zero.
452        debug_assert!(
453            self.connections_per_cluster_ip
454                .values()
455                .all(|by_ip| !by_ip.is_empty() && by_ip.values().all(|&c| c > 0)),
456            "untrack_all must not leave empty inner maps or zero-count ips behind"
457        );
458        #[cfg(debug_assertions)]
459        self.check_invariants();
460    }
461
462    /// Wipe every per-(cluster, source-IP) accounting bucket. Called by
463    /// the runtime `SetMaxConnectionsPerIp(0)` path so disabling the
464    /// feature does not leave dead bookkeeping behind that a future
465    /// re-enable would consult.
466    pub fn clear_cluster_ip_tracking(&mut self) {
467        self.cluster_ip_tracks.clear();
468        self.connections_per_cluster_ip.clear();
469        // Both halves of the per-(cluster, ip) accounting are now empty; a
470        // future re-enable starts from a clean slate.
471        debug_assert!(
472            self.cluster_ip_tracks.is_empty() && self.connections_per_cluster_ip.is_empty(),
473            "clear must wipe both the reverse index and the forward count map"
474        );
475        #[cfg(debug_assertions)]
476        self.check_invariants();
477    }
478
479    /// The slab is considered at capacity if it contains more sessions than twice max_connections
480    pub fn at_capacity(&self) -> bool {
481        self.slab.len() >= self.accept_slab_threshold()
482    }
483
484    /// The slab fill level at which `at_capacity` flips to true and the
485    /// accept queue is flushed. Reported as `slab.accept_threshold` so the
486    /// per-iteration `slab.accept_threshold_percent` gauge in the run loop
487    /// can chart proximity to this gate, distinct from raw slab usage.
488    ///
489    /// The constant `10 + 2 * max_connections` is the historical pre-knob
490    /// budget; configured slab capacity is
491    /// `10 + slab_entries_per_connection * max_connections` (see
492    /// `command/src/config.rs`) and can be larger, so `slab.usage_percent`
493    /// (against `slab.capacity()`) and `slab.accept_threshold_percent`
494    /// (against this gate) are emitted as independent gauges.
495    pub fn accept_slab_threshold(&self) -> usize {
496        let threshold = 10 + 2 * self.max_connections;
497        // The gate must leave headroom above the connection cap so listener /
498        // system slots (Channel, Metrics, Timer, listeners) are never starved
499        // by frontend connections alone.
500        debug_assert!(
501            threshold > self.max_connections,
502            "accept gate must sit strictly above max_connections to reserve system slots"
503        );
504        threshold
505    }
506
507    /// Check the number of connections against max_connections, and the slab capacity.
508    /// Returns false if limits are reached.
509    pub fn check_limits(&mut self) -> bool {
510        // Live-count invariant: the accounted connection count never exceeds
511        // the configured cap (incr() enforces this, decr() never underflows).
512        debug_assert!(
513            self.nb_connections <= self.max_connections,
514            "nb_connections must never exceed max_connections"
515        );
516        if self.nb_connections >= self.max_connections {
517            error!("max number of session connection reached, flushing the accept queue");
518            gauge!(names::accept_queue::BACKPRESSURE, 1);
519            self.can_accept = false;
520            // A negative result must have closed the accept gate.
521            debug_assert!(
522                !self.can_accept,
523                "refusing at the cap must clear can_accept"
524            );
525            return false;
526        }
527
528        if self.at_capacity() {
529            error!("not enough memory to accept another session, flushing the accept queue");
530            error!(
531                "nb_connections: {}, max_connections: {}",
532                self.nb_connections, self.max_connections
533            );
534            gauge!(names::accept_queue::BACKPRESSURE, 1);
535            self.can_accept = false;
536
537            debug_assert!(
538                !self.can_accept,
539                "refusing at slab capacity must clear can_accept"
540            );
541            return false;
542        }
543
544        // A positive result means there is room under both gates.
545        debug_assert!(
546            self.nb_connections < self.max_connections && !self.at_capacity(),
547            "check_limits returned room while a gate was actually saturated"
548        );
549        true
550    }
551
552    pub fn to_session(token: Token) -> SessionToken {
553        SessionToken(token.0)
554    }
555
556    pub fn incr(&mut self) {
557        let before = self.nb_connections;
558        self.nb_connections += 1;
559        assert!(self.nb_connections <= self.max_connections);
560        // The counter advances by exactly one per accepted session.
561        debug_assert_eq!(
562            self.nb_connections,
563            before + 1,
564            "incr must raise nb_connections by exactly one"
565        );
566        // `client.connections_max` and `client.connections_percent` are
567        // emitted from the run loop alongside `process.uptime_seconds` /
568        // `server.live` so all proxy gauges advance in lock-step. Keeping
569        // `client.connections` per-event preserves the high-resolution
570        // signal scrapers expect.
571        gauge!(names::client::CONNECTIONS, self.nb_connections);
572    }
573
574    /// Decrements the number of sessions, start accepting new connections
575    /// if the capacity limit of 90% has not been reached.
576    pub fn decr(&mut self) {
577        assert!(self.nb_connections != 0);
578        let before = self.nb_connections;
579        self.nb_connections -= 1;
580        // Mirror of incr: exactly one connection is released, no underflow.
581        debug_assert_eq!(
582            self.nb_connections,
583            before - 1,
584            "decr must lower nb_connections by exactly one"
585        );
586        gauge!(names::client::CONNECTIONS, self.nb_connections);
587
588        // do not be ready to accept right away, wait until we get back to 10% capacity
589        if !self.can_accept && self.nb_connections < self.max_connections * 90 / 100 {
590            debug!(
591                "nb_connections = {}, max_connections = {}, starting to accept again",
592                self.nb_connections, self.max_connections
593            );
594            gauge!(names::accept_queue::BACKPRESSURE, 0);
595            self.can_accept = true;
596        }
597    }
598
599    /// Full cross-field invariant sweep for the session manager. Called as a
600    /// `debug_assert!`-guarded postcondition from the mutating cluster-IP
601    /// tracking methods. Asserts logic-bug conditions only — every clause
602    /// holds on any well-formed manager regardless of traffic.
603    #[cfg(debug_assertions)]
604    fn check_invariants(&self) {
605        // 1. Live connection count never exceeds the configured cap.
606        debug_assert!(
607            self.nb_connections <= self.max_connections,
608            "nb_connections {} exceeds max_connections {}",
609            self.nb_connections,
610            self.max_connections
611        );
612        // 2. The forward count map never retains an empty inner ip-map or a
613        //    zero count (both are reaped on the last untrack).
614        debug_assert!(
615            self.connections_per_cluster_ip
616                .values()
617                .all(|by_ip| !by_ip.is_empty() && by_ip.values().all(|&c| c > 0)),
618            "connections_per_cluster_ip holds an empty inner map or a zero count"
619        );
620        // 3. Reverse-index → forward-count coherence: every (token, cluster,
621        //    ip) recorded in the reverse index must have a positive forward
622        //    count. The forward count is the sum of contributing tokens, so a
623        //    tracked slot can never point at a missing/zero count.
624        debug_assert!(
625            self.cluster_ip_tracks.values().all(|by_cluster| {
626                by_cluster.iter().all(|(cluster_id, ips)| {
627                    ips.iter().all(|ip| {
628                        self.connections_per_cluster_ip
629                            .get(cluster_id)
630                            .and_then(|by_ip| by_ip.get(ip))
631                            .is_some_and(|&c| c > 0)
632                    })
633                })
634            }),
635            "a tracked (token, cluster, ip) slot has no positive forward count"
636        );
637        // 4. The reverse index never retains an empty inner structure.
638        debug_assert!(
639            self.cluster_ip_tracks.values().all(|by_cluster| {
640                !by_cluster.is_empty() && by_cluster.values().all(|ips| !ips.is_empty())
641            }),
642            "cluster_ip_tracks retains an empty per-token or per-cluster entry"
643        );
644    }
645}
646
647#[derive(thiserror::Error, Debug)]
648pub enum ServerError {
649    #[error("could not create event loop with MIO poll: {0}")]
650    CreatePoll(IoError),
651    #[error("could not clone the MIO registry: {0}")]
652    CloneRegistry(IoError),
653    #[error("could not register the channel: {0}")]
654    RegisterChannel(IoError),
655    #[error("{msg}:{scm_err}")]
656    ScmSocket {
657        msg: String,
658        scm_err: ScmSocketError,
659    },
660}
661
662/// `Server` handles the event loop, the listeners, the sessions and
663/// communication with the configuration channel.
664///
665/// A listener wraps a listen socket, the associated proxying protocols
666/// (HTTP, HTTPS and TCP) and the routing configuration for clusters.
667/// Listeners handle creating sessions from accepted sockets.
668///
669/// A session manages a "front" socket for a connected client, and all
670/// of the associated data (back socket, protocol state machine, buffers,
671/// metrics...).
672///
673/// `Server` gets configuration updates from the channel (domIN/path routes,
674/// backend server address...).
675///
676/// Listeners and sessions are all stored in a slab structure to index them
677/// by a [Token], they all have to implement the [ProxySession] trait.
678pub struct Server {
679    accept_queue_timeout: Duration,
680    /// Tuple layout: `(socket, listen token, protocol, accept time, peer
681    /// address)`. The peer is captured via `TcpStream::peer_addr()` at accept
682    /// time so the `client.connect.per_source.*` counter can be attributed
683    /// without the socket having to be alive at session-creation time. The
684    /// peer is `Option` because `peer_addr()` is best-effort: a peer that
685    /// races to close before we read it is rare but possible.
686    accept_queue: VecDeque<(
687        TcpStream,
688        ListenToken,
689        Protocol,
690        Instant,
691        Option<SocketAddr>,
692    )>,
693    /// When the accept queue saturates and `check_limits` refuses, evict the
694    /// oldest non-listener sessions to make room. Default off — see
695    /// `command::config::DEFAULT_EVICT_ON_QUEUE_FULL` for the rationale.
696    evict_on_queue_full: bool,
697    accept_ready: HashSet<ListenToken>,
698    backends: Rc<RefCell<BackendMap>>,
699    base_sessions_count: usize,
700    channel: ProxyChannel,
701    config_state: ConfigState,
702    current_poll_errors: i32,
703    health_checker: HealthChecker,
704    http: Rc<RefCell<http::HttpProxy>>,
705    https: Rc<RefCell<https::HttpsProxy>>,
706    last_sessions_len: usize,
707    last_shutting_down_message: Option<Instant>,
708    last_zombie_check: Instant,
709    loop_start: Instant,
710    /// Wall-clock anchor for the `process.uptime_seconds` gauge. Captured once
711    /// in [`Server::new`]; never reset on hot upgrades (the new worker that
712    /// inherits FDs is a fresh process and starts its own counter).
713    started_at: Instant,
714    /// Last time the 1Hz `accept_queue.saturated_seconds` ticker fired. The
715    /// counter is incremented once per [`ACCEPT_SATURATION_TICK`] while
716    /// `SessionManager::can_accept` is `false`, so dashboards can plot the
717    /// time spent saturated rather than just whether saturation occurred.
718    last_saturation_tick: Instant,
719    max_poll_errors: i32, // TODO: make this configurable? this defaults to 10000 for now
720    /// Shared reference to the buffer pool the protocol stacks check buffers
721    /// in and out of. Held here so the run loop can sample
722    /// `buffer.in_use` / `buffer.capacity` / `buffer.usage_percent` once per
723    /// iteration without requiring each protocol module to expose its own
724    /// snapshot.
725    pool: Rc<RefCell<Pool>>,
726    pub poll: Poll,
727    poll_timeout: Option<Duration>, // TODO: make this configurable? this defaults to 1000 milliseconds for now
728    scm_listeners: Option<Listeners>,
729    scm: ScmSocket,
730    sessions: Rc<RefCell<SessionManager>>,
731    should_poll_at: Option<Instant>,
732    shutting_down: Option<String>,
733    tcp: Rc<RefCell<tcp::TcpProxy>>,
734    udp: Rc<RefCell<udp::UdpProxy>>,
735    zombie_check_interval: Duration,
736}
737
738impl Server {
739    pub fn try_new_from_config(
740        worker_to_main_channel: ProxyChannel,
741        worker_to_main_scm: ScmSocket,
742        config: ServerConfig,
743        initial_state: InitialState,
744        expects_initial_status: bool,
745    ) -> Result<Self, ServerError> {
746        let event_loop = Poll::new().map_err(ServerError::CreatePoll)?;
747        // Commit the operator-configured Basic-auth credential cap (or
748        // keep the built-in default) once per worker process. The
749        // `OnceLock` rejects any later attempt to change the value, so
750        // calling here — before any L7 listener has accepted a request
751        // — guarantees the cap is in force the first time `mux::auth`
752        // runs.
753        if let Some(cap) = config.basic_auth_max_credential_bytes {
754            crate::protocol::mux::auth::set_max_decoded_credential_bytes(cap as usize);
755        }
756        // Same set-once-per-worker-boot pattern for the splice kernel-pipe
757        // capacity. The setter no-ops on `0` so an explicit zero in config
758        // does not collapse the pipe to PAGE_SIZE; the kernel still applies
759        // page-rounding and `/proc/sys/fs/pipe-max-size` clamping at
760        // SplicePipe::new time. Cfg-gated because the splice module only
761        // exists on Linux + `splice` feature.
762        #[cfg(all(target_os = "linux", feature = "splice"))]
763        if let Some(cap) = config.splice_pipe_capacity_bytes {
764            crate::splice::set_pipe_capacity(cap as usize);
765        }
766        let pool = Rc::new(RefCell::new(Pool::with_capacity(
767            config.min_buffers as usize,
768            config.max_buffers as usize,
769            config.buffer_size as usize,
770        )));
771        let backends = Rc::new(RefCell::new(BackendMap::new()));
772
773        // Note: slab_capacity uses 4x multiplier (up from 2x) to account for H2
774        // multiplexing where each session can have multiple backend connections.
775        // Newer `optional` proto fields fall through to the
776        // command-lib defaults when an older worker manager omits them.
777        let sessions: Rc<RefCell<SessionManager>> = SessionManager::new(
778            Slab::with_capacity(config.slab_capacity() as usize),
779            config.max_connections as usize,
780            config
781                .max_connections_per_ip
782                .unwrap_or(sozu_command::config::DEFAULT_MAX_CONNECTIONS_PER_IP),
783            config
784                .retry_after
785                .unwrap_or(sozu_command::config::DEFAULT_RETRY_AFTER),
786        );
787        {
788            let mut s = sessions.borrow_mut();
789            let entry = s.slab.vacant_entry();
790            trace!("taking token {:?} for channel", SessionToken(entry.key()));
791            entry.insert(Rc::new(RefCell::new(ListenSession {
792                protocol: Protocol::Channel,
793            })));
794        }
795        {
796            let mut s = sessions.borrow_mut();
797            let entry = s.slab.vacant_entry();
798            trace!("taking token {:?} for metrics", SessionToken(entry.key()));
799            entry.insert(Rc::new(RefCell::new(ListenSession {
800                protocol: Protocol::Timer,
801            })));
802        }
803        {
804            let mut s = sessions.borrow_mut();
805            let entry = s.slab.vacant_entry();
806            trace!("taking token {:?} for metrics", SessionToken(entry.key()));
807            entry.insert(Rc::new(RefCell::new(ListenSession {
808                protocol: Protocol::Metrics,
809            })));
810        }
811
812        Server::new(
813            event_loop,
814            worker_to_main_channel,
815            worker_to_main_scm,
816            sessions,
817            pool,
818            backends,
819            None,
820            None,
821            None,
822            config,
823            Some(initial_state),
824            expects_initial_status,
825        )
826    }
827
828    #[allow(clippy::too_many_arguments)]
829    pub fn new(
830        poll: Poll,
831        mut channel: ProxyChannel,
832        scm: ScmSocket,
833        sessions: Rc<RefCell<SessionManager>>,
834        pool: Rc<RefCell<Pool>>,
835        backends: Rc<RefCell<BackendMap>>,
836        http: Option<http::HttpProxy>,
837        https: Option<https::HttpsProxy>,
838        tcp: Option<tcp::TcpProxy>,
839        server_config: ServerConfig,
840        initial_state: Option<InitialState>,
841        expects_initial_status: bool,
842    ) -> Result<Self, ServerError> {
843        FEATURES.with(|_features| {
844            // initializing feature flags
845        });
846
847        poll.registry()
848            .register(
849                &mut channel,
850                Token(0),
851                Interest::READABLE | Interest::WRITABLE,
852            )
853            .map_err(ServerError::RegisterChannel)?;
854
855        METRICS.with(|metrics| {
856            if let Some(sock) = (*metrics.borrow_mut()).socket_mut() {
857                poll.registry()
858                    .register(sock, Token(2), Interest::WRITABLE)
859                    .expect("should register the metrics socket");
860            }
861        });
862
863        let base_sessions_count = sessions.borrow().slab.len();
864
865        let http = Rc::new(RefCell::new(match http {
866            Some(http) => http,
867            None => {
868                let registry = poll
869                    .registry()
870                    .try_clone()
871                    .map_err(ServerError::CloneRegistry)?;
872
873                http::HttpProxy::new(registry, sessions.clone(), pool.clone(), backends.clone())
874            }
875        }));
876
877        let https = Rc::new(RefCell::new(match https {
878            Some(https) => https,
879            None => {
880                let registry = poll
881                    .registry()
882                    .try_clone()
883                    .map_err(ServerError::CloneRegistry)?;
884
885                https::HttpsProxy::new(registry, sessions.clone(), pool.clone(), backends.clone())
886            }
887        }));
888
889        let tcp = Rc::new(RefCell::new(match tcp {
890            Some(tcp) => tcp,
891            None => {
892                let registry = poll
893                    .registry()
894                    .try_clone()
895                    .map_err(ServerError::CloneRegistry)?;
896
897                tcp::TcpProxy::new(registry, sessions.clone(), pool.clone(), backends.clone())
898            }
899        }));
900
901        // UDP proxy is constructed internally (no constructor parameter) so the
902        // public `Server::new` / `try_new_from_config` signatures stay
903        // unchanged — bin/ is not forced to change for construction.
904        let udp = Rc::new(RefCell::new({
905            let registry = poll
906                .registry()
907                .try_clone()
908                .map_err(ServerError::CloneRegistry)?;
909
910            udp::UdpProxy::new(
911                registry,
912                sessions.clone(),
913                pool.clone(),
914                backends.clone(),
915                server_config.max_connections as usize,
916                server_config.buffer_size as usize,
917            )
918        }));
919
920        let mut server = Server {
921            accept_queue_timeout: Duration::from_secs(u64::from(
922                server_config.accept_queue_timeout,
923            )),
924            accept_queue: VecDeque::new(),
925            evict_on_queue_full: server_config.evict_on_queue_full.unwrap_or(false),
926            accept_ready: HashSet::new(),
927            backends,
928            base_sessions_count,
929            channel,
930            config_state: ConfigState::new(),
931            current_poll_errors: 0,
932            health_checker: HealthChecker::new(),
933            http,
934            https,
935            last_sessions_len: 0, // to be reset on server run
936            last_shutting_down_message: None,
937            last_zombie_check: Instant::now(), // to be reset on server run
938            loop_start: Instant::now(),        // to be reset on server run
939            started_at: Instant::now(),        // captured once, never reset
940            last_saturation_tick: Instant::now(), // 1Hz saturation ticker anchor
941            max_poll_errors: 10000,            // TODO: make it configurable?
942            pool,
943            poll_timeout: Some(Duration::from_millis(1000)), // TODO: make it configurable?
944            poll,
945            scm_listeners: None,
946            scm,
947            sessions,
948            should_poll_at: None,
949            shutting_down: None,
950            tcp,
951            udp,
952            zombie_check_interval: Duration::from_secs(u64::from(
953                server_config.zombie_check_interval,
954            )),
955        };
956
957        // initialize the worker with the state we got from a file
958        if let Some(state) = initial_state {
959            for request in state.requests {
960                trace!("generating initial config request: {:#?}", request);
961                server.notify_proxys(request);
962            }
963
964            // do not send back answers to the initialization messages
965            QUEUE.with(|queue| {
966                (*queue.borrow_mut()).clear();
967            });
968        }
969
970        if expects_initial_status {
971            // the main process sends a Status message, so we can notify it
972            // when the initial state is loaded
973            server.block_channel();
974            let msg = server.channel.read_message();
975            debug!("got message: {:?}", msg);
976
977            if let Ok(WorkerRequest {
978                id,
979                content:
980                    Request {
981                        request_type: Some(RequestType::Status(_)),
982                    },
983            }) = msg
984            {
985                if let Err(e) = server.channel.write_message(&WorkerResponse::ok(id)) {
986                    error!("Could not send an ok to the main process: {}", e);
987                }
988            } else {
989                panic!(
990                    "plz give me a status request first when I start, you sent me this instead: {msg:?}"
991                );
992            }
993            server.unblock_channel();
994        }
995
996        info!("will try to receive listeners");
997        server
998            .scm
999            .set_blocking(true)
1000            .map_err(|scm_err| ServerError::ScmSocket {
1001                msg: "Could not set the scm socket to blocking".to_string(),
1002                scm_err,
1003            })?;
1004        let listeners =
1005            server
1006                .scm
1007                .receive_listeners()
1008                .map_err(|scm_err| ServerError::ScmSocket {
1009                    msg: "could not receive listeners from the scm socket".to_string(),
1010                    scm_err,
1011                })?;
1012        server
1013            .scm
1014            .set_blocking(false)
1015            .map_err(|scm_err| ServerError::ScmSocket {
1016                msg: "Could not set the scm socket to unblocking".to_string(),
1017                scm_err,
1018            })?;
1019        info!("received listeners: {:?}", listeners);
1020        server.scm_listeners = Some(listeners);
1021
1022        Ok(server)
1023    }
1024
1025    /// The server runs in a loop until a shutdown is ordered
1026    pub fn run(&mut self) {
1027        let mut events = Events::with_capacity(1024); // TODO: make event capacity configurable?
1028        self.last_sessions_len = self.sessions.borrow().slab.len();
1029
1030        self.last_zombie_check = Instant::now();
1031        self.loop_start = Instant::now();
1032
1033        loop {
1034            self.check_for_poll_errors();
1035
1036            let timeout = self.reset_loop_time_and_get_timeout();
1037
1038            match self.poll.poll(&mut events, timeout) {
1039                Ok(_) => self.current_poll_errors = 0,
1040                Err(error) => {
1041                    error!("Error while polling events: {:?}", error);
1042                    self.current_poll_errors += 1;
1043                    continue;
1044                }
1045            }
1046
1047            let after_epoll = Instant::now();
1048            time!(
1049                names::event_loop::EPOLL_TIME,
1050                (after_epoll - self.loop_start).as_millis()
1051            );
1052            self.loop_start = after_epoll;
1053
1054            self.send_queue();
1055
1056            for event in events.iter() {
1057                match event.token() {
1058                    // this is the command channel
1059                    Token(0) => {
1060                        if event.is_error() {
1061                            error!("error reading from command channel");
1062                            continue;
1063                        }
1064                        if event.is_read_closed() || event.is_write_closed() {
1065                            error!("command channel was closed");
1066                            return;
1067                        }
1068                        let ready = Ready::from(event);
1069                        self.channel.handle_events(ready);
1070
1071                        // loop here because iterations has borrow issues
1072                        loop {
1073                            QUEUE.with(|queue| {
1074                                if !(*queue.borrow()).is_empty() {
1075                                    self.channel.interest.insert(Ready::WRITABLE);
1076                                }
1077                            });
1078
1079                            //trace!("WORKER[{}] channel readiness={:?}, interest={:?}, queue={} elements",
1080                            //  line!(), self.channel.readiness, self.channel.interest, self.queue.len());
1081                            if self.channel.readiness() == Ready::EMPTY {
1082                                break;
1083                            }
1084
1085                            // exit the big loop if the message is HardStop
1086                            if self.read_channel_messages_and_notify() {
1087                                return;
1088                            }
1089
1090                            QUEUE.with(|queue| {
1091                                if !(*queue.borrow()).is_empty() {
1092                                    self.channel.interest.insert(Ready::WRITABLE);
1093                                }
1094                            });
1095
1096                            self.send_queue();
1097                        }
1098                    }
1099                    // timer tick
1100                    Token(1) => {
1101                        while let Some(t) = TIMER.with(|timer| timer.borrow_mut().poll()) {
1102                            self.timeout(t);
1103                        }
1104                    }
1105                    // metrics socket is writable
1106                    Token(2) => METRICS.with(|metrics| {
1107                        (*metrics.borrow_mut()).writable();
1108                    }),
1109                    // ListenToken: 1 listener <=> 1 token
1110                    // ProtocolToken (HTTP/HTTPS/TCP): 1 connection <=> 1 token
1111                    token if self.health_checker.owns_token(token) => {
1112                        self.health_checker.ready(token);
1113                    }
1114                    token if self.udp.borrow().health_owns_token(token) => {
1115                        self.udp.borrow_mut().health_ready(token);
1116                    }
1117                    token => self.ready(token, Ready::from(event)),
1118                }
1119            }
1120
1121            if let Some(t) = self.should_poll_at.as_ref() {
1122                if *t <= Instant::now() {
1123                    while let Some(t) = TIMER.with(|timer| timer.borrow_mut().poll()) {
1124                        //info!("polled for timeout: {:?}", t);
1125                        self.timeout(t);
1126                    }
1127                }
1128            }
1129            self.handle_remaining_readiness();
1130            self.create_sessions();
1131
1132            self.should_poll_at = TIMER.with(|timer| timer.borrow().next_poll_date());
1133
1134            self.zombie_check();
1135            self.health_checker
1136                .poll(&self.backends, self.poll.registry());
1137            // Drive the UDP endpoint health prober (TCP-probe + hysteresis +
1138            // fail-open). Non-blocking; no-op when no UDP cluster has health
1139            // configured.
1140            self.udp.borrow_mut().health_poll();
1141
1142            // Frontend session gauges. `client.connections` keeps the
1143            // per-event signal from `SessionManager::incr/decr`; the rest
1144            // follow the once-per-iteration batching contract this run loop
1145            // uses for `process.uptime_seconds` / `server.live`.
1146            //
1147            // `slab.usage_percent` charts pure slab utilisation against
1148            // `slab.capacity()`. `slab.accept_threshold_percent` charts how
1149            // close the slab is to the `at_capacity()` accept gate
1150            // (`10 + 2 * max_connections`, see
1151            // `SessionManager::accept_slab_threshold`). Configured slab
1152            // capacity can be larger than that gate (`slab_entries_per_connection`
1153            // > 2), so the two gauges are kept independent on purpose.
1154            {
1155                let sessions = self.sessions.borrow();
1156                let nb_connections = sessions.nb_connections;
1157                let max_connections = sessions.max_connections;
1158                let slab_len = sessions.slab.len();
1159                let slab_capacity = sessions.slab.capacity();
1160                let accept_threshold = sessions.accept_slab_threshold();
1161
1162                gauge!(names::client::CONNECTIONS, nb_connections);
1163                gauge!(names::client::CONNECTIONS_MAX, max_connections);
1164                if max_connections > 0 {
1165                    gauge!(
1166                        "client.connections_percent",
1167                        nb_connections * 100 / max_connections
1168                    );
1169                }
1170
1171                gauge!(names::slab::ENTRIES, slab_len);
1172                gauge!(names::slab::CAPACITY, slab_capacity);
1173                if slab_capacity > 0 {
1174                    gauge!(names::slab::USAGE_PERCENT, slab_len * 100 / slab_capacity);
1175                }
1176                if accept_threshold > 0 {
1177                    gauge!(
1178                        "slab.accept_threshold_percent",
1179                        slab_len * 100 / accept_threshold
1180                    );
1181                }
1182            }
1183            // Buffer pool gauges. `buffer.in_use` replaces the older
1184            // `buffer.number` (renamed in `lib/src/pool.rs` for naming
1185            // consistency with the surrounding `buffer.*` keys).
1186            // `buffer.usage_percent` is computed against the configured
1187            // `buffer.capacity` so dashboards can chart pool pressure.
1188            {
1189                let pool = self.pool.borrow();
1190                let used = pool.inner.used();
1191                let capacity = pool.inner.capacity();
1192                gauge!(names::buffer::IN_USE, used);
1193                gauge!(names::buffer::CAPACITY, capacity);
1194                if capacity > 0 {
1195                    gauge!(names::buffer::USAGE_PERCENT, used * 100 / capacity);
1196                }
1197            }
1198            // 1Hz tick for `accept_queue.saturated_seconds`. Increments once
1199            // per `ACCEPT_SATURATION_TICK` while `SessionManager::can_accept`
1200            // is `false`. Distinguishes "queue spent N seconds at max" from
1201            // "queue briefly hit max" — the binary `accept_queue.backpressure`
1202            // gauge collapses that duration. Sampled here rather than via a
1203            // dedicated mio timer because the run loop ticks at least once
1204            // per `poll_timeout` (1s by default), which is granular enough.
1205            let now = Instant::now();
1206            if now.duration_since(self.last_saturation_tick) >= ACCEPT_SATURATION_TICK {
1207                if !self.sessions.borrow().can_accept {
1208                    incr!(names::accept_queue::SATURATED_SECONDS);
1209                }
1210                self.last_saturation_tick = now;
1211            }
1212            // Process / runtime gauges sampled once per loop iteration. Same
1213            // batch as `client.connections` so dashboards see them update in
1214            // lock-step.
1215            gauge!(
1216                "process.uptime_seconds",
1217                self.started_at.elapsed().as_secs() as usize
1218            );
1219            // `server.live` flips to 0 once a graceful shutdown is requested,
1220            // matching Envoy's `server.live` semantics. L4 health checks
1221            // (HAProxy / cloud LBs) can poll this gauge to drain a worker
1222            // before the OS-level termination signal lands.
1223            gauge!(
1224                "server.live",
1225                if self.shutting_down.is_some() { 0 } else { 1 }
1226            );
1227            METRICS.with(|metrics| {
1228                (*metrics.borrow_mut()).send_data();
1229            });
1230
1231            if self.shutting_down.is_some() && self.shut_down_sessions() {
1232                return;
1233            }
1234        }
1235    }
1236
1237    fn check_for_poll_errors(&mut self) {
1238        if self.current_poll_errors >= self.max_poll_errors {
1239            error!(
1240                "Something is going very wrong. Last {} poll() calls failed, crashing..",
1241                self.current_poll_errors
1242            );
1243            panic!(
1244                "poll() calls failed {} times in a row",
1245                self.current_poll_errors
1246            );
1247        }
1248    }
1249
1250    fn reset_loop_time_and_get_timeout(&mut self) -> Option<Duration> {
1251        let now = Instant::now();
1252        time!(
1253            names::event_loop::EVENT_LOOP_TIME,
1254            (now - self.loop_start).as_millis()
1255        );
1256
1257        let mut timeout = match self.should_poll_at.as_ref() {
1258            None => self.poll_timeout,
1259            Some(i) => {
1260                if *i <= now {
1261                    self.poll_timeout
1262                } else {
1263                    let dur = *i - now;
1264                    match self.poll_timeout {
1265                        None => Some(dur),
1266                        Some(t) => {
1267                            if t < dur {
1268                                Some(t)
1269                            } else {
1270                                Some(dur)
1271                            }
1272                        }
1273                    }
1274                }
1275            }
1276        };
1277
1278        if self.shutting_down.is_some() {
1279            let shutdown_tick = Duration::from_millis(100);
1280            timeout = match timeout {
1281                None => Some(shutdown_tick),
1282                Some(current) => Some(current.min(shutdown_tick)),
1283            };
1284        }
1285
1286        self.loop_start = now;
1287        timeout
1288    }
1289
1290    /// Returns true if hardstop
1291    fn read_channel_messages_and_notify(&mut self) -> bool {
1292        if !self.channel.readiness().is_readable() {
1293            return false;
1294        }
1295
1296        if let Err(e) = self.channel.readable() {
1297            error!("error reading from channel: {:?}", e);
1298        }
1299
1300        loop {
1301            let request = self.channel.read_message();
1302            debug!("Received request {:?}", request);
1303            match request {
1304                Ok(request) => match request.content.request_type {
1305                    Some(RequestType::HardStop(_)) => {
1306                        let req_id = request.id.clone();
1307                        self.notify(request);
1308                        if let Err(e) = self.channel.write_message(&WorkerResponse::ok(req_id)) {
1309                            error!("Could not send ok response to the main process: {}", e);
1310                        }
1311                        if let Err(e) = self.channel.run() {
1312                            error!("Error while running the server channel: {}", e);
1313                        }
1314                        return true;
1315                    }
1316                    Some(RequestType::SoftStop(_)) => {
1317                        self.shutting_down = Some(request.id.clone());
1318                        self.last_sessions_len = self.sessions.borrow().slab.len();
1319                        self.notify(request);
1320                    }
1321                    Some(RequestType::ReturnListenSockets(_)) => {
1322                        info!("received ReturnListenSockets order");
1323                        match self.return_listen_sockets() {
1324                            Ok(_) => push_queue(WorkerResponse::ok(request.id)),
1325                            Err(error) => push_queue(worker_response_error(
1326                                request.id,
1327                                format!("Could not send listeners on scm socket: {error:?}"),
1328                            )),
1329                        }
1330                    }
1331                    _ => self.notify(request),
1332                },
1333                // Not an error per se, occurs when there is nothing to read
1334                Err(_) => {
1335                    // if the message was too large, we grow the buffer and retry to read if possible
1336                    if (self.channel.interest & self.channel.readiness).is_readable() {
1337                        if let Err(e) = self.channel.readable() {
1338                            error!("error reading from channel: {:?}", e);
1339                        }
1340                        continue;
1341                    }
1342                    break;
1343                }
1344            }
1345        }
1346        false
1347    }
1348
1349    /// Scans all sessions that have been inactive for longer than the configured interval
1350    fn zombie_check(&mut self) {
1351        let now = Instant::now();
1352        if now - self.last_zombie_check < self.zombie_check_interval {
1353            return;
1354        }
1355        info!("zombie check");
1356        // `now` is sampled this iteration and we only get here past the
1357        // interval gate, so the check timestamp advances monotonically.
1358        debug_assert!(
1359            now >= self.last_zombie_check,
1360            "zombie-check timestamp must never move backwards"
1361        );
1362        self.last_zombie_check = now;
1363
1364        let mut zombie_tokens = HashSet::new();
1365
1366        // find the zombie sessions
1367        for (_index, session) in self
1368            .sessions
1369            .borrow_mut()
1370            .slab
1371            .iter_mut()
1372            .filter(|(_, c)| now - c.borrow().last_event() > self.zombie_check_interval)
1373        {
1374            let session_token = session.borrow().frontend_token();
1375            if !zombie_tokens.contains(&session_token) {
1376                session.borrow().print_session();
1377                zombie_tokens.insert(session_token);
1378            }
1379        }
1380
1381        // Listen/system sessions report `Instant::now()` as their last event,
1382        // so `now - last_event` is ~0 and never exceeds the interval: a
1383        // listener can never be collected as a zombie. Assert the set is free
1384        // of listen protocols before we reap it.
1385        debug_assert!(
1386            !self.sessions.borrow().slab.iter().any(|(_, session)| {
1387                let s = session.borrow();
1388                zombie_tokens.contains(&s.frontend_token())
1389                    && matches!(
1390                        s.protocol(),
1391                        Protocol::HTTPListen | Protocol::HTTPSListen | Protocol::TCPListen
1392                    )
1393            }),
1394            "zombie reaping must never target a listener session"
1395        );
1396
1397        let zombie_count = zombie_tokens.len() as i64;
1398        count!(names::misc::ZOMBIES, zombie_count);
1399
1400        let remaining_count = self.shut_down_sessions_by_frontend_tokens(zombie_tokens);
1401        info!(
1402            "removing {} zombies ({} remaining entries after close)",
1403            zombie_count, remaining_count
1404        );
1405    }
1406
1407    /// Calls close on targeted sessions, yields the number of entries in the slab
1408    /// that were not properly removed
1409    fn shut_down_sessions_by_frontend_tokens(&self, tokens: HashSet<Token>) -> usize {
1410        if tokens.is_empty() {
1411            return 0;
1412        }
1413
1414        // close the sessions associated with the tokens
1415        for token in &tokens {
1416            if self.sessions.borrow().slab.contains(token.0) {
1417                let slab_before = self.sessions.borrow().slab.len();
1418                let session = { self.sessions.borrow_mut().slab.remove(token.0) };
1419                session.borrow_mut().close();
1420                self.sessions.borrow_mut().decr();
1421                // The removed token is truly gone afterwards. The slab may shrink
1422                // by MORE than one: `close()` also frees the session's backend
1423                // slab slot(s) (the multi-token pattern), so assert it shrank by
1424                // at least the frontend slot we just removed, not exactly one.
1425                debug_assert!(
1426                    !self.sessions.borrow().slab.contains(token.0),
1427                    "removed token must be absent from the slab"
1428                );
1429                debug_assert!(
1430                    self.sessions.borrow().slab.len() < slab_before,
1431                    "removing a present session must free at least its own slab slot"
1432                );
1433            }
1434        }
1435
1436        // find the entries of closed sessions in the session manager (they should not be there)
1437        let mut dangling_entries = HashSet::new();
1438        for (entry_key, session) in &self.sessions.borrow().slab {
1439            if tokens.contains(&session.borrow().frontend_token()) {
1440                dangling_entries.insert(entry_key);
1441            }
1442        }
1443
1444        // remove these from the session manager
1445        let mut dangling_entries_count = 0;
1446        for entry_key in dangling_entries {
1447            let mut sessions = self.sessions.borrow_mut();
1448            if sessions.slab.contains(entry_key) {
1449                sessions.slab.remove(entry_key);
1450                dangling_entries_count += 1;
1451            }
1452        }
1453        // Postcondition: no surviving slab entry still references any of the
1454        // closed frontend tokens — both the direct remove and the dangling
1455        // sweep together leave the slab clean of these sessions.
1456        debug_assert!(
1457            !self
1458                .sessions
1459                .borrow()
1460                .slab
1461                .iter()
1462                .any(|(_, session)| tokens.contains(&session.borrow().frontend_token())),
1463            "no slab entry may reference a closed frontend token after teardown"
1464        );
1465        dangling_entries_count
1466    }
1467
1468    /// Order sessions to shut down, check that they are all down
1469    fn shut_down_sessions(&mut self) -> bool {
1470        let sessions_count = self.sessions.borrow().slab.len();
1471        let mut sessions_to_shut_down = HashSet::new();
1472
1473        for (_key, session) in &self.sessions.borrow().slab {
1474            let mut session = session.borrow_mut();
1475            if session.shutting_down() {
1476                debug!(
1477                    "Server killing session from shutting_down: token={:?}, protocol={:?}",
1478                    session.frontend_token(),
1479                    session.protocol()
1480                );
1481                sessions_to_shut_down.insert(Token(session.frontend_token().0));
1482            }
1483        }
1484        let _ = self.shut_down_sessions_by_frontend_tokens(sessions_to_shut_down);
1485
1486        let new_sessions_count = self.sessions.borrow().slab.len();
1487
1488        if new_sessions_count < sessions_count {
1489            let now = Instant::now();
1490            if let Some(last) = self.last_shutting_down_message {
1491                if (now - last) > Duration::from_secs(5) {
1492                    info!(
1493                        "closed {} sessions, {} sessions left, base_sessions_count = {}",
1494                        sessions_count - new_sessions_count,
1495                        new_sessions_count,
1496                        self.base_sessions_count
1497                    );
1498                }
1499            }
1500            self.last_shutting_down_message = Some(now);
1501        }
1502
1503        if new_sessions_count <= self.base_sessions_count {
1504            info!("last session stopped, shutting down!");
1505            if let Err(e) = self.channel.run() {
1506                error!("Error while running the server channel: {}", e);
1507            }
1508            // self.block_channel();
1509            let id = self
1510                .shutting_down
1511                .take()
1512                .expect("should have shut down correctly"); // panicking here makes sense actually
1513
1514            debug!("Responding OK to main process for request {}", id);
1515
1516            let proxy_response = WorkerResponse::ok(id);
1517            if let Err(e) = self.channel.write_message(&proxy_response) {
1518                error!("Could not write response to the main process: {}", e);
1519            }
1520            if let Err(e) = self.channel.run() {
1521                error!("Error while running the server channel: {}", e);
1522            }
1523            return true;
1524        }
1525
1526        if new_sessions_count < self.last_sessions_len {
1527            info!(
1528                "shutting down, {} slab elements remaining (base: {})",
1529                new_sessions_count - self.base_sessions_count,
1530                self.base_sessions_count
1531            );
1532            self.last_sessions_len = new_sessions_count;
1533        }
1534
1535        false
1536    }
1537
1538    fn kill_session(&self, session: Rc<RefCell<dyn ProxySession>>) {
1539        let token = session.borrow().frontend_token();
1540        let _ = self.shut_down_sessions_by_frontend_tokens(HashSet::from([token]));
1541    }
1542
1543    fn send_queue(&mut self) {
1544        if self.channel.readiness.is_writable() {
1545            QUEUE.with(|q| {
1546                let mut queue = q.borrow_mut();
1547                loop {
1548                    if let Some(resp) = queue.pop_front() {
1549                        debug!("Sending response {:?}", resp);
1550                        if let Err(e) = self.channel.write_message(&resp) {
1551                            error!("Could not write message {} on the channel: {}", resp, e);
1552                            queue.push_front(resp);
1553                        }
1554                    }
1555
1556                    if self.channel.back_buf.available_data() > 0 {
1557                        if let Err(e) = self.channel.writable() {
1558                            error!("error writing to channel: {:?}", e);
1559                        }
1560                    }
1561
1562                    if !self.channel.readiness.is_writable() {
1563                        break;
1564                    }
1565
1566                    if self.channel.back_buf.available_data() == 0 && queue.is_empty() {
1567                        break;
1568                    }
1569                }
1570            });
1571        }
1572    }
1573
1574    fn notify(&mut self, message: WorkerRequest) {
1575        // Polled lease-expiry janitor: SetMetricDetail leases self-expire after
1576        // their TTL so a crashed `sozu top` cannot permanently elevate metrics
1577        // cardinality. The janitor runs at most every LEASE_TICK_INTERVAL,
1578        // gated by `lease_tick_due` so the hot path of `notify` doesn't pay
1579        // the HashMap walk on every iteration. Single-threaded worker, so
1580        // `borrow_mut` is safe here.
1581        let now = std::time::Instant::now();
1582        // Capture (previous, effective) before releasing the borrow so we
1583        // can emit an Event afterwards. Holding `METRICS.borrow_mut`
1584        // across `push_event` would re-enter the same thread-local from
1585        // inside `QUEUE.with` (safe but conceptually noisy); the
1586        // two-step split keeps the borrow scopes minimal.
1587        let lease_tick_transition = METRICS.with(|metrics| {
1588            let mut m = metrics.borrow_mut();
1589            if !m.lease_tick_due(now) {
1590                return None;
1591            }
1592            let previous = m.lease_tick(now)?;
1593            let effective = m.detail_effective();
1594            Some((previous, effective))
1595        });
1596        if let Some((previous, effective)) = lease_tick_transition {
1597            // The janitor retired one or more leases AND the effective
1598            // level moved. Surface the worker-local transition as an
1599            // Event so the master folds it into the audit log (closes
1600            // the gap where TUI-crashed lease expiry was previously
1601            // silent). `client_id` is `None` because the janitor may
1602            // have retired multiple leases at once.
1603            push_metric_detail_transition(previous, effective, "lease_tick_expired", None);
1604        }
1605        match &message.content.request_type {
1606            Some(RequestType::ConfigureMetrics(configuration)) => {
1607                match MetricsConfiguration::try_from(*configuration) {
1608                    Ok(metrics_config) => {
1609                        METRICS.with(|metrics| {
1610                            (*metrics.borrow_mut()).configure(&metrics_config);
1611                            push_queue(WorkerResponse::ok(message.id));
1612                        });
1613                    }
1614                    Err(e) => {
1615                        error!("Error configuring metrics: {}", e);
1616                        push_queue(WorkerResponse::error(message.id, e));
1617                    }
1618                }
1619                return;
1620            }
1621            Some(RequestType::QueryMetrics(query_metrics_options)) => {
1622                METRICS.with(|metrics| {
1623                    match (*metrics.borrow_mut()).query(query_metrics_options) {
1624                        Ok(c) => push_queue(WorkerResponse::ok_with_content(message.id, c)),
1625                        Err(e) => {
1626                            error!("Error querying metrics: {}", e);
1627                            push_queue(WorkerResponse::error(message.id, e))
1628                        }
1629                    }
1630                });
1631                return;
1632            }
1633            // Runtime cardinality lease verb — apply, renew, or clear a lease
1634            // on this worker's `Aggregator`. The lease bumps `effective` to
1635            // `max(configured, max(active leases))`; expiry runs on the polled
1636            // janitor below. Master-side aggregation into `MetricDetailStatus`
1637            // lands in a follow-up; for now the worker acks with a bare OK so
1638            // the existing `worker_request` fan-out path can collect.
1639            Some(RequestType::SetMetricDetail(req)) => {
1640                // Master populates the peer binding from the connecting
1641                // `ClientSession` before fan-out (`bin/src/command/
1642                // requests.rs::worker_request`). A pre-binding caller or a
1643                // platform without `SO_PEERCRED` yields `PeerBinding::default()`
1644                // — clears against that lease are accepted from anyone, per
1645                // the proto contract on `SetMetricDetail.peer_pid`.
1646                let presented_binding = crate::metrics::PeerBinding {
1647                    pid: req.peer_pid,
1648                    // Master sends Crockford-base32 ULIDs (`Ulid::to_string`);
1649                    // accept those, with a fallback hex parse for callers that
1650                    // happen to send `0x…` form. A failed parse degrades to
1651                    // `None` — the lease store treats that as "binding
1652                    // unknown" per the proto contract.
1653                    session_ulid: req.peer_session_ulid.as_deref().and_then(|s| {
1654                        rusty_ulid::Ulid::from_str(s)
1655                            .map(u128::from)
1656                            .ok()
1657                            .or_else(|| u128::from_str_radix(s.trim_start_matches("0x"), 16).ok())
1658                    }),
1659                };
1660                if req.clear.unwrap_or(false) {
1661                    // Defense-in-depth: the master pre-validates `client_id`
1662                    // length at the dispatch site, but worker IPC is not
1663                    // master-only — fuzz harnesses, serial_test-flagged
1664                    // integration tests, and future internal callers can
1665                    // issue an oversized clear directly. Mirror the apply
1666                    // path's `ClientIdTooLong` arm so an unbounded HashMap
1667                    // lookup is never driven by an operator-supplied string
1668                    // here either. The reason string echoes the byte length
1669                    // but not the operator bytes themselves (symmetric with
1670                    // the audit-column-smuggling guard on the apply path).
1671                    if req.client_id.len() > crate::metrics::LEASE_CLIENT_ID_MAX_BYTES {
1672                        let msg = format!(
1673                            "SetMetricDetail: clear client_id length {} exceeds {} bytes",
1674                            req.client_id.len(),
1675                            crate::metrics::LEASE_CLIENT_ID_MAX_BYTES,
1676                        );
1677                        error!("{}", msg);
1678                        push_queue(WorkerResponse::error(message.id.clone(), msg));
1679                        return;
1680                    }
1681                    // Capture transition fields + post-clear snapshot
1682                    // before releasing the borrow so we can emit an
1683                    // Event after AND build the WorkerMetricDetailStatus
1684                    // payload that the master folds into
1685                    // `MetricDetailStatus.workers[<worker_id>]`. Without
1686                    // this payload the master used its own view as a
1687                    // stand-in for the worker's per-aggregator state.
1688                    let (outcome, effective_after, configured_after, lease_count_after) = METRICS
1689                        .with(|metrics| {
1690                            let mut m = metrics.borrow_mut();
1691                            let outcome = m.lease_clear(&req.client_id, presented_binding);
1692                            (
1693                                outcome,
1694                                m.detail_effective(),
1695                                m.detail_configured(),
1696                                m.lease_count(),
1697                            )
1698                        });
1699                    match outcome {
1700                        crate::metrics::LeaseClearOutcome::Cleared { previous_effective } => {
1701                            push_metric_detail_transition(
1702                                previous_effective,
1703                                effective_after,
1704                                "lease_clear",
1705                                Some(req.client_id.clone()),
1706                            );
1707                            push_queue(WorkerResponse::ok_with_content(
1708                                message.id.clone(),
1709                                worker_metric_detail_status_content(
1710                                    configured_after,
1711                                    effective_after,
1712                                    previous_effective,
1713                                    lease_count_after,
1714                                ),
1715                            ));
1716                        }
1717                        crate::metrics::LeaseClearOutcome::NotFound => {
1718                            // Silent no-op: no lease existed for that
1719                            // id. The worker's state is unchanged so
1720                            // previous_effective == effective.
1721                            push_queue(WorkerResponse::ok_with_content(
1722                                message.id.clone(),
1723                                worker_metric_detail_status_content(
1724                                    configured_after,
1725                                    effective_after,
1726                                    effective_after,
1727                                    lease_count_after,
1728                                ),
1729                            ));
1730                        }
1731                        crate::metrics::LeaseClearOutcome::Unauthorized => {
1732                            // Do NOT echo `req.client_id` here: the operator-
1733                            // supplied bytes flow back through the master's
1734                            // worker→reason aggregation into the audit line's
1735                            // `reason=` column, which is sanitised for control
1736                            // bytes only. The dedicated `lease_id=` audit
1737                            // column already carries the operator string
1738                            // through `sanitize_for_audit_kv`, so re-embedding
1739                            // it here would let a value containing `,` or `=`
1740                            // forge a sibling KV pair against SIEM consumers
1741                            // that split on `, key=value`.
1742                            let msg = "SetMetricDetail: clear refused (peer \
1743                                 binding does not match the apply-time owner)"
1744                                .to_owned();
1745                            error!("{}", msg);
1746                            push_queue(WorkerResponse::error(message.id.clone(), msg));
1747                        }
1748                    }
1749                    return;
1750                }
1751                let detail_proto = match req.detail {
1752                    Some(d) => d,
1753                    None => {
1754                        // Operator-supplied `client_id` is intentionally
1755                        // omitted from the reason string: the dedicated
1756                        // `lease_id=` audit column carries it through the
1757                        // strict KV sanitiser. See the matching comment on
1758                        // the `Unauthorized` arm above for the column-
1759                        // smuggling rationale.
1760                        let msg = "SetMetricDetail without `detail` and without `clear`".to_owned();
1761                        error!("{}", msg);
1762                        push_queue(WorkerResponse::error(message.id.clone(), msg));
1763                        return;
1764                    }
1765                };
1766                let detail_enum = match MetricDetail::try_from(detail_proto) {
1767                    Ok(d) => d,
1768                    Err(e) => {
1769                        let msg =
1770                            format!("SetMetricDetail: invalid MetricDetail variant {detail_proto}");
1771                        error!("{}: {}", msg, e);
1772                        push_queue(WorkerResponse::error(message.id.clone(), msg));
1773                        return;
1774                    }
1775                };
1776                let level = MetricDetailLevel::from(detail_enum);
1777                // Bound the worst case BEFORE we touch the aggregator: the
1778                // proto contract on `SetMetricDetail.ttl_seconds` says the
1779                // worker rejects values larger than `LEASE_TTL_MAX` so a
1780                // stuck operator-side renewer (or a buggy third-party client)
1781                // cannot lock the worker into elevated cardinality. The
1782                // `Aggregator::lease_apply` clamp is still in place as a
1783                // defence-in-depth net for code paths that bypass this
1784                // dispatch (proto fuzzing, future internal callers).
1785                if let Some(t) = req.ttl_seconds
1786                    && u64::from(t) > crate::metrics::LEASE_TTL_MAX.as_secs()
1787                {
1788                    let msg = format!(
1789                        "SetMetricDetail: ttl_seconds={t} exceeds LEASE_TTL_MAX={}",
1790                        crate::metrics::LEASE_TTL_MAX.as_secs()
1791                    );
1792                    error!("{}", msg);
1793                    push_queue(WorkerResponse::error(message.id.clone(), msg));
1794                    return;
1795                }
1796                let ttl_seconds = req.ttl_seconds.filter(|&t| t > 0).unwrap_or_else(|| {
1797                    // The default fits in a u32 by construction
1798                    // (LEASE_TTL_DEFAULT = 60 s); the lossy `as u32` cast
1799                    // is replaced with a checked conversion so any
1800                    // future tweak past `u32::MAX` seconds (≈ 136 years)
1801                    // can't silently truncate. Falls through to 60 s on
1802                    // the theoretical overflow path.
1803                    u32::try_from(crate::metrics::LEASE_TTL_DEFAULT.as_secs()).unwrap_or(60)
1804                });
1805                let ttl = std::time::Duration::from_secs(ttl_seconds.into());
1806                let (outcome, configured_after, lease_count_after) = METRICS.with(|metrics| {
1807                    let mut m = metrics.borrow_mut();
1808                    let outcome =
1809                        m.lease_apply(req.client_id.clone(), level, ttl, presented_binding);
1810                    (outcome, m.detail_configured(), m.lease_count())
1811                });
1812                match outcome {
1813                    crate::metrics::LeaseApplyOutcome::Applied {
1814                        previous_effective,
1815                        new_effective,
1816                    } => {
1817                        push_metric_detail_transition(
1818                            previous_effective,
1819                            new_effective,
1820                            "lease_apply",
1821                            Some(req.client_id.clone()),
1822                        );
1823                        push_queue(WorkerResponse::ok_with_content(
1824                            message.id.clone(),
1825                            worker_metric_detail_status_content(
1826                                configured_after,
1827                                new_effective,
1828                                previous_effective,
1829                                lease_count_after,
1830                            ),
1831                        ));
1832                    }
1833                    crate::metrics::LeaseApplyOutcome::ClientIdTooLong => {
1834                        let msg = format!(
1835                            "SetMetricDetail: client_id length {} exceeds {} bytes",
1836                            req.client_id.len(),
1837                            crate::metrics::LEASE_CLIENT_ID_MAX_BYTES,
1838                        );
1839                        error!("{}", msg);
1840                        push_queue(WorkerResponse::error(message.id.clone(), msg));
1841                    }
1842                    crate::metrics::LeaseApplyOutcome::TableFull => {
1843                        // Same audit-column-smuggling guard as the
1844                        // `Unauthorized` and missing-detail arms: the
1845                        // operator-supplied `client_id` is already rendered
1846                        // safely through the strict KV sanitiser in the
1847                        // audit envelope's `lease_id=` column, so we keep
1848                        // it out of the reason string.
1849                        let msg = format!(
1850                            "SetMetricDetail: lease table at capacity ({} entries); reject new \
1851                             apply — operators must retry after an active lease expires or is \
1852                             cleared",
1853                            crate::metrics::LEASE_TABLE_CAP,
1854                        );
1855                        error!("{}", msg);
1856                        push_queue(WorkerResponse::error(message.id.clone(), msg));
1857                    }
1858                    crate::metrics::LeaseApplyOutcome::TtlOutOfRange => {
1859                        // Unreachable in the normal flow: the dispatch-time
1860                        // gate above already rejected ttl > LEASE_TTL_MAX.
1861                        // Surface explicitly so any future bypass (proto
1862                        // fuzzing, internal callers) fails loud rather
1863                        // than silently capping the lessor's intent.
1864                        let msg = format!(
1865                            "SetMetricDetail: ttl exceeds LEASE_TTL_MAX={} (internal contract \
1866                             violation: dispatch gate should have rejected)",
1867                            crate::metrics::LEASE_TTL_MAX.as_secs(),
1868                        );
1869                        error!("{}", msg);
1870                        push_queue(WorkerResponse::error(message.id.clone(), msg));
1871                    }
1872                    crate::metrics::LeaseApplyOutcome::Unauthorized => {
1873                        // A renewal arrived against an existing lease whose
1874                        // apply-time peer binding does not match the
1875                        // presented one. The `client_id` is intentionally
1876                        // omitted from the error string — the audit-log
1877                        // row already carries it in the dedicated
1878                        // `lease_id` column, and echoing it here would
1879                        // route operator-controlled bytes through the
1880                        // freeform reason field.
1881                        let msg = "SetMetricDetail: renewal refused (peer binding does not \
1882                                   match the apply-time owner)"
1883                            .to_owned();
1884                        error!("{}", msg);
1885                        push_queue(WorkerResponse::error(message.id.clone(), msg));
1886                    }
1887                }
1888                return;
1889            }
1890            Some(RequestType::Logging(logging_filter)) => {
1891                info!(
1892                    "{} changing logging filter to {}",
1893                    message.id, logging_filter
1894                );
1895                // there should not be any errors as it was already parsed by the main process
1896                let (directives, _errors) = logging::parse_logging_spec(logging_filter);
1897                logging::LOGGER.with(|logger| {
1898                    logger.borrow_mut().set_directives(directives);
1899                });
1900                push_queue(WorkerResponse::ok(message.id));
1901                return;
1902            }
1903            Some(RequestType::QueryClustersHashes(_)) => {
1904                push_queue(WorkerResponse::ok_with_content(
1905                    message.id.clone(),
1906                    ContentType::ClusterHashes(ClusterHashes {
1907                        map: self.config_state.hash_state(),
1908                    })
1909                    .into(),
1910                ));
1911                return;
1912            }
1913            Some(RequestType::QueryClusterById(cluster_id)) => {
1914                push_queue(WorkerResponse::ok_with_content(
1915                    message.id.clone(),
1916                    ContentType::Clusters(ClusterInformations {
1917                        vec: self
1918                            .config_state
1919                            .cluster_state(cluster_id)
1920                            .map_or(vec![], |ci| vec![ci]),
1921                    })
1922                    .into(),
1923                ));
1924            }
1925            Some(RequestType::SetMaxConnectionsPerIp(limit)) => {
1926                let mut sessions = self.sessions.borrow_mut();
1927                let previous = sessions.max_connections_per_ip;
1928                sessions.max_connections_per_ip = *limit;
1929                // Disabling the feature on the fly should not leave
1930                // stale `(cluster, ip)` entries behind: drain the
1931                // bookkeeping so a re-enable starts from a clean slate
1932                // and `cluster_ip_at_limit` does not consult dead state.
1933                if *limit == 0 {
1934                    sessions.clear_cluster_ip_tracking();
1935                }
1936                info!(
1937                    "{} updated global max_connections_per_ip from {} to {}",
1938                    message.id, previous, limit
1939                );
1940                push_queue(WorkerResponse::ok(message.id));
1941                return;
1942            }
1943            Some(RequestType::QueryMaxConnectionsPerIp(_)) => {
1944                let limit = self.sessions.borrow().max_connections_per_ip;
1945                push_queue(WorkerResponse::ok_with_content(
1946                    message.id,
1947                    ContentType::MaxConnectionsPerIpLimit(
1948                        sozu_command::proto::command::MaxConnectionsPerIpLimit { limit },
1949                    )
1950                    .into(),
1951                ));
1952                return;
1953            }
1954            Some(RequestType::QueryClustersByDomain(domain)) => {
1955                let cluster_ids = self
1956                    .config_state
1957                    .get_cluster_ids_by_domain(domain.hostname.clone(), domain.path.clone());
1958                let vec = cluster_ids
1959                    .iter()
1960                    .filter_map(|cluster_id| self.config_state.cluster_state(cluster_id))
1961                    .collect();
1962
1963                push_queue(WorkerResponse::ok_with_content(
1964                    message.id.clone(),
1965                    ContentType::Clusters(ClusterInformations { vec }).into(),
1966                ));
1967                return;
1968            }
1969            Some(RequestType::QueryCertificatesFromWorkers(filters)) => {
1970                if filters.fingerprint.is_some() {
1971                    let certs = self.config_state.get_certificates(filters.clone());
1972                    let response = if !certs.is_empty() {
1973                        WorkerResponse::ok_with_content(
1974                            message.id.clone(),
1975                            ContentType::CertificatesWithFingerprints(
1976                                CertificatesWithFingerprints { certs },
1977                            )
1978                            .into(),
1979                        )
1980                    } else {
1981                        worker_response_error(
1982                            message.id.clone(),
1983                            "Could not find certificate for this fingerprint",
1984                        )
1985                    };
1986                    push_queue(response);
1987                    return;
1988                }
1989                // if all certificates are queried, or filtered by domain name,
1990                // the request will be handled by the https proxy
1991            }
1992            _other_request => {}
1993        }
1994        self.notify_proxys(message);
1995    }
1996
1997    pub fn notify_proxys(&mut self, request: WorkerRequest) {
1998        if let Err(e) = self.config_state.dispatch(&request.content) {
1999            error!("Could not execute order on config state: {}", e);
2000        }
2001
2002        let req_id = request.id.clone();
2003
2004        match request.content.request_type {
2005            Some(RequestType::AddCluster(ref cluster)) => {
2006                // Mirror the master-side ConfigState::add_cluster check so
2007                // off-channel paths (TOML reload, SaveState/LoadState, direct
2008                // API) that smuggle a malformed `cluster.health_check` cannot
2009                // arm the worker's BackendList::set_health_check_config with
2010                // a CRLF/NUL/C0 URI or zero thresholds. The SetHealthCheck
2011                // handler below already runs the same validation; this is the
2012                // AddCluster mirror.
2013                if let Some(hc) = cluster.health_check.as_ref() {
2014                    if let Err(reason) = sozu_command::config::validate_health_check_config(hc) {
2015                        push_queue(worker_response_error(req_id, reason));
2016                        return;
2017                    }
2018                }
2019                self.add_cluster(cluster);
2020                // Re-arm the metric drain tombstone in case this cluster id
2021                // was previously removed — without this the drain would
2022                // continue dropping every emission for the resurrected
2023                // cluster. Idempotent on a fresh id.
2024                METRICS.with(|metrics| {
2025                    (*metrics.borrow_mut()).add_cluster(&cluster.cluster_id);
2026                });
2027                //not returning because the message must still be handled by each proxy
2028            }
2029            Some(RequestType::RemoveCluster(ref cluster_id)) => {
2030                self.remove_health_check_state(cluster_id);
2031                METRICS.with(|metrics| {
2032                    (*metrics.borrow_mut()).remove_cluster(cluster_id);
2033                });
2034                //not returning because the message must still be handled by each proxy
2035            }
2036            Some(RequestType::SetHealthCheck(ref set)) => {
2037                if let Err(reason) = sozu_command::config::validate_health_check_config(&set.config)
2038                {
2039                    push_queue(worker_response_error(req_id, reason));
2040                    return;
2041                }
2042                self.backends
2043                    .borrow_mut()
2044                    .set_health_check_config(&set.cluster_id, Some(set.config.to_owned()));
2045                push_queue(WorkerResponse::ok(req_id));
2046                return;
2047            }
2048            Some(RequestType::RemoveHealthCheck(ref cluster_id)) => {
2049                self.remove_health_check_state(cluster_id);
2050                push_queue(WorkerResponse::ok(req_id));
2051                return;
2052            }
2053            Some(RequestType::AddBackend(ref backend)) => {
2054                push_queue(self.add_backend(&req_id, backend));
2055                return;
2056            }
2057            Some(RequestType::RemoveBackend(ref remove_backend)) => {
2058                push_queue(self.remove_backend(&req_id, remove_backend));
2059                return;
2060            }
2061            _ => {}
2062        };
2063
2064        let proxy_destinations = request.content.get_destinations();
2065        let mut notify_response = None;
2066        if proxy_destinations.to_http_proxy {
2067            notify_response = Some(self.http.borrow_mut().notify(request.clone()));
2068        }
2069        if proxy_destinations.to_https_proxy {
2070            let http_proxy_response = self.https.borrow_mut().notify(request.clone());
2071            if http_proxy_response.is_failure() || notify_response.is_none() {
2072                notify_response = Some(http_proxy_response);
2073            }
2074        }
2075        if proxy_destinations.to_tcp_proxy {
2076            let tcp_proxy_response = self.tcp.borrow_mut().notify(request.clone());
2077            if tcp_proxy_response.is_failure() || notify_response.is_none() {
2078                notify_response = Some(tcp_proxy_response);
2079            }
2080        }
2081        if proxy_destinations.to_udp_proxy {
2082            let udp_proxy_response = self.udp.borrow_mut().notify(request.clone());
2083            if udp_proxy_response.is_failure() || notify_response.is_none() {
2084                notify_response = Some(udp_proxy_response);
2085            }
2086        }
2087        if let Some(response) = notify_response {
2088            push_queue(response);
2089        }
2090
2091        match request.content.request_type {
2092            // special case for adding listeners, because we need to register a listener
2093            Some(RequestType::AddHttpListener(listener)) => {
2094                push_queue(self.notify_add_http_listener(&req_id, listener));
2095            }
2096            Some(RequestType::AddHttpsListener(listener)) => {
2097                push_queue(self.notify_add_https_listener(&req_id, listener));
2098            }
2099            Some(RequestType::AddTcpListener(listener)) => {
2100                push_queue(self.notify_add_tcp_listener(&req_id, listener));
2101            }
2102            Some(RequestType::AddUdpListener(listener)) => {
2103                push_queue(self.notify_add_udp_listener(&req_id, listener));
2104            }
2105            Some(RequestType::UpdateHttpListener(patch)) => {
2106                push_queue(self.notify_update_http_listener(&req_id, patch));
2107            }
2108            Some(RequestType::UpdateHttpsListener(patch)) => {
2109                push_queue(self.notify_update_https_listener(&req_id, patch));
2110            }
2111            Some(RequestType::UpdateTcpListener(patch)) => {
2112                push_queue(self.notify_update_tcp_listener(&req_id, patch));
2113            }
2114            Some(RequestType::UpdateUdpListener(patch)) => {
2115                push_queue(self.notify_update_udp_listener(&req_id, patch));
2116            }
2117            Some(RequestType::RemoveListener(ref remove)) => {
2118                debug!("{} remove {:?} listener {:?}", req_id, remove.proxy, remove);
2119                // We only remove a listener that was previously added, so the
2120                // base count is at least 1 — the subtraction cannot underflow.
2121                debug_assert!(
2122                    self.base_sessions_count > 0,
2123                    "removing a listener with base_sessions_count == 0 would underflow"
2124                );
2125                self.base_sessions_count -= 1;
2126                let response = match ListenerType::try_from(remove.proxy) {
2127                    Ok(ListenerType::Http) => self.http.borrow_mut().notify(request),
2128                    Ok(ListenerType::Https) => self.https.borrow_mut().notify(request),
2129                    Ok(ListenerType::Tcp) => self.tcp.borrow_mut().notify(request),
2130                    Ok(ListenerType::Udp) => self.udp.borrow_mut().notify(request),
2131                    Err(_) => WorkerResponse::error(req_id, "Wrong variant ListenerType"),
2132                };
2133                push_queue(response);
2134            }
2135            Some(RequestType::ActivateListener(ref activate)) => {
2136                push_queue(self.notify_activate_listener(&req_id, activate));
2137            }
2138            Some(RequestType::DeactivateListener(ref deactivate)) => {
2139                push_queue(self.notify_deactivate_listener(&req_id, deactivate));
2140            }
2141            _other_request => {}
2142        };
2143    }
2144
2145    fn add_cluster(&mut self, cluster: &Cluster) {
2146        let mut backends = self.backends.borrow_mut();
2147        backends.set_load_balancing_policy_for_cluster(
2148            &cluster.cluster_id,
2149            LoadBalancingAlgorithms::try_from(cluster.load_balancing).unwrap_or_default(),
2150            cluster
2151                .load_metric
2152                .and_then(|n| LoadMetric::try_from(n).ok()),
2153        );
2154        backends.set_health_check_config(&cluster.cluster_id, cluster.health_check.to_owned());
2155        backends.set_cluster_http2(&cluster.cluster_id, cluster.http2.unwrap_or(false));
2156    }
2157
2158    fn add_backend(&mut self, req_id: &str, add_backend: &AddBackend) -> WorkerResponse {
2159        let new_backend = Backend::new(
2160            &add_backend.backend_id,
2161            add_backend.address.into(),
2162            add_backend.sticky_id.clone(),
2163            add_backend.load_balancing_parameters,
2164            add_backend.backup,
2165        );
2166        self.backends
2167            .borrow_mut()
2168            .add_backend(&add_backend.cluster_id, new_backend);
2169
2170        WorkerResponse::ok(req_id)
2171    }
2172
2173    fn remove_health_check_state(&mut self, cluster_id: &str) {
2174        self.health_checker.remove_cluster(cluster_id);
2175        self.backends
2176            .borrow_mut()
2177            .health_check_configs
2178            .remove(cluster_id);
2179    }
2180
2181    fn remove_backend(&mut self, req_id: &str, backend: &RemoveBackend) -> WorkerResponse {
2182        let address = backend.address.into();
2183        // Runtime removal is address-keyed and drops every backend at this
2184        // address (A/B test, weighted variant, dedup race). The metrics
2185        // layer is id-keyed — fan out one `remove_backend` per actually-
2186        // removed id so the two identities stay in sync. Without this the
2187        // `backend_id` field on the IPC message could name "A" while the
2188        // runtime dropped both "A" and "B" at the same address, leaving
2189        // "B"'s metrics rows orphaned forever.
2190        let removed_ids = self
2191            .backends
2192            .borrow_mut()
2193            .remove_backend(&backend.cluster_id, &address);
2194        if removed_ids.is_empty() {
2195            // Edge case: BackendList returned nothing (address never
2196            // existed in this cluster). Honour the request's stated id
2197            // anyway so a no-op request still tidies any orphan metric
2198            // row from a prior identity-drift state.
2199            METRICS.with(|metrics| {
2200                (*metrics.borrow_mut()).remove_backend(&backend.cluster_id, &backend.backend_id);
2201            });
2202        } else {
2203            METRICS.with(|metrics| {
2204                let mut metrics = metrics.borrow_mut();
2205                for id in &removed_ids {
2206                    metrics.remove_backend(&backend.cluster_id, id);
2207                }
2208            });
2209        }
2210
2211        WorkerResponse::ok(req_id)
2212    }
2213
2214    fn notify_add_http_listener(
2215        &mut self,
2216        req_id: &str,
2217        listener: HttpListenerConfig,
2218    ) -> WorkerResponse {
2219        debug!("{} add http listener {:?}", req_id, listener);
2220
2221        if self.sessions.borrow().at_capacity() {
2222            return worker_response_error(req_id, "session list is full, cannot add a listener");
2223        }
2224
2225        let mut session_manager = self.sessions.borrow_mut();
2226        // The vacant entry's key is free now and becomes the listener's token.
2227        let slab_before = session_manager.slab.len();
2228        debug_assert!(
2229            !session_manager
2230                .slab
2231                .contains(session_manager.slab.vacant_key()),
2232            "the next vacant slab key must be free before insertion"
2233        );
2234        let entry = session_manager.slab.vacant_entry();
2235        let token = Token(entry.key());
2236
2237        match self.http.borrow_mut().add_listener(listener, token) {
2238            Ok(_token) => {
2239                entry.insert(Rc::new(RefCell::new(ListenSession {
2240                    protocol: Protocol::HTTPListen,
2241                })));
2242                // The listener session occupies exactly the token's slab key,
2243                // and the slab grew by exactly one slot.
2244                debug_assert!(
2245                    session_manager.slab.contains(token.0),
2246                    "listener insert must occupy the token's slab key"
2247                );
2248                debug_assert_eq!(
2249                    session_manager.slab.len(),
2250                    slab_before + 1,
2251                    "adding a listener must occupy exactly one slab slot"
2252                );
2253                self.base_sessions_count += 1;
2254                WorkerResponse::ok(req_id)
2255            }
2256            Err(e) => worker_response_error(req_id, format!("Could not add HTTP listener: {e}")),
2257        }
2258    }
2259
2260    fn notify_add_https_listener(
2261        &mut self,
2262        req_id: &str,
2263        listener: HttpsListenerConfig,
2264    ) -> WorkerResponse {
2265        debug!("{} add https listener {:?}", req_id, listener);
2266
2267        if self.sessions.borrow().at_capacity() {
2268            return worker_response_error(req_id, "session list is full, cannot add a listener");
2269        }
2270
2271        let mut session_manager = self.sessions.borrow_mut();
2272        let slab_before = session_manager.slab.len();
2273        debug_assert!(
2274            !session_manager
2275                .slab
2276                .contains(session_manager.slab.vacant_key()),
2277            "the next vacant slab key must be free before insertion"
2278        );
2279        let entry = session_manager.slab.vacant_entry();
2280        let token = Token(entry.key());
2281
2282        match self
2283            .https
2284            .borrow_mut()
2285            .add_listener(listener.clone(), token)
2286        {
2287            Ok(_token) => {
2288                entry.insert(Rc::new(RefCell::new(ListenSession {
2289                    protocol: Protocol::HTTPSListen,
2290                })));
2291                debug_assert!(
2292                    session_manager.slab.contains(token.0),
2293                    "listener insert must occupy the token's slab key"
2294                );
2295                debug_assert_eq!(
2296                    session_manager.slab.len(),
2297                    slab_before + 1,
2298                    "adding a listener must occupy exactly one slab slot"
2299                );
2300                self.base_sessions_count += 1;
2301                WorkerResponse::ok(req_id)
2302            }
2303            Err(e) => worker_response_error(req_id, format!("Could not add HTTPS listener: {e}")),
2304        }
2305    }
2306
2307    fn notify_add_tcp_listener(
2308        &mut self,
2309        req_id: &str,
2310        listener: CommandTcpListener,
2311    ) -> WorkerResponse {
2312        debug!("{} add tcp listener {:?}", req_id, listener);
2313
2314        if self.sessions.borrow().at_capacity() {
2315            return worker_response_error(req_id, "session list is full, cannot add a listener");
2316        }
2317
2318        let mut session_manager = self.sessions.borrow_mut();
2319        let slab_before = session_manager.slab.len();
2320        debug_assert!(
2321            !session_manager
2322                .slab
2323                .contains(session_manager.slab.vacant_key()),
2324            "the next vacant slab key must be free before insertion"
2325        );
2326        let entry = session_manager.slab.vacant_entry();
2327        let token = Token(entry.key());
2328
2329        match self.tcp.borrow_mut().add_listener(listener, token) {
2330            Ok(_token) => {
2331                entry.insert(Rc::new(RefCell::new(ListenSession {
2332                    protocol: Protocol::TCPListen,
2333                })));
2334                debug_assert!(
2335                    session_manager.slab.contains(token.0),
2336                    "listener insert must occupy the token's slab key"
2337                );
2338                debug_assert_eq!(
2339                    session_manager.slab.len(),
2340                    slab_before + 1,
2341                    "adding a listener must occupy exactly one slab slot"
2342                );
2343                self.base_sessions_count += 1;
2344                WorkerResponse::ok(req_id)
2345            }
2346            Err(e) => worker_response_error(req_id, format!("Could not add TCP listener: {e}")),
2347        }
2348    }
2349
2350    fn notify_add_udp_listener(
2351        &mut self,
2352        req_id: &str,
2353        listener: CommandUdpListener,
2354    ) -> WorkerResponse {
2355        debug!("{} add udp listener {:?}", req_id, listener);
2356
2357        if self.sessions.borrow().at_capacity() {
2358            return worker_response_error(req_id, "session list is full, cannot add a listener");
2359        }
2360
2361        let mut session_manager = self.sessions.borrow_mut();
2362        let entry = session_manager.slab.vacant_entry();
2363        let token = Token(entry.key());
2364
2365        match self.udp.borrow_mut().add_listener(listener, token) {
2366            Ok(_token) => {
2367                entry.insert(Rc::new(RefCell::new(ListenSession {
2368                    protocol: Protocol::UDPListen,
2369                })));
2370                self.base_sessions_count += 1;
2371                WorkerResponse::ok(req_id)
2372            }
2373            Err(e) => worker_response_error(req_id, format!("Could not add UDP listener: {e}")),
2374        }
2375    }
2376
2377    fn notify_update_udp_listener(
2378        &mut self,
2379        req_id: &str,
2380        patch: UpdateUdpListenerConfig,
2381    ) -> WorkerResponse {
2382        debug!("{} update udp listener {:?}", req_id, patch.address);
2383        match self.udp.borrow_mut().update_listener(patch) {
2384            Ok(()) => WorkerResponse::ok(req_id),
2385            Err(e) => worker_response_error(req_id, format!("Could not update UDP listener: {e}")),
2386        }
2387    }
2388
2389    fn notify_update_http_listener(
2390        &mut self,
2391        req_id: &str,
2392        patch: UpdateHttpListenerConfig,
2393    ) -> WorkerResponse {
2394        debug!("{} update http listener {:?}", req_id, patch.address);
2395        match self.http.borrow_mut().update_listener(patch) {
2396            Ok(()) => WorkerResponse::ok(req_id),
2397            Err(e) => worker_response_error(req_id, format!("Could not update HTTP listener: {e}")),
2398        }
2399    }
2400
2401    fn notify_update_https_listener(
2402        &mut self,
2403        req_id: &str,
2404        patch: UpdateHttpsListenerConfig,
2405    ) -> WorkerResponse {
2406        debug!("{} update https listener {:?}", req_id, patch.address);
2407        match self.https.borrow_mut().update_listener(patch) {
2408            Ok(()) => WorkerResponse::ok(req_id),
2409            Err(e) => {
2410                worker_response_error(req_id, format!("Could not update HTTPS listener: {e}"))
2411            }
2412        }
2413    }
2414
2415    fn notify_update_tcp_listener(
2416        &mut self,
2417        req_id: &str,
2418        patch: UpdateTcpListenerConfig,
2419    ) -> WorkerResponse {
2420        debug!("{} update tcp listener {:?}", req_id, patch.address);
2421        match self.tcp.borrow_mut().update_listener(patch) {
2422            Ok(()) => WorkerResponse::ok(req_id),
2423            Err(e) => worker_response_error(req_id, format!("Could not update TCP listener: {e}")),
2424        }
2425    }
2426
2427    fn notify_activate_listener(
2428        &mut self,
2429        req_id: &str,
2430        activate: &ActivateListener,
2431    ) -> WorkerResponse {
2432        debug!(
2433            "{} activate {:?} listener {:?}",
2434            req_id, activate.proxy, activate
2435        );
2436
2437        let address: std::net::SocketAddr = activate.address.into();
2438
2439        match ListenerType::try_from(activate.proxy) {
2440            Ok(ListenerType::Http) => {
2441                let listener = self
2442                    .scm_listeners
2443                    .as_mut()
2444                    .and_then(|s| s.get_http(&address))
2445                    // SAFETY: `fd` was just received from the supervisor via SCM_RIGHTS
2446                    // (see `command/src/scm_socket.rs`) and is not owned elsewhere — the
2447                    // `ScmListeners` map removes it on `get_http`. Ownership transfers to
2448                    // the mio wrapper, whose `Drop` closes the descriptor.
2449                    .map(|fd| unsafe { MioTcpListener::from_raw_fd(fd) });
2450
2451                let activated_token = self.http.borrow_mut().activate_listener(&address, listener);
2452                match activated_token {
2453                    Ok(token) => {
2454                        self.accept(ListenToken(token.0), Protocol::HTTPListen);
2455                        WorkerResponse::ok(req_id)
2456                    }
2457                    Err(activate_error) => worker_response_error(
2458                        req_id,
2459                        format!("Could not activate HTTP listener: {activate_error}"),
2460                    ),
2461                }
2462            }
2463            Ok(ListenerType::Https) => {
2464                let listener = self
2465                    .scm_listeners
2466                    .as_mut()
2467                    .and_then(|s| s.get_https(&address))
2468                    // SAFETY: `fd` was just received from the supervisor via SCM_RIGHTS
2469                    // (see `command/src/scm_socket.rs`) and is not owned elsewhere — the
2470                    // `ScmListeners` map removes it on `get_https`. Ownership transfers to
2471                    // the mio wrapper, whose `Drop` closes the descriptor.
2472                    .map(|fd| unsafe { MioTcpListener::from_raw_fd(fd) });
2473
2474                let activated_token = self
2475                    .https
2476                    .borrow_mut()
2477                    .activate_listener(&address, listener);
2478                match activated_token {
2479                    Ok(token) => {
2480                        self.accept(ListenToken(token.0), Protocol::HTTPSListen);
2481                        WorkerResponse::ok(req_id)
2482                    }
2483                    Err(activate_error) => worker_response_error(
2484                        req_id,
2485                        format!("Could not activate HTTPS listener: {activate_error}"),
2486                    ),
2487                }
2488            }
2489            Ok(ListenerType::Tcp) => {
2490                let listener = self
2491                    .scm_listeners
2492                    .as_mut()
2493                    .and_then(|s| s.get_tcp(&address))
2494                    // SAFETY: `fd` was just received from the supervisor via SCM_RIGHTS
2495                    // (see `command/src/scm_socket.rs`) and is not owned elsewhere — the
2496                    // `ScmListeners` map removes it on `get_tcp`. Ownership transfers to
2497                    // the mio wrapper, whose `Drop` closes the descriptor.
2498                    .map(|fd| unsafe { MioTcpListener::from_raw_fd(fd) });
2499
2500                let listener_token = self.tcp.borrow_mut().activate_listener(&address, listener);
2501                match listener_token {
2502                    Ok(token) => {
2503                        self.accept(ListenToken(token.0), Protocol::TCPListen);
2504                        WorkerResponse::ok(req_id)
2505                    }
2506                    Err(activate_error) => worker_response_error(
2507                        req_id,
2508                        format!("Could not activate TCP listener: {activate_error}"),
2509                    ),
2510                }
2511            }
2512            Ok(ListenerType::Udp) => {
2513                let socket = self
2514                    .scm_listeners
2515                    .as_mut()
2516                    .and_then(|s| s.get_udp(&address))
2517                    // SAFETY: `fd` was just received from the supervisor via
2518                    // SCM_RIGHTS (see `command/src/scm_socket.rs`) and is not
2519                    // owned elsewhere — `ScmListeners::get_udp` removes it from
2520                    // the map. Ownership transfers to the mio `UdpSocket`
2521                    // wrapper, whose `Drop` closes the descriptor. `O_NONBLOCK`
2522                    // + `SO_REUSE*` are file-description flags preserved across
2523                    // SCM + exec.
2524                    .map(|fd| unsafe { MioUdpSocket::from_raw_fd(fd) });
2525
2526                let activated_token = self.udp.borrow_mut().activate_listener(&address, socket);
2527                match activated_token {
2528                    Ok(token) => {
2529                        // UDP never uses accept()/create_session: replace the
2530                        // `ListenSession` placeholder at the listener token with
2531                        // the real `UdpListenerSession` so the READABLE
2532                        // registration drives `Server::ready`'s generic path
2533                        // into `UdpListenerSession::update_readiness`.
2534                        if let Some(session) = self.udp.borrow_mut().build_session(token) {
2535                            let mut sessions = self.sessions.borrow_mut();
2536                            if sessions.slab.contains(token.0) {
2537                                sessions.slab[token.0] = session;
2538                            }
2539                        }
2540                        WorkerResponse::ok(req_id)
2541                    }
2542                    Err(activate_error) => worker_response_error(
2543                        req_id,
2544                        format!("Could not activate UDP listener: {activate_error}"),
2545                    ),
2546                }
2547            }
2548            Err(_) => worker_response_error(req_id, "Wrong variant for ListenerType on request"),
2549        }
2550    }
2551
2552    fn notify_deactivate_listener(
2553        &mut self,
2554        req_id: &str,
2555        deactivate: &DeactivateListener,
2556    ) -> WorkerResponse {
2557        debug!(
2558            "{} deactivate {:?} listener {:?}",
2559            req_id, deactivate.proxy, deactivate
2560        );
2561
2562        let address: std::net::SocketAddr = deactivate.address.into();
2563
2564        match ListenerType::try_from(deactivate.proxy) {
2565            Ok(ListenerType::Http) => {
2566                let (token, mut listener) = match self.http.borrow_mut().give_back_listener(address)
2567                {
2568                    Ok((token, listener)) => (token, listener),
2569                    Err(e) => {
2570                        return worker_response_error(
2571                            req_id,
2572                            format!(
2573                                "Couldn't deactivate HTTP listener at address {address:?}: {e}"
2574                            ),
2575                        );
2576                    }
2577                };
2578
2579                if let Err(e) = self.poll.registry().deregister(&mut listener) {
2580                    error!(
2581                        "error deregistering HTTP listen socket({:?}): {:?}",
2582                        deactivate, e
2583                    );
2584                }
2585
2586                {
2587                    let mut sessions = self.sessions.borrow_mut();
2588                    if sessions.slab.contains(token.0) {
2589                        sessions.slab.remove(token.0);
2590                        info!("removed listen token {:?}", token);
2591                    }
2592                }
2593
2594                if deactivate.to_scm {
2595                    self.unblock_scm_socket();
2596                    let listeners = Listeners {
2597                        http: vec![(address, listener.as_raw_fd())],
2598                        tls: vec![],
2599                        tcp: vec![],
2600                        udp: vec![],
2601                    };
2602                    info!("sending HTTP listener: {:?}", listeners);
2603                    let res = self.scm.send_listeners(&listeners);
2604
2605                    self.block_scm_socket();
2606
2607                    info!("sent HTTP listener: {:?}", res);
2608                }
2609                WorkerResponse::ok(req_id)
2610            }
2611            Ok(ListenerType::Https) => {
2612                let (token, mut listener) = match self
2613                    .https
2614                    .borrow_mut()
2615                    .give_back_listener(address)
2616                {
2617                    Ok((token, listener)) => (token, listener),
2618                    Err(e) => {
2619                        return worker_response_error(
2620                            req_id,
2621                            format!(
2622                                "Couldn't deactivate HTTPS listener at address {address:?}: {e}",
2623                            ),
2624                        );
2625                    }
2626                };
2627                if let Err(e) = self.poll.registry().deregister(&mut listener) {
2628                    error!(
2629                        "error deregistering HTTPS listen socket({:?}): {:?}",
2630                        deactivate, e
2631                    );
2632                }
2633                if self.sessions.borrow().slab.contains(token.0) {
2634                    self.sessions.borrow_mut().slab.remove(token.0);
2635                    info!("removed listen token {:?}", token);
2636                }
2637
2638                if deactivate.to_scm {
2639                    self.unblock_scm_socket();
2640                    let listeners = Listeners {
2641                        http: vec![],
2642                        tls: vec![(address, listener.as_raw_fd())],
2643                        tcp: vec![],
2644                        udp: vec![],
2645                    };
2646                    info!("sending HTTPS listener: {:?}", listeners);
2647                    let res = self.scm.send_listeners(&listeners);
2648
2649                    self.block_scm_socket();
2650
2651                    info!("sent HTTPS listener: {:?}", res);
2652                }
2653                WorkerResponse::ok(req_id)
2654            }
2655            Ok(ListenerType::Tcp) => {
2656                let (token, mut listener) = match self.tcp.borrow_mut().give_back_listener(address)
2657                {
2658                    Ok((token, listener)) => (token, listener),
2659                    Err(e) => {
2660                        return worker_response_error(
2661                            req_id,
2662                            format!(
2663                                "Could not deactivate TCP listener at address {address:?}: {e}"
2664                            ),
2665                        );
2666                    }
2667                };
2668
2669                if let Err(e) = self.poll.registry().deregister(&mut listener) {
2670                    error!(
2671                        "error deregistering TCP listen socket({:?}): {:?}",
2672                        deactivate, e
2673                    );
2674                }
2675                if self.sessions.borrow().slab.contains(token.0) {
2676                    self.sessions.borrow_mut().slab.remove(token.0);
2677                    info!("removed listen token {:?}", token);
2678                }
2679
2680                if deactivate.to_scm {
2681                    self.unblock_scm_socket();
2682                    let listeners = Listeners {
2683                        http: vec![],
2684                        tls: vec![],
2685                        tcp: vec![(address, listener.as_raw_fd())],
2686                        udp: vec![],
2687                    };
2688                    info!("sending TCP listener: {:?}", listeners);
2689                    let res = self.scm.send_listeners(&listeners);
2690
2691                    self.block_scm_socket();
2692
2693                    info!("sent TCP listener: {:?}", res);
2694                }
2695                WorkerResponse::ok(req_id)
2696            }
2697            Ok(ListenerType::Udp) => {
2698                let (token, mut listener) = match self.udp.borrow_mut().give_back_listener(address)
2699                {
2700                    Ok((token, listener)) => (token, listener),
2701                    Err(e) => {
2702                        return worker_response_error(
2703                            req_id,
2704                            format!(
2705                                "Could not deactivate UDP listener at address {address:?}: {e}"
2706                            ),
2707                        );
2708                    }
2709                };
2710
2711                if let Err(e) = self.poll.registry().deregister(&mut listener) {
2712                    error!(
2713                        "error deregistering UDP listen socket({:?}): {:?}",
2714                        deactivate, e
2715                    );
2716                }
2717                if self.sessions.borrow().slab.contains(token.0) {
2718                    self.sessions.borrow_mut().slab.remove(token.0);
2719                    info!("removed listen token {:?}", token);
2720                }
2721
2722                if deactivate.to_scm {
2723                    self.unblock_scm_socket();
2724                    let listeners = Listeners {
2725                        http: vec![],
2726                        tls: vec![],
2727                        tcp: vec![],
2728                        udp: vec![(address, listener.as_raw_fd())],
2729                    };
2730                    info!("sending UDP listener: {:?}", listeners);
2731                    let res = self.scm.send_listeners(&listeners);
2732
2733                    self.block_scm_socket();
2734
2735                    info!("sent UDP listener: {:?}", res);
2736                }
2737                WorkerResponse::ok(req_id)
2738            }
2739            Err(_) => worker_response_error(req_id, "Wrong variant for ListenerType on request"),
2740        }
2741    }
2742
2743    /// Send all socket addresses and file descriptors of all proxies, via the scm socket
2744    pub fn return_listen_sockets(&mut self) -> Result<(), ScmSocketError> {
2745        self.unblock_scm_socket();
2746
2747        let mut http_listeners = self.http.borrow_mut().give_back_listeners();
2748        for &mut (_, ref mut sock) in http_listeners.iter_mut() {
2749            if let Err(e) = self.poll.registry().deregister(sock) {
2750                error!(
2751                    "error deregistering HTTP listen socket({:?}): {:?}",
2752                    sock, e
2753                );
2754            }
2755        }
2756
2757        let mut https_listeners = self.https.borrow_mut().give_back_listeners();
2758        for &mut (_, ref mut sock) in https_listeners.iter_mut() {
2759            if let Err(e) = self.poll.registry().deregister(sock) {
2760                error!(
2761                    "error deregistering HTTPS listen socket({:?}): {:?}",
2762                    sock, e
2763                );
2764            }
2765        }
2766
2767        let mut tcp_listeners = self.tcp.borrow_mut().give_back_listeners();
2768        for &mut (_, ref mut sock) in tcp_listeners.iter_mut() {
2769            if let Err(e) = self.poll.registry().deregister(sock) {
2770                error!("error deregistering TCP listen socket({:?}): {:?}", sock, e);
2771            }
2772        }
2773
2774        let mut udp_listeners = self.udp.borrow_mut().give_back_listeners();
2775        for &mut (_, ref mut sock) in udp_listeners.iter_mut() {
2776            if let Err(e) = self.poll.registry().deregister(sock) {
2777                error!("error deregistering UDP listen socket({:?}): {:?}", sock, e);
2778            }
2779        }
2780
2781        // use as_raw_fd because the listeners should be dropped after sending them
2782        let listeners = Listeners {
2783            http: http_listeners
2784                .iter()
2785                .map(|(addr, listener)| (*addr, listener.as_raw_fd()))
2786                .collect(),
2787            tls: https_listeners
2788                .iter()
2789                .map(|(addr, listener)| (*addr, listener.as_raw_fd()))
2790                .collect(),
2791            tcp: tcp_listeners
2792                .iter()
2793                .map(|(addr, listener)| (*addr, listener.as_raw_fd()))
2794                .collect(),
2795            udp: udp_listeners
2796                .iter()
2797                .map(|(addr, listener)| (*addr, listener.as_raw_fd()))
2798                .collect(),
2799        };
2800        // Each handed-back listener is collected exactly once: the assembled
2801        // fd lists mirror the give_back lists one-to-one (the maps above are
2802        // straight `.iter().map().collect()` with no filtering or dedup).
2803        debug_assert_eq!(
2804            listeners.http.len(),
2805            http_listeners.len(),
2806            "every HTTP listener must be collected exactly once"
2807        );
2808        debug_assert_eq!(
2809            listeners.tls.len(),
2810            https_listeners.len(),
2811            "every HTTPS listener must be collected exactly once"
2812        );
2813        debug_assert_eq!(
2814            listeners.tcp.len(),
2815            tcp_listeners.len(),
2816            "every TCP listener must be collected exactly once"
2817        );
2818        info!("sending default listeners: {:?}", listeners);
2819        let res = self.scm.send_listeners(&listeners);
2820
2821        self.block_scm_socket();
2822
2823        info!("sent default listeners: {:?}", res);
2824        res
2825    }
2826
2827    fn block_scm_socket(&mut self) {
2828        if let Err(e) = self.scm.set_blocking(true) {
2829            error!("Could not block scm socket: {}", e);
2830        }
2831    }
2832
2833    fn unblock_scm_socket(&mut self) {
2834        if let Err(e) = self.scm.set_blocking(false) {
2835            error!("Could not unblock scm socket: {}", e);
2836        }
2837    }
2838
2839    pub fn to_session(&self, token: Token) -> SessionToken {
2840        SessionToken(token.0)
2841    }
2842
2843    pub fn from_session(&self, token: SessionToken) -> Token {
2844        Token(token.0)
2845    }
2846
2847    pub fn accept(&mut self, token: ListenToken, protocol: Protocol) {
2848        // Per-protocol counter key. Keeping the namespace static (3 keys +
2849        // aggregate) is a deliberate cardinality cap: per-listener-address
2850        // labelling would require runtime `Box::leak` because `incr!` takes
2851        // `&'static str`, and listener addresses can be reconfigured at
2852        // runtime by the control plane. Operators wanting per-listener
2853        // attribution should correlate with the listener-protocol breakdown
2854        // below.
2855        //
2856        // Non-listen protocols reach this code only on an invariant break
2857        // upstream (`ready()` dispatched a non-listen `Protocol` to
2858        // `accept()`). Log and return rather than panicking — defense in
2859        // depth on the accept path, which is process-fatal if it aborts.
2860        let (proto_key, accepted_protocol) = match protocol {
2861            Protocol::TCPListen => ("listener.accepted.tcp", Protocol::TCPListen),
2862            Protocol::HTTPListen => ("listener.accepted.http", Protocol::HTTPListen),
2863            Protocol::HTTPSListen => ("listener.accepted.https", Protocol::HTTPSListen),
2864            other => {
2865                warn!(
2866                    "accept() called with non-listen protocol {:?} on token {:?}; skipping",
2867                    other, token
2868                );
2869                return;
2870            }
2871        };
2872
2873        // Past the guard, `accepted_protocol` is one of the three listen
2874        // variants — the inner dispatch's `unreachable!` arm relies on this.
2875        debug_assert!(
2876            matches!(
2877                accepted_protocol,
2878                Protocol::TCPListen | Protocol::HTTPListen | Protocol::HTTPSListen
2879            ),
2880            "accept dispatch must run with a listen protocol only"
2881        );
2882
2883        loop {
2884            let result = match accepted_protocol {
2885                Protocol::TCPListen => self.tcp.borrow_mut().accept(token),
2886                Protocol::HTTPListen => self.http.borrow_mut().accept(token),
2887                Protocol::HTTPSListen => self.https.borrow_mut().accept(token),
2888                // The outer match populates `accepted_protocol` only with the
2889                // three listen variants and returns early otherwise — this
2890                // arm is structurally unreachable.
2891                other => unreachable!(
2892                    "accept dispatch reached non-listen protocol {:?} after outer guard",
2893                    other
2894                ),
2895            };
2896            match result {
2897                Ok(sock) => {
2898                    // peer_addr() is one syscall (`getpeername(2)`) and runs
2899                    // exactly once per accepted socket. It can fail if the
2900                    // peer raced to close — recorded as `None` and silently
2901                    // skipped for the per-source counter.
2902                    let peer = sock.peer_addr().ok();
2903                    incr!(names::listener::ACCEPTED_TOTAL);
2904                    incr!(proto_key);
2905                    if let Some(peer_addr) = peer.as_ref() {
2906                        incr!(per_source_bucket(peer_addr));
2907                    }
2908                    let queue_before = self.accept_queue.len();
2909                    self.accept_queue.push_back((
2910                        sock,
2911                        token,
2912                        accepted_protocol,
2913                        Instant::now(),
2914                        peer,
2915                    ));
2916                    // One accepted socket enqueues exactly one entry.
2917                    debug_assert_eq!(
2918                        self.accept_queue.len(),
2919                        queue_before + 1,
2920                        "each accepted socket must enqueue exactly one entry"
2921                    );
2922                }
2923                Err(AcceptError::WouldBlock) => {
2924                    self.accept_ready.remove(&token);
2925                    break;
2926                }
2927                Err(other) => {
2928                    error!(
2929                        "error accepting {:?} sockets: {:?}",
2930                        accepted_protocol, other
2931                    );
2932                    self.accept_ready.remove(&token);
2933                    break;
2934                }
2935            }
2936        }
2937
2938        gauge!(names::accept_queue::CONNECTIONS, self.accept_queue.len());
2939    }
2940
2941    pub fn create_sessions(&mut self) {
2942        while let Some((sock, token, protocol, timestamp, _peer)) = self.accept_queue.pop_back() {
2943            let wait_time = Instant::now() - timestamp;
2944            time!(names::accept_queue::WAIT_TIME, wait_time.as_millis());
2945            if wait_time > self.accept_queue_timeout {
2946                incr!(names::accept_queue::TIMEOUT);
2947                continue;
2948            }
2949
2950            if !self.sessions.borrow_mut().check_limits() {
2951                // The socket we just popped will not be served, plus every
2952                // remaining queued socket below `break` will time out.
2953                // `listener.connection_capped` counts the popped socket so
2954                // the counter aligns with `check_limits` invocations rather
2955                // than with queue depth at the time of refusal.
2956                incr!(names::listener::CONNECTION_CAPPED);
2957
2958                if !self.evict_on_queue_full {
2959                    break;
2960                }
2961
2962                // Skip eviction during graceful shutdown — defeats the
2963                // shutting_down semantics and is wasted work since the
2964                // worker is winding down anyway.
2965                if self.shutting_down.is_some() {
2966                    break;
2967                }
2968
2969                // Evict 1% of `max_connections` per iteration. Conservative
2970                // ratio: large enough to make meaningful progress clearing
2971                // the accept queue, small enough to limit collateral damage
2972                // to active sessions. The cap loop re-checks limits after
2973                // each eviction round, so multiple rounds can run if the
2974                // queue has many pending connections. Decoupled from
2975                // `slab_entries_per_connection` (which only sizes the slab,
2976                // not `max_connections`).
2977                let to_evict = (self.sessions.borrow().max_connections / 100).max(1);
2978                let evicted = self.evict_least_active_sessions(to_evict);
2979                if evicted == 0 {
2980                    // Informational, not an invariant break: the worker may
2981                    // be at boot, or every active session is a system
2982                    // protocol (Channel/Metrics/Timer/listeners) and is
2983                    // ineligible. Stay at warn so operators see it in info-
2984                    // level production logs.
2985                    warn!("evict_on_queue_full enabled but no candidate sessions to evict");
2986                    break;
2987                }
2988
2989                count!(names::sessions::EVICTED, evicted as i64);
2990                warn!(
2991                    "evicted {} least recently active sessions to make room",
2992                    evicted
2993                );
2994
2995                if !self.sessions.borrow_mut().check_limits() {
2996                    break;
2997                }
2998            }
2999
3000            //FIXME: check the timestamp
3001            //TODO: create_session should return the session and
3002            // the server should insert it in the the SessionManager
3003            // The accept path only ever enqueues listen protocols, so the
3004            // `_ => panic!` arm below is genuinely unreachable. Assert the set
3005            // here so a future enqueue regression trips in debug, not prod.
3006            debug_assert!(
3007                matches!(
3008                    protocol,
3009                    Protocol::TCPListen | Protocol::HTTPListen | Protocol::HTTPSListen
3010                ),
3011                "accept queue must only hold listen protocols, got {protocol:?}"
3012            );
3013            match protocol {
3014                Protocol::TCPListen => {
3015                    let proxy = self.tcp.clone();
3016                    if self
3017                        .tcp
3018                        .borrow_mut()
3019                        .create_session(sock, token, wait_time, proxy)
3020                        .is_err()
3021                    {
3022                        break;
3023                    }
3024                }
3025                Protocol::HTTPListen => {
3026                    let proxy = self.http.clone();
3027                    if self
3028                        .http
3029                        .borrow_mut()
3030                        .create_session(sock, token, wait_time, proxy)
3031                        .is_err()
3032                    {
3033                        break;
3034                    }
3035                }
3036                Protocol::HTTPSListen => {
3037                    if self
3038                        .https
3039                        .borrow_mut()
3040                        .create_session(sock, token, wait_time, self.https.clone())
3041                        .is_err()
3042                    {
3043                        break;
3044                    }
3045                }
3046                _ => panic!("should not call accept() on a HTTP, HTTPS or TCP session"),
3047            };
3048            let nb_before = self.sessions.borrow().nb_connections;
3049            self.sessions.borrow_mut().incr();
3050            // A successfully created session bumps the live count by one.
3051            debug_assert_eq!(
3052                self.sessions.borrow().nb_connections,
3053                nb_before + 1,
3054                "create_sessions must account exactly one new connection per created session"
3055            );
3056        }
3057
3058        gauge!(names::accept_queue::CONNECTIONS, self.accept_queue.len());
3059    }
3060
3061    pub fn ready(&mut self, token: Token, events: Ready) {
3062        trace!("PROXY\t{:?} got events: {:?}", token, events);
3063
3064        let session_token = token.0;
3065        if self.sessions.borrow().slab.contains(session_token) {
3066            //info!("sessions contains {:?}", session_token);
3067            let protocol = self.sessions.borrow().slab[session_token]
3068                .borrow()
3069                .protocol();
3070            // NOTE: `token` is NOT necessarily the session's frontend token. A
3071            // session is registered under BOTH its frontend and its backend slab
3072            // slots (the multi-token pattern: `connect_to_backend` inserts the
3073            // same session Rc under a second vacant key), so `ready()` is also
3074            // dispatched with the backend token, where `frontend_token() !=
3075            // session_token`. There is therefore no `token == frontend_token`
3076            // identity to assert here.
3077            //info!("protocol: {:?}", protocol);
3078            match protocol {
3079                Protocol::HTTPListen | Protocol::HTTPSListen | Protocol::TCPListen => {
3080                    //info!("PROTOCOL IS LISTEN");
3081                    if events.is_readable() {
3082                        self.accept_ready.insert(ListenToken(token.0));
3083                        if self.sessions.borrow().can_accept {
3084                            self.accept(ListenToken(token.0), protocol);
3085                        }
3086                        return;
3087                    }
3088
3089                    if events.is_writable() {
3090                        error!(
3091                            "received writable for listener {:?}, this should not happen",
3092                            token
3093                        );
3094                        return;
3095                    }
3096
3097                    if events.is_hup() {
3098                        error!("should not happen: server {:?} closed", token);
3099                        return;
3100                    }
3101
3102                    unreachable!();
3103                }
3104                _ => {}
3105            }
3106
3107            let session = self.sessions.borrow_mut().slab[session_token].clone();
3108            session.borrow_mut().update_readiness(token, events);
3109            if session.borrow_mut().ready(session.clone()) {
3110                debug!(
3111                    "Server killing session from ready: token={:?}, protocol={:?}, events={:?}",
3112                    token, protocol, events
3113                );
3114                self.kill_session(session);
3115            }
3116        }
3117    }
3118
3119    pub fn timeout(&mut self, token: Token) {
3120        trace!("PROXY\t{:?} got timeout", token);
3121
3122        let session_token = token.0;
3123        if self.sessions.borrow().slab.contains(session_token) {
3124            let session = self.sessions.borrow_mut().slab[session_token].clone();
3125            if session.borrow_mut().timeout(token) {
3126                debug!(
3127                    "Server killing session from timeout: token={:?}, protocol={:?}",
3128                    token,
3129                    session.borrow().protocol()
3130                );
3131                self.kill_session(session);
3132            }
3133        }
3134    }
3135
3136    pub fn handle_remaining_readiness(&mut self) {
3137        // try to accept again after handling all session events,
3138        // since we might have released a few session slots
3139        if self.sessions.borrow().can_accept && !self.accept_ready.is_empty() {
3140            while let Some(token) = self
3141                .accept_ready
3142                .iter()
3143                .next()
3144                .map(|token| ListenToken(token.0))
3145            {
3146                let protocol = self.sessions.borrow().slab[token.0].borrow().protocol();
3147                self.accept(token, protocol);
3148                if !self.sessions.borrow().can_accept || self.accept_ready.is_empty() {
3149                    break;
3150                }
3151            }
3152        }
3153    }
3154    fn block_channel(&mut self) {
3155        if let Err(e) = self.channel.blocking() {
3156            error!("Could not block channel: {}", e);
3157        }
3158    }
3159    fn unblock_channel(&mut self) {
3160        if let Err(e) = self.channel.nonblocking() {
3161            error!("Could not block channel: {}", e);
3162        }
3163    }
3164
3165    /// Evict the `count` least-recently-active non-listener sessions and
3166    /// return how many tokens were enqueued for shutdown. Used by
3167    /// `create_sessions` when the accept queue is saturated and the
3168    /// `evict_on_queue_full` knob is set.
3169    ///
3170    /// Uses `select_nth_unstable_by_key` (introselect, O(n) average) to
3171    /// partition the oldest `count` sessions in-place rather than a full
3172    /// O(n log n) sort. The candidate `Vec` is unavoidable because of
3173    /// `RefCell` borrow rules — the immutable borrow on `self.sessions`
3174    /// must drop before `shut_down_sessions_by_frontend_tokens` can take
3175    /// its mutable borrow.
3176    fn evict_least_active_sessions(&self, count: usize) -> usize {
3177        if count == 0 {
3178            return 0;
3179        }
3180
3181        let tokens = {
3182            let sessions = self.sessions.borrow();
3183            let mut candidates: Vec<(Token, Instant)> = sessions
3184                .slab
3185                .iter()
3186                .filter(|(_, session)| {
3187                    !matches!(
3188                        session.borrow().protocol(),
3189                        Protocol::HTTPListen
3190                            | Protocol::HTTPSListen
3191                            | Protocol::TCPListen
3192                            | Protocol::UDPListen
3193                            | Protocol::Channel
3194                            | Protocol::Metrics
3195                            | Protocol::Timer
3196                    )
3197                })
3198                .map(|(_, session)| {
3199                    let s = session.borrow();
3200                    (s.frontend_token(), s.last_event())
3201                })
3202                .collect();
3203
3204            // Early return is load-bearing: the `pivot` computation below
3205            // does `count.min(len) - 1`, which underflows on empty input.
3206            if candidates.is_empty() {
3207                return 0;
3208            }
3209
3210            let pivot = count.min(candidates.len()) - 1;
3211            candidates.select_nth_unstable_by_key(pivot, |&(_, last_event)| last_event);
3212
3213            candidates[..=pivot]
3214                .iter()
3215                .map(|&(token, _)| token)
3216                .collect::<HashSet<Token>>()
3217        };
3218
3219        let evicted = tokens.len();
3220        self.shut_down_sessions_by_frontend_tokens(tokens);
3221        evicted
3222    }
3223}
3224
3225/// log the error together with the request id
3226/// create a WorkerResponse
3227fn worker_response_error<S: ToString, T: ToString>(request_id: S, error: T) -> WorkerResponse {
3228    error!(
3229        "error on request {}, {}",
3230        request_id.to_string(),
3231        error.to_string()
3232    );
3233    WorkerResponse::error(request_id, error)
3234}
3235
3236pub struct ListenSession {
3237    pub protocol: Protocol,
3238}
3239
3240impl ProxySession for ListenSession {
3241    fn last_event(&self) -> Instant {
3242        Instant::now()
3243    }
3244
3245    fn print_session(&self) {}
3246
3247    fn frontend_token(&self) -> Token {
3248        Token(0)
3249    }
3250
3251    fn protocol(&self) -> Protocol {
3252        self.protocol
3253    }
3254
3255    fn ready(&mut self, _session: Rc<RefCell<dyn ProxySession>>) -> SessionIsToBeClosed {
3256        false
3257    }
3258
3259    fn shutting_down(&mut self) -> SessionIsToBeClosed {
3260        false
3261    }
3262
3263    fn update_readiness(&mut self, _token: Token, _events: Ready) {}
3264
3265    fn close(&mut self) {}
3266
3267    fn timeout(&mut self, _token: Token) -> SessionIsToBeClosed {
3268        error!(
3269            "called ProxySession::timeout(token={:?}, time) on ListenSession {{ protocol: {:?} }}",
3270            _token, self.protocol
3271        );
3272        false
3273    }
3274}
3275
3276#[cfg(test)]
3277mod accept_telemetry_tests {
3278    use super::*;
3279
3280    /// Two IPv4 addresses sharing the same /24 must hash to the same bucket;
3281    /// the masking logic guarantees this regardless of the host octet.
3282    #[test]
3283    fn per_source_bucket_collapses_ipv4_slash24() {
3284        let a: SocketAddr = "203.0.113.5:1234".parse().unwrap();
3285        let b: SocketAddr = "203.0.113.250:9999".parse().unwrap();
3286        assert_eq!(
3287            per_source_bucket(&a),
3288            per_source_bucket(&b),
3289            "addresses in the same /24 must land in the same bucket"
3290        );
3291    }
3292
3293    /// Two IPv6 addresses sharing the same /48 must hash to the same bucket.
3294    #[test]
3295    fn per_source_bucket_collapses_ipv6_slash48() {
3296        let a: SocketAddr = "[2001:db8:1234::1]:443".parse().unwrap();
3297        let b: SocketAddr = "[2001:db8:1234:abcd::ffff]:8443".parse().unwrap();
3298        assert_eq!(
3299            per_source_bucket(&a),
3300            per_source_bucket(&b),
3301            "addresses in the same /48 must land in the same bucket"
3302        );
3303    }
3304
3305    /// Every bucket label must be one of `PER_SOURCE_BUCKETS` precomputed
3306    /// statics — the cardinality cap is the load-bearing property.
3307    #[test]
3308    fn per_source_bucket_keys_are_bounded() {
3309        assert_eq!(PER_SOURCE_BUCKET_KEYS.len(), PER_SOURCE_BUCKETS);
3310        for (i, key) in PER_SOURCE_BUCKET_KEYS.iter().enumerate() {
3311            let expected = format!("client.connect.per_source.bucket_{i:03}");
3312            assert_eq!(*key, expected.as_str());
3313        }
3314    }
3315
3316    /// A modest sample across distinct /24 prefixes should hit a healthy
3317    /// number of distinct buckets — guards against the hash collapsing.
3318    #[test]
3319    fn per_source_bucket_distributes_distinct_subnets() {
3320        let mut hits = std::collections::HashSet::new();
3321        for i in 0..200u8 {
3322            let addr: SocketAddr = format!("10.0.{i}.42:80").parse().unwrap();
3323            hits.insert(per_source_bucket(&addr));
3324        }
3325        // With 200 distinct /24 prefixes hashed into 256 buckets we expect
3326        // many distinct labels — assert a conservative lower bound that
3327        // tolerates birthday collisions.
3328        assert!(
3329            hits.len() >= 100,
3330            "expected at least 100 distinct buckets across 200 /24s, got {}",
3331            hits.len()
3332        );
3333    }
3334}
3335
3336#[cfg(test)]
3337mod eviction_tests {
3338    use std::collections::HashSet;
3339    use std::time::{Duration, Instant};
3340
3341    use mio::Token;
3342
3343    /// `select_nth_unstable_by_key` partitions in O(n) so that the first
3344    /// `pivot + 1` entries are the `pivot + 1` smallest by key. This guards
3345    /// against a future refactor swapping the comparator orientation.
3346    #[test]
3347    fn select_nth_finds_oldest_sessions() {
3348        let now = Instant::now();
3349        let mut candidates = [
3350            (Token(1), now - Duration::from_secs(10)), // 10s old
3351            (Token(2), now - Duration::from_secs(50)), // 50s old (oldest)
3352            (Token(3), now - Duration::from_secs(5)),  // 5s old (newest)
3353            (Token(4), now - Duration::from_secs(30)), // 30s old
3354            (Token(5), now - Duration::from_secs(20)), // 20s old
3355        ];
3356
3357        let count = 2;
3358        let pivot = count.min(candidates.len()) - 1;
3359        candidates.select_nth_unstable_by_key(pivot, |&(_, last_event)| last_event);
3360
3361        let selected: HashSet<Token> = candidates[..=pivot]
3362            .iter()
3363            .map(|&(token, _)| token)
3364            .collect();
3365
3366        assert_eq!(selected.len(), 2);
3367        assert!(
3368            selected.contains(&Token(2)),
3369            "should contain 50s-old session"
3370        );
3371        assert!(
3372            selected.contains(&Token(4)),
3373            "should contain 30s-old session"
3374        );
3375    }
3376
3377    /// When `count` exceeds available candidates, the pivot collapses to
3378    /// `len - 1` so we evict everything; this test pins that behaviour
3379    /// against a future refactor that might silently truncate.
3380    #[test]
3381    fn select_nth_with_count_exceeding_candidates() {
3382        let now = Instant::now();
3383        let mut candidates = [(Token(1), now - Duration::from_secs(10))];
3384
3385        let count = 5;
3386        let pivot = count.min(candidates.len()) - 1;
3387        candidates.select_nth_unstable_by_key(pivot, |&(_, last_event)| last_event);
3388
3389        let selected: HashSet<Token> = candidates[..=pivot]
3390            .iter()
3391            .map(|&(token, _)| token)
3392            .collect();
3393
3394        assert_eq!(selected.len(), 1);
3395        assert!(selected.contains(&Token(1)));
3396    }
3397}