Skip to main content

sozu_lib/
udp.rs

1//! UDP load-balancing I/O shell + mio event-loop wiring (issue #1273).
2//!
3//! This module is the **impure** half of the UDP datapath: it owns every
4//! syscall, the buffer copies, the per-flow connected upstream sockets, the
5//! timer arming, the `BackendMap`/health/metrics edges, and the slab/token
6//! bookkeeping. It drives the pure sans-io core in
7//! [`crate::protocol::udp`](crate::protocol::udp) (the `UdpManager` /
8//! `UdpFlow` two-level split) through `ManagerInput` / `Output`.
9//!
10//! Architecture (mirrors `tcp.rs`, but UDP is **one-listener-many-flows**):
11//! - [`UdpListener`] wraps the single `mio::net::UdpSocket` a listener binds.
12//!   There is **no accept loop** — a readable event means "datagrams waiting",
13//!   not "a new connection".
14//! - [`UdpProxy`] holds the listeners, the per-listener [`UdpManager`]s, the
15//!   shared `BackendMap`, the session slab and the registry. It does **not**
16//!   implement `ProxyConfiguration` / `L7Proxy` (their signatures are
17//!   `TcpStream`-bound); the server calls its inherent
18//!   [`notify`](UdpProxy::notify) directly.
19//! - [`UdpListenerSession`] is the `ProxySession` the server's generic
20//!   readiness path drives. One session backs one listener; its
21//!   `update_readiness` demuxes by token (listener-token = client recv;
22//!   upstream-token = backend recv). It owns the per-flow connected sockets and
23//!   the `upstream_token -> FlowId` map.
24//!
25//! UDP never goes through `accept()` / `create_session()`: a `Protocol::UDP`
26//! listener readable event falls through `Server::ready`'s generic arm into
27//! `ProxySession::ready`.
28//!
29//! Long-form lifecycle (datapath, NAT return, teardown, control plane,
30//! hardening): `lib/src/protocol/udp/LIFECYCLE.md`.
31
32use std::{
33    cell::RefCell,
34    collections::{BTreeMap, HashMap, VecDeque, hash_map::Entry},
35    io::ErrorKind,
36    net::SocketAddr,
37    os::unix::io::AsRawFd,
38    rc::Rc,
39    time::{Duration, Instant},
40};
41
42use mio::{Interest, Registry, Token, net::UdpSocket, unix::SourceFd};
43use sozu_command::{
44    logging::ansi_palette,
45    proto::command::{
46        Cluster, LoadBalancingAlgorithms, LoadMetric, RequestUdpFrontend, UdpAffinityKey,
47        UdpListenerConfig, UpdateUdpListenerConfig, WorkerRequest, WorkerResponse,
48        request::RequestType,
49    },
50};
51
52use crate::metrics::names;
53use crate::{
54    CachedTags, ListenerError, ListenerHandler, Protocol, ProxyError, ProxySession,
55    SessionIsToBeClosed,
56    backends::BackendMap,
57    pool::Pool,
58    protocol::udp::{
59        CloseReason, ClusterConfig, ConfigEvent, DropReason, FlowId, ManagerInput, MetricEvent,
60        Output, UdpManager,
61    },
62    server::{SessionManager, TIMER},
63    socket::{udp_bind, udp_connect},
64    sozu_command::{ready::Ready, state::ClusterId},
65};
66
67mod health;
68pub use health::UdpHealthChecker;
69
70/// Per-session log envelope (tag `UDP`). The colored output uses the unified
71/// scheme: bold bright-white protocol label, light-grey keyword, gray keys and
72/// bright-white values. Honours the `colored` flag via [`ansi_palette`].
73macro_rules! log_context {
74    ($self:expr) => {{
75        let (open, reset, grey, gray, white) = ansi_palette();
76        format!(
77            "[- - - -]\t{open}UDP{reset}\t{grey}Listener{reset}({gray}token{reset}={white}{token}{reset}, {gray}address{reset}={white}{address}{reset})\t >>>",
78            open = open,
79            reset = reset,
80            grey = grey,
81            gray = gray,
82            white = white,
83            token = $self.listener_token.0,
84            address = $self.address,
85        )
86    }};
87}
88
89/// Module-level prefix for [`UdpProxy`] callbacks (notify, listener add/remove,
90/// soft/hard stop) which own a listener/token map but have no per-session
91/// token. Produces a bold bright-white `UDP` label in colored mode.
92macro_rules! log_module_context {
93    () => {{
94        let (open, reset, _, _, _) = sozu_command::logging::ansi_palette();
95        format!("{open}UDP{reset}\t >>>", open = open, reset = reset)
96    }};
97}
98
99/// Per-flow log envelope (tag `UDP-FLOW`). Renders the flow's stable id plus
100/// client/backend addresses so flow lines stay filterable.
101macro_rules! log_flow_context {
102    ($flow:expr, $client:expr, $backend:expr) => {{
103        let (open, reset, grey, gray, white) = sozu_command::logging::ansi_palette();
104        format!(
105            "[- - - -]\t{open}UDP-FLOW{reset}\t{grey}Flow{reset}({gray}id{reset}={white}{id}{reset}, {gray}client{reset}={white}{client}{reset}, {gray}backend{reset}={white}{backend:?}{reset})\t >>>",
106            open = open,
107            reset = reset,
108            grey = grey,
109            gray = gray,
110            white = white,
111            id = $flow,
112            client = $client,
113            backend = $backend,
114        )
115    }};
116}
117
118/// Bound on the per-flow upstream write queue (return path to one backend). A
119/// connected upstream socket only buffers one flow's traffic, so a small cap is
120/// enough to ride out a transient `EWOULDBLOCK` (kernel send buffer momentarily
121/// full) without letting a slow/stalled backend balloon memory. Past the cap we
122/// drop the datagram (`udp.datagrams.dropped.wq_full`) — UDP is best-effort and
123/// the client retries.
124const UPSTREAM_WRITE_QUEUE_CAP: usize = 64;
125/// Bound on the per-listener client-return write queue (replies fanned back to
126/// many clients through the single listener socket). Sized larger than the
127/// per-flow cap because it is shared across every flow's return traffic.
128const CLIENT_WRITE_QUEUE_CAP: usize = 256;
129
130/// What `WriteQueue::drain` should do after a single send attempt: keep
131/// draining, stop because the socket went `WouldBlock` (rearm WRITABLE), or stop
132/// because the send hit a hard error (drop the datagram, keep draining the rest).
133enum SendOutcome {
134    /// The datagram was written; pop it and continue.
135    Sent,
136    /// `WouldBlock`: leave the datagram at the front and stop draining.
137    WouldBlock,
138    /// A hard error (e.g. ECONNREFUSED): drop this datagram + count it, continue.
139    Dropped,
140}
141
142/// A bounded FIFO of datagrams awaiting a writable socket. The egress fast path
143/// (a `send`/`send_to` that succeeds immediately) never touches this; it is only
144/// engaged when the kernel send buffer is full (`WouldBlock`). On overflow the
145/// oldest-still-pending order is preserved and the *new* datagram is dropped
146/// (the queued ones are closer to leaving the socket).
147///
148/// The queue stores `(SocketAddr, Vec<u8>)`: the upstream variant ignores the
149/// address (a connected socket's `send` has an implicit destination) while the
150/// client-return variant uses it as the `send_to` destination. Keeping one type
151/// for both lets the drain loop and the unit tests stay shared.
152struct WriteQueue {
153    queue: VecDeque<(SocketAddr, Vec<u8>)>,
154    cap: usize,
155}
156
157impl WriteQueue {
158    fn new(cap: usize) -> Self {
159        WriteQueue {
160            queue: VecDeque::new(),
161            cap,
162        }
163    }
164
165    fn is_empty(&self) -> bool {
166        self.queue.is_empty()
167    }
168
169    /// Current depth. Only used by the unit tests; the runtime drains by the
170    /// `is_empty` / `drain` return value, not a length read.
171    #[cfg(test)]
172    fn len(&self) -> usize {
173        self.queue.len()
174    }
175
176    /// Enqueue a datagram for later draining. Returns `true` if it was accepted,
177    /// `false` if the queue was at capacity (caller drops + counts `wq_full`).
178    #[must_use]
179    fn push(&mut self, dst: SocketAddr, payload: Vec<u8>) -> bool {
180        if self.queue.len() >= self.cap {
181            return false;
182        }
183        self.queue.push_back((dst, payload));
184        // Invariant: the bounded FIFO never grows past its cap. The guard above
185        // is the only growth site, so depth <= cap holds after every push.
186        debug_assert!(
187            self.queue.len() <= self.cap,
188            "WriteQueue overran its cap: len {} > cap {}",
189            self.queue.len(),
190            self.cap,
191        );
192        true
193    }
194
195    /// Drain in FIFO order, calling `send` for each datagram. Stops on the first
196    /// `WouldBlock` (leaving that datagram queued for the next writable event) or
197    /// when empty. Hard-errored datagrams are popped and counted via
198    /// `SendOutcome::Dropped`. Returns `true` if the queue is now empty (the
199    /// caller can drop WRITABLE interest back to READABLE-only).
200    fn drain<F: FnMut(&SocketAddr, &[u8]) -> SendOutcome>(&mut self, mut send: F) -> bool {
201        while let Some((dst, payload)) = self.queue.front() {
202            match send(dst, payload) {
203                SendOutcome::Sent | SendOutcome::Dropped => {
204                    self.queue.pop_front();
205                }
206                SendOutcome::WouldBlock => break,
207            }
208        }
209        self.queue.is_empty()
210    }
211}
212
213/// One UDP listener: a single `mio::net::UdpSocket` plus its routing config.
214/// Unlike a TCP listener there is no accept loop — a readable event is a batch
215/// of datagrams the session drains to `WouldBlock`.
216pub struct UdpListener {
217    active: SessionIsToBeClosed,
218    address: SocketAddr,
219    cluster_id: Option<String>,
220    config: UdpListenerConfig,
221    socket: Option<UdpSocket>,
222    tags: BTreeMap<String, CachedTags>,
223    token: Token,
224}
225
226impl ListenerHandler for UdpListener {
227    fn get_addr(&self) -> &SocketAddr {
228        &self.address
229    }
230
231    fn get_tags(&self, key: &str) -> Option<&CachedTags> {
232        self.tags.get(key)
233    }
234
235    fn set_tags(&mut self, key: String, tags: Option<BTreeMap<String, String>>) {
236        match tags {
237            Some(tags) => self.tags.insert(key, CachedTags::new(tags)),
238            None => self.tags.remove(&key),
239        };
240    }
241
242    fn protocol(&self) -> Protocol {
243        Protocol::UDP
244    }
245
246    fn public_address(&self) -> SocketAddr {
247        self.config
248            .public_address
249            .map(|addr| addr.into())
250            .unwrap_or(self.address)
251    }
252}
253
254impl UdpListener {
255    fn new(config: UdpListenerConfig, token: Token) -> Result<UdpListener, ListenerError> {
256        Ok(UdpListener {
257            cluster_id: None,
258            socket: None,
259            token,
260            address: config.address.into(),
261            config,
262            active: false,
263            tags: BTreeMap::new(),
264        })
265    }
266
267    /// Bind (or adopt an SCM-passed) socket and register it `READABLE`. The
268    /// READABLE registration is what drives `Server::ready` for this listener —
269    /// there is no accept path.
270    pub fn activate(
271        &mut self,
272        registry: &Registry,
273        udp_socket: Option<UdpSocket>,
274    ) -> Result<Token, ProxyError> {
275        if self.active {
276            return Ok(self.token);
277        }
278
279        let mut socket = match udp_socket {
280            Some(socket) => socket,
281            None => {
282                let address: SocketAddr = self.config.address.into();
283                udp_bind(address).map_err(|e| ProxyError::BindToSocket(address, e))?
284            }
285        };
286
287        registry
288            .register(&mut socket, self.token, Interest::READABLE)
289            .map_err(ProxyError::RegisterListener)?;
290
291        self.socket = Some(socket);
292        self.active = true;
293        Ok(self.token)
294    }
295
296    /// Apply a partial-update patch to this UDP listener's live config. Fields
297    /// absent in the patch (`None`) are preserved.
298    pub fn update_config(&mut self, patch: &UpdateUdpListenerConfig) {
299        if let Some(v) = patch.public_address {
300            self.config.public_address = Some(v);
301        }
302        if let Some(v) = patch.front_timeout {
303            self.config.front_timeout = v;
304        }
305        if let Some(v) = patch.back_timeout {
306            self.config.back_timeout = v;
307        }
308        if let Some(v) = patch.max_rx_datagram_size {
309            self.config.max_rx_datagram_size = v;
310        }
311        if let Some(v) = patch.max_flows {
312            self.config.max_flows = v;
313        }
314    }
315}
316
317/// The UDP proxy. Holds the listeners, one `UdpManager` per listener token, the
318/// shared `BackendMap`, the session slab and a cloned registry. Does NOT
319/// implement `ProxyConfiguration` / `L7Proxy`; the server drives it through the
320/// inherent [`notify`](Self::notify) plus the activate/give-back helpers.
321pub struct UdpProxy {
322    fronts: HashMap<String, Token>,
323    backends: Rc<RefCell<BackendMap>>,
324    listeners: HashMap<Token, Rc<RefCell<UdpListener>>>,
325    /// The built listener session per listener token. The server inserts the
326    /// same `Rc` into the slab at the listener token; the proxy keeps a clone so
327    /// it can drive per-flow teardown (soft/hard stop) without downcasting the
328    /// slab's `dyn ProxySession`. Cleared on listener removal/stop.
329    listener_sessions: HashMap<Token, Rc<RefCell<UdpListenerSession>>>,
330    /// One sans-io manager per listener token, sharing the listener's lifecycle.
331    managers: HashMap<Token, Rc<RefCell<UdpManager>>>,
332    /// Cluster routing for each listener token (set by the UDP frontend).
333    cluster_for_listener: HashMap<Token, ClusterId>,
334    /// Last `AddCluster`-supplied UDP knobs per cluster id. Cached so a frontend
335    /// added AFTER its cluster still picks up `responses` / `requests` / PPv2 /
336    /// affinity — `AddCluster` and `AddUdpFrontend` arrive in either order, and
337    /// neither must clobber the other's contribution to a manager's
338    /// `ClusterConfig`.
339    cluster_udp_config: HashMap<ClusterId, sozu_command::proto::command::UdpClusterConfig>,
340    registry: Registry,
341    sessions: Rc<RefCell<SessionManager>>,
342    #[allow(dead_code)]
343    pool: Rc<RefCell<Pool>>,
344    /// Fixed hash seed injected once into every manager. It is deliberately the
345    /// same constant on every worker and across restarts so HRW/Maglev affinity
346    /// is reproducible cluster-wide — a per-worker random seed would scatter the
347    /// same flow key onto a different backend on each worker and reshuffle it on
348    /// every restart, defeating the documented affinity-stability contract (see
349    /// [`crate::load_balancing::DEFAULT_HASH_SEED`]).
350    hash_seed: u64,
351    /// Global `max_connections` (from `ServerConfig`). Used to clamp a UDP
352    /// listener's auto `max_flows` so a single listener cannot inflate the
353    /// shared `SessionManager` slab and starve HTTP/TCP.
354    max_connections: usize,
355    /// Global `buffer_size` (from `ServerConfig`). Used to clamp a listener's
356    /// `max_rx_datagram_size` so a hostile `AddUdpListener` cannot allocate a
357    /// multi-GB recv buffer.
358    buffer_size: usize,
359    /// Endpoint-bound active health prober (TCP probe + hysteresis +
360    /// fail-open). Driven from the server event loop via
361    /// [`UdpProxy::health_poll`] / [`UdpProxy::health_ready`].
362    health: UdpHealthChecker,
363}
364
365impl UdpProxy {
366    pub fn new(
367        registry: Registry,
368        sessions: Rc<RefCell<SessionManager>>,
369        pool: Rc<RefCell<Pool>>,
370        backends: Rc<RefCell<BackendMap>>,
371        max_connections: usize,
372        buffer_size: usize,
373    ) -> UdpProxy {
374        // Fixed seed, identical on every worker and across restarts: HRW/Maglev
375        // affinity must be reproducible cluster-wide. A per-worker/per-restart
376        // random seed (as a `process::id()` + `Instant::now()` mix would be)
377        // would route the same flow key to a different backend on each worker
378        // and reshuffle it on every restart. Reuse the LB module's canonical
379        // affinity seed so UDP and the HTTP/TCP affinity hashers agree.
380        let hash_seed = crate::load_balancing::DEFAULT_HASH_SEED;
381        UdpProxy {
382            backends,
383            listeners: HashMap::new(),
384            listener_sessions: HashMap::new(),
385            managers: HashMap::new(),
386            cluster_for_listener: HashMap::new(),
387            cluster_udp_config: HashMap::new(),
388            fronts: HashMap::new(),
389            registry,
390            sessions,
391            pool,
392            hash_seed,
393            max_connections,
394            buffer_size,
395            health: UdpHealthChecker::new(),
396        }
397    }
398
399    /// Drive the UDP health prober one event-loop step (server calls this once
400    /// per iteration, mirroring `HealthChecker::poll`). Non-blocking.
401    pub fn health_poll(&mut self) {
402        let registry = self.registry.try_clone();
403        if let Ok(registry) = registry {
404            self.health.poll(&self.backends, &registry);
405        }
406    }
407
408    /// Record mio readiness for a UDP health-probe socket.
409    pub fn health_ready(&mut self, token: Token) {
410        self.health.ready(token);
411    }
412
413    /// Whether `token` is a UDP health-probe socket this proxy owns.
414    pub fn health_owns_token(&self, token: Token) -> bool {
415        self.health.owns_token(token)
416    }
417
418    pub fn add_listener(
419        &mut self,
420        config: UdpListenerConfig,
421        token: Token,
422    ) -> Result<Token, ProxyError> {
423        match self.listeners.entry(token) {
424            Entry::Vacant(entry) => {
425                let mut config = config;
426                let max_flows = effective_max_flows(config.max_flows, self.max_connections);
427                // Defense in depth: cap the recv-buffer sizing to `buffer_size`
428                // so a hostile `AddUdpListener` (`max_rx_datagram_size =
429                // u32::MAX`) can't allocate a multi-GB buffer. Write the clamped
430                // value back into the stored config so the manager AND the
431                // session's `recv_buf` (sized from `config` in `new()`) agree on
432                // the same bound.
433                let max_rx = clamp_max_rx(config.max_rx_datagram_size as usize, self.buffer_size);
434                config.max_rx_datagram_size = max_rx as u32;
435                let front = Duration::from_secs(u64::from(config.front_timeout));
436                let back = Duration::from_secs(u64::from(config.back_timeout));
437                let listener = UdpListener::new(config, token).map_err(ProxyError::AddListener)?;
438                entry.insert(Rc::new(RefCell::new(listener)));
439                let cluster_cfg = ClusterConfig {
440                    front_timeout: front,
441                    back_timeout: back,
442                    ..Default::default()
443                };
444                self.managers.insert(
445                    token,
446                    Rc::new(RefCell::new(UdpManager::new(
447                        cluster_cfg,
448                        max_flows,
449                        max_rx,
450                        self.hash_seed,
451                    ))),
452                );
453                Ok(token)
454            }
455            _ => Err(ProxyError::ListenerAlreadyPresent),
456        }
457    }
458
459    pub fn remove_listener(&mut self, address: SocketAddr) -> SessionIsToBeClosed {
460        let len = self.listeners.len();
461        let mut removed_tokens = Vec::new();
462        self.listeners.retain(|token, l| {
463            if l.borrow().address == address {
464                removed_tokens.push(*token);
465                false
466            } else {
467                true
468            }
469        });
470        let now = Instant::now();
471        for token in removed_tokens {
472            self.cluster_for_listener.remove(&token);
473            // Drive per-flow teardown THROUGH the manager (emits FlowEvicted +
474            // CloseFlow per flow) BEFORE dropping the manager, so the
475            // active-flows gauge is decremented once per flow and the per-flow
476            // upstream sockets + slab slots are freed. Removing the manager first
477            // would silently leak the gauge by N.
478            if let Some(session) = self.listener_sessions.remove(&token) {
479                session.borrow_mut().close_all_flows(now);
480            }
481            self.managers.remove(&token);
482        }
483        self.listeners.len() < len
484    }
485
486    pub fn activate_listener(
487        &self,
488        addr: &SocketAddr,
489        udp_socket: Option<UdpSocket>,
490    ) -> Result<Token, ProxyError> {
491        let listener = self
492            .listeners
493            .values()
494            .find(|listener| listener.borrow().address == *addr)
495            .ok_or(ProxyError::NoListenerFound(*addr))?;
496
497        listener.borrow_mut().activate(&self.registry, udp_socket)
498    }
499
500    /// Build the [`UdpListenerSession`] that drives this listener's datagrams.
501    /// The server inserts the returned session into the slab **at the listener
502    /// token**, replacing the `ListenSession` placeholder, so the generic
503    /// readiness path reaches `UdpListenerSession::update_readiness`. Returns
504    /// `None` if the listener token is unknown.
505    pub fn build_session(&mut self, token: Token) -> Option<Rc<RefCell<UdpListenerSession>>> {
506        let listener = self.listeners.get(&token)?.clone();
507        let manager = self.managers.get(&token)?.clone();
508        let registry = self.registry.try_clone().ok()?;
509        let session = Rc::new(RefCell::new(UdpListenerSession::new(
510            listener,
511            manager,
512            self.backends.clone(),
513            registry,
514            self.sessions.clone(),
515            token,
516        )));
517        // Keep a clone so soft/hard stop can drive per-flow teardown.
518        self.listener_sessions.insert(token, session.clone());
519        Some(session)
520    }
521
522    pub fn give_back_listeners(&mut self) -> Vec<(SocketAddr, UdpSocket)> {
523        self.listeners
524            .values()
525            .filter_map(|listener| {
526                let mut owned = listener.borrow_mut();
527                if let Some(socket) = owned.socket.take() {
528                    owned.active = false;
529                    return Some((owned.address, socket));
530                }
531                None
532            })
533            .collect()
534    }
535
536    pub fn give_back_listener(
537        &mut self,
538        address: SocketAddr,
539    ) -> Result<(Token, UdpSocket), ProxyError> {
540        let listener = self
541            .listeners
542            .values()
543            .find(|listener| listener.borrow().address == address)
544            .ok_or(ProxyError::NoListenerFound(address))?;
545
546        let (token, taken) = {
547            let mut owned = listener.borrow_mut();
548            let taken = owned.socket.take().ok_or(ProxyError::UnactivatedListener)?;
549            owned.active = false;
550            (owned.token, taken)
551        };
552        // Deactivating removes the listener from service: tear down its active
553        // flows THROUGH the manager so the per-flow upstream slab slots + fds
554        // don't dangle and the active-flows gauge is decremented once per flow
555        // (the manager is RETAINED here — not removed from `self.managers` — so
556        // `close_all` also resets its flow table, keeping manager and shell
557        // consistent for a later reactivate).
558        if let Some(session) = self.listener_sessions.remove(&token) {
559            session.borrow_mut().close_all_flows(Instant::now());
560        }
561        Ok((token, taken))
562    }
563
564    pub fn update_listener(&mut self, patch: UpdateUdpListenerConfig) -> Result<(), ProxyError> {
565        let address: SocketAddr = patch.address.into();
566        let listener = self
567            .listeners
568            .values()
569            .find(|l| l.borrow().address == address)
570            .ok_or(ProxyError::NoListenerFound(address))?;
571        {
572            let mut l = listener.borrow_mut();
573            l.update_config(&patch);
574            // Clamp the stored rx size in place (defense in depth): keep the
575            // listener config, the manager, and the session's `recv_buf` agreeing
576            // on the same `buffer_size`-bounded value.
577            l.config.max_rx_datagram_size =
578                clamp_max_rx(l.config.max_rx_datagram_size as usize, self.buffer_size) as u32;
579        }
580
581        // Reflect the timeout / cap / rx-size changes into the manager for new
582        // flows. Existing flows keep their captured config (stable contract).
583        if let Some(token) = self
584            .listeners
585            .iter()
586            .find(|(_, l)| l.borrow().address == address)
587            .map(|(t, _)| *t)
588            && let Some(mgr) = self.managers.get(&token)
589        {
590            let now = Instant::now();
591            let (cfg, max_flows, max_rx) = {
592                let l = listener.borrow();
593                (
594                    self.cluster_config_for(&l, token),
595                    effective_max_flows(l.config.max_flows, self.max_connections),
596                    // Defense in depth: clamp the raised rx size to `buffer_size`
597                    // so a hostile `UpdateUdpListener` can't grow the recv buffer
598                    // past the global cap.
599                    clamp_max_rx(l.config.max_rx_datagram_size as usize, self.buffer_size),
600                )
601            };
602            {
603                let mut m = mgr.borrow_mut();
604                m.handle_input(ManagerInput::Config(ConfigEvent::SetCluster(cfg)), now);
605                m.handle_input(
606                    ManagerInput::Config(ConfigEvent::SetMaxFlows(max_flows)),
607                    now,
608                );
609                m.handle_input(
610                    ManagerInput::Config(ConfigEvent::SetMaxRxDatagramSize(max_rx)),
611                    now,
612                );
613            }
614            // Resize the live session's recv scratch to match the (clamped) new
615            // rx size. Without this, a config that RAISES `max_rx_datagram_size`
616            // would leave `recv_buf` at its old (smaller) length: `recv_from`
617            // would kernel-truncate datagrams between the old and new size while
618            // the manager's `len > max_rx` check (now the larger value) passes
619            // them, forwarding them truncated. The `+ 1` mirrors `new()`: it lets
620            // the manager observe `len == max_rx + 1 > max_rx` and drop an
621            // oversized datagram (`DropReason::Truncated`) instead of silently
622            // truncating it. Existing flows keep their captured config; only the
623            // shared recv buffer tracks the live rx size.
624            if let Some(session) = self.listener_sessions.get(&token) {
625                session.borrow_mut().resize_recv_buf(max_rx);
626            }
627        }
628        Ok(())
629    }
630
631    pub fn add_udp_front(&mut self, front: RequestUdpFrontend) -> Result<(), ProxyError> {
632        let address = front.address.into();
633        let token = {
634            let mut listener = self
635                .listeners
636                .values()
637                .find(|l| l.borrow().address == address)
638                .ok_or(ProxyError::NoListenerFound(address))?
639                .borrow_mut();
640            self.fronts
641                .insert(front.cluster_id.to_string(), listener.token);
642            listener.set_tags(address.to_string(), Some(front.tags));
643            listener.cluster_id = Some(front.cluster_id.clone());
644            listener.token
645        };
646        self.cluster_for_listener
647            .insert(token, front.cluster_id.clone());
648
649        // Commit the cluster routing into the manager so admitted flows know
650        // which cluster to `SelectBackend` against.
651        if let Some(mgr) = self.managers.get(&token) {
652            let listener = self.listeners.get(&token).unwrap();
653            let cfg = {
654                let l = listener.borrow();
655                self.cluster_config_for(&l, token)
656            };
657            mgr.borrow_mut().handle_input(
658                ManagerInput::Config(ConfigEvent::SetCluster(cfg)),
659                Instant::now(),
660            );
661        }
662        Ok(())
663    }
664
665    pub fn remove_udp_front(&mut self, front: RequestUdpFrontend) -> Result<(), ProxyError> {
666        let address = front.address.into();
667        let token = {
668            let mut listener = match self
669                .listeners
670                .values()
671                .find(|l| l.borrow().address == address)
672            {
673                Some(l) => l.borrow_mut(),
674                None => return Err(ProxyError::NoListenerFound(address)),
675            };
676            listener.set_tags(address.to_string(), None);
677            if let Some(cluster_id) = listener.cluster_id.take() {
678                self.fronts.remove(&cluster_id);
679            }
680            listener.token
681        };
682        self.cluster_for_listener.remove(&token);
683        // Drop the routing in the manager — new datagrams now have no backend.
684        if let Some(mgr) = self.managers.get(&token) {
685            mgr.borrow_mut().handle_input(
686                ManagerInput::Config(ConfigEvent::SetCluster(ClusterConfig::default())),
687                Instant::now(),
688            );
689        }
690        Ok(())
691    }
692
693    /// Build a [`ClusterConfig`] for a listener from its current frontend
694    /// cluster routing + timeouts. The per-cluster knobs (responses/requests/
695    /// PPv2/affinity) are populated by [`apply_cluster`](Self::apply_cluster)
696    /// when an `AddCluster` carries a `udp` block; absent that they stay at the
697    /// proto defaults.
698    fn cluster_config_for(&self, listener: &UdpListener, _token: Token) -> ClusterConfig {
699        let cluster = listener.cluster_id.clone().unwrap_or_default();
700        let mut cfg = ClusterConfig {
701            cluster: cluster.clone(),
702            front_timeout: Duration::from_secs(u64::from(listener.config.front_timeout)),
703            back_timeout: Duration::from_secs(u64::from(listener.config.back_timeout)),
704            ..Default::default()
705        };
706        // Fold in the cluster's cached UDP knobs (set by a prior or later
707        // `AddCluster`) so the order of `AddCluster` vs `AddUdpFrontend` does not
708        // matter — both rebuilds of a manager's `ClusterConfig` converge on the
709        // same per-cluster contract.
710        if let Some(udp) = self.cluster_udp_config.get(&cluster) {
711            apply_udp_knobs(&mut cfg, udp);
712        }
713        cfg
714    }
715
716    /// Apply an `AddCluster` to every listener routing to it: fold the UDP
717    /// cluster knobs (affinity / responses / requests / PPv2) into the
718    /// manager's `ClusterConfig`, and rebuild the LB policy (HRW/Maglev table)
719    /// via the shared `BackendMap`.
720    fn apply_cluster(&mut self, cluster: &Cluster) {
721        // 1. LB policy (HRW/Maglev rebuild happens inside the BackendMap).
722        self.backends
723            .borrow_mut()
724            .set_load_balancing_policy_for_cluster(
725                &cluster.cluster_id,
726                LoadBalancingAlgorithms::try_from(cluster.load_balancing).unwrap_or_default(),
727                cluster
728                    .load_metric
729                    .and_then(|n| LoadMetric::try_from(n).ok()),
730            );
731
732        // 1b. Health settings from the cluster's `udp.health` block, if any.
733        // Mode HEALTH_OFF / no health block disables probing for this cluster.
734        let health_settings = cluster.udp.as_ref().and_then(|udp| {
735            udp.health.as_ref().and_then(|h| {
736                let mode = h
737                    .mode
738                    .and_then(|m| sozu_command::proto::command::UdpHealthMode::try_from(m).ok());
739                match mode {
740                    Some(sozu_command::proto::command::UdpHealthMode::HealthOff) => None,
741                    // TCP_PROBE (default when a health block is present) and
742                    // UDP_PROBE both schedule the TCP-probe state machine; the
743                    // app UDP-probe payload rides along for the secondary check.
744                    _ => Some(health::UdpHealthSettings::from_proto(h)),
745                }
746            })
747        });
748        self.health
749            .set_cluster(&cluster.cluster_id, health_settings, &self.registry);
750
751        // 1c. Cache the UDP knobs so a frontend added in EITHER order picks them
752        // up via `cluster_config_for`. An `AddCluster` without a `udp` block
753        // clears the cache (back to proto defaults).
754        match &cluster.udp {
755            Some(udp) => {
756                self.cluster_udp_config
757                    .insert(cluster.cluster_id.clone(), udp.clone());
758            }
759            None => {
760                self.cluster_udp_config.remove(&cluster.cluster_id);
761            }
762        }
763
764        // 2. Per-cluster UDP knobs into the managers routing to this cluster.
765        // `cluster_config_for` now folds the cached knobs in, so the rebuild is
766        // identical regardless of whether the frontend or the cluster came
767        // first.
768        let now = Instant::now();
769        let tokens: Vec<Token> = self
770            .cluster_for_listener
771            .iter()
772            .filter(|(_, c)| **c == cluster.cluster_id)
773            .map(|(t, _)| *t)
774            .collect();
775        for token in tokens {
776            let Some(listener) = self.listeners.get(&token) else {
777                continue;
778            };
779            let cfg = {
780                let l = listener.borrow();
781                self.cluster_config_for(&l, token)
782            };
783            if let Some(mgr) = self.managers.get(&token) {
784                mgr.borrow_mut()
785                    .handle_input(ManagerInput::Config(ConfigEvent::SetCluster(cfg)), now);
786            }
787        }
788    }
789
790    /// Inherent dispatch entry point — the server calls this directly (UDP does
791    /// not implement `ProxyConfiguration`). Handles UDP frontends, cluster
792    /// config, listener removal, and stop. No accept / create_session.
793    pub fn notify(&mut self, message: WorkerRequest) -> WorkerResponse {
794        let request_type = match message.content.request_type {
795            Some(t) => t,
796            None => return WorkerResponse::error(message.id, "Empty request"),
797        };
798        match request_type {
799            RequestType::AddUdpFrontend(front) => match self.add_udp_front(front) {
800                Ok(()) => WorkerResponse::ok(message.id),
801                Err(err) => WorkerResponse::error(message.id, err),
802            },
803            RequestType::RemoveUdpFrontend(front) => match self.remove_udp_front(front) {
804                Ok(()) => WorkerResponse::ok(message.id),
805                Err(err) => WorkerResponse::error(message.id, err),
806            },
807            RequestType::AddCluster(cluster) => {
808                self.apply_cluster(&cluster);
809                WorkerResponse::ok(message.id)
810            }
811            RequestType::RemoveCluster(cluster_id) => {
812                let tokens: Vec<Token> = self
813                    .cluster_for_listener
814                    .iter()
815                    .filter(|(_, c)| **c == cluster_id)
816                    .map(|(t, _)| *t)
817                    .collect();
818                for token in tokens {
819                    if let Some(mgr) = self.managers.get(&token) {
820                        mgr.borrow_mut().handle_input(
821                            ManagerInput::Config(ConfigEvent::SetCluster(ClusterConfig::default())),
822                            Instant::now(),
823                        );
824                    }
825                }
826                self.cluster_udp_config.remove(&cluster_id);
827                self.health.remove_cluster(&cluster_id, &self.registry);
828                WorkerResponse::ok(message.id)
829            }
830            RequestType::SoftStop(_) => {
831                info!(
832                    "{} {} processing soft shutdown",
833                    log_module_context!(),
834                    message.id
835                );
836                // Drain: admit no new flows. Then actively tear down existing
837                // flows so the worker reaches `base_sessions_count` and exits
838                // promptly instead of waiting out each flow's idle timeout. UDP
839                // has no half-sent response to preserve, so an immediate flow
840                // teardown on soft-stop is the right graceful behavior (a stray
841                // in-flight reply may be lost, which is acceptable for a
842                // best-effort datagram proxy).
843                let now = Instant::now();
844                for mgr in self.managers.values() {
845                    mgr.borrow_mut()
846                        .handle_input(ManagerInput::Config(ConfigEvent::Drain), now);
847                }
848                // Drive teardown through each manager (FlowEvicted + CloseFlow
849                // per flow) so the active-flows gauge balances to zero.
850                for session in self.listener_sessions.values() {
851                    session.borrow_mut().close_all_flows(now);
852                }
853                self.listener_sessions.clear();
854                let listeners: HashMap<_, _> = self.listeners.drain().collect();
855                for (_, l) in listeners.iter() {
856                    l.borrow_mut()
857                        .socket
858                        .take()
859                        .map(|mut sock| self.registry.deregister(&mut sock));
860                }
861                WorkerResponse::processing(message.id)
862            }
863            RequestType::HardStop(_) => {
864                info!("{} {} hard shutdown", log_module_context!(), message.id);
865                let now = Instant::now();
866                // Drive teardown through each manager (FlowEvicted + CloseFlow
867                // per flow) so the active-flows gauge balances to zero before the
868                // managers are dropped below.
869                for session in self.listener_sessions.values() {
870                    session.borrow_mut().close_all_flows(now);
871                }
872                self.listener_sessions.clear();
873                let mut listeners: HashMap<_, _> = self.listeners.drain().collect();
874                for (_, l) in listeners.drain() {
875                    l.borrow_mut()
876                        .socket
877                        .take()
878                        .map(|mut sock| self.registry.deregister(&mut sock));
879                }
880                self.managers.clear();
881                WorkerResponse::ok(message.id)
882            }
883            RequestType::Status(_) => {
884                info!("{} {} status", log_module_context!(), message.id);
885                WorkerResponse::ok(message.id)
886            }
887            RequestType::RemoveListener(remove) => {
888                if !self.remove_listener(remove.address.into()) {
889                    WorkerResponse::error(
890                        message.id,
891                        format!("no UDP listener to remove at address {:?}", remove.address),
892                    )
893                } else {
894                    WorkerResponse::ok(message.id)
895                }
896            }
897            command => {
898                debug!(
899                    "{} {} unsupported message for UDP proxy, ignoring {:?}",
900                    log_module_context!(),
901                    message.id,
902                    command
903                );
904                WorkerResponse::error(message.id, "unsupported message")
905            }
906        }
907    }
908}
909
910/// Fold a proto [`UdpClusterConfig`](sozu_command::proto::command::UdpClusterConfig)
911/// into a [`ClusterConfig`], applying the proto defaults for absent fields.
912/// Single source of truth so `apply_cluster` (live push) and
913/// `cluster_config_for` (frontend add / rebuild) never diverge.
914fn apply_udp_knobs(cfg: &mut ClusterConfig, udp: &sozu_command::proto::command::UdpClusterConfig) {
915    cfg.affinity_with_port = matches!(
916        udp.affinity_key
917            .and_then(|k| UdpAffinityKey::try_from(k).ok()),
918        Some(UdpAffinityKey::SourceIpPort)
919    );
920    cfg.responses = udp.responses.unwrap_or(0);
921    cfg.requests = udp.requests.unwrap_or(0);
922    cfg.send_proxy_protocol = udp.send_proxy_protocol.unwrap_or(false);
923    cfg.proxy_protocol_every_datagram = udp.proxy_protocol_every_datagram.unwrap_or(false);
924}
925
926/// Fallback auto `max_flows` when `RLIMIT_NOFILE` can't be read.
927const DEFAULT_AUTO_MAX_FLOWS: usize = 1024;
928
929/// `max_flows == 0` means "auto": ~70% of the soft `RLIMIT_NOFILE`, so the fd
930/// budget adapts to the host without hand-tuning (one fd + one slab slot per
931/// flow). Falls back to a conservative constant when the limit can't be read.
932///
933/// The auto derivation is clamped to `slab_headroom` (the global
934/// `max_connections`): every admitted flow consumes one shared `SessionManager`
935/// slab slot, so an unclamped ~70%-of-RLIMIT value on a host with a very large
936/// fd limit could try to inflate the slab to hundreds of thousands of entries
937/// and starve HTTP/TCP. An *explicitly configured* `max_flows` is honoured as-is
938/// (the operator opted in); only the auto value is capped. A `slab_headroom` of
939/// 0 (no connection budget configured) disables the clamp.
940fn effective_max_flows(configured: u32, slab_headroom: usize) -> usize {
941    if configured != 0 {
942        return configured as usize;
943    }
944    let auto = {
945        #[cfg(unix)]
946        {
947            let mut limit = libc::rlimit {
948                rlim_cur: 0,
949                rlim_max: 0,
950            };
951            // SAFETY: `getrlimit` writes a fully-initialised `rlimit` into
952            // `limit`; we read the result only on success (`== 0`).
953            let ret = unsafe { libc::getrlimit(libc::RLIMIT_NOFILE, &mut limit) };
954            if ret == 0 && limit.rlim_cur > 0 {
955                let soft = limit.rlim_cur;
956                ((soft.saturating_mul(7)) / 10).max(1) as usize
957            } else {
958                DEFAULT_AUTO_MAX_FLOWS
959            }
960        }
961        #[cfg(not(unix))]
962        {
963            DEFAULT_AUTO_MAX_FLOWS
964        }
965    };
966    if slab_headroom == 0 {
967        auto
968    } else {
969        auto.min(slab_headroom).max(1)
970    }
971}
972
973/// Clamp a configured `max_rx_datagram_size` to the global `buffer_size`
974/// (defense in depth): the per-session `recv_buf` is sized `max_rx + 1`, so a
975/// raw worker `AddUdpListener`/`UpdateUdpListener` carrying e.g.
976/// `max_rx_datagram_size = u32::MAX` must NOT be able to allocate a ~4 GB
977/// buffer. A `buffer_size` of 0 (unset) leaves the value untouched.
978fn clamp_max_rx(configured: usize, buffer_size: usize) -> usize {
979    if buffer_size == 0 {
980        configured
981    } else {
982        configured.min(buffer_size)
983    }
984}
985
986/// The `ProxySession` backing one UDP listener. The server's generic readiness
987/// path drives this (UDP is not in the listen-accept arm). It owns the per-flow
988/// connected upstream sockets and demuxes events by token.
989pub struct UdpListenerSession {
990    /// The listener this session serves.
991    listener: Rc<RefCell<UdpListener>>,
992    /// The sans-io manager for this listener.
993    manager: Rc<RefCell<UdpManager>>,
994    /// Shared backend map for LB selection.
995    backends: Rc<RefCell<BackendMap>>,
996    /// Cloned mio registry for per-flow socket (de)registration.
997    registry: Registry,
998    /// Slab the per-flow upstream tokens are inserted into (the same `Rc` the
999    /// listener-session is registered under: multi-token pattern).
1000    sessions: Rc<RefCell<SessionManager>>,
1001    /// The listener's own token (client recv + timer key).
1002    listener_token: Token,
1003    /// Listener address (for logging).
1004    address: SocketAddr,
1005    /// Per-flow connected upstream sockets, keyed by their slab token.
1006    upstream_sockets: HashMap<Token, UdpSocket>,
1007    /// Per-flow bounded egress queues for the forward path, keyed by the
1008    /// upstream token. A datagram lands here only when the connected upstream
1009    /// socket returns `WouldBlock`; the socket is then reregistered
1010    /// `READABLE | WRITABLE` and drained on the next writable event. Dropped
1011    /// together with the socket on flow close (no leak, gauge stays correct).
1012    upstream_write_queues: HashMap<Token, WriteQueue>,
1013    /// Bounded egress queue for the client-return path (replies fanned back
1014    /// through the single listener socket via `send_to`). Engaged only when the
1015    /// listener socket returns `WouldBlock`; the listener is then reregistered
1016    /// `READABLE | WRITABLE` and drained on its next writable event.
1017    client_write_queue: WriteQueue,
1018    /// `upstream_token -> FlowId` for NAT-return demux.
1019    upstream_to_flow: HashMap<Token, FlowId>,
1020    /// `FlowId -> upstream_token` to tear down on close.
1021    flow_to_upstream: HashMap<FlowId, Token>,
1022    /// `FlowId -> admission Instant` for `udp.flow.duration` on close.
1023    flow_started: HashMap<FlowId, Instant>,
1024    /// `FlowId -> (client, backend)` for the close access log.
1025    flow_endpoints: HashMap<FlowId, (SocketAddr, Option<SocketAddr>)>,
1026    /// The client source whose datagram is currently being drained. Lets
1027    /// `on_send_to_backend` resolve the right per-flow upstream socket for an
1028    /// already-established flow (where no `OpenUpstream` precedes the send).
1029    /// `None` while draining backend-side or timer-driven outputs.
1030    in_flight_client: Option<SocketAddr>,
1031    /// The flow whose upstream was just opened in this drain pass — covers the
1032    /// new-flow path where `OpenUpstream{flow}` immediately precedes the first
1033    /// `SendToBackend`.
1034    in_flight_flow: Option<FlowId>,
1035    /// Shadow of the manager's flow table: normalised client key → `FlowId`.
1036    /// Lets the shell resolve the owning flow for a `SendToBackend` on an
1037    /// established flow from the in-flight client source. Kept in lockstep with
1038    /// `OpenUpstream` / `CloseFlow`.
1039    client_key_to_flow: HashMap<SocketAddr, FlowId>,
1040    /// Reusable recv scratch buffer, sized to `max_rx_datagram_size`.
1041    recv_buf: Vec<u8>,
1042    /// The currently-armed `TIMER` handle, so a re-arm cancels the previous
1043    /// deadline instead of leaking timer-slab entries (the manager only emits a
1044    /// fresh `ArmTimer` when the deadline actually changes).
1045    timer_handle: Option<crate::timer::Timeout>,
1046}
1047
1048impl UdpListenerSession {
1049    #[allow(clippy::too_many_arguments)]
1050    pub fn new(
1051        listener: Rc<RefCell<UdpListener>>,
1052        manager: Rc<RefCell<UdpManager>>,
1053        backends: Rc<RefCell<BackendMap>>,
1054        registry: Registry,
1055        sessions: Rc<RefCell<SessionManager>>,
1056        listener_token: Token,
1057    ) -> UdpListenerSession {
1058        let (address, max_rx) = {
1059            let l = listener.borrow();
1060            (l.address, l.config.max_rx_datagram_size as usize)
1061        };
1062        UdpListenerSession {
1063            listener,
1064            manager,
1065            backends,
1066            registry,
1067            sessions,
1068            listener_token,
1069            address,
1070            upstream_sockets: HashMap::new(),
1071            upstream_write_queues: HashMap::new(),
1072            client_write_queue: WriteQueue::new(CLIENT_WRITE_QUEUE_CAP),
1073            upstream_to_flow: HashMap::new(),
1074            flow_to_upstream: HashMap::new(),
1075            flow_started: HashMap::new(),
1076            flow_endpoints: HashMap::new(),
1077            in_flight_client: None,
1078            in_flight_flow: None,
1079            client_key_to_flow: HashMap::new(),
1080            // Size the recv scratch to `max_rx + 1`, NOT `max_rx`: a UDP
1081            // `recv_from` truncates the datagram to the buffer length and
1082            // silently discards the tail. If the buffer were exactly `max_rx`,
1083            // an oversized datagram would arrive as a `max_rx`-byte payload —
1084            // indistinguishable from a legal one — and be forwarded truncated.
1085            // The extra byte lets the manager observe `len == max_rx + 1 >
1086            // max_rx` and drop it (`DropReason::Truncated`) instead.
1087            recv_buf: vec![0u8; max_rx.saturating_add(1).max(1)],
1088            timer_handle: None,
1089        }
1090    }
1091
1092    /// Resize the recv scratch buffer to `max_rx + 1` (the same `+ 1` sizing as
1093    /// [`new`](Self::new): the extra byte lets the manager observe an oversized
1094    /// datagram as `len == max_rx + 1 > max_rx` and drop it as `Truncated`
1095    /// rather than silently forwarding a kernel-truncated payload). Called from
1096    /// `UdpProxy::update_listener` when a config push changes the listener's
1097    /// `max_rx_datagram_size`. Resizing larger zero-fills the new tail; resizing
1098    /// smaller truncates the (idle, between-datagram) scratch — both are safe
1099    /// because the buffer holds no live datagram across calls.
1100    fn resize_recv_buf(&mut self, max_rx: usize) {
1101        self.recv_buf.resize(max_rx.saturating_add(1).max(1), 0u8);
1102    }
1103
1104    /// Normalise a client source the same way the active manager config keys
1105    /// flows (4-tuple when `affinity_with_port`, else source-IP with port 0).
1106    fn client_key(&self, src: SocketAddr) -> SocketAddr {
1107        let with_port = self.manager.borrow().affinity_with_port();
1108        if with_port {
1109            src
1110        } else {
1111            let mut s = src;
1112            s.set_port(0);
1113            s
1114        }
1115    }
1116
1117    /// Drain every datagram waiting on the listener socket into the manager.
1118    /// Edge-triggered epoll: loop `recv_from` to `WouldBlock`.
1119    fn ingest_client(&mut self, now: Instant) {
1120        // Pull the socket out behind a short borrow then operate on it.
1121        loop {
1122            let result = {
1123                let listener = self.listener.borrow();
1124                let Some(socket) = listener.socket.as_ref() else {
1125                    return;
1126                };
1127                socket.recv_from(&mut self.recv_buf)
1128            };
1129            match result {
1130                Ok((len, src)) => {
1131                    // Mark the in-flight client so `on_send_to_backend` can
1132                    // resolve the owning flow's upstream socket for an
1133                    // already-established flow (no `OpenUpstream` precedes it).
1134                    self.in_flight_client = Some(src);
1135                    self.in_flight_flow = None;
1136                    let len = len.min(self.recv_buf.len());
1137                    // Borrow the payload via a split so the mutable borrow of
1138                    // `self.manager` does not alias `self.recv_buf`.
1139                    let payload: &[u8] = &self.recv_buf[..len];
1140                    // SAFETY-free: `handle_input` only reads the slice; the
1141                    // borrow checker is satisfied because `recv_buf` and
1142                    // `manager` are disjoint fields.
1143                    let mgr = self.manager.clone();
1144                    mgr.borrow_mut()
1145                        .handle_input(ManagerInput::ClientDatagram { src, payload }, now);
1146                    // Drain outputs after each datagram to bound queue growth.
1147                    self.drain_outputs(now);
1148                    self.in_flight_client = None;
1149                }
1150                Err(ref e) if e.kind() == ErrorKind::WouldBlock => break,
1151                Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
1152                Err(e) => {
1153                    debug!(
1154                        "{} recv_from error on UDP listener: {}",
1155                        log_context!(self),
1156                        e
1157                    );
1158                    break;
1159                }
1160            }
1161        }
1162    }
1163
1164    /// Drain datagrams waiting on one flow's connected upstream socket into the
1165    /// manager as `BackendDatagram`s.
1166    fn ingest_upstream(&mut self, upstream_token: Token, now: Instant) {
1167        let Some(&flow) = self.upstream_to_flow.get(&upstream_token) else {
1168            return;
1169        };
1170        loop {
1171            let result = {
1172                let Some(socket) = self.upstream_sockets.get(&upstream_token) else {
1173                    return;
1174                };
1175                socket.recv(&mut self.recv_buf)
1176            };
1177            match result {
1178                Ok(len) => {
1179                    let len = len.min(self.recv_buf.len());
1180                    let payload: &[u8] = &self.recv_buf[..len];
1181                    let mgr = self.manager.clone();
1182                    mgr.borrow_mut()
1183                        .handle_input(ManagerInput::BackendDatagram { flow, payload }, now);
1184                    self.drain_outputs(now);
1185                }
1186                Err(ref e) if e.kind() == ErrorKind::WouldBlock => break,
1187                Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
1188                Err(e) => {
1189                    debug!(
1190                        "{} recv error on upstream socket: {}",
1191                        log_context!(self),
1192                        e
1193                    );
1194                    break;
1195                }
1196            }
1197        }
1198    }
1199
1200    /// Drain the manager's output queue, acting on each `Output`.
1201    fn drain_outputs(&mut self, now: Instant) {
1202        let mgr = self.manager.clone();
1203        loop {
1204            let out = mgr.borrow_mut().poll_output();
1205            let Some(out) = out else { break };
1206            match out {
1207                Output::SelectBackend { flow, cluster, key } => {
1208                    self.on_select_backend(flow, &cluster, key, now)
1209                }
1210                Output::OpenUpstream { flow, backend } => self.on_open_upstream(flow, backend, now),
1211                Output::SendToBackend(transmit) => self.on_send_to_backend(transmit),
1212                Output::SendToClient(transmit) => self.on_send_to_client(transmit),
1213                Output::ArmTimer(deadline) => self.arm_timer(deadline, now),
1214                Output::Metric(ev) => Self::record_metric(ev),
1215                Output::CloseFlow(flow) => self.on_close_flow(flow),
1216                Output::Drop(reason) => Self::record_drop(reason),
1217            }
1218        }
1219    }
1220
1221    fn on_select_backend(&mut self, flow: FlowId, cluster: &str, key: u64, now: Instant) {
1222        let resolved = self
1223            .backends
1224            .borrow_mut()
1225            .backend_from_cluster_id_with_key(cluster, Some(key));
1226        match resolved {
1227            Ok((backend, addr)) => {
1228                self.manager.borrow_mut().handle_input(
1229                    ManagerInput::BackendResolved {
1230                        flow,
1231                        backend,
1232                        addr,
1233                    },
1234                    now,
1235                );
1236            }
1237            Err(e) => {
1238                debug!(
1239                    "{} no backend for cluster {}: {}; aborting flow {}",
1240                    log_context!(self),
1241                    cluster,
1242                    e,
1243                    flow
1244                );
1245                incr!(names::udp::DROPPED_NO_BACKEND);
1246                // Abort the flow instead of leaving it parked AwaitingBackend:
1247                // the manager already counted it (FlowCreated, +1 gauge, slab +
1248                // admission slot), so without this it would squat a `max_flows`
1249                // slot for the full idle timeout while every later datagram is
1250                // dropped. `abort_flow` enqueues FlowEvicted + CloseFlow, which
1251                // the surrounding `drain_outputs` loop processes — freeing the
1252                // slot immediately and balancing the gauge via FlowEvicted.
1253                self.manager
1254                    .borrow_mut()
1255                    .abort_flow(flow, now, CloseReason::Aborted);
1256            }
1257        }
1258    }
1259
1260    fn on_open_upstream(&mut self, flow: FlowId, backend: SocketAddr, now: Instant) {
1261        let mut socket = match udp_connect(backend) {
1262            Ok(socket) => socket,
1263            Err(e) => {
1264                // EMFILE/ENFILE/connect refusal → shed this flow, never panic.
1265                warn!(
1266                    "{} could not open upstream socket to {}: {}; shedding flow {}",
1267                    log_context!(self),
1268                    backend,
1269                    e,
1270                    flow
1271                );
1272                incr!(names::udp::FLOWS_SHED);
1273                // The manager already moved this flow toward Established and
1274                // counted it; abort it so the `max_flows` slot frees immediately
1275                // (FlowEvicted balances the gauge) instead of squatting until the
1276                // idle timeout. Keep the FLOWS_SHED metric above for the
1277                // EMFILE/refused case. `abort_flow` enqueues FlowEvicted +
1278                // CloseFlow for the surrounding `drain_outputs` loop.
1279                self.manager
1280                    .borrow_mut()
1281                    .abort_flow(flow, now, CloseReason::Aborted);
1282                return;
1283            }
1284        };
1285        // Multi-token pattern (template tcp.rs:1029-1053): a fresh slab slot
1286        // under the SAME listener-session Rc, registered READABLE so its
1287        // readiness reaches `Server::ready` → demuxed back to this session by
1288        // `update_readiness`. The flow-table cap (`max_flows`) already bounds
1289        // how many upstream sockets/slots can exist, so the slab cannot grow
1290        // unbounded here.
1291        let upstream_token = {
1292            let mut s = self.sessions.borrow_mut();
1293            let listener_session = s.slab[self.listener_token.0].clone();
1294            let entry = s.slab.vacant_entry();
1295            let token = Token(entry.key());
1296            entry.insert(listener_session);
1297            token
1298        };
1299        if let Err(e) = self
1300            .registry
1301            .register(&mut socket, upstream_token, Interest::READABLE)
1302        {
1303            error!(
1304                "{} could not register upstream socket: {}",
1305                log_context!(self),
1306                e
1307            );
1308            self.sessions.borrow_mut().slab.try_remove(upstream_token.0);
1309            // The flow is Established in the manager but has no usable upstream
1310            // socket: abort it so its `max_flows` slot frees now (FlowEvicted
1311            // balances the gauge) rather than squatting until idle timeout.
1312            self.manager
1313                .borrow_mut()
1314                .abort_flow(flow, now, CloseReason::Aborted);
1315            return;
1316        }
1317        self.upstream_sockets.insert(upstream_token, socket);
1318        self.upstream_to_flow.insert(upstream_token, flow);
1319        self.flow_to_upstream.insert(flow, upstream_token);
1320        // `flow_to_upstream` and `upstream_to_flow` are inverse maps: the token
1321        // in one points back to the flow in the other. A broken inverse would let
1322        // a backend reply demux to the wrong client (a NAT-return mismatch).
1323        debug_assert_eq!(
1324            self.upstream_to_flow.get(&upstream_token),
1325            Some(&flow),
1326            "upstream_to_flow must map the new token back to its flow"
1327        );
1328        debug_assert_eq!(
1329            self.flow_to_upstream.get(&flow),
1330            Some(&upstream_token),
1331            "flow_to_upstream must map the flow back to its upstream token"
1332        );
1333        self.flow_started.insert(flow, Instant::now());
1334        // The client source for this flow is the one currently in flight.
1335        let client = self.in_flight_client.unwrap_or(self.address);
1336        self.flow_endpoints.insert(flow, (client, Some(backend)));
1337        if let Some(src) = self.in_flight_client {
1338            let key = self.client_key(src);
1339            self.client_key_to_flow.insert(key, flow);
1340            // The shadow flow-table only ever holds live flows: the flow we just
1341            // mapped must have a live upstream token (it is the one we just
1342            // opened). Pairs the on-close drop in `on_close_flow`.
1343            debug_assert!(
1344                self.flow_to_upstream.contains_key(&flow),
1345                "client_key_to_flow points at flow {flow} with no live upstream token"
1346            );
1347        }
1348        // This flow's first SendToBackend (if any) follows immediately.
1349        self.in_flight_flow = Some(flow);
1350    }
1351
1352    fn on_send_to_backend(&mut self, transmit: crate::protocol::udp::Transmit) {
1353        // Resolve the owning flow precisely (NOT by `transmit.dst`, which two
1354        // flows to the same backend would alias — sending on the wrong
1355        // connected socket misroutes that backend's reply to the wrong client).
1356        //   * new flow:  `OpenUpstream{flow}` set `in_flight_flow` just before.
1357        //   * established flow: resolve via the in-flight client source through
1358        //     the shell-side `client_key -> flow` shadow of the flow table.
1359        let flow = self.in_flight_flow.or_else(|| {
1360            self.in_flight_client
1361                .map(|src| self.client_key(src))
1362                .and_then(|key| self.client_key_to_flow.get(&key).copied())
1363        });
1364        let token = flow.and_then(|f| self.flow_to_upstream.get(&f).copied());
1365        let Some(token) = token else {
1366            // No resolved flow / upstream socket: this is not a queue-full drop.
1367            incr!(names::udp::DROPPED_UNKNOWN_FLOW);
1368            return;
1369        };
1370        let Some(socket) = self.upstream_sockets.get(&token) else {
1371            // Socket already gone (flow closed mid-drain): unknown-flow, not
1372            // queue-full.
1373            incr!(names::udp::DROPPED_UNKNOWN_FLOW);
1374            return;
1375        };
1376        // If a queue is already backed up for this flow, preserve FIFO order —
1377        // do NOT jump the line with the fast-path `send`. Append (or drop on
1378        // overflow) and let the WRITABLE drain catch up.
1379        if let Some(q) = self.upstream_write_queues.get_mut(&token)
1380            && !q.is_empty()
1381        {
1382            if !q.push(transmit.dst, transmit.payload) {
1383                debug!("{} upstream write queue full, dropping", log_context!(self));
1384                incr!(names::udp::DROPPED_WQ_FULL);
1385            }
1386            return;
1387        }
1388        match socket.send(&transmit.payload) {
1389            Ok(_) => {}
1390            Err(ref e) if e.kind() == ErrorKind::WouldBlock => {
1391                // Kernel send buffer full: enqueue (bounded) and arm WRITABLE so
1392                // the next writable event drains it. Drop + metric only at cap.
1393                let q = self
1394                    .upstream_write_queues
1395                    .entry(token)
1396                    .or_insert_with(|| WriteQueue::new(UPSTREAM_WRITE_QUEUE_CAP));
1397                if q.push(transmit.dst, transmit.payload) {
1398                    self.arm_upstream_writable(token);
1399                } else {
1400                    debug!("{} upstream write queue full, dropping", log_context!(self));
1401                    incr!(names::udp::DROPPED_WQ_FULL);
1402                }
1403            }
1404            Err(e) => {
1405                debug!("{} upstream send error: {}", log_context!(self), e);
1406                // Hard send error (e.g. ECONNREFUSED), not a queue-full drop.
1407                incr!(names::udp::DROPPED_SEND_ERROR);
1408            }
1409        }
1410    }
1411
1412    /// Reregister a flow's connected upstream socket for `READABLE | WRITABLE`
1413    /// so a queued forward datagram gets a writable wake (the edge-triggered
1414    /// analog of `signal_pending_write`). Idempotent enough — mio coalesces a
1415    /// repeated interest set.
1416    fn arm_upstream_writable(&mut self, token: Token) {
1417        if let Some(socket) = self.upstream_sockets.get_mut(&token)
1418            && let Err(e) =
1419                self.registry
1420                    .reregister(socket, token, Interest::READABLE | Interest::WRITABLE)
1421        {
1422            debug!(
1423                "{} could not arm WRITABLE on upstream socket: {}",
1424                log_context!(self),
1425                e
1426            );
1427        }
1428    }
1429
1430    /// Drop a flow's upstream socket back to `READABLE`-only once its write queue
1431    /// has fully drained, so an empty socket no longer wakes the loop on every
1432    /// writable edge (a permanently-WRITABLE UDP socket would busy-loop).
1433    fn disarm_upstream_writable(&mut self, token: Token) {
1434        if let Some(socket) = self.upstream_sockets.get_mut(&token)
1435            && let Err(e) = self.registry.reregister(socket, token, Interest::READABLE)
1436        {
1437            debug!(
1438                "{} could not disarm WRITABLE on upstream socket: {}",
1439                log_context!(self),
1440                e
1441            );
1442        }
1443    }
1444
1445    /// Drain a flow's upstream write queue on a writable event. Re-sends in FIFO
1446    /// order until `WouldBlock` or empty; on empty, drops WRITABLE interest.
1447    fn drain_upstream_queue(&mut self, token: Token) {
1448        let Some(mut queue) = self.upstream_write_queues.remove(&token) else {
1449            return;
1450        };
1451        let socket = self.upstream_sockets.get(&token);
1452        let Some(socket) = socket else {
1453            // Socket gone (flow closed mid-drain): discard the queue.
1454            return;
1455        };
1456        let emptied = queue.drain(|_dst, payload| match socket.send(payload) {
1457            Ok(_) => SendOutcome::Sent,
1458            Err(ref e) if e.kind() == ErrorKind::WouldBlock => SendOutcome::WouldBlock,
1459            Err(_) => SendOutcome::Dropped,
1460        });
1461        if emptied {
1462            self.disarm_upstream_writable(token);
1463        } else {
1464            // Still backed up: put the queue back and keep WRITABLE armed.
1465            self.upstream_write_queues.insert(token, queue);
1466        }
1467    }
1468
1469    fn on_send_to_client(&mut self, transmit: crate::protocol::udp::Transmit) {
1470        // Preserve FIFO: if the client-return queue is already backed up, append
1471        // (or drop on overflow) rather than jumping the line via the fast path.
1472        if !self.client_write_queue.is_empty() {
1473            if !self.client_write_queue.push(transmit.dst, transmit.payload) {
1474                debug!("{} client write queue full, dropping", log_context!(self));
1475                incr!(names::udp::DROPPED_WQ_FULL);
1476            }
1477            return;
1478        }
1479        let send_result = {
1480            let listener = self.listener.borrow();
1481            let Some(socket) = listener.socket.as_ref() else {
1482                return;
1483            };
1484            socket.send_to(&transmit.payload, transmit.dst)
1485        };
1486        match send_result {
1487            Ok(_) => {}
1488            Err(ref e) if e.kind() == ErrorKind::WouldBlock => {
1489                // Listener send buffer full: enqueue (bounded) + arm WRITABLE on
1490                // the listener so the next writable event drains it.
1491                if self.client_write_queue.push(transmit.dst, transmit.payload) {
1492                    self.arm_client_writable();
1493                } else {
1494                    debug!("{} client write queue full, dropping", log_context!(self));
1495                    incr!(names::udp::DROPPED_WQ_FULL);
1496                }
1497            }
1498            Err(e) => {
1499                debug!("{} client send_to error: {}", log_context!(self), e);
1500                // Hard send_to error, not a queue-full drop.
1501                incr!(names::udp::DROPPED_SEND_ERROR);
1502            }
1503        }
1504    }
1505
1506    /// Reregister the listener socket for `READABLE | WRITABLE` so a queued
1507    /// client-return datagram gets a writable wake. The listener token routes
1508    /// the writable event back into `update_readiness`.
1509    fn arm_client_writable(&mut self) {
1510        let listener = self.listener.borrow();
1511        let fd = match listener.socket.as_ref() {
1512            Some(socket) => socket.as_raw_fd(),
1513            None => return,
1514        };
1515        if let Err(e) = self.registry.reregister(
1516            &mut SourceFd(&fd),
1517            self.listener_token,
1518            Interest::READABLE | Interest::WRITABLE,
1519        ) {
1520            debug!(
1521                "{} could not arm WRITABLE on listener socket: {}",
1522                log_context!(self),
1523                e
1524            );
1525        }
1526    }
1527
1528    /// Drop the listener socket back to `READABLE`-only once the client-return
1529    /// queue is empty (a permanently-WRITABLE listener would busy-loop).
1530    fn disarm_client_writable(&mut self) {
1531        let listener = self.listener.borrow();
1532        let fd = match listener.socket.as_ref() {
1533            Some(socket) => socket.as_raw_fd(),
1534            None => return,
1535        };
1536        if let Err(e) =
1537            self.registry
1538                .reregister(&mut SourceFd(&fd), self.listener_token, Interest::READABLE)
1539        {
1540            debug!(
1541                "{} could not disarm WRITABLE on listener socket: {}",
1542                log_context!(self),
1543                e
1544            );
1545        }
1546    }
1547
1548    /// Drain the client-return write queue on a listener writable event. Re-sends
1549    /// in FIFO order until `WouldBlock` or empty; on empty, drops WRITABLE.
1550    fn drain_client_queue(&mut self) {
1551        let mut queue = std::mem::replace(&mut self.client_write_queue, WriteQueue::new(0));
1552        let emptied = {
1553            let listener = self.listener.borrow();
1554            let Some(socket) = listener.socket.as_ref() else {
1555                // Listener socket gone: discard the queue and restore an empty one.
1556                self.client_write_queue = WriteQueue::new(CLIENT_WRITE_QUEUE_CAP);
1557                return;
1558            };
1559            queue.drain(|dst, payload| match socket.send_to(payload, *dst) {
1560                Ok(_) => SendOutcome::Sent,
1561                Err(ref e) if e.kind() == ErrorKind::WouldBlock => SendOutcome::WouldBlock,
1562                Err(_) => SendOutcome::Dropped,
1563            })
1564        };
1565        // Restore the (possibly still-backed-up) queue; preserve its capacity.
1566        queue.cap = CLIENT_WRITE_QUEUE_CAP;
1567        self.client_write_queue = queue;
1568        if emptied {
1569            self.disarm_client_writable();
1570        }
1571    }
1572
1573    /// Map the single manager-wide `ArmTimer(deadline)` onto the thread-local
1574    /// `TIMER`, keyed by the **listener token**: when it fires, the run loop
1575    /// calls `Server::timeout(listener_token)` → `session.timeout(token)` →
1576    /// `manager.handle_timeout(now)`.
1577    fn arm_timer(&mut self, deadline: Instant, now: Instant) {
1578        let delay = deadline.saturating_duration_since(now);
1579        TIMER.with(|timer| {
1580            let mut timer = timer.borrow_mut();
1581            // Cancel the previous deadline so re-arming does not leak timer-slab
1582            // entries. The manager only emits a fresh `ArmTimer` on a real
1583            // deadline change, so cancellations are rare.
1584            if let Some(old) = self.timer_handle.take() {
1585                let _ = timer.cancel_timeout(&old);
1586            }
1587            self.timer_handle = Some(timer.set_timeout(delay, self.listener_token));
1588        });
1589    }
1590
1591    fn on_close_flow(&mut self, flow: FlowId) {
1592        if let Some(token) = self.flow_to_upstream.remove(&flow) {
1593            if let Some(mut socket) = self.upstream_sockets.remove(&token) {
1594                if let Err(e) = self.registry.deregister(&mut socket) {
1595                    debug!("{} deregister upstream on close: {}", log_context!(self), e);
1596                }
1597            }
1598            // Drop any queued (un-drained) forward datagrams with the socket so
1599            // the per-flow queue cannot leak; gauge correctness is preserved
1600            // because the queue holds bytes, not flow-count state.
1601            self.upstream_write_queues.remove(&token);
1602            self.upstream_to_flow.remove(&token);
1603            self.sessions.borrow_mut().slab.try_remove(token.0);
1604            // On close, all per-flow maps drop the flow together: neither
1605            // direction of the upstream-token map may still reference it.
1606            debug_assert!(
1607                !self.upstream_to_flow.values().any(|&f| f == flow),
1608                "on_close_flow left upstream_to_flow referencing closed flow {flow}"
1609            );
1610            debug_assert!(
1611                !self.flow_to_upstream.contains_key(&flow),
1612                "on_close_flow left flow_to_upstream entry for closed flow {flow}"
1613            );
1614        }
1615        if let Some(started) = self.flow_started.remove(&flow) {
1616            let duration = started.elapsed();
1617            time!(names::udp::FLOW_DURATION, duration.as_millis());
1618        }
1619        let (client, backend) = self
1620            .flow_endpoints
1621            .remove(&flow)
1622            .unwrap_or((self.address, None));
1623        // Drop the shadow flow-table entry if it still points at this flow.
1624        let key = self.client_key(client);
1625        if self.client_key_to_flow.get(&key) == Some(&flow) {
1626            self.client_key_to_flow.remove(&key);
1627        }
1628        // The shadow flow-table must no longer map THIS flow id. A surviving
1629        // entry would misroute a later established-flow `SendToBackend` onto a
1630        // freed upstream token.
1631        debug_assert!(
1632            !self.client_key_to_flow.values().any(|&f| f == flow),
1633            "on_close_flow left client_key_to_flow referencing closed flow {flow}"
1634        );
1635        info!("{} flow closed", log_flow_context!(flow, client, backend));
1636    }
1637
1638    fn record_metric(ev: MetricEvent) {
1639        match ev {
1640            MetricEvent::FlowCreated => {
1641                incr!(names::udp::FLOWS_CREATED);
1642                gauge_add!(names::udp::ACTIVE_FLOWS, 1);
1643            }
1644            MetricEvent::FlowEvicted => {
1645                incr!(names::udp::FLOWS_EVICTED);
1646                gauge_add!(names::udp::ACTIVE_FLOWS, -1);
1647            }
1648            MetricEvent::FlowShed => {
1649                incr!(names::udp::FLOWS_SHED);
1650            }
1651            MetricEvent::DatagramIn(bytes) => {
1652                incr!(names::udp::DATAGRAMS_IN);
1653                count!(names::udp::BYTES_IN, bytes as i64);
1654            }
1655            MetricEvent::DatagramOut(bytes) => {
1656                incr!(names::udp::DATAGRAMS_OUT);
1657                count!(names::udp::BYTES_OUT, bytes as i64);
1658            }
1659            MetricEvent::DatagramDropped(reason) => Self::record_drop(reason),
1660        }
1661    }
1662
1663    fn record_drop(reason: DropReason) {
1664        incr!(names::udp::DATAGRAMS_DROPPED);
1665        match reason {
1666            DropReason::Invalid => incr!(names::udp::DROPPED_INVALID),
1667            DropReason::Truncated => incr!(names::udp::DROPPED_TRUNCATED),
1668            DropReason::NoBackend => incr!(names::udp::DROPPED_NO_BACKEND),
1669            DropReason::Shed => incr!(names::udp::DROPPED_SHED),
1670            DropReason::UnknownFlow => incr!(names::udp::DROPPED_UNKNOWN_FLOW),
1671        }
1672    }
1673
1674    /// Tear down every active flow on this listener **through the manager**, so
1675    /// each close emits `FlowEvicted` + `CloseFlow` exactly once and the shell's
1676    /// normal [`on_close_flow`](Self::on_close_flow) handler frees the upstream
1677    /// socket + slab slot and decrements `udp.active_flows`. Used on soft/hard
1678    /// stop, listener remove, and listener deactivate so the worker reaches its
1679    /// `base_sessions_count` and exits promptly instead of waiting out every
1680    /// flow's idle timeout — and so the active-flows gauge does not leak by N
1681    /// (the bug the old direct-teardown path had: it cleared the shell maps
1682    /// without telling the manager, so `FlowEvicted` never fired).
1683    ///
1684    /// On a deactivate where the manager is *retained* (not dropped), this also
1685    /// resets the manager's flow table to empty, keeping manager and shell
1686    /// consistent. The listener socket and the listener-session slab slot are
1687    /// left intact (the listener slot is part of `base_sessions_count` and
1688    /// reclaimed when the worker exits); only the connectionless per-flow slots
1689    /// — which are NOT counted in `nb_connections` and so removed without
1690    /// `decr` — are freed.
1691    pub fn close_all_flows(&mut self, now: Instant) {
1692        // Drive teardown through the manager: it emits one `FlowEvicted` +
1693        // `CloseFlow` per live flow into its output queue. Draining those runs
1694        // `record_metric(FlowEvicted)` (the single gauge decrement) and
1695        // `on_close_flow` (frees socket + slab slot + shell maps) per flow.
1696        self.manager.borrow_mut().close_all(now);
1697        self.drain_outputs(now);
1698        // After `close_all`, the manager's flow table is empty and it armed no
1699        // new timer (`reschedule` emits `ArmTimer` only on a real deadline
1700        // change, and an empty table has no deadline). Cancel any residual shell
1701        // timer so it can't fire against a now-flowless listener.
1702        if let Some(handle) = self.timer_handle.take() {
1703            TIMER.with(|timer| {
1704                let _ = timer.borrow_mut().cancel_timeout(&handle);
1705            });
1706        }
1707    }
1708}
1709
1710impl ProxySession for UdpListenerSession {
1711    fn protocol(&self) -> Protocol {
1712        Protocol::UDPListen
1713    }
1714
1715    fn update_readiness(&mut self, token: Token, events: Ready) {
1716        // WRITABLE first: a previously-`WouldBlock` socket can now accept queued
1717        // egress. Drain before reading so the kernel send buffer has room before
1718        // this pass enqueues more (and so a writable-only event still drains).
1719        if events.is_writable() {
1720            if token == self.listener_token {
1721                self.drain_client_queue();
1722            } else if self.upstream_to_flow.contains_key(&token) {
1723                self.drain_upstream_queue(token);
1724            }
1725        }
1726        if !events.is_readable() {
1727            return;
1728        }
1729        let now = Instant::now();
1730        if token == self.listener_token {
1731            self.ingest_client(now);
1732        } else if self.upstream_to_flow.contains_key(&token) {
1733            self.ingest_upstream(token, now);
1734        }
1735    }
1736
1737    fn ready(&mut self, _session: Rc<RefCell<dyn ProxySession>>) -> SessionIsToBeClosed {
1738        // All work happens in `update_readiness` (it has the firing token; the
1739        // generic `ready()` does not). Never close the listener session here —
1740        // it lives for the listener's lifetime.
1741        false
1742    }
1743
1744    fn timeout(&mut self, token: Token) -> SessionIsToBeClosed {
1745        if token == self.listener_token {
1746            let now = Instant::now();
1747            self.manager.borrow_mut().handle_timeout(now);
1748            self.drain_outputs(now);
1749            // Re-arm: the manager emits a fresh ArmTimer via poll_output if a
1750            // flow is still scheduled (handled inside drain_outputs). Nothing
1751            // to do here. Never close the listener on a flow timeout.
1752        }
1753        false
1754    }
1755
1756    fn close(&mut self) {
1757        // Tear down any still-live flows THROUGH the manager (FlowEvicted +
1758        // CloseFlow per flow) so `udp.active_flows` balances to zero even if the
1759        // server reaps this listener session directly without a prior
1760        // proxy-driven `close_all_flows`. In the common path
1761        // (`close_all_flows` already ran) the manager has no live flows, so this
1762        // is a cheap no-op. Also cancels the residual idle timer.
1763        self.close_all_flows(Instant::now());
1764        // Any leftover egress queues / shell maps are dropped here (the per-flow
1765        // ones were already freed by the manager-driven close above; this only
1766        // resets the client-return queue, which is not flow-state).
1767        self.upstream_write_queues.clear();
1768        self.client_write_queue = WriteQueue::new(CLIENT_WRITE_QUEUE_CAP);
1769        self.upstream_to_flow.clear();
1770        self.flow_to_upstream.clear();
1771        // Deregister + drop the listener socket. Never shutdown(Both) — UDP has
1772        // no connection to shut down; just deregister + drop the fds.
1773        let mut listener = self.listener.borrow_mut();
1774        if let Some(socket) = listener.socket.as_ref() {
1775            let fd = socket.as_raw_fd();
1776            let _ = self.registry.deregister(&mut SourceFd(&fd));
1777        }
1778        listener.active = false;
1779    }
1780
1781    fn last_event(&self) -> Instant {
1782        // The listener session lives for the listener's lifetime — it is never a
1783        // zombie even when idle. Mirror `ListenSession::last_event` (which
1784        // returns `now`) so the `zombie_check` (which has no listener-protocol
1785        // exclusion) never reaps a quiet UDP listener. Per-flow idle reaping is
1786        // handled by the manager's timer wheel, not the zombie sweep.
1787        Instant::now()
1788    }
1789
1790    fn print_session(&self) {
1791        error!(
1792            "{} UDP listener session: {} active flows, {} upstream sockets",
1793            log_context!(self),
1794            self.manager.borrow().flow_count(),
1795            self.upstream_sockets.len(),
1796        );
1797    }
1798
1799    fn frontend_token(&self) -> Token {
1800        self.listener_token
1801    }
1802
1803    fn shutting_down(&mut self) -> SessionIsToBeClosed {
1804        // The listener session is a *listener*, not a connection: it was never
1805        // counted by `SessionManager::incr` (only accepted sessions are), so it
1806        // MUST NOT be routed through the connection-counted
1807        // `shut_down_sessions_by_frontend_tokens` path — that calls `decr`,
1808        // which underflows `nb_connections` and panics (`assert!(nb != 0)` at
1809        // `server.rs`). This mirrors `ListenSession::shutting_down` (which also
1810        // returns `false`). Soft-stop draining is owned by
1811        // `UdpProxy::notify(SoftStop)`: it flips every manager to `Drain` and
1812        // deregisters the listener sockets, so existing flows reach teardown and
1813        // no new flow is admitted. The slot is reclaimed when the worker exits.
1814        false
1815    }
1816
1817    fn cluster_id(&self) -> Option<String> {
1818        self.listener.borrow().cluster_id.clone()
1819    }
1820}
1821
1822#[allow(unused_imports)]
1823pub(crate) use {log_context, log_flow_context, log_module_context};
1824
1825#[cfg(test)]
1826mod tests {
1827    use super::*;
1828
1829    use std::cell::Cell;
1830
1831    #[test]
1832    fn effective_max_flows_explicit_value_is_used() {
1833        // An explicit value is honoured verbatim, ignoring the slab headroom
1834        // clamp (the operator opted in).
1835        assert_eq!(effective_max_flows(42, 0), 42);
1836        assert_eq!(effective_max_flows(42, 10), 42);
1837    }
1838
1839    #[test]
1840    fn effective_max_flows_auto_is_positive() {
1841        // 0 = auto: derives ~70% of RLIMIT_NOFILE, always >= 1.
1842        assert!(effective_max_flows(0, 0) >= 1);
1843    }
1844
1845    #[test]
1846    fn effective_max_flows_auto_is_clamped_to_slab_headroom() {
1847        // Auto derivation is capped at the slab headroom (max_connections) so a
1848        // UDP listener can't inflate the shared slab. A headroom of 0 disables
1849        // the clamp.
1850        assert!(effective_max_flows(0, 4) <= 4);
1851        assert!(effective_max_flows(0, 4) >= 1);
1852    }
1853
1854    #[test]
1855    fn clamp_max_rx_respects_buffer_size() {
1856        // The configured rx size is clamped to buffer_size; an unset (0)
1857        // buffer_size leaves it untouched.
1858        assert_eq!(clamp_max_rx(u32::MAX as usize, 16_384), 16_384);
1859        assert_eq!(clamp_max_rx(1_024, 16_384), 1_024);
1860        assert_eq!(clamp_max_rx(u32::MAX as usize, 0), u32::MAX as usize);
1861    }
1862
1863    fn addr(port: u16) -> SocketAddr {
1864        SocketAddr::from(([127, 0, 0, 1], port))
1865    }
1866
1867    /// A fake socket whose `send` outcome is scripted per call. Lets the
1868    /// `WriteQueue` state machine be exercised with zero real I/O: we feed a
1869    /// sequence of `SendOutcome`s and record what was sent in order.
1870    struct FakeSocket {
1871        /// Outcomes returned by successive `send` calls (front = next).
1872        script: RefCell<VecDeque<SendOutcome>>,
1873        /// Payloads that `Sent`/`Dropped` actually consumed, in order.
1874        consumed: RefCell<Vec<Vec<u8>>>,
1875        /// Default outcome once the script is exhausted.
1876        default: Cell<bool>, // true = Sent, false = WouldBlock
1877    }
1878
1879    impl FakeSocket {
1880        fn new(script: Vec<SendOutcome>, default_sent: bool) -> Self {
1881            FakeSocket {
1882                script: RefCell::new(script.into()),
1883                consumed: RefCell::new(Vec::new()),
1884                default: Cell::new(default_sent),
1885            }
1886        }
1887
1888        fn send(&self, payload: &[u8]) -> SendOutcome {
1889            let outcome = self.script.borrow_mut().pop_front().unwrap_or({
1890                if self.default.get() {
1891                    SendOutcome::Sent
1892                } else {
1893                    SendOutcome::WouldBlock
1894                }
1895            });
1896            if matches!(outcome, SendOutcome::Sent | SendOutcome::Dropped) {
1897                self.consumed.borrow_mut().push(payload.to_vec());
1898            }
1899            outcome
1900        }
1901    }
1902
1903    #[test]
1904    fn write_queue_push_until_full_then_drops() {
1905        let mut q = WriteQueue::new(2);
1906        assert!(q.is_empty());
1907        assert!(q.push(addr(1), vec![1]));
1908        assert!(q.push(addr(2), vec![2]));
1909        assert_eq!(q.len(), 2);
1910        // At capacity: the third push is rejected (caller drops + counts).
1911        assert!(!q.push(addr(3), vec![3]));
1912        assert_eq!(q.len(), 2);
1913    }
1914
1915    #[test]
1916    fn write_queue_drains_in_fifo_order_on_writable() {
1917        let mut q = WriteQueue::new(8);
1918        for i in 0..4u8 {
1919            assert!(q.push(addr(i as u16), vec![i]));
1920        }
1921        // All sends succeed: drain empties the queue, FIFO preserved.
1922        let sock = FakeSocket::new(vec![], true);
1923        let emptied = q.drain(|_dst, payload| sock.send(payload));
1924        assert!(emptied);
1925        assert!(q.is_empty());
1926        assert_eq!(
1927            *sock.consumed.borrow(),
1928            vec![vec![0u8], vec![1u8], vec![2u8], vec![3u8]]
1929        );
1930    }
1931
1932    #[test]
1933    fn write_queue_stops_on_wouldblock_and_resumes() {
1934        let mut q = WriteQueue::new(8);
1935        for i in 0..3u8 {
1936            assert!(q.push(addr(i as u16), vec![i]));
1937        }
1938        // First send ok, then WouldBlock: drain sends one and stops, leaving two.
1939        let sock = FakeSocket::new(vec![SendOutcome::Sent, SendOutcome::WouldBlock], false);
1940        let emptied = q.drain(|_dst, payload| sock.send(payload));
1941        assert!(!emptied);
1942        assert_eq!(q.len(), 2);
1943        assert_eq!(*sock.consumed.borrow(), vec![vec![0u8]]);
1944        // Front is still the second datagram (FIFO preserved across the stall).
1945        assert_eq!(q.queue.front().unwrap().1, vec![1u8]);
1946        // Second writable event: now everything goes through.
1947        let sock2 = FakeSocket::new(vec![], true);
1948        let emptied2 = q.drain(|_dst, payload| sock2.send(payload));
1949        assert!(emptied2);
1950        assert!(q.is_empty());
1951        assert_eq!(*sock2.consumed.borrow(), vec![vec![1u8], vec![2u8]]);
1952    }
1953
1954    #[test]
1955    fn write_queue_hard_error_drops_one_and_continues() {
1956        let mut q = WriteQueue::new(8);
1957        for i in 0..3u8 {
1958            assert!(q.push(addr(i as u16), vec![i]));
1959        }
1960        // Middle datagram hard-errors: it is popped + skipped, the rest proceed.
1961        let sock = FakeSocket::new(
1962            vec![SendOutcome::Sent, SendOutcome::Dropped, SendOutcome::Sent],
1963            true,
1964        );
1965        let emptied = q.drain(|_dst, payload| sock.send(payload));
1966        assert!(emptied);
1967        assert!(q.is_empty());
1968        // The dropped datagram (1) was consumed-for-accounting but the queue is
1969        // empty and the surviving datagrams (0, 2) went out in order.
1970        assert_eq!(
1971            *sock.consumed.borrow(),
1972            vec![vec![0u8], vec![1u8], vec![2u8]]
1973        );
1974    }
1975
1976    #[test]
1977    fn write_queue_empties_cleanly_when_already_empty() {
1978        let mut q = WriteQueue::new(4);
1979        let sock = FakeSocket::new(vec![], true);
1980        // Draining an empty queue is a no-op that reports emptied = true.
1981        let emptied = q.drain(|_dst, payload| sock.send(payload));
1982        assert!(emptied);
1983        assert!(q.is_empty());
1984        assert!(sock.consumed.borrow().is_empty());
1985    }
1986}