Skip to main content

sozu_lib/
server.rs

1//! event loop management
2use std::{
3    cell::RefCell,
4    collections::{HashMap, HashSet, VecDeque, hash_map::Entry},
5    hash::{DefaultHasher, Hash, Hasher},
6    io::Error as IoError,
7    net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
8    os::unix::io::{AsRawFd, FromRawFd},
9    rc::Rc,
10    str::FromStr,
11    sync::LazyLock,
12    time::{Duration, Instant},
13};
14
15use mio::{
16    Events, Interest, Poll, Token,
17    net::{TcpListener as MioTcpListener, TcpStream},
18};
19use slab::Slab;
20use sozu_command::{
21    channel::Channel,
22    config::MetricDetailLevel,
23    logging,
24    proto::command::{
25        ActivateListener, AddBackend, CertificatesWithFingerprints, Cluster, ClusterHashes,
26        ClusterInformations, DeactivateListener, Event, EventKind, HttpListenerConfig,
27        HttpsListenerConfig, InitialState, ListenerType, LoadBalancingAlgorithms, LoadMetric,
28        MetricDetail, MetricsConfiguration, RemoveBackend, Request, ResponseContent,
29        ResponseStatus, ServerConfig, TcpListenerConfig as CommandTcpListener,
30        UpdateHttpListenerConfig, UpdateHttpsListenerConfig, UpdateTcpListenerConfig,
31        WorkerRequest, WorkerResponse, request::RequestType, response_content::ContentType,
32    },
33    ready::Ready,
34    scm_socket::{Listeners, ScmSocket, ScmSocketError},
35    state::ConfigState,
36};
37
38use crate::metrics::names;
39use crate::{
40    AcceptError, Protocol, ProxyConfiguration, ProxySession, SessionIsToBeClosed,
41    backends::{Backend, BackendMap},
42    features::FEATURES,
43    health_check::HealthChecker,
44    http, https,
45    metrics::METRICS,
46    pool::Pool,
47    tcp,
48    timer::Timer,
49};
50
51// Number of retries to perform on a server after a connection failure
52pub const CONN_RETRIES: u8 = 3;
53
54/// Number of bounded buckets for the per-source connect-rate counter.
55///
56/// `incr!` requires a `&'static str`, so per-IP labelling would either need
57/// runtime `Box::leak` per unique source (unbounded under SYN flood — direct
58/// OWASP A05 / NIST SP 800-92 cardinality-blow-up risk) or a fixed bucket
59/// table. We pick the bucket table: 256 static labels precomputed at startup,
60/// each masked subnet hashes into one of them.
61///
62/// Bucket-noise vs per-IP fidelity is a deliberate trade. Operators wanting
63/// per-IP attribution should pair these counters with structured access logs
64/// or a downstream rate-limiter; the metric here is for "is some /24 spamming
65/// us right now?", not "which IP exactly". 256 buckets keep the memory + UDP
66/// statsd cost flat regardless of attacker effort.
67pub const PER_SOURCE_BUCKETS: usize = 256;
68
69/// Pre-leaked `&'static str` table for per-source bucket counters.
70/// `incr!` requires `&'static str`; we leak once at first access (LazyLock)
71/// for `PER_SOURCE_BUCKETS` keys, totalling ~10 KB heap. The leak is bounded
72/// by `PER_SOURCE_BUCKETS` and never grows with traffic.
73static PER_SOURCE_BUCKET_KEYS: LazyLock<[&'static str; PER_SOURCE_BUCKETS]> = LazyLock::new(|| {
74    let mut keys: [&'static str; PER_SOURCE_BUCKETS] = [""; PER_SOURCE_BUCKETS];
75    for (i, slot) in keys.iter_mut().enumerate() {
76        // e.g. "client.connect.per_source.bucket_042"
77        let owned = format!("client.connect.per_source.bucket_{i:03}");
78        *slot = Box::leak(owned.into_boxed_str());
79    }
80    keys
81});
82
83/// Mask an IP address to its bounded prefix (/24 for IPv4, /48 for IPv6) and
84/// hash it into one of `PER_SOURCE_BUCKETS` slots. The hash is `DefaultHasher`,
85/// which is deterministic within a process but salted across runs — fine for
86/// telemetry, not suitable for cross-host correlation.
87fn per_source_bucket(peer: &SocketAddr) -> &'static str {
88    let mut hasher = DefaultHasher::new();
89    match peer.ip() {
90        IpAddr::V4(v4) => {
91            let octets = v4.octets();
92            // /24 mask: keep first three octets, zero the host portion.
93            let masked = Ipv4Addr::new(octets[0], octets[1], octets[2], 0);
94            masked.hash(&mut hasher);
95        }
96        IpAddr::V6(v6) => {
97            let octets = v6.octets();
98            // /48 mask: keep first 6 bytes, zero the rest.
99            let mut masked_octets = [0u8; 16];
100            masked_octets[..6].copy_from_slice(&octets[..6]);
101            Ipv6Addr::from(masked_octets).hash(&mut hasher);
102        }
103    }
104    let idx = (hasher.finish() as usize) % PER_SOURCE_BUCKETS;
105    PER_SOURCE_BUCKET_KEYS[idx]
106}
107
108/// Period between two `accept_queue.saturated_seconds` ticks. The counter is
109/// incremented once per period while [`SessionManager::can_accept`] is `false`,
110/// distinguishing "queue spent N seconds at max" from "queue briefly hit max"
111/// — the binary `accept_queue.backpressure` gauge collapses that duration.
112const ACCEPT_SATURATION_TICK: Duration = Duration::from_secs(1);
113
114pub type ProxyChannel = Channel<WorkerResponse, WorkerRequest>;
115
116thread_local! {
117  pub static QUEUE: RefCell<VecDeque<WorkerResponse>> = const { RefCell::new(VecDeque::new()) };
118}
119
120thread_local! {
121  pub static TIMER: RefCell<Timer<Token>> = RefCell::new(Timer::default());
122}
123
124pub fn push_queue(message: WorkerResponse) {
125    QUEUE.with(|queue| {
126        (*queue.borrow_mut()).push_back(message);
127    });
128}
129
130pub fn push_event(event: Event) {
131    QUEUE.with(|queue| {
132        (*queue.borrow_mut()).push_back(WorkerResponse {
133            id: "EVENT".to_string(),
134            message: String::new(),
135            status: ResponseStatus::Processing.into(),
136            content: Some(ContentType::Event(event).into()),
137        });
138    });
139}
140
141/// Build the `WorkerMetricDetailStatus` content payload returned in
142/// every successful `SetMetricDetail` worker response. The master
143/// collects these across the fan-out and assembles them into
144/// `MetricDetailStatus.workers[<worker_id>]` so the TUI sees each
145/// worker's actual aggregator state instead of the master's view.
146fn worker_metric_detail_status_content(
147    configured: MetricDetailLevel,
148    effective: MetricDetailLevel,
149    previous_effective: MetricDetailLevel,
150    active_lease_count: u32,
151) -> ResponseContent {
152    use sozu_command::proto::command::WorkerMetricDetailStatus;
153    ContentType::WorkerMetricDetailStatus(WorkerMetricDetailStatus {
154        configured: MetricDetail::from(configured) as i32,
155        effective: MetricDetail::from(effective) as i32,
156        previous_effective: MetricDetail::from(previous_effective) as i32,
157        active_lease_count,
158    })
159    .into()
160}
161
162/// Build a `METRIC_DETAIL_CHANGED` event carrying the worker-local
163/// transition payload (previous/effective levels + transition kind).
164/// `client_id` is `Some(_)` for explicit apply/clear, `None` for the
165/// polled janitor's bulk expiry. The master folds this Event into the
166/// audit log alongside operator-initiated transitions emitted at the
167/// dispatch site in `bin/src/command/requests.rs::worker_request`.
168fn push_metric_detail_transition(
169    previous: MetricDetailLevel,
170    effective: MetricDetailLevel,
171    transition_kind: &'static str,
172    client_id: Option<String>,
173) {
174    use sozu_command::proto::command::MetricDetailTransition;
175    // No-op when nothing actually changed. Defence-in-depth — every
176    // caller already gates on `previous != effective`, but
177    // double-checking here means future call sites can't accidentally
178    // emit a "ghost" transition.
179    if previous == effective {
180        return;
181    }
182    push_event(Event {
183        kind: EventKind::MetricDetailChanged as i32,
184        cluster_id: None,
185        backend_id: None,
186        address: None,
187        metric_detail: Some(MetricDetailTransition {
188            previous_effective: MetricDetail::from(previous) as i32,
189            effective: MetricDetail::from(effective) as i32,
190            transition_kind: transition_kind.to_owned(),
191            client_id,
192        }),
193    });
194}
195
196#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
197pub struct ListenToken(pub usize);
198#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
199pub struct SessionToken(pub usize);
200
201impl From<usize> for ListenToken {
202    fn from(val: usize) -> ListenToken {
203        ListenToken(val)
204    }
205}
206
207impl From<ListenToken> for usize {
208    fn from(val: ListenToken) -> usize {
209        val.0
210    }
211}
212
213impl From<usize> for SessionToken {
214    fn from(val: usize) -> SessionToken {
215        SessionToken(val)
216    }
217}
218
219impl From<SessionToken> for usize {
220    fn from(val: SessionToken) -> usize {
221        val.0
222    }
223}
224
225pub struct SessionManager {
226    pub max_connections: usize,
227    pub nb_connections: usize,
228    pub can_accept: bool,
229    pub slab: Slab<Rc<RefCell<dyn ProxySession>>>,
230    /// Default per-(cluster, source-IP) connection limit. `0` disables
231    /// the feature; cluster-level overrides take precedence at check
232    /// time.
233    pub max_connections_per_ip: u64,
234    /// Default `Retry-After` header value (seconds) for HTTP 429
235    /// responses emitted on per-(cluster, source-IP) limit hit. `0`
236    /// omits the header.
237    pub retry_after: u32,
238    /// Active **frontend connections** per `(cluster_id, source_ip)`.
239    /// Each frontend session contributes AT MOST 1 to the count for any
240    /// given `(cluster, ip)` pair, regardless of how many streams it
241    /// multiplexes to that cluster (an H2 connection serving 100
242    /// streams to cluster X from IP 1.2.3.4 still counts as 1). The
243    /// counter is incremented the first time a session's
244    /// `Router::connect` resolves to a fresh `(cluster, ip)` pair, and
245    /// decremented when the session closes. Empty when the feature is
246    /// unused.
247    ///
248    /// ── Why nested maps instead of `HashMap<(String, IpAddr), usize>` ──
249    ///
250    /// The per-request hot path (`cluster_ip_at_limit`, called from
251    /// `mux/router::connect` for every cluster-resolving request) used
252    /// to allocate a `String` to build the compound key on every
253    /// lookup. Splitting the storage so the outer key is `String`
254    /// lets the lookup take `&str` — `HashMap::get(cluster_id)` on a
255    /// `HashMap<String, _>` accepts a borrow via the `Borrow<str>`
256    /// impl. The hot path no longer allocates; the only `String` clone
257    /// is in `track_cluster_ip`, which runs at most once per
258    /// `(cluster, ip)` pair per session. Memory footprint is unchanged
259    /// in steady state — entries are still reaped to zero on session
260    /// close.
261    connections_per_cluster_ip: HashMap<String, HashMap<IpAddr, usize>>,
262    /// Reverse index: per-token map of `cluster_id` → set of source IPs
263    /// already counted against `connections_per_cluster_ip`. Used to
264    /// make `track_cluster_ip` idempotent within a session (so H2
265    /// streams to the same cluster from the same client only consume
266    /// one slot in the limit) and to drain a session's contributions on
267    /// close. Same nesting rationale as above.
268    cluster_ip_tracks: HashMap<Token, HashMap<String, HashSet<IpAddr>>>,
269}
270
271impl SessionManager {
272    pub fn new(
273        slab: Slab<Rc<RefCell<dyn ProxySession>>>,
274        max_connections: usize,
275        max_connections_per_ip: u64,
276        retry_after: u32,
277    ) -> Rc<RefCell<Self>> {
278        Rc::new(RefCell::new(SessionManager {
279            max_connections,
280            nb_connections: 0,
281            can_accept: true,
282            slab,
283            max_connections_per_ip,
284            retry_after,
285            connections_per_cluster_ip: HashMap::new(),
286            cluster_ip_tracks: HashMap::new(),
287        }))
288    }
289
290    /// Resolve the effective per-(cluster, source-IP) limit. `override_value`
291    /// is the cluster-level setting from the proto `Cluster` message:
292    /// `None` inherits the global default, `Some(0)` is explicit
293    /// "unlimited", `Some(n > 0)` overrides.
294    pub fn effective_max_connections_per_ip(&self, override_value: Option<u64>) -> u64 {
295        override_value.unwrap_or(self.max_connections_per_ip)
296    }
297
298    /// Resolve the effective `Retry-After` header value. `Some(0)` (or
299    /// the global default of 0) signals "omit the header" — caller
300    /// must skip emission rather than render `Retry-After: 0`.
301    pub fn effective_retry_after(&self, override_value: Option<u32>) -> u32 {
302        override_value.unwrap_or(self.retry_after)
303    }
304
305    /// Returns `true` when admitting `token` to one more connection for
306    /// `(cluster, ip)` would exceed the resolved limit. `0` is treated
307    /// as unlimited. A token that already holds a slot for this
308    /// `(cluster, ip)` is NEVER at the limit — H2 sessions multiplex
309    /// many streams to the same cluster on a single connection, and
310    /// the limit governs distinct frontend connections, not streams.
311    ///
312    /// Hot-path: called for every cluster-resolving request from
313    /// `mux/router::connect`. The nested-map storage lets both lookups
314    /// borrow `cluster_id` and `ip`; no per-call allocation runs here
315    /// in steady state.
316    pub fn cluster_ip_at_limit(
317        &self,
318        token: Token,
319        cluster_id: &str,
320        ip: &IpAddr,
321        override_value: Option<u64>,
322    ) -> bool {
323        let limit = self.effective_max_connections_per_ip(override_value);
324        if limit == 0 {
325            return false;
326        }
327        if self
328            .cluster_ip_tracks
329            .get(&token)
330            .and_then(|by_cluster| by_cluster.get(cluster_id))
331            .is_some_and(|ips| ips.contains(ip))
332        {
333            return false;
334        }
335        self.connections_per_cluster_ip
336            .get(cluster_id)
337            .and_then(|by_ip| by_ip.get(ip))
338            .is_some_and(|c| (*c as u64) >= limit)
339    }
340
341    /// Account `token`'s active connection against `(cluster, ip)`.
342    /// Idempotent within a token: a second call for the same
343    /// `(cluster, ip)` is a no-op so H2 retries / multi-stream opens
344    /// to the same cluster do not double-count.
345    ///
346    /// Allocates a single owned `String` per `(token, cluster)` pair on
347    /// first observation — `entry(cluster_id.clone())` materialises a
348    /// new outer-map slot. Subsequent IPs under the same `(token,
349    /// cluster)` reuse the existing slot.
350    pub fn track_cluster_ip(&mut self, token: Token, cluster_id: String, ip: IpAddr) {
351        let inserted = self
352            .cluster_ip_tracks
353            .entry(token)
354            .or_default()
355            .entry(cluster_id.clone())
356            .or_default()
357            .insert(ip);
358        if inserted {
359            *self
360                .connections_per_cluster_ip
361                .entry(cluster_id)
362                .or_default()
363                .entry(ip)
364                .or_insert(0) += 1;
365        }
366    }
367
368    /// Drain every `(cluster, ip)` slot held by `token` and apply the
369    /// matching decrements. Called on session teardown only — there is
370    /// no per-stream untrack because the limit is per-connection, not
371    /// per-stream. Removes empty inner maps so the outer
372    /// `connections_per_cluster_ip` does not retain `(cluster_id,
373    /// empty_map)` orphans across cluster lifetimes.
374    pub fn untrack_all_cluster_ip(&mut self, token: Token) {
375        let Some(by_cluster) = self.cluster_ip_tracks.remove(&token) else {
376            return;
377        };
378        for (cluster_id, ips) in by_cluster {
379            let Entry::Occupied(mut outer) = self.connections_per_cluster_ip.entry(cluster_id)
380            else {
381                continue;
382            };
383            for ip in ips {
384                if let Entry::Occupied(mut inner) = outer.get_mut().entry(ip) {
385                    let count = inner.get_mut();
386                    *count = count.saturating_sub(1);
387                    if *count == 0 {
388                        inner.remove();
389                    }
390                }
391            }
392            if outer.get().is_empty() {
393                outer.remove();
394            }
395        }
396    }
397
398    /// Wipe every per-(cluster, source-IP) accounting bucket. Called by
399    /// the runtime `SetMaxConnectionsPerIp(0)` path so disabling the
400    /// feature does not leave dead bookkeeping behind that a future
401    /// re-enable would consult.
402    pub fn clear_cluster_ip_tracking(&mut self) {
403        self.cluster_ip_tracks.clear();
404        self.connections_per_cluster_ip.clear();
405    }
406
407    /// The slab is considered at capacity if it contains more sessions than twice max_connections
408    pub fn at_capacity(&self) -> bool {
409        self.slab.len() >= self.accept_slab_threshold()
410    }
411
412    /// The slab fill level at which `at_capacity` flips to true and the
413    /// accept queue is flushed. Reported as `slab.accept_threshold` so the
414    /// per-iteration `slab.accept_threshold_percent` gauge in the run loop
415    /// can chart proximity to this gate, distinct from raw slab usage.
416    ///
417    /// The constant `10 + 2 * max_connections` is the historical pre-knob
418    /// budget; configured slab capacity is
419    /// `10 + slab_entries_per_connection * max_connections` (see
420    /// `command/src/config.rs`) and can be larger, so `slab.usage_percent`
421    /// (against `slab.capacity()`) and `slab.accept_threshold_percent`
422    /// (against this gate) are emitted as independent gauges.
423    pub fn accept_slab_threshold(&self) -> usize {
424        10 + 2 * self.max_connections
425    }
426
427    /// Check the number of connections against max_connections, and the slab capacity.
428    /// Returns false if limits are reached.
429    pub fn check_limits(&mut self) -> bool {
430        if self.nb_connections >= self.max_connections {
431            error!("max number of session connection reached, flushing the accept queue");
432            gauge!(names::accept_queue::BACKPRESSURE, 1);
433            self.can_accept = false;
434            return false;
435        }
436
437        if self.at_capacity() {
438            error!("not enough memory to accept another session, flushing the accept queue");
439            error!(
440                "nb_connections: {}, max_connections: {}",
441                self.nb_connections, self.max_connections
442            );
443            gauge!(names::accept_queue::BACKPRESSURE, 1);
444            self.can_accept = false;
445
446            return false;
447        }
448
449        true
450    }
451
452    pub fn to_session(token: Token) -> SessionToken {
453        SessionToken(token.0)
454    }
455
456    pub fn incr(&mut self) {
457        self.nb_connections += 1;
458        assert!(self.nb_connections <= self.max_connections);
459        // `client.connections_max` and `client.connections_percent` are
460        // emitted from the run loop alongside `process.uptime_seconds` /
461        // `server.live` so all proxy gauges advance in lock-step. Keeping
462        // `client.connections` per-event preserves the high-resolution
463        // signal scrapers expect.
464        gauge!(names::client::CONNECTIONS, self.nb_connections);
465    }
466
467    /// Decrements the number of sessions, start accepting new connections
468    /// if the capacity limit of 90% has not been reached.
469    pub fn decr(&mut self) {
470        assert!(self.nb_connections != 0);
471        self.nb_connections -= 1;
472        gauge!(names::client::CONNECTIONS, self.nb_connections);
473
474        // do not be ready to accept right away, wait until we get back to 10% capacity
475        if !self.can_accept && self.nb_connections < self.max_connections * 90 / 100 {
476            debug!(
477                "nb_connections = {}, max_connections = {}, starting to accept again",
478                self.nb_connections, self.max_connections
479            );
480            gauge!(names::accept_queue::BACKPRESSURE, 0);
481            self.can_accept = true;
482        }
483    }
484}
485
486#[derive(thiserror::Error, Debug)]
487pub enum ServerError {
488    #[error("could not create event loop with MIO poll: {0}")]
489    CreatePoll(IoError),
490    #[error("could not clone the MIO registry: {0}")]
491    CloneRegistry(IoError),
492    #[error("could not register the channel: {0}")]
493    RegisterChannel(IoError),
494    #[error("{msg}:{scm_err}")]
495    ScmSocket {
496        msg: String,
497        scm_err: ScmSocketError,
498    },
499}
500
501/// `Server` handles the event loop, the listeners, the sessions and
502/// communication with the configuration channel.
503///
504/// A listener wraps a listen socket, the associated proxying protocols
505/// (HTTP, HTTPS and TCP) and the routing configuration for clusters.
506/// Listeners handle creating sessions from accepted sockets.
507///
508/// A session manages a "front" socket for a connected client, and all
509/// of the associated data (back socket, protocol state machine, buffers,
510/// metrics...).
511///
512/// `Server` gets configuration updates from the channel (domIN/path routes,
513/// backend server address...).
514///
515/// Listeners and sessions are all stored in a slab structure to index them
516/// by a [Token], they all have to implement the [ProxySession] trait.
517pub struct Server {
518    accept_queue_timeout: Duration,
519    /// Tuple layout: `(socket, listen token, protocol, accept time, peer
520    /// address)`. The peer is captured via `TcpStream::peer_addr()` at accept
521    /// time so the `client.connect.per_source.*` counter can be attributed
522    /// without the socket having to be alive at session-creation time. The
523    /// peer is `Option` because `peer_addr()` is best-effort: a peer that
524    /// races to close before we read it is rare but possible.
525    accept_queue: VecDeque<(
526        TcpStream,
527        ListenToken,
528        Protocol,
529        Instant,
530        Option<SocketAddr>,
531    )>,
532    /// When the accept queue saturates and `check_limits` refuses, evict the
533    /// oldest non-listener sessions to make room. Default off — see
534    /// `command::config::DEFAULT_EVICT_ON_QUEUE_FULL` for the rationale.
535    evict_on_queue_full: bool,
536    accept_ready: HashSet<ListenToken>,
537    backends: Rc<RefCell<BackendMap>>,
538    base_sessions_count: usize,
539    channel: ProxyChannel,
540    config_state: ConfigState,
541    current_poll_errors: i32,
542    health_checker: HealthChecker,
543    http: Rc<RefCell<http::HttpProxy>>,
544    https: Rc<RefCell<https::HttpsProxy>>,
545    last_sessions_len: usize,
546    last_shutting_down_message: Option<Instant>,
547    last_zombie_check: Instant,
548    loop_start: Instant,
549    /// Wall-clock anchor for the `process.uptime_seconds` gauge. Captured once
550    /// in [`Server::new`]; never reset on hot upgrades (the new worker that
551    /// inherits FDs is a fresh process and starts its own counter).
552    started_at: Instant,
553    /// Last time the 1Hz `accept_queue.saturated_seconds` ticker fired. The
554    /// counter is incremented once per [`ACCEPT_SATURATION_TICK`] while
555    /// `SessionManager::can_accept` is `false`, so dashboards can plot the
556    /// time spent saturated rather than just whether saturation occurred.
557    last_saturation_tick: Instant,
558    max_poll_errors: i32, // TODO: make this configurable? this defaults to 10000 for now
559    /// Shared reference to the buffer pool the protocol stacks check buffers
560    /// in and out of. Held here so the run loop can sample
561    /// `buffer.in_use` / `buffer.capacity` / `buffer.usage_percent` once per
562    /// iteration without requiring each protocol module to expose its own
563    /// snapshot.
564    pool: Rc<RefCell<Pool>>,
565    pub poll: Poll,
566    poll_timeout: Option<Duration>, // TODO: make this configurable? this defaults to 1000 milliseconds for now
567    scm_listeners: Option<Listeners>,
568    scm: ScmSocket,
569    sessions: Rc<RefCell<SessionManager>>,
570    should_poll_at: Option<Instant>,
571    shutting_down: Option<String>,
572    tcp: Rc<RefCell<tcp::TcpProxy>>,
573    zombie_check_interval: Duration,
574}
575
576impl Server {
577    pub fn try_new_from_config(
578        worker_to_main_channel: ProxyChannel,
579        worker_to_main_scm: ScmSocket,
580        config: ServerConfig,
581        initial_state: InitialState,
582        expects_initial_status: bool,
583    ) -> Result<Self, ServerError> {
584        let event_loop = Poll::new().map_err(ServerError::CreatePoll)?;
585        // Commit the operator-configured Basic-auth credential cap (or
586        // keep the built-in default) once per worker process. The
587        // `OnceLock` rejects any later attempt to change the value, so
588        // calling here — before any L7 listener has accepted a request
589        // — guarantees the cap is in force the first time `mux::auth`
590        // runs.
591        if let Some(cap) = config.basic_auth_max_credential_bytes {
592            crate::protocol::mux::auth::set_max_decoded_credential_bytes(cap as usize);
593        }
594        // Same set-once-per-worker-boot pattern for the splice kernel-pipe
595        // capacity. The setter no-ops on `0` so an explicit zero in config
596        // does not collapse the pipe to PAGE_SIZE; the kernel still applies
597        // page-rounding and `/proc/sys/fs/pipe-max-size` clamping at
598        // SplicePipe::new time. Cfg-gated because the splice module only
599        // exists on Linux + `splice` feature.
600        #[cfg(all(target_os = "linux", feature = "splice"))]
601        if let Some(cap) = config.splice_pipe_capacity_bytes {
602            crate::splice::set_pipe_capacity(cap as usize);
603        }
604        let pool = Rc::new(RefCell::new(Pool::with_capacity(
605            config.min_buffers as usize,
606            config.max_buffers as usize,
607            config.buffer_size as usize,
608        )));
609        let backends = Rc::new(RefCell::new(BackendMap::new()));
610
611        // Note: slab_capacity uses 4x multiplier (up from 2x) to account for H2
612        // multiplexing where each session can have multiple backend connections.
613        // Newer `optional` proto fields fall through to the
614        // command-lib defaults when an older worker manager omits them.
615        let sessions: Rc<RefCell<SessionManager>> = SessionManager::new(
616            Slab::with_capacity(config.slab_capacity() as usize),
617            config.max_connections as usize,
618            config
619                .max_connections_per_ip
620                .unwrap_or(sozu_command::config::DEFAULT_MAX_CONNECTIONS_PER_IP),
621            config
622                .retry_after
623                .unwrap_or(sozu_command::config::DEFAULT_RETRY_AFTER),
624        );
625        {
626            let mut s = sessions.borrow_mut();
627            let entry = s.slab.vacant_entry();
628            trace!("taking token {:?} for channel", SessionToken(entry.key()));
629            entry.insert(Rc::new(RefCell::new(ListenSession {
630                protocol: Protocol::Channel,
631            })));
632        }
633        {
634            let mut s = sessions.borrow_mut();
635            let entry = s.slab.vacant_entry();
636            trace!("taking token {:?} for metrics", SessionToken(entry.key()));
637            entry.insert(Rc::new(RefCell::new(ListenSession {
638                protocol: Protocol::Timer,
639            })));
640        }
641        {
642            let mut s = sessions.borrow_mut();
643            let entry = s.slab.vacant_entry();
644            trace!("taking token {:?} for metrics", SessionToken(entry.key()));
645            entry.insert(Rc::new(RefCell::new(ListenSession {
646                protocol: Protocol::Metrics,
647            })));
648        }
649
650        Server::new(
651            event_loop,
652            worker_to_main_channel,
653            worker_to_main_scm,
654            sessions,
655            pool,
656            backends,
657            None,
658            None,
659            None,
660            config,
661            Some(initial_state),
662            expects_initial_status,
663        )
664    }
665
666    #[allow(clippy::too_many_arguments)]
667    pub fn new(
668        poll: Poll,
669        mut channel: ProxyChannel,
670        scm: ScmSocket,
671        sessions: Rc<RefCell<SessionManager>>,
672        pool: Rc<RefCell<Pool>>,
673        backends: Rc<RefCell<BackendMap>>,
674        http: Option<http::HttpProxy>,
675        https: Option<https::HttpsProxy>,
676        tcp: Option<tcp::TcpProxy>,
677        server_config: ServerConfig,
678        initial_state: Option<InitialState>,
679        expects_initial_status: bool,
680    ) -> Result<Self, ServerError> {
681        FEATURES.with(|_features| {
682            // initializing feature flags
683        });
684
685        poll.registry()
686            .register(
687                &mut channel,
688                Token(0),
689                Interest::READABLE | Interest::WRITABLE,
690            )
691            .map_err(ServerError::RegisterChannel)?;
692
693        METRICS.with(|metrics| {
694            if let Some(sock) = (*metrics.borrow_mut()).socket_mut() {
695                poll.registry()
696                    .register(sock, Token(2), Interest::WRITABLE)
697                    .expect("should register the metrics socket");
698            }
699        });
700
701        let base_sessions_count = sessions.borrow().slab.len();
702
703        let http = Rc::new(RefCell::new(match http {
704            Some(http) => http,
705            None => {
706                let registry = poll
707                    .registry()
708                    .try_clone()
709                    .map_err(ServerError::CloneRegistry)?;
710
711                http::HttpProxy::new(registry, sessions.clone(), pool.clone(), backends.clone())
712            }
713        }));
714
715        let https = Rc::new(RefCell::new(match https {
716            Some(https) => https,
717            None => {
718                let registry = poll
719                    .registry()
720                    .try_clone()
721                    .map_err(ServerError::CloneRegistry)?;
722
723                https::HttpsProxy::new(registry, sessions.clone(), pool.clone(), backends.clone())
724            }
725        }));
726
727        let tcp = Rc::new(RefCell::new(match tcp {
728            Some(tcp) => tcp,
729            None => {
730                let registry = poll
731                    .registry()
732                    .try_clone()
733                    .map_err(ServerError::CloneRegistry)?;
734
735                tcp::TcpProxy::new(registry, sessions.clone(), pool.clone(), backends.clone())
736            }
737        }));
738
739        let mut server = Server {
740            accept_queue_timeout: Duration::from_secs(u64::from(
741                server_config.accept_queue_timeout,
742            )),
743            accept_queue: VecDeque::new(),
744            evict_on_queue_full: server_config.evict_on_queue_full.unwrap_or(false),
745            accept_ready: HashSet::new(),
746            backends,
747            base_sessions_count,
748            channel,
749            config_state: ConfigState::new(),
750            current_poll_errors: 0,
751            health_checker: HealthChecker::new(),
752            http,
753            https,
754            last_sessions_len: 0, // to be reset on server run
755            last_shutting_down_message: None,
756            last_zombie_check: Instant::now(), // to be reset on server run
757            loop_start: Instant::now(),        // to be reset on server run
758            started_at: Instant::now(),        // captured once, never reset
759            last_saturation_tick: Instant::now(), // 1Hz saturation ticker anchor
760            max_poll_errors: 10000,            // TODO: make it configurable?
761            pool,
762            poll_timeout: Some(Duration::from_millis(1000)), // TODO: make it configurable?
763            poll,
764            scm_listeners: None,
765            scm,
766            sessions,
767            should_poll_at: None,
768            shutting_down: None,
769            tcp,
770            zombie_check_interval: Duration::from_secs(u64::from(
771                server_config.zombie_check_interval,
772            )),
773        };
774
775        // initialize the worker with the state we got from a file
776        if let Some(state) = initial_state {
777            for request in state.requests {
778                trace!("generating initial config request: {:#?}", request);
779                server.notify_proxys(request);
780            }
781
782            // do not send back answers to the initialization messages
783            QUEUE.with(|queue| {
784                (*queue.borrow_mut()).clear();
785            });
786        }
787
788        if expects_initial_status {
789            // the main process sends a Status message, so we can notify it
790            // when the initial state is loaded
791            server.block_channel();
792            let msg = server.channel.read_message();
793            debug!("got message: {:?}", msg);
794
795            if let Ok(WorkerRequest {
796                id,
797                content:
798                    Request {
799                        request_type: Some(RequestType::Status(_)),
800                    },
801            }) = msg
802            {
803                if let Err(e) = server.channel.write_message(&WorkerResponse::ok(id)) {
804                    error!("Could not send an ok to the main process: {}", e);
805                }
806            } else {
807                panic!(
808                    "plz give me a status request first when I start, you sent me this instead: {msg:?}"
809                );
810            }
811            server.unblock_channel();
812        }
813
814        info!("will try to receive listeners");
815        server
816            .scm
817            .set_blocking(true)
818            .map_err(|scm_err| ServerError::ScmSocket {
819                msg: "Could not set the scm socket to blocking".to_string(),
820                scm_err,
821            })?;
822        let listeners =
823            server
824                .scm
825                .receive_listeners()
826                .map_err(|scm_err| ServerError::ScmSocket {
827                    msg: "could not receive listeners from the scm socket".to_string(),
828                    scm_err,
829                })?;
830        server
831            .scm
832            .set_blocking(false)
833            .map_err(|scm_err| ServerError::ScmSocket {
834                msg: "Could not set the scm socket to unblocking".to_string(),
835                scm_err,
836            })?;
837        info!("received listeners: {:?}", listeners);
838        server.scm_listeners = Some(listeners);
839
840        Ok(server)
841    }
842
843    /// The server runs in a loop until a shutdown is ordered
844    pub fn run(&mut self) {
845        let mut events = Events::with_capacity(1024); // TODO: make event capacity configurable?
846        self.last_sessions_len = self.sessions.borrow().slab.len();
847
848        self.last_zombie_check = Instant::now();
849        self.loop_start = Instant::now();
850
851        loop {
852            self.check_for_poll_errors();
853
854            let timeout = self.reset_loop_time_and_get_timeout();
855
856            match self.poll.poll(&mut events, timeout) {
857                Ok(_) => self.current_poll_errors = 0,
858                Err(error) => {
859                    error!("Error while polling events: {:?}", error);
860                    self.current_poll_errors += 1;
861                    continue;
862                }
863            }
864
865            let after_epoll = Instant::now();
866            time!(
867                names::event_loop::EPOLL_TIME,
868                (after_epoll - self.loop_start).as_millis()
869            );
870            self.loop_start = after_epoll;
871
872            self.send_queue();
873
874            for event in events.iter() {
875                match event.token() {
876                    // this is the command channel
877                    Token(0) => {
878                        if event.is_error() {
879                            error!("error reading from command channel");
880                            continue;
881                        }
882                        if event.is_read_closed() || event.is_write_closed() {
883                            error!("command channel was closed");
884                            return;
885                        }
886                        let ready = Ready::from(event);
887                        self.channel.handle_events(ready);
888
889                        // loop here because iterations has borrow issues
890                        loop {
891                            QUEUE.with(|queue| {
892                                if !(*queue.borrow()).is_empty() {
893                                    self.channel.interest.insert(Ready::WRITABLE);
894                                }
895                            });
896
897                            //trace!("WORKER[{}] channel readiness={:?}, interest={:?}, queue={} elements",
898                            //  line!(), self.channel.readiness, self.channel.interest, self.queue.len());
899                            if self.channel.readiness() == Ready::EMPTY {
900                                break;
901                            }
902
903                            // exit the big loop if the message is HardStop
904                            if self.read_channel_messages_and_notify() {
905                                return;
906                            }
907
908                            QUEUE.with(|queue| {
909                                if !(*queue.borrow()).is_empty() {
910                                    self.channel.interest.insert(Ready::WRITABLE);
911                                }
912                            });
913
914                            self.send_queue();
915                        }
916                    }
917                    // timer tick
918                    Token(1) => {
919                        while let Some(t) = TIMER.with(|timer| timer.borrow_mut().poll()) {
920                            self.timeout(t);
921                        }
922                    }
923                    // metrics socket is writable
924                    Token(2) => METRICS.with(|metrics| {
925                        (*metrics.borrow_mut()).writable();
926                    }),
927                    // ListenToken: 1 listener <=> 1 token
928                    // ProtocolToken (HTTP/HTTPS/TCP): 1 connection <=> 1 token
929                    token if self.health_checker.owns_token(token) => {
930                        self.health_checker.ready(token);
931                    }
932                    token => self.ready(token, Ready::from(event)),
933                }
934            }
935
936            if let Some(t) = self.should_poll_at.as_ref() {
937                if *t <= Instant::now() {
938                    while let Some(t) = TIMER.with(|timer| timer.borrow_mut().poll()) {
939                        //info!("polled for timeout: {:?}", t);
940                        self.timeout(t);
941                    }
942                }
943            }
944            self.handle_remaining_readiness();
945            self.create_sessions();
946
947            self.should_poll_at = TIMER.with(|timer| timer.borrow().next_poll_date());
948
949            self.zombie_check();
950            self.health_checker
951                .poll(&self.backends, self.poll.registry());
952
953            // Frontend session gauges. `client.connections` keeps the
954            // per-event signal from `SessionManager::incr/decr`; the rest
955            // follow the once-per-iteration batching contract this run loop
956            // uses for `process.uptime_seconds` / `server.live`.
957            //
958            // `slab.usage_percent` charts pure slab utilisation against
959            // `slab.capacity()`. `slab.accept_threshold_percent` charts how
960            // close the slab is to the `at_capacity()` accept gate
961            // (`10 + 2 * max_connections`, see
962            // `SessionManager::accept_slab_threshold`). Configured slab
963            // capacity can be larger than that gate (`slab_entries_per_connection`
964            // > 2), so the two gauges are kept independent on purpose.
965            {
966                let sessions = self.sessions.borrow();
967                let nb_connections = sessions.nb_connections;
968                let max_connections = sessions.max_connections;
969                let slab_len = sessions.slab.len();
970                let slab_capacity = sessions.slab.capacity();
971                let accept_threshold = sessions.accept_slab_threshold();
972
973                gauge!(names::client::CONNECTIONS, nb_connections);
974                gauge!(names::client::CONNECTIONS_MAX, max_connections);
975                if max_connections > 0 {
976                    gauge!(
977                        "client.connections_percent",
978                        nb_connections * 100 / max_connections
979                    );
980                }
981
982                gauge!(names::slab::ENTRIES, slab_len);
983                gauge!(names::slab::CAPACITY, slab_capacity);
984                if slab_capacity > 0 {
985                    gauge!(names::slab::USAGE_PERCENT, slab_len * 100 / slab_capacity);
986                }
987                if accept_threshold > 0 {
988                    gauge!(
989                        "slab.accept_threshold_percent",
990                        slab_len * 100 / accept_threshold
991                    );
992                }
993            }
994            // Buffer pool gauges. `buffer.in_use` replaces the older
995            // `buffer.number` (renamed in `lib/src/pool.rs` for naming
996            // consistency with the surrounding `buffer.*` keys).
997            // `buffer.usage_percent` is computed against the configured
998            // `buffer.capacity` so dashboards can chart pool pressure.
999            {
1000                let pool = self.pool.borrow();
1001                let used = pool.inner.used();
1002                let capacity = pool.inner.capacity();
1003                gauge!(names::buffer::IN_USE, used);
1004                gauge!(names::buffer::CAPACITY, capacity);
1005                if capacity > 0 {
1006                    gauge!(names::buffer::USAGE_PERCENT, used * 100 / capacity);
1007                }
1008            }
1009            // 1Hz tick for `accept_queue.saturated_seconds`. Increments once
1010            // per `ACCEPT_SATURATION_TICK` while `SessionManager::can_accept`
1011            // is `false`. Distinguishes "queue spent N seconds at max" from
1012            // "queue briefly hit max" — the binary `accept_queue.backpressure`
1013            // gauge collapses that duration. Sampled here rather than via a
1014            // dedicated mio timer because the run loop ticks at least once
1015            // per `poll_timeout` (1s by default), which is granular enough.
1016            let now = Instant::now();
1017            if now.duration_since(self.last_saturation_tick) >= ACCEPT_SATURATION_TICK {
1018                if !self.sessions.borrow().can_accept {
1019                    incr!(names::accept_queue::SATURATED_SECONDS);
1020                }
1021                self.last_saturation_tick = now;
1022            }
1023            // Process / runtime gauges sampled once per loop iteration. Same
1024            // batch as `client.connections` so dashboards see them update in
1025            // lock-step.
1026            gauge!(
1027                "process.uptime_seconds",
1028                self.started_at.elapsed().as_secs() as usize
1029            );
1030            // `server.live` flips to 0 once a graceful shutdown is requested,
1031            // matching Envoy's `server.live` semantics. L4 health checks
1032            // (HAProxy / cloud LBs) can poll this gauge to drain a worker
1033            // before the OS-level termination signal lands.
1034            gauge!(
1035                "server.live",
1036                if self.shutting_down.is_some() { 0 } else { 1 }
1037            );
1038            METRICS.with(|metrics| {
1039                (*metrics.borrow_mut()).send_data();
1040            });
1041
1042            if self.shutting_down.is_some() && self.shut_down_sessions() {
1043                return;
1044            }
1045        }
1046    }
1047
1048    fn check_for_poll_errors(&mut self) {
1049        if self.current_poll_errors >= self.max_poll_errors {
1050            error!(
1051                "Something is going very wrong. Last {} poll() calls failed, crashing..",
1052                self.current_poll_errors
1053            );
1054            panic!(
1055                "poll() calls failed {} times in a row",
1056                self.current_poll_errors
1057            );
1058        }
1059    }
1060
1061    fn reset_loop_time_and_get_timeout(&mut self) -> Option<Duration> {
1062        let now = Instant::now();
1063        time!(
1064            names::event_loop::EVENT_LOOP_TIME,
1065            (now - self.loop_start).as_millis()
1066        );
1067
1068        let mut timeout = match self.should_poll_at.as_ref() {
1069            None => self.poll_timeout,
1070            Some(i) => {
1071                if *i <= now {
1072                    self.poll_timeout
1073                } else {
1074                    let dur = *i - now;
1075                    match self.poll_timeout {
1076                        None => Some(dur),
1077                        Some(t) => {
1078                            if t < dur {
1079                                Some(t)
1080                            } else {
1081                                Some(dur)
1082                            }
1083                        }
1084                    }
1085                }
1086            }
1087        };
1088
1089        if self.shutting_down.is_some() {
1090            let shutdown_tick = Duration::from_millis(100);
1091            timeout = match timeout {
1092                None => Some(shutdown_tick),
1093                Some(current) => Some(current.min(shutdown_tick)),
1094            };
1095        }
1096
1097        self.loop_start = now;
1098        timeout
1099    }
1100
1101    /// Returns true if hardstop
1102    fn read_channel_messages_and_notify(&mut self) -> bool {
1103        if !self.channel.readiness().is_readable() {
1104            return false;
1105        }
1106
1107        if let Err(e) = self.channel.readable() {
1108            error!("error reading from channel: {:?}", e);
1109        }
1110
1111        loop {
1112            let request = self.channel.read_message();
1113            debug!("Received request {:?}", request);
1114            match request {
1115                Ok(request) => match request.content.request_type {
1116                    Some(RequestType::HardStop(_)) => {
1117                        let req_id = request.id.clone();
1118                        self.notify(request);
1119                        if let Err(e) = self.channel.write_message(&WorkerResponse::ok(req_id)) {
1120                            error!("Could not send ok response to the main process: {}", e);
1121                        }
1122                        if let Err(e) = self.channel.run() {
1123                            error!("Error while running the server channel: {}", e);
1124                        }
1125                        return true;
1126                    }
1127                    Some(RequestType::SoftStop(_)) => {
1128                        self.shutting_down = Some(request.id.clone());
1129                        self.last_sessions_len = self.sessions.borrow().slab.len();
1130                        self.notify(request);
1131                    }
1132                    Some(RequestType::ReturnListenSockets(_)) => {
1133                        info!("received ReturnListenSockets order");
1134                        match self.return_listen_sockets() {
1135                            Ok(_) => push_queue(WorkerResponse::ok(request.id)),
1136                            Err(error) => push_queue(worker_response_error(
1137                                request.id,
1138                                format!("Could not send listeners on scm socket: {error:?}"),
1139                            )),
1140                        }
1141                    }
1142                    _ => self.notify(request),
1143                },
1144                // Not an error per se, occurs when there is nothing to read
1145                Err(_) => {
1146                    // if the message was too large, we grow the buffer and retry to read if possible
1147                    if (self.channel.interest & self.channel.readiness).is_readable() {
1148                        if let Err(e) = self.channel.readable() {
1149                            error!("error reading from channel: {:?}", e);
1150                        }
1151                        continue;
1152                    }
1153                    break;
1154                }
1155            }
1156        }
1157        false
1158    }
1159
1160    /// Scans all sessions that have been inactive for longer than the configured interval
1161    fn zombie_check(&mut self) {
1162        let now = Instant::now();
1163        if now - self.last_zombie_check < self.zombie_check_interval {
1164            return;
1165        }
1166        info!("zombie check");
1167        self.last_zombie_check = now;
1168
1169        let mut zombie_tokens = HashSet::new();
1170
1171        // find the zombie sessions
1172        for (_index, session) in self
1173            .sessions
1174            .borrow_mut()
1175            .slab
1176            .iter_mut()
1177            .filter(|(_, c)| now - c.borrow().last_event() > self.zombie_check_interval)
1178        {
1179            let session_token = session.borrow().frontend_token();
1180            if !zombie_tokens.contains(&session_token) {
1181                session.borrow().print_session();
1182                zombie_tokens.insert(session_token);
1183            }
1184        }
1185
1186        let zombie_count = zombie_tokens.len() as i64;
1187        count!(names::misc::ZOMBIES, zombie_count);
1188
1189        let remaining_count = self.shut_down_sessions_by_frontend_tokens(zombie_tokens);
1190        info!(
1191            "removing {} zombies ({} remaining entries after close)",
1192            zombie_count, remaining_count
1193        );
1194    }
1195
1196    /// Calls close on targeted sessions, yields the number of entries in the slab
1197    /// that were not properly removed
1198    fn shut_down_sessions_by_frontend_tokens(&self, tokens: HashSet<Token>) -> usize {
1199        if tokens.is_empty() {
1200            return 0;
1201        }
1202
1203        // close the sessions associated with the tokens
1204        for token in &tokens {
1205            if self.sessions.borrow().slab.contains(token.0) {
1206                let session = { self.sessions.borrow_mut().slab.remove(token.0) };
1207                session.borrow_mut().close();
1208                self.sessions.borrow_mut().decr();
1209            }
1210        }
1211
1212        // find the entries of closed sessions in the session manager (they should not be there)
1213        let mut dangling_entries = HashSet::new();
1214        for (entry_key, session) in &self.sessions.borrow().slab {
1215            if tokens.contains(&session.borrow().frontend_token()) {
1216                dangling_entries.insert(entry_key);
1217            }
1218        }
1219
1220        // remove these from the session manager
1221        let mut dangling_entries_count = 0;
1222        for entry_key in dangling_entries {
1223            let mut sessions = self.sessions.borrow_mut();
1224            if sessions.slab.contains(entry_key) {
1225                sessions.slab.remove(entry_key);
1226                dangling_entries_count += 1;
1227            }
1228        }
1229        dangling_entries_count
1230    }
1231
1232    /// Order sessions to shut down, check that they are all down
1233    fn shut_down_sessions(&mut self) -> bool {
1234        let sessions_count = self.sessions.borrow().slab.len();
1235        let mut sessions_to_shut_down = HashSet::new();
1236
1237        for (_key, session) in &self.sessions.borrow().slab {
1238            let mut session = session.borrow_mut();
1239            if session.shutting_down() {
1240                debug!(
1241                    "Server killing session from shutting_down: token={:?}, protocol={:?}",
1242                    session.frontend_token(),
1243                    session.protocol()
1244                );
1245                sessions_to_shut_down.insert(Token(session.frontend_token().0));
1246            }
1247        }
1248        let _ = self.shut_down_sessions_by_frontend_tokens(sessions_to_shut_down);
1249
1250        let new_sessions_count = self.sessions.borrow().slab.len();
1251
1252        if new_sessions_count < sessions_count {
1253            let now = Instant::now();
1254            if let Some(last) = self.last_shutting_down_message {
1255                if (now - last) > Duration::from_secs(5) {
1256                    info!(
1257                        "closed {} sessions, {} sessions left, base_sessions_count = {}",
1258                        sessions_count - new_sessions_count,
1259                        new_sessions_count,
1260                        self.base_sessions_count
1261                    );
1262                }
1263            }
1264            self.last_shutting_down_message = Some(now);
1265        }
1266
1267        if new_sessions_count <= self.base_sessions_count {
1268            info!("last session stopped, shutting down!");
1269            if let Err(e) = self.channel.run() {
1270                error!("Error while running the server channel: {}", e);
1271            }
1272            // self.block_channel();
1273            let id = self
1274                .shutting_down
1275                .take()
1276                .expect("should have shut down correctly"); // panicking here makes sense actually
1277
1278            debug!("Responding OK to main process for request {}", id);
1279
1280            let proxy_response = WorkerResponse::ok(id);
1281            if let Err(e) = self.channel.write_message(&proxy_response) {
1282                error!("Could not write response to the main process: {}", e);
1283            }
1284            if let Err(e) = self.channel.run() {
1285                error!("Error while running the server channel: {}", e);
1286            }
1287            return true;
1288        }
1289
1290        if new_sessions_count < self.last_sessions_len {
1291            info!(
1292                "shutting down, {} slab elements remaining (base: {})",
1293                new_sessions_count - self.base_sessions_count,
1294                self.base_sessions_count
1295            );
1296            self.last_sessions_len = new_sessions_count;
1297        }
1298
1299        false
1300    }
1301
1302    fn kill_session(&self, session: Rc<RefCell<dyn ProxySession>>) {
1303        let token = session.borrow().frontend_token();
1304        let _ = self.shut_down_sessions_by_frontend_tokens(HashSet::from([token]));
1305    }
1306
1307    fn send_queue(&mut self) {
1308        if self.channel.readiness.is_writable() {
1309            QUEUE.with(|q| {
1310                let mut queue = q.borrow_mut();
1311                loop {
1312                    if let Some(resp) = queue.pop_front() {
1313                        debug!("Sending response {:?}", resp);
1314                        if let Err(e) = self.channel.write_message(&resp) {
1315                            error!("Could not write message {} on the channel: {}", resp, e);
1316                            queue.push_front(resp);
1317                        }
1318                    }
1319
1320                    if self.channel.back_buf.available_data() > 0 {
1321                        if let Err(e) = self.channel.writable() {
1322                            error!("error writing to channel: {:?}", e);
1323                        }
1324                    }
1325
1326                    if !self.channel.readiness.is_writable() {
1327                        break;
1328                    }
1329
1330                    if self.channel.back_buf.available_data() == 0 && queue.is_empty() {
1331                        break;
1332                    }
1333                }
1334            });
1335        }
1336    }
1337
1338    fn notify(&mut self, message: WorkerRequest) {
1339        // Polled lease-expiry janitor: SetMetricDetail leases self-expire after
1340        // their TTL so a crashed `sozu top` cannot permanently elevate metrics
1341        // cardinality. The janitor runs at most every LEASE_TICK_INTERVAL,
1342        // gated by `lease_tick_due` so the hot path of `notify` doesn't pay
1343        // the HashMap walk on every iteration. Single-threaded worker, so
1344        // `borrow_mut` is safe here.
1345        let now = std::time::Instant::now();
1346        // Capture (previous, effective) before releasing the borrow so we
1347        // can emit an Event afterwards. Holding `METRICS.borrow_mut`
1348        // across `push_event` would re-enter the same thread-local from
1349        // inside `QUEUE.with` (safe but conceptually noisy); the
1350        // two-step split keeps the borrow scopes minimal.
1351        let lease_tick_transition = METRICS.with(|metrics| {
1352            let mut m = metrics.borrow_mut();
1353            if !m.lease_tick_due(now) {
1354                return None;
1355            }
1356            let previous = m.lease_tick(now)?;
1357            let effective = m.detail_effective();
1358            Some((previous, effective))
1359        });
1360        if let Some((previous, effective)) = lease_tick_transition {
1361            // The janitor retired one or more leases AND the effective
1362            // level moved. Surface the worker-local transition as an
1363            // Event so the master folds it into the audit log (closes
1364            // the gap where TUI-crashed lease expiry was previously
1365            // silent). `client_id` is `None` because the janitor may
1366            // have retired multiple leases at once.
1367            push_metric_detail_transition(previous, effective, "lease_tick_expired", None);
1368        }
1369        match &message.content.request_type {
1370            Some(RequestType::ConfigureMetrics(configuration)) => {
1371                match MetricsConfiguration::try_from(*configuration) {
1372                    Ok(metrics_config) => {
1373                        METRICS.with(|metrics| {
1374                            (*metrics.borrow_mut()).configure(&metrics_config);
1375                            push_queue(WorkerResponse::ok(message.id));
1376                        });
1377                    }
1378                    Err(e) => {
1379                        error!("Error configuring metrics: {}", e);
1380                        push_queue(WorkerResponse::error(message.id, e));
1381                    }
1382                }
1383                return;
1384            }
1385            Some(RequestType::QueryMetrics(query_metrics_options)) => {
1386                METRICS.with(|metrics| {
1387                    match (*metrics.borrow_mut()).query(query_metrics_options) {
1388                        Ok(c) => push_queue(WorkerResponse::ok_with_content(message.id, c)),
1389                        Err(e) => {
1390                            error!("Error querying metrics: {}", e);
1391                            push_queue(WorkerResponse::error(message.id, e))
1392                        }
1393                    }
1394                });
1395                return;
1396            }
1397            // Runtime cardinality lease verb — apply, renew, or clear a lease
1398            // on this worker's `Aggregator`. The lease bumps `effective` to
1399            // `max(configured, max(active leases))`; expiry runs on the polled
1400            // janitor below. Master-side aggregation into `MetricDetailStatus`
1401            // lands in a follow-up; for now the worker acks with a bare OK so
1402            // the existing `worker_request` fan-out path can collect.
1403            Some(RequestType::SetMetricDetail(req)) => {
1404                // Master populates the peer binding from the connecting
1405                // `ClientSession` before fan-out (`bin/src/command/
1406                // requests.rs::worker_request`). A pre-binding caller or a
1407                // platform without `SO_PEERCRED` yields `PeerBinding::default()`
1408                // — clears against that lease are accepted from anyone, per
1409                // the proto contract on `SetMetricDetail.peer_pid`.
1410                let presented_binding = crate::metrics::PeerBinding {
1411                    pid: req.peer_pid,
1412                    // Master sends Crockford-base32 ULIDs (`Ulid::to_string`);
1413                    // accept those, with a fallback hex parse for callers that
1414                    // happen to send `0x…` form. A failed parse degrades to
1415                    // `None` — the lease store treats that as "binding
1416                    // unknown" per the proto contract.
1417                    session_ulid: req.peer_session_ulid.as_deref().and_then(|s| {
1418                        rusty_ulid::Ulid::from_str(s)
1419                            .map(u128::from)
1420                            .ok()
1421                            .or_else(|| u128::from_str_radix(s.trim_start_matches("0x"), 16).ok())
1422                    }),
1423                };
1424                if req.clear.unwrap_or(false) {
1425                    // Defense-in-depth: the master pre-validates `client_id`
1426                    // length at the dispatch site, but worker IPC is not
1427                    // master-only — fuzz harnesses, serial_test-flagged
1428                    // integration tests, and future internal callers can
1429                    // issue an oversized clear directly. Mirror the apply
1430                    // path's `ClientIdTooLong` arm so an unbounded HashMap
1431                    // lookup is never driven by an operator-supplied string
1432                    // here either. The reason string echoes the byte length
1433                    // but not the operator bytes themselves (symmetric with
1434                    // the audit-column-smuggling guard on the apply path).
1435                    if req.client_id.len() > crate::metrics::LEASE_CLIENT_ID_MAX_BYTES {
1436                        let msg = format!(
1437                            "SetMetricDetail: clear client_id length {} exceeds {} bytes",
1438                            req.client_id.len(),
1439                            crate::metrics::LEASE_CLIENT_ID_MAX_BYTES,
1440                        );
1441                        error!("{}", msg);
1442                        push_queue(WorkerResponse::error(message.id.clone(), msg));
1443                        return;
1444                    }
1445                    // Capture transition fields + post-clear snapshot
1446                    // before releasing the borrow so we can emit an
1447                    // Event after AND build the WorkerMetricDetailStatus
1448                    // payload that the master folds into
1449                    // `MetricDetailStatus.workers[<worker_id>]`. Without
1450                    // this payload the master used its own view as a
1451                    // stand-in for the worker's per-aggregator state.
1452                    let (outcome, effective_after, configured_after, lease_count_after) = METRICS
1453                        .with(|metrics| {
1454                            let mut m = metrics.borrow_mut();
1455                            let outcome = m.lease_clear(&req.client_id, presented_binding);
1456                            (
1457                                outcome,
1458                                m.detail_effective(),
1459                                m.detail_configured(),
1460                                m.lease_count(),
1461                            )
1462                        });
1463                    match outcome {
1464                        crate::metrics::LeaseClearOutcome::Cleared { previous_effective } => {
1465                            push_metric_detail_transition(
1466                                previous_effective,
1467                                effective_after,
1468                                "lease_clear",
1469                                Some(req.client_id.clone()),
1470                            );
1471                            push_queue(WorkerResponse::ok_with_content(
1472                                message.id.clone(),
1473                                worker_metric_detail_status_content(
1474                                    configured_after,
1475                                    effective_after,
1476                                    previous_effective,
1477                                    lease_count_after,
1478                                ),
1479                            ));
1480                        }
1481                        crate::metrics::LeaseClearOutcome::NotFound => {
1482                            // Silent no-op: no lease existed for that
1483                            // id. The worker's state is unchanged so
1484                            // previous_effective == effective.
1485                            push_queue(WorkerResponse::ok_with_content(
1486                                message.id.clone(),
1487                                worker_metric_detail_status_content(
1488                                    configured_after,
1489                                    effective_after,
1490                                    effective_after,
1491                                    lease_count_after,
1492                                ),
1493                            ));
1494                        }
1495                        crate::metrics::LeaseClearOutcome::Unauthorized => {
1496                            // Do NOT echo `req.client_id` here: the operator-
1497                            // supplied bytes flow back through the master's
1498                            // worker→reason aggregation into the audit line's
1499                            // `reason=` column, which is sanitised for control
1500                            // bytes only. The dedicated `lease_id=` audit
1501                            // column already carries the operator string
1502                            // through `sanitize_for_audit_kv`, so re-embedding
1503                            // it here would let a value containing `,` or `=`
1504                            // forge a sibling KV pair against SIEM consumers
1505                            // that split on `, key=value`.
1506                            let msg = "SetMetricDetail: clear refused (peer \
1507                                 binding does not match the apply-time owner)"
1508                                .to_owned();
1509                            error!("{}", msg);
1510                            push_queue(WorkerResponse::error(message.id.clone(), msg));
1511                        }
1512                    }
1513                    return;
1514                }
1515                let detail_proto = match req.detail {
1516                    Some(d) => d,
1517                    None => {
1518                        // Operator-supplied `client_id` is intentionally
1519                        // omitted from the reason string: the dedicated
1520                        // `lease_id=` audit column carries it through the
1521                        // strict KV sanitiser. See the matching comment on
1522                        // the `Unauthorized` arm above for the column-
1523                        // smuggling rationale.
1524                        let msg = "SetMetricDetail without `detail` and without `clear`".to_owned();
1525                        error!("{}", msg);
1526                        push_queue(WorkerResponse::error(message.id.clone(), msg));
1527                        return;
1528                    }
1529                };
1530                let detail_enum = match MetricDetail::try_from(detail_proto) {
1531                    Ok(d) => d,
1532                    Err(e) => {
1533                        let msg =
1534                            format!("SetMetricDetail: invalid MetricDetail variant {detail_proto}");
1535                        error!("{}: {}", msg, e);
1536                        push_queue(WorkerResponse::error(message.id.clone(), msg));
1537                        return;
1538                    }
1539                };
1540                let level = MetricDetailLevel::from(detail_enum);
1541                // Bound the worst case BEFORE we touch the aggregator: the
1542                // proto contract on `SetMetricDetail.ttl_seconds` says the
1543                // worker rejects values larger than `LEASE_TTL_MAX` so a
1544                // stuck operator-side renewer (or a buggy third-party client)
1545                // cannot lock the worker into elevated cardinality. The
1546                // `Aggregator::lease_apply` clamp is still in place as a
1547                // defence-in-depth net for code paths that bypass this
1548                // dispatch (proto fuzzing, future internal callers).
1549                if let Some(t) = req.ttl_seconds
1550                    && u64::from(t) > crate::metrics::LEASE_TTL_MAX.as_secs()
1551                {
1552                    let msg = format!(
1553                        "SetMetricDetail: ttl_seconds={t} exceeds LEASE_TTL_MAX={}",
1554                        crate::metrics::LEASE_TTL_MAX.as_secs()
1555                    );
1556                    error!("{}", msg);
1557                    push_queue(WorkerResponse::error(message.id.clone(), msg));
1558                    return;
1559                }
1560                let ttl_seconds = req.ttl_seconds.filter(|&t| t > 0).unwrap_or_else(|| {
1561                    // The default fits in a u32 by construction
1562                    // (LEASE_TTL_DEFAULT = 60 s); the lossy `as u32` cast
1563                    // is replaced with a checked conversion so any
1564                    // future tweak past `u32::MAX` seconds (≈ 136 years)
1565                    // can't silently truncate. Falls through to 60 s on
1566                    // the theoretical overflow path.
1567                    u32::try_from(crate::metrics::LEASE_TTL_DEFAULT.as_secs()).unwrap_or(60)
1568                });
1569                let ttl = std::time::Duration::from_secs(ttl_seconds.into());
1570                let (outcome, configured_after, lease_count_after) = METRICS.with(|metrics| {
1571                    let mut m = metrics.borrow_mut();
1572                    let outcome =
1573                        m.lease_apply(req.client_id.clone(), level, ttl, presented_binding);
1574                    (outcome, m.detail_configured(), m.lease_count())
1575                });
1576                match outcome {
1577                    crate::metrics::LeaseApplyOutcome::Applied {
1578                        previous_effective,
1579                        new_effective,
1580                    } => {
1581                        push_metric_detail_transition(
1582                            previous_effective,
1583                            new_effective,
1584                            "lease_apply",
1585                            Some(req.client_id.clone()),
1586                        );
1587                        push_queue(WorkerResponse::ok_with_content(
1588                            message.id.clone(),
1589                            worker_metric_detail_status_content(
1590                                configured_after,
1591                                new_effective,
1592                                previous_effective,
1593                                lease_count_after,
1594                            ),
1595                        ));
1596                    }
1597                    crate::metrics::LeaseApplyOutcome::ClientIdTooLong => {
1598                        let msg = format!(
1599                            "SetMetricDetail: client_id length {} exceeds {} bytes",
1600                            req.client_id.len(),
1601                            crate::metrics::LEASE_CLIENT_ID_MAX_BYTES,
1602                        );
1603                        error!("{}", msg);
1604                        push_queue(WorkerResponse::error(message.id.clone(), msg));
1605                    }
1606                    crate::metrics::LeaseApplyOutcome::TableFull => {
1607                        // Same audit-column-smuggling guard as the
1608                        // `Unauthorized` and missing-detail arms: the
1609                        // operator-supplied `client_id` is already rendered
1610                        // safely through the strict KV sanitiser in the
1611                        // audit envelope's `lease_id=` column, so we keep
1612                        // it out of the reason string.
1613                        let msg = format!(
1614                            "SetMetricDetail: lease table at capacity ({} entries); reject new \
1615                             apply — operators must retry after an active lease expires or is \
1616                             cleared",
1617                            crate::metrics::LEASE_TABLE_CAP,
1618                        );
1619                        error!("{}", msg);
1620                        push_queue(WorkerResponse::error(message.id.clone(), msg));
1621                    }
1622                    crate::metrics::LeaseApplyOutcome::TtlOutOfRange => {
1623                        // Unreachable in the normal flow: the dispatch-time
1624                        // gate above already rejected ttl > LEASE_TTL_MAX.
1625                        // Surface explicitly so any future bypass (proto
1626                        // fuzzing, internal callers) fails loud rather
1627                        // than silently capping the lessor's intent.
1628                        let msg = format!(
1629                            "SetMetricDetail: ttl exceeds LEASE_TTL_MAX={} (internal contract \
1630                             violation: dispatch gate should have rejected)",
1631                            crate::metrics::LEASE_TTL_MAX.as_secs(),
1632                        );
1633                        error!("{}", msg);
1634                        push_queue(WorkerResponse::error(message.id.clone(), msg));
1635                    }
1636                    crate::metrics::LeaseApplyOutcome::Unauthorized => {
1637                        // A renewal arrived against an existing lease whose
1638                        // apply-time peer binding does not match the
1639                        // presented one. The `client_id` is intentionally
1640                        // omitted from the error string — the audit-log
1641                        // row already carries it in the dedicated
1642                        // `lease_id` column, and echoing it here would
1643                        // route operator-controlled bytes through the
1644                        // freeform reason field.
1645                        let msg = "SetMetricDetail: renewal refused (peer binding does not \
1646                                   match the apply-time owner)"
1647                            .to_owned();
1648                        error!("{}", msg);
1649                        push_queue(WorkerResponse::error(message.id.clone(), msg));
1650                    }
1651                }
1652                return;
1653            }
1654            Some(RequestType::Logging(logging_filter)) => {
1655                info!(
1656                    "{} changing logging filter to {}",
1657                    message.id, logging_filter
1658                );
1659                // there should not be any errors as it was already parsed by the main process
1660                let (directives, _errors) = logging::parse_logging_spec(logging_filter);
1661                logging::LOGGER.with(|logger| {
1662                    logger.borrow_mut().set_directives(directives);
1663                });
1664                push_queue(WorkerResponse::ok(message.id));
1665                return;
1666            }
1667            Some(RequestType::QueryClustersHashes(_)) => {
1668                push_queue(WorkerResponse::ok_with_content(
1669                    message.id.clone(),
1670                    ContentType::ClusterHashes(ClusterHashes {
1671                        map: self.config_state.hash_state(),
1672                    })
1673                    .into(),
1674                ));
1675                return;
1676            }
1677            Some(RequestType::QueryClusterById(cluster_id)) => {
1678                push_queue(WorkerResponse::ok_with_content(
1679                    message.id.clone(),
1680                    ContentType::Clusters(ClusterInformations {
1681                        vec: self
1682                            .config_state
1683                            .cluster_state(cluster_id)
1684                            .map_or(vec![], |ci| vec![ci]),
1685                    })
1686                    .into(),
1687                ));
1688            }
1689            Some(RequestType::SetMaxConnectionsPerIp(limit)) => {
1690                let mut sessions = self.sessions.borrow_mut();
1691                let previous = sessions.max_connections_per_ip;
1692                sessions.max_connections_per_ip = *limit;
1693                // Disabling the feature on the fly should not leave
1694                // stale `(cluster, ip)` entries behind: drain the
1695                // bookkeeping so a re-enable starts from a clean slate
1696                // and `cluster_ip_at_limit` does not consult dead state.
1697                if *limit == 0 {
1698                    sessions.clear_cluster_ip_tracking();
1699                }
1700                info!(
1701                    "{} updated global max_connections_per_ip from {} to {}",
1702                    message.id, previous, limit
1703                );
1704                push_queue(WorkerResponse::ok(message.id));
1705                return;
1706            }
1707            Some(RequestType::QueryMaxConnectionsPerIp(_)) => {
1708                let limit = self.sessions.borrow().max_connections_per_ip;
1709                push_queue(WorkerResponse::ok_with_content(
1710                    message.id,
1711                    ContentType::MaxConnectionsPerIpLimit(
1712                        sozu_command::proto::command::MaxConnectionsPerIpLimit { limit },
1713                    )
1714                    .into(),
1715                ));
1716                return;
1717            }
1718            Some(RequestType::QueryClustersByDomain(domain)) => {
1719                let cluster_ids = self
1720                    .config_state
1721                    .get_cluster_ids_by_domain(domain.hostname.clone(), domain.path.clone());
1722                let vec = cluster_ids
1723                    .iter()
1724                    .filter_map(|cluster_id| self.config_state.cluster_state(cluster_id))
1725                    .collect();
1726
1727                push_queue(WorkerResponse::ok_with_content(
1728                    message.id.clone(),
1729                    ContentType::Clusters(ClusterInformations { vec }).into(),
1730                ));
1731                return;
1732            }
1733            Some(RequestType::QueryCertificatesFromWorkers(filters)) => {
1734                if filters.fingerprint.is_some() {
1735                    let certs = self.config_state.get_certificates(filters.clone());
1736                    let response = if !certs.is_empty() {
1737                        WorkerResponse::ok_with_content(
1738                            message.id.clone(),
1739                            ContentType::CertificatesWithFingerprints(
1740                                CertificatesWithFingerprints { certs },
1741                            )
1742                            .into(),
1743                        )
1744                    } else {
1745                        worker_response_error(
1746                            message.id.clone(),
1747                            "Could not find certificate for this fingerprint",
1748                        )
1749                    };
1750                    push_queue(response);
1751                    return;
1752                }
1753                // if all certificates are queried, or filtered by domain name,
1754                // the request will be handled by the https proxy
1755            }
1756            _other_request => {}
1757        }
1758        self.notify_proxys(message);
1759    }
1760
1761    pub fn notify_proxys(&mut self, request: WorkerRequest) {
1762        if let Err(e) = self.config_state.dispatch(&request.content) {
1763            error!("Could not execute order on config state: {}", e);
1764        }
1765
1766        let req_id = request.id.clone();
1767
1768        match request.content.request_type {
1769            Some(RequestType::AddCluster(ref cluster)) => {
1770                // Mirror the master-side ConfigState::add_cluster check so
1771                // off-channel paths (TOML reload, SaveState/LoadState, direct
1772                // API) that smuggle a malformed `cluster.health_check` cannot
1773                // arm the worker's BackendList::set_health_check_config with
1774                // a CRLF/NUL/C0 URI or zero thresholds. The SetHealthCheck
1775                // handler below already runs the same validation; this is the
1776                // AddCluster mirror.
1777                if let Some(hc) = cluster.health_check.as_ref() {
1778                    if let Err(reason) = sozu_command::config::validate_health_check_config(hc) {
1779                        push_queue(worker_response_error(req_id, reason));
1780                        return;
1781                    }
1782                }
1783                self.add_cluster(cluster);
1784                // Re-arm the metric drain tombstone in case this cluster id
1785                // was previously removed — without this the drain would
1786                // continue dropping every emission for the resurrected
1787                // cluster. Idempotent on a fresh id.
1788                METRICS.with(|metrics| {
1789                    (*metrics.borrow_mut()).add_cluster(&cluster.cluster_id);
1790                });
1791                //not returning because the message must still be handled by each proxy
1792            }
1793            Some(RequestType::RemoveCluster(ref cluster_id)) => {
1794                self.remove_health_check_state(cluster_id);
1795                METRICS.with(|metrics| {
1796                    (*metrics.borrow_mut()).remove_cluster(cluster_id);
1797                });
1798                //not returning because the message must still be handled by each proxy
1799            }
1800            Some(RequestType::SetHealthCheck(ref set)) => {
1801                if let Err(reason) = sozu_command::config::validate_health_check_config(&set.config)
1802                {
1803                    push_queue(worker_response_error(req_id, reason));
1804                    return;
1805                }
1806                self.backends
1807                    .borrow_mut()
1808                    .set_health_check_config(&set.cluster_id, Some(set.config.to_owned()));
1809                push_queue(WorkerResponse::ok(req_id));
1810                return;
1811            }
1812            Some(RequestType::RemoveHealthCheck(ref cluster_id)) => {
1813                self.remove_health_check_state(cluster_id);
1814                push_queue(WorkerResponse::ok(req_id));
1815                return;
1816            }
1817            Some(RequestType::AddBackend(ref backend)) => {
1818                push_queue(self.add_backend(&req_id, backend));
1819                return;
1820            }
1821            Some(RequestType::RemoveBackend(ref remove_backend)) => {
1822                push_queue(self.remove_backend(&req_id, remove_backend));
1823                return;
1824            }
1825            _ => {}
1826        };
1827
1828        let proxy_destinations = request.content.get_destinations();
1829        let mut notify_response = None;
1830        if proxy_destinations.to_http_proxy {
1831            notify_response = Some(self.http.borrow_mut().notify(request.clone()));
1832        }
1833        if proxy_destinations.to_https_proxy {
1834            let http_proxy_response = self.https.borrow_mut().notify(request.clone());
1835            if http_proxy_response.is_failure() || notify_response.is_none() {
1836                notify_response = Some(http_proxy_response);
1837            }
1838        }
1839        if proxy_destinations.to_tcp_proxy {
1840            let tcp_proxy_response = self.tcp.borrow_mut().notify(request.clone());
1841            if tcp_proxy_response.is_failure() || notify_response.is_none() {
1842                notify_response = Some(tcp_proxy_response);
1843            }
1844        }
1845        if let Some(response) = notify_response {
1846            push_queue(response);
1847        }
1848
1849        match request.content.request_type {
1850            // special case for adding listeners, because we need to register a listener
1851            Some(RequestType::AddHttpListener(listener)) => {
1852                push_queue(self.notify_add_http_listener(&req_id, listener));
1853            }
1854            Some(RequestType::AddHttpsListener(listener)) => {
1855                push_queue(self.notify_add_https_listener(&req_id, listener));
1856            }
1857            Some(RequestType::AddTcpListener(listener)) => {
1858                push_queue(self.notify_add_tcp_listener(&req_id, listener));
1859            }
1860            Some(RequestType::UpdateHttpListener(patch)) => {
1861                push_queue(self.notify_update_http_listener(&req_id, patch));
1862            }
1863            Some(RequestType::UpdateHttpsListener(patch)) => {
1864                push_queue(self.notify_update_https_listener(&req_id, patch));
1865            }
1866            Some(RequestType::UpdateTcpListener(patch)) => {
1867                push_queue(self.notify_update_tcp_listener(&req_id, patch));
1868            }
1869            Some(RequestType::RemoveListener(ref remove)) => {
1870                debug!("{} remove {:?} listener {:?}", req_id, remove.proxy, remove);
1871                self.base_sessions_count -= 1;
1872                let response = match ListenerType::try_from(remove.proxy) {
1873                    Ok(ListenerType::Http) => self.http.borrow_mut().notify(request),
1874                    Ok(ListenerType::Https) => self.https.borrow_mut().notify(request),
1875                    Ok(ListenerType::Tcp) => self.tcp.borrow_mut().notify(request),
1876                    Err(_) => WorkerResponse::error(req_id, "Wrong variant ListenerType"),
1877                };
1878                push_queue(response);
1879            }
1880            Some(RequestType::ActivateListener(ref activate)) => {
1881                push_queue(self.notify_activate_listener(&req_id, activate));
1882            }
1883            Some(RequestType::DeactivateListener(ref deactivate)) => {
1884                push_queue(self.notify_deactivate_listener(&req_id, deactivate));
1885            }
1886            _other_request => {}
1887        };
1888    }
1889
1890    fn add_cluster(&mut self, cluster: &Cluster) {
1891        let mut backends = self.backends.borrow_mut();
1892        backends.set_load_balancing_policy_for_cluster(
1893            &cluster.cluster_id,
1894            LoadBalancingAlgorithms::try_from(cluster.load_balancing).unwrap_or_default(),
1895            cluster
1896                .load_metric
1897                .and_then(|n| LoadMetric::try_from(n).ok()),
1898        );
1899        backends.set_health_check_config(&cluster.cluster_id, cluster.health_check.to_owned());
1900        backends.set_cluster_http2(&cluster.cluster_id, cluster.http2.unwrap_or(false));
1901    }
1902
1903    fn add_backend(&mut self, req_id: &str, add_backend: &AddBackend) -> WorkerResponse {
1904        let new_backend = Backend::new(
1905            &add_backend.backend_id,
1906            add_backend.address.into(),
1907            add_backend.sticky_id.clone(),
1908            add_backend.load_balancing_parameters,
1909            add_backend.backup,
1910        );
1911        self.backends
1912            .borrow_mut()
1913            .add_backend(&add_backend.cluster_id, new_backend);
1914
1915        WorkerResponse::ok(req_id)
1916    }
1917
1918    fn remove_health_check_state(&mut self, cluster_id: &str) {
1919        self.health_checker.remove_cluster(cluster_id);
1920        self.backends
1921            .borrow_mut()
1922            .health_check_configs
1923            .remove(cluster_id);
1924    }
1925
1926    fn remove_backend(&mut self, req_id: &str, backend: &RemoveBackend) -> WorkerResponse {
1927        let address = backend.address.into();
1928        // Runtime removal is address-keyed and drops every backend at this
1929        // address (A/B test, weighted variant, dedup race). The metrics
1930        // layer is id-keyed — fan out one `remove_backend` per actually-
1931        // removed id so the two identities stay in sync. Without this the
1932        // `backend_id` field on the IPC message could name "A" while the
1933        // runtime dropped both "A" and "B" at the same address, leaving
1934        // "B"'s metrics rows orphaned forever.
1935        let removed_ids = self
1936            .backends
1937            .borrow_mut()
1938            .remove_backend(&backend.cluster_id, &address);
1939        if removed_ids.is_empty() {
1940            // Edge case: BackendList returned nothing (address never
1941            // existed in this cluster). Honour the request's stated id
1942            // anyway so a no-op request still tidies any orphan metric
1943            // row from a prior identity-drift state.
1944            METRICS.with(|metrics| {
1945                (*metrics.borrow_mut()).remove_backend(&backend.cluster_id, &backend.backend_id);
1946            });
1947        } else {
1948            METRICS.with(|metrics| {
1949                let mut metrics = metrics.borrow_mut();
1950                for id in &removed_ids {
1951                    metrics.remove_backend(&backend.cluster_id, id);
1952                }
1953            });
1954        }
1955
1956        WorkerResponse::ok(req_id)
1957    }
1958
1959    fn notify_add_http_listener(
1960        &mut self,
1961        req_id: &str,
1962        listener: HttpListenerConfig,
1963    ) -> WorkerResponse {
1964        debug!("{} add http listener {:?}", req_id, listener);
1965
1966        if self.sessions.borrow().at_capacity() {
1967            return worker_response_error(req_id, "session list is full, cannot add a listener");
1968        }
1969
1970        let mut session_manager = self.sessions.borrow_mut();
1971        let entry = session_manager.slab.vacant_entry();
1972        let token = Token(entry.key());
1973
1974        match self.http.borrow_mut().add_listener(listener, token) {
1975            Ok(_token) => {
1976                entry.insert(Rc::new(RefCell::new(ListenSession {
1977                    protocol: Protocol::HTTPListen,
1978                })));
1979                self.base_sessions_count += 1;
1980                WorkerResponse::ok(req_id)
1981            }
1982            Err(e) => worker_response_error(req_id, format!("Could not add HTTP listener: {e}")),
1983        }
1984    }
1985
1986    fn notify_add_https_listener(
1987        &mut self,
1988        req_id: &str,
1989        listener: HttpsListenerConfig,
1990    ) -> WorkerResponse {
1991        debug!("{} add https listener {:?}", req_id, listener);
1992
1993        if self.sessions.borrow().at_capacity() {
1994            return worker_response_error(req_id, "session list is full, cannot add a listener");
1995        }
1996
1997        let mut session_manager = self.sessions.borrow_mut();
1998        let entry = session_manager.slab.vacant_entry();
1999        let token = Token(entry.key());
2000
2001        match self
2002            .https
2003            .borrow_mut()
2004            .add_listener(listener.clone(), token)
2005        {
2006            Ok(_token) => {
2007                entry.insert(Rc::new(RefCell::new(ListenSession {
2008                    protocol: Protocol::HTTPSListen,
2009                })));
2010                self.base_sessions_count += 1;
2011                WorkerResponse::ok(req_id)
2012            }
2013            Err(e) => worker_response_error(req_id, format!("Could not add HTTPS listener: {e}")),
2014        }
2015    }
2016
2017    fn notify_add_tcp_listener(
2018        &mut self,
2019        req_id: &str,
2020        listener: CommandTcpListener,
2021    ) -> WorkerResponse {
2022        debug!("{} add tcp listener {:?}", req_id, listener);
2023
2024        if self.sessions.borrow().at_capacity() {
2025            return worker_response_error(req_id, "session list is full, cannot add a listener");
2026        }
2027
2028        let mut session_manager = self.sessions.borrow_mut();
2029        let entry = session_manager.slab.vacant_entry();
2030        let token = Token(entry.key());
2031
2032        match self.tcp.borrow_mut().add_listener(listener, token) {
2033            Ok(_token) => {
2034                entry.insert(Rc::new(RefCell::new(ListenSession {
2035                    protocol: Protocol::TCPListen,
2036                })));
2037                self.base_sessions_count += 1;
2038                WorkerResponse::ok(req_id)
2039            }
2040            Err(e) => worker_response_error(req_id, format!("Could not add TCP listener: {e}")),
2041        }
2042    }
2043
2044    fn notify_update_http_listener(
2045        &mut self,
2046        req_id: &str,
2047        patch: UpdateHttpListenerConfig,
2048    ) -> WorkerResponse {
2049        debug!("{} update http listener {:?}", req_id, patch.address);
2050        match self.http.borrow_mut().update_listener(patch) {
2051            Ok(()) => WorkerResponse::ok(req_id),
2052            Err(e) => worker_response_error(req_id, format!("Could not update HTTP listener: {e}")),
2053        }
2054    }
2055
2056    fn notify_update_https_listener(
2057        &mut self,
2058        req_id: &str,
2059        patch: UpdateHttpsListenerConfig,
2060    ) -> WorkerResponse {
2061        debug!("{} update https listener {:?}", req_id, patch.address);
2062        match self.https.borrow_mut().update_listener(patch) {
2063            Ok(()) => WorkerResponse::ok(req_id),
2064            Err(e) => {
2065                worker_response_error(req_id, format!("Could not update HTTPS listener: {e}"))
2066            }
2067        }
2068    }
2069
2070    fn notify_update_tcp_listener(
2071        &mut self,
2072        req_id: &str,
2073        patch: UpdateTcpListenerConfig,
2074    ) -> WorkerResponse {
2075        debug!("{} update tcp listener {:?}", req_id, patch.address);
2076        match self.tcp.borrow_mut().update_listener(patch) {
2077            Ok(()) => WorkerResponse::ok(req_id),
2078            Err(e) => worker_response_error(req_id, format!("Could not update TCP listener: {e}")),
2079        }
2080    }
2081
2082    fn notify_activate_listener(
2083        &mut self,
2084        req_id: &str,
2085        activate: &ActivateListener,
2086    ) -> WorkerResponse {
2087        debug!(
2088            "{} activate {:?} listener {:?}",
2089            req_id, activate.proxy, activate
2090        );
2091
2092        let address: std::net::SocketAddr = activate.address.into();
2093
2094        match ListenerType::try_from(activate.proxy) {
2095            Ok(ListenerType::Http) => {
2096                let listener = self
2097                    .scm_listeners
2098                    .as_mut()
2099                    .and_then(|s| s.get_http(&address))
2100                    // SAFETY: `fd` was just received from the supervisor via SCM_RIGHTS
2101                    // (see `command/src/scm_socket.rs`) and is not owned elsewhere — the
2102                    // `ScmListeners` map removes it on `get_http`. Ownership transfers to
2103                    // the mio wrapper, whose `Drop` closes the descriptor.
2104                    .map(|fd| unsafe { MioTcpListener::from_raw_fd(fd) });
2105
2106                let activated_token = self.http.borrow_mut().activate_listener(&address, listener);
2107                match activated_token {
2108                    Ok(token) => {
2109                        self.accept(ListenToken(token.0), Protocol::HTTPListen);
2110                        WorkerResponse::ok(req_id)
2111                    }
2112                    Err(activate_error) => worker_response_error(
2113                        req_id,
2114                        format!("Could not activate HTTP listener: {activate_error}"),
2115                    ),
2116                }
2117            }
2118            Ok(ListenerType::Https) => {
2119                let listener = self
2120                    .scm_listeners
2121                    .as_mut()
2122                    .and_then(|s| s.get_https(&address))
2123                    // SAFETY: `fd` was just received from the supervisor via SCM_RIGHTS
2124                    // (see `command/src/scm_socket.rs`) and is not owned elsewhere — the
2125                    // `ScmListeners` map removes it on `get_https`. Ownership transfers to
2126                    // the mio wrapper, whose `Drop` closes the descriptor.
2127                    .map(|fd| unsafe { MioTcpListener::from_raw_fd(fd) });
2128
2129                let activated_token = self
2130                    .https
2131                    .borrow_mut()
2132                    .activate_listener(&address, listener);
2133                match activated_token {
2134                    Ok(token) => {
2135                        self.accept(ListenToken(token.0), Protocol::HTTPSListen);
2136                        WorkerResponse::ok(req_id)
2137                    }
2138                    Err(activate_error) => worker_response_error(
2139                        req_id,
2140                        format!("Could not activate HTTPS listener: {activate_error}"),
2141                    ),
2142                }
2143            }
2144            Ok(ListenerType::Tcp) => {
2145                let listener = self
2146                    .scm_listeners
2147                    .as_mut()
2148                    .and_then(|s| s.get_tcp(&address))
2149                    // SAFETY: `fd` was just received from the supervisor via SCM_RIGHTS
2150                    // (see `command/src/scm_socket.rs`) and is not owned elsewhere — the
2151                    // `ScmListeners` map removes it on `get_tcp`. Ownership transfers to
2152                    // the mio wrapper, whose `Drop` closes the descriptor.
2153                    .map(|fd| unsafe { MioTcpListener::from_raw_fd(fd) });
2154
2155                let listener_token = self.tcp.borrow_mut().activate_listener(&address, listener);
2156                match listener_token {
2157                    Ok(token) => {
2158                        self.accept(ListenToken(token.0), Protocol::TCPListen);
2159                        WorkerResponse::ok(req_id)
2160                    }
2161                    Err(activate_error) => worker_response_error(
2162                        req_id,
2163                        format!("Could not activate TCP listener: {activate_error}"),
2164                    ),
2165                }
2166            }
2167            Err(_) => worker_response_error(req_id, "Wrong variant for ListenerType on request"),
2168        }
2169    }
2170
2171    fn notify_deactivate_listener(
2172        &mut self,
2173        req_id: &str,
2174        deactivate: &DeactivateListener,
2175    ) -> WorkerResponse {
2176        debug!(
2177            "{} deactivate {:?} listener {:?}",
2178            req_id, deactivate.proxy, deactivate
2179        );
2180
2181        let address: std::net::SocketAddr = deactivate.address.into();
2182
2183        match ListenerType::try_from(deactivate.proxy) {
2184            Ok(ListenerType::Http) => {
2185                let (token, mut listener) = match self.http.borrow_mut().give_back_listener(address)
2186                {
2187                    Ok((token, listener)) => (token, listener),
2188                    Err(e) => {
2189                        return worker_response_error(
2190                            req_id,
2191                            format!(
2192                                "Couldn't deactivate HTTP listener at address {address:?}: {e}"
2193                            ),
2194                        );
2195                    }
2196                };
2197
2198                if let Err(e) = self.poll.registry().deregister(&mut listener) {
2199                    error!(
2200                        "error deregistering HTTP listen socket({:?}): {:?}",
2201                        deactivate, e
2202                    );
2203                }
2204
2205                {
2206                    let mut sessions = self.sessions.borrow_mut();
2207                    if sessions.slab.contains(token.0) {
2208                        sessions.slab.remove(token.0);
2209                        info!("removed listen token {:?}", token);
2210                    }
2211                }
2212
2213                if deactivate.to_scm {
2214                    self.unblock_scm_socket();
2215                    let listeners = Listeners {
2216                        http: vec![(address, listener.as_raw_fd())],
2217                        tls: vec![],
2218                        tcp: vec![],
2219                    };
2220                    info!("sending HTTP listener: {:?}", listeners);
2221                    let res = self.scm.send_listeners(&listeners);
2222
2223                    self.block_scm_socket();
2224
2225                    info!("sent HTTP listener: {:?}", res);
2226                }
2227                WorkerResponse::ok(req_id)
2228            }
2229            Ok(ListenerType::Https) => {
2230                let (token, mut listener) = match self
2231                    .https
2232                    .borrow_mut()
2233                    .give_back_listener(address)
2234                {
2235                    Ok((token, listener)) => (token, listener),
2236                    Err(e) => {
2237                        return worker_response_error(
2238                            req_id,
2239                            format!(
2240                                "Couldn't deactivate HTTPS listener at address {address:?}: {e}",
2241                            ),
2242                        );
2243                    }
2244                };
2245                if let Err(e) = self.poll.registry().deregister(&mut listener) {
2246                    error!(
2247                        "error deregistering HTTPS listen socket({:?}): {:?}",
2248                        deactivate, e
2249                    );
2250                }
2251                if self.sessions.borrow().slab.contains(token.0) {
2252                    self.sessions.borrow_mut().slab.remove(token.0);
2253                    info!("removed listen token {:?}", token);
2254                }
2255
2256                if deactivate.to_scm {
2257                    self.unblock_scm_socket();
2258                    let listeners = Listeners {
2259                        http: vec![],
2260                        tls: vec![(address, listener.as_raw_fd())],
2261                        tcp: vec![],
2262                    };
2263                    info!("sending HTTPS listener: {:?}", listeners);
2264                    let res = self.scm.send_listeners(&listeners);
2265
2266                    self.block_scm_socket();
2267
2268                    info!("sent HTTPS listener: {:?}", res);
2269                }
2270                WorkerResponse::ok(req_id)
2271            }
2272            Ok(ListenerType::Tcp) => {
2273                let (token, mut listener) = match self.tcp.borrow_mut().give_back_listener(address)
2274                {
2275                    Ok((token, listener)) => (token, listener),
2276                    Err(e) => {
2277                        return worker_response_error(
2278                            req_id,
2279                            format!(
2280                                "Could not deactivate TCP listener at address {address:?}: {e}"
2281                            ),
2282                        );
2283                    }
2284                };
2285
2286                if let Err(e) = self.poll.registry().deregister(&mut listener) {
2287                    error!(
2288                        "error deregistering TCP listen socket({:?}): {:?}",
2289                        deactivate, e
2290                    );
2291                }
2292                if self.sessions.borrow().slab.contains(token.0) {
2293                    self.sessions.borrow_mut().slab.remove(token.0);
2294                    info!("removed listen token {:?}", token);
2295                }
2296
2297                if deactivate.to_scm {
2298                    self.unblock_scm_socket();
2299                    let listeners = Listeners {
2300                        http: vec![],
2301                        tls: vec![],
2302                        tcp: vec![(address, listener.as_raw_fd())],
2303                    };
2304                    info!("sending TCP listener: {:?}", listeners);
2305                    let res = self.scm.send_listeners(&listeners);
2306
2307                    self.block_scm_socket();
2308
2309                    info!("sent TCP listener: {:?}", res);
2310                }
2311                WorkerResponse::ok(req_id)
2312            }
2313            Err(_) => worker_response_error(req_id, "Wrong variant for ListenerType on request"),
2314        }
2315    }
2316
2317    /// Send all socket addresses and file descriptors of all proxies, via the scm socket
2318    pub fn return_listen_sockets(&mut self) -> Result<(), ScmSocketError> {
2319        self.unblock_scm_socket();
2320
2321        let mut http_listeners = self.http.borrow_mut().give_back_listeners();
2322        for &mut (_, ref mut sock) in http_listeners.iter_mut() {
2323            if let Err(e) = self.poll.registry().deregister(sock) {
2324                error!(
2325                    "error deregistering HTTP listen socket({:?}): {:?}",
2326                    sock, e
2327                );
2328            }
2329        }
2330
2331        let mut https_listeners = self.https.borrow_mut().give_back_listeners();
2332        for &mut (_, ref mut sock) in https_listeners.iter_mut() {
2333            if let Err(e) = self.poll.registry().deregister(sock) {
2334                error!(
2335                    "error deregistering HTTPS listen socket({:?}): {:?}",
2336                    sock, e
2337                );
2338            }
2339        }
2340
2341        let mut tcp_listeners = self.tcp.borrow_mut().give_back_listeners();
2342        for &mut (_, ref mut sock) in tcp_listeners.iter_mut() {
2343            if let Err(e) = self.poll.registry().deregister(sock) {
2344                error!("error deregistering TCP listen socket({:?}): {:?}", sock, e);
2345            }
2346        }
2347
2348        // use as_raw_fd because the listeners should be dropped after sending them
2349        let listeners = Listeners {
2350            http: http_listeners
2351                .iter()
2352                .map(|(addr, listener)| (*addr, listener.as_raw_fd()))
2353                .collect(),
2354            tls: https_listeners
2355                .iter()
2356                .map(|(addr, listener)| (*addr, listener.as_raw_fd()))
2357                .collect(),
2358            tcp: tcp_listeners
2359                .iter()
2360                .map(|(addr, listener)| (*addr, listener.as_raw_fd()))
2361                .collect(),
2362        };
2363        info!("sending default listeners: {:?}", listeners);
2364        let res = self.scm.send_listeners(&listeners);
2365
2366        self.block_scm_socket();
2367
2368        info!("sent default listeners: {:?}", res);
2369        res
2370    }
2371
2372    fn block_scm_socket(&mut self) {
2373        if let Err(e) = self.scm.set_blocking(true) {
2374            error!("Could not block scm socket: {}", e);
2375        }
2376    }
2377
2378    fn unblock_scm_socket(&mut self) {
2379        if let Err(e) = self.scm.set_blocking(false) {
2380            error!("Could not unblock scm socket: {}", e);
2381        }
2382    }
2383
2384    pub fn to_session(&self, token: Token) -> SessionToken {
2385        SessionToken(token.0)
2386    }
2387
2388    pub fn from_session(&self, token: SessionToken) -> Token {
2389        Token(token.0)
2390    }
2391
2392    pub fn accept(&mut self, token: ListenToken, protocol: Protocol) {
2393        // Per-protocol counter key. Keeping the namespace static (3 keys +
2394        // aggregate) is a deliberate cardinality cap: per-listener-address
2395        // labelling would require runtime `Box::leak` because `incr!` takes
2396        // `&'static str`, and listener addresses can be reconfigured at
2397        // runtime by the control plane. Operators wanting per-listener
2398        // attribution should correlate with the listener-protocol breakdown
2399        // below.
2400        //
2401        // Non-listen protocols reach this code only on an invariant break
2402        // upstream (`ready()` dispatched a non-listen `Protocol` to
2403        // `accept()`). Log and return rather than panicking — defense in
2404        // depth on the accept path, which is process-fatal if it aborts.
2405        let (proto_key, accepted_protocol) = match protocol {
2406            Protocol::TCPListen => ("listener.accepted.tcp", Protocol::TCPListen),
2407            Protocol::HTTPListen => ("listener.accepted.http", Protocol::HTTPListen),
2408            Protocol::HTTPSListen => ("listener.accepted.https", Protocol::HTTPSListen),
2409            other => {
2410                warn!(
2411                    "accept() called with non-listen protocol {:?} on token {:?}; skipping",
2412                    other, token
2413                );
2414                return;
2415            }
2416        };
2417
2418        loop {
2419            let result = match accepted_protocol {
2420                Protocol::TCPListen => self.tcp.borrow_mut().accept(token),
2421                Protocol::HTTPListen => self.http.borrow_mut().accept(token),
2422                Protocol::HTTPSListen => self.https.borrow_mut().accept(token),
2423                // The outer match populates `accepted_protocol` only with the
2424                // three listen variants and returns early otherwise — this
2425                // arm is structurally unreachable.
2426                other => unreachable!(
2427                    "accept dispatch reached non-listen protocol {:?} after outer guard",
2428                    other
2429                ),
2430            };
2431            match result {
2432                Ok(sock) => {
2433                    // peer_addr() is one syscall (`getpeername(2)`) and runs
2434                    // exactly once per accepted socket. It can fail if the
2435                    // peer raced to close — recorded as `None` and silently
2436                    // skipped for the per-source counter.
2437                    let peer = sock.peer_addr().ok();
2438                    incr!(names::listener::ACCEPTED_TOTAL);
2439                    incr!(proto_key);
2440                    if let Some(peer_addr) = peer.as_ref() {
2441                        incr!(per_source_bucket(peer_addr));
2442                    }
2443                    self.accept_queue.push_back((
2444                        sock,
2445                        token,
2446                        accepted_protocol,
2447                        Instant::now(),
2448                        peer,
2449                    ));
2450                }
2451                Err(AcceptError::WouldBlock) => {
2452                    self.accept_ready.remove(&token);
2453                    break;
2454                }
2455                Err(other) => {
2456                    error!(
2457                        "error accepting {:?} sockets: {:?}",
2458                        accepted_protocol, other
2459                    );
2460                    self.accept_ready.remove(&token);
2461                    break;
2462                }
2463            }
2464        }
2465
2466        gauge!(names::accept_queue::CONNECTIONS, self.accept_queue.len());
2467    }
2468
2469    pub fn create_sessions(&mut self) {
2470        while let Some((sock, token, protocol, timestamp, _peer)) = self.accept_queue.pop_back() {
2471            let wait_time = Instant::now() - timestamp;
2472            time!(names::accept_queue::WAIT_TIME, wait_time.as_millis());
2473            if wait_time > self.accept_queue_timeout {
2474                incr!(names::accept_queue::TIMEOUT);
2475                continue;
2476            }
2477
2478            if !self.sessions.borrow_mut().check_limits() {
2479                // The socket we just popped will not be served, plus every
2480                // remaining queued socket below `break` will time out.
2481                // `listener.connection_capped` counts the popped socket so
2482                // the counter aligns with `check_limits` invocations rather
2483                // than with queue depth at the time of refusal.
2484                incr!(names::listener::CONNECTION_CAPPED);
2485
2486                if !self.evict_on_queue_full {
2487                    break;
2488                }
2489
2490                // Skip eviction during graceful shutdown — defeats the
2491                // shutting_down semantics and is wasted work since the
2492                // worker is winding down anyway.
2493                if self.shutting_down.is_some() {
2494                    break;
2495                }
2496
2497                // Evict 1% of `max_connections` per iteration. Conservative
2498                // ratio: large enough to make meaningful progress clearing
2499                // the accept queue, small enough to limit collateral damage
2500                // to active sessions. The cap loop re-checks limits after
2501                // each eviction round, so multiple rounds can run if the
2502                // queue has many pending connections. Decoupled from
2503                // `slab_entries_per_connection` (which only sizes the slab,
2504                // not `max_connections`).
2505                let to_evict = (self.sessions.borrow().max_connections / 100).max(1);
2506                let evicted = self.evict_least_active_sessions(to_evict);
2507                if evicted == 0 {
2508                    // Informational, not an invariant break: the worker may
2509                    // be at boot, or every active session is a system
2510                    // protocol (Channel/Metrics/Timer/listeners) and is
2511                    // ineligible. Stay at warn so operators see it in info-
2512                    // level production logs.
2513                    warn!("evict_on_queue_full enabled but no candidate sessions to evict");
2514                    break;
2515                }
2516
2517                count!(names::sessions::EVICTED, evicted as i64);
2518                warn!(
2519                    "evicted {} least recently active sessions to make room",
2520                    evicted
2521                );
2522
2523                if !self.sessions.borrow_mut().check_limits() {
2524                    break;
2525                }
2526            }
2527
2528            //FIXME: check the timestamp
2529            //TODO: create_session should return the session and
2530            // the server should insert it in the the SessionManager
2531            match protocol {
2532                Protocol::TCPListen => {
2533                    let proxy = self.tcp.clone();
2534                    if self
2535                        .tcp
2536                        .borrow_mut()
2537                        .create_session(sock, token, wait_time, proxy)
2538                        .is_err()
2539                    {
2540                        break;
2541                    }
2542                }
2543                Protocol::HTTPListen => {
2544                    let proxy = self.http.clone();
2545                    if self
2546                        .http
2547                        .borrow_mut()
2548                        .create_session(sock, token, wait_time, proxy)
2549                        .is_err()
2550                    {
2551                        break;
2552                    }
2553                }
2554                Protocol::HTTPSListen => {
2555                    if self
2556                        .https
2557                        .borrow_mut()
2558                        .create_session(sock, token, wait_time, self.https.clone())
2559                        .is_err()
2560                    {
2561                        break;
2562                    }
2563                }
2564                _ => panic!("should not call accept() on a HTTP, HTTPS or TCP session"),
2565            };
2566            self.sessions.borrow_mut().incr();
2567        }
2568
2569        gauge!(names::accept_queue::CONNECTIONS, self.accept_queue.len());
2570    }
2571
2572    pub fn ready(&mut self, token: Token, events: Ready) {
2573        trace!("PROXY\t{:?} got events: {:?}", token, events);
2574
2575        let session_token = token.0;
2576        if self.sessions.borrow().slab.contains(session_token) {
2577            //info!("sessions contains {:?}", session_token);
2578            let protocol = self.sessions.borrow().slab[session_token]
2579                .borrow()
2580                .protocol();
2581            //info!("protocol: {:?}", protocol);
2582            match protocol {
2583                Protocol::HTTPListen | Protocol::HTTPSListen | Protocol::TCPListen => {
2584                    //info!("PROTOCOL IS LISTEN");
2585                    if events.is_readable() {
2586                        self.accept_ready.insert(ListenToken(token.0));
2587                        if self.sessions.borrow().can_accept {
2588                            self.accept(ListenToken(token.0), protocol);
2589                        }
2590                        return;
2591                    }
2592
2593                    if events.is_writable() {
2594                        error!(
2595                            "received writable for listener {:?}, this should not happen",
2596                            token
2597                        );
2598                        return;
2599                    }
2600
2601                    if events.is_hup() {
2602                        error!("should not happen: server {:?} closed", token);
2603                        return;
2604                    }
2605
2606                    unreachable!();
2607                }
2608                _ => {}
2609            }
2610
2611            let session = self.sessions.borrow_mut().slab[session_token].clone();
2612            session.borrow_mut().update_readiness(token, events);
2613            if session.borrow_mut().ready(session.clone()) {
2614                debug!(
2615                    "Server killing session from ready: token={:?}, protocol={:?}, events={:?}",
2616                    token, protocol, events
2617                );
2618                self.kill_session(session);
2619            }
2620        }
2621    }
2622
2623    pub fn timeout(&mut self, token: Token) {
2624        trace!("PROXY\t{:?} got timeout", token);
2625
2626        let session_token = token.0;
2627        if self.sessions.borrow().slab.contains(session_token) {
2628            let session = self.sessions.borrow_mut().slab[session_token].clone();
2629            if session.borrow_mut().timeout(token) {
2630                debug!(
2631                    "Server killing session from timeout: token={:?}, protocol={:?}",
2632                    token,
2633                    session.borrow().protocol()
2634                );
2635                self.kill_session(session);
2636            }
2637        }
2638    }
2639
2640    pub fn handle_remaining_readiness(&mut self) {
2641        // try to accept again after handling all session events,
2642        // since we might have released a few session slots
2643        if self.sessions.borrow().can_accept && !self.accept_ready.is_empty() {
2644            while let Some(token) = self
2645                .accept_ready
2646                .iter()
2647                .next()
2648                .map(|token| ListenToken(token.0))
2649            {
2650                let protocol = self.sessions.borrow().slab[token.0].borrow().protocol();
2651                self.accept(token, protocol);
2652                if !self.sessions.borrow().can_accept || self.accept_ready.is_empty() {
2653                    break;
2654                }
2655            }
2656        }
2657    }
2658    fn block_channel(&mut self) {
2659        if let Err(e) = self.channel.blocking() {
2660            error!("Could not block channel: {}", e);
2661        }
2662    }
2663    fn unblock_channel(&mut self) {
2664        if let Err(e) = self.channel.nonblocking() {
2665            error!("Could not block channel: {}", e);
2666        }
2667    }
2668
2669    /// Evict the `count` least-recently-active non-listener sessions and
2670    /// return how many tokens were enqueued for shutdown. Used by
2671    /// `create_sessions` when the accept queue is saturated and the
2672    /// `evict_on_queue_full` knob is set.
2673    ///
2674    /// Uses `select_nth_unstable_by_key` (introselect, O(n) average) to
2675    /// partition the oldest `count` sessions in-place rather than a full
2676    /// O(n log n) sort. The candidate `Vec` is unavoidable because of
2677    /// `RefCell` borrow rules — the immutable borrow on `self.sessions`
2678    /// must drop before `shut_down_sessions_by_frontend_tokens` can take
2679    /// its mutable borrow.
2680    fn evict_least_active_sessions(&self, count: usize) -> usize {
2681        if count == 0 {
2682            return 0;
2683        }
2684
2685        let tokens = {
2686            let sessions = self.sessions.borrow();
2687            let mut candidates: Vec<(Token, Instant)> = sessions
2688                .slab
2689                .iter()
2690                .filter(|(_, session)| {
2691                    !matches!(
2692                        session.borrow().protocol(),
2693                        Protocol::HTTPListen
2694                            | Protocol::HTTPSListen
2695                            | Protocol::TCPListen
2696                            | Protocol::Channel
2697                            | Protocol::Metrics
2698                            | Protocol::Timer
2699                    )
2700                })
2701                .map(|(_, session)| {
2702                    let s = session.borrow();
2703                    (s.frontend_token(), s.last_event())
2704                })
2705                .collect();
2706
2707            // Early return is load-bearing: the `pivot` computation below
2708            // does `count.min(len) - 1`, which underflows on empty input.
2709            if candidates.is_empty() {
2710                return 0;
2711            }
2712
2713            let pivot = count.min(candidates.len()) - 1;
2714            candidates.select_nth_unstable_by_key(pivot, |&(_, last_event)| last_event);
2715
2716            candidates[..=pivot]
2717                .iter()
2718                .map(|&(token, _)| token)
2719                .collect::<HashSet<Token>>()
2720        };
2721
2722        let evicted = tokens.len();
2723        self.shut_down_sessions_by_frontend_tokens(tokens);
2724        evicted
2725    }
2726}
2727
2728/// log the error together with the request id
2729/// create a WorkerResponse
2730fn worker_response_error<S: ToString, T: ToString>(request_id: S, error: T) -> WorkerResponse {
2731    error!(
2732        "error on request {}, {}",
2733        request_id.to_string(),
2734        error.to_string()
2735    );
2736    WorkerResponse::error(request_id, error)
2737}
2738
2739pub struct ListenSession {
2740    pub protocol: Protocol,
2741}
2742
2743impl ProxySession for ListenSession {
2744    fn last_event(&self) -> Instant {
2745        Instant::now()
2746    }
2747
2748    fn print_session(&self) {}
2749
2750    fn frontend_token(&self) -> Token {
2751        Token(0)
2752    }
2753
2754    fn protocol(&self) -> Protocol {
2755        self.protocol
2756    }
2757
2758    fn ready(&mut self, _session: Rc<RefCell<dyn ProxySession>>) -> SessionIsToBeClosed {
2759        false
2760    }
2761
2762    fn shutting_down(&mut self) -> SessionIsToBeClosed {
2763        false
2764    }
2765
2766    fn update_readiness(&mut self, _token: Token, _events: Ready) {}
2767
2768    fn close(&mut self) {}
2769
2770    fn timeout(&mut self, _token: Token) -> SessionIsToBeClosed {
2771        error!(
2772            "called ProxySession::timeout(token={:?}, time) on ListenSession {{ protocol: {:?} }}",
2773            _token, self.protocol
2774        );
2775        false
2776    }
2777}
2778
2779#[cfg(test)]
2780mod accept_telemetry_tests {
2781    use super::*;
2782
2783    /// Two IPv4 addresses sharing the same /24 must hash to the same bucket;
2784    /// the masking logic guarantees this regardless of the host octet.
2785    #[test]
2786    fn per_source_bucket_collapses_ipv4_slash24() {
2787        let a: SocketAddr = "203.0.113.5:1234".parse().unwrap();
2788        let b: SocketAddr = "203.0.113.250:9999".parse().unwrap();
2789        assert_eq!(
2790            per_source_bucket(&a),
2791            per_source_bucket(&b),
2792            "addresses in the same /24 must land in the same bucket"
2793        );
2794    }
2795
2796    /// Two IPv6 addresses sharing the same /48 must hash to the same bucket.
2797    #[test]
2798    fn per_source_bucket_collapses_ipv6_slash48() {
2799        let a: SocketAddr = "[2001:db8:1234::1]:443".parse().unwrap();
2800        let b: SocketAddr = "[2001:db8:1234:abcd::ffff]:8443".parse().unwrap();
2801        assert_eq!(
2802            per_source_bucket(&a),
2803            per_source_bucket(&b),
2804            "addresses in the same /48 must land in the same bucket"
2805        );
2806    }
2807
2808    /// Every bucket label must be one of `PER_SOURCE_BUCKETS` precomputed
2809    /// statics — the cardinality cap is the load-bearing property.
2810    #[test]
2811    fn per_source_bucket_keys_are_bounded() {
2812        assert_eq!(PER_SOURCE_BUCKET_KEYS.len(), PER_SOURCE_BUCKETS);
2813        for (i, key) in PER_SOURCE_BUCKET_KEYS.iter().enumerate() {
2814            let expected = format!("client.connect.per_source.bucket_{i:03}");
2815            assert_eq!(*key, expected.as_str());
2816        }
2817    }
2818
2819    /// A modest sample across distinct /24 prefixes should hit a healthy
2820    /// number of distinct buckets — guards against the hash collapsing.
2821    #[test]
2822    fn per_source_bucket_distributes_distinct_subnets() {
2823        let mut hits = std::collections::HashSet::new();
2824        for i in 0..200u8 {
2825            let addr: SocketAddr = format!("10.0.{i}.42:80").parse().unwrap();
2826            hits.insert(per_source_bucket(&addr));
2827        }
2828        // With 200 distinct /24 prefixes hashed into 256 buckets we expect
2829        // many distinct labels — assert a conservative lower bound that
2830        // tolerates birthday collisions.
2831        assert!(
2832            hits.len() >= 100,
2833            "expected at least 100 distinct buckets across 200 /24s, got {}",
2834            hits.len()
2835        );
2836    }
2837}
2838
2839#[cfg(test)]
2840mod eviction_tests {
2841    use std::collections::HashSet;
2842    use std::time::{Duration, Instant};
2843
2844    use mio::Token;
2845
2846    /// `select_nth_unstable_by_key` partitions in O(n) so that the first
2847    /// `pivot + 1` entries are the `pivot + 1` smallest by key. This guards
2848    /// against a future refactor swapping the comparator orientation.
2849    #[test]
2850    fn select_nth_finds_oldest_sessions() {
2851        let now = Instant::now();
2852        let mut candidates = [
2853            (Token(1), now - Duration::from_secs(10)), // 10s old
2854            (Token(2), now - Duration::from_secs(50)), // 50s old (oldest)
2855            (Token(3), now - Duration::from_secs(5)),  // 5s old (newest)
2856            (Token(4), now - Duration::from_secs(30)), // 30s old
2857            (Token(5), now - Duration::from_secs(20)), // 20s old
2858        ];
2859
2860        let count = 2;
2861        let pivot = count.min(candidates.len()) - 1;
2862        candidates.select_nth_unstable_by_key(pivot, |&(_, last_event)| last_event);
2863
2864        let selected: HashSet<Token> = candidates[..=pivot]
2865            .iter()
2866            .map(|&(token, _)| token)
2867            .collect();
2868
2869        assert_eq!(selected.len(), 2);
2870        assert!(
2871            selected.contains(&Token(2)),
2872            "should contain 50s-old session"
2873        );
2874        assert!(
2875            selected.contains(&Token(4)),
2876            "should contain 30s-old session"
2877        );
2878    }
2879
2880    /// When `count` exceeds available candidates, the pivot collapses to
2881    /// `len - 1` so we evict everything; this test pins that behaviour
2882    /// against a future refactor that might silently truncate.
2883    #[test]
2884    fn select_nth_with_count_exceeding_candidates() {
2885        let now = Instant::now();
2886        let mut candidates = [(Token(1), now - Duration::from_secs(10))];
2887
2888        let count = 5;
2889        let pivot = count.min(candidates.len()) - 1;
2890        candidates.select_nth_unstable_by_key(pivot, |&(_, last_event)| last_event);
2891
2892        let selected: HashSet<Token> = candidates[..=pivot]
2893            .iter()
2894            .map(|&(token, _)| token)
2895            .collect();
2896
2897        assert_eq!(selected.len(), 1);
2898        assert!(selected.contains(&Token(1)));
2899    }
2900}