sozu_lib/udp.rs
1//! UDP load-balancing I/O shell + mio event-loop wiring (issue #1273).
2//!
3//! This module is the **impure** half of the UDP datapath: it owns every
4//! syscall, the buffer copies, the per-flow connected upstream sockets, the
5//! timer arming, the `BackendMap`/health/metrics edges, and the slab/token
6//! bookkeeping. It drives the pure sans-io core in
7//! [`crate::protocol::udp`](crate::protocol::udp) (the `UdpManager` /
8//! `UdpFlow` two-level split) through `ManagerInput` / `Output`.
9//!
10//! Architecture (mirrors `tcp.rs`, but UDP is **one-listener-many-flows**):
11//! - [`UdpListener`] wraps the single `mio::net::UdpSocket` a listener binds.
12//! There is **no accept loop** — a readable event means "datagrams waiting",
13//! not "a new connection".
14//! - [`UdpProxy`] holds the listeners, the per-listener [`UdpManager`]s, the
15//! shared `BackendMap`, the session slab and the registry. It does **not**
16//! implement `ProxyConfiguration` / `L7Proxy` (their signatures are
17//! `TcpStream`-bound); the server calls its inherent
18//! [`notify`](UdpProxy::notify) directly.
19//! - [`UdpListenerSession`] is the `ProxySession` the server's generic
20//! readiness path drives. One session backs one listener; its
21//! `update_readiness` demuxes by token (listener-token = client recv;
22//! upstream-token = backend recv). It owns the per-flow connected sockets and
23//! the `upstream_token -> FlowId` map.
24//!
25//! UDP never goes through `accept()` / `create_session()`: a `Protocol::UDP`
26//! listener readable event falls through `Server::ready`'s generic arm into
27//! `ProxySession::ready`.
28//!
29//! Long-form lifecycle (datapath, NAT return, teardown, control plane,
30//! hardening): `lib/src/protocol/udp/LIFECYCLE.md`.
31
32use std::{
33 cell::RefCell,
34 collections::{BTreeMap, HashMap, VecDeque, hash_map::Entry},
35 io::ErrorKind,
36 net::SocketAddr,
37 os::unix::io::AsRawFd,
38 rc::Rc,
39 time::{Duration, Instant},
40};
41
42use mio::{Interest, Registry, Token, net::UdpSocket, unix::SourceFd};
43use sozu_command::{
44 logging::ansi_palette,
45 proto::command::{
46 Cluster, LoadBalancingAlgorithms, LoadMetric, RequestUdpFrontend, UdpAffinityKey,
47 UdpListenerConfig, UpdateUdpListenerConfig, WorkerRequest, WorkerResponse,
48 request::RequestType,
49 },
50};
51
52use crate::metrics::names;
53use crate::{
54 CachedTags, ListenerError, ListenerHandler, Protocol, ProxyError, ProxySession,
55 SessionIsToBeClosed,
56 backends::BackendMap,
57 pool::Pool,
58 protocol::udp::{
59 CloseReason, ClusterConfig, ConfigEvent, DropReason, FlowId, ManagerInput, MetricEvent,
60 Output, UdpManager,
61 },
62 server::{SessionManager, TIMER},
63 socket::{udp_bind, udp_connect},
64 sozu_command::{ready::Ready, state::ClusterId},
65};
66
67mod health;
68pub use health::UdpHealthChecker;
69
70/// Per-session log envelope (tag `UDP`). The colored output uses the unified
71/// scheme: bold bright-white protocol label, light-grey keyword, gray keys and
72/// bright-white values. Honours the `colored` flag via [`ansi_palette`].
73macro_rules! log_context {
74 ($self:expr) => {{
75 let (open, reset, grey, gray, white) = ansi_palette();
76 format!(
77 "[- - - -]\t{open}UDP{reset}\t{grey}Listener{reset}({gray}token{reset}={white}{token}{reset}, {gray}address{reset}={white}{address}{reset})\t >>>",
78 open = open,
79 reset = reset,
80 grey = grey,
81 gray = gray,
82 white = white,
83 token = $self.listener_token.0,
84 address = $self.address,
85 )
86 }};
87}
88
89/// Module-level prefix for [`UdpProxy`] callbacks (notify, listener add/remove,
90/// soft/hard stop) which own a listener/token map but have no per-session
91/// token. Produces a bold bright-white `UDP` label in colored mode.
92macro_rules! log_module_context {
93 () => {{
94 let (open, reset, _, _, _) = sozu_command::logging::ansi_palette();
95 format!("{open}UDP{reset}\t >>>", open = open, reset = reset)
96 }};
97}
98
99/// Per-flow log envelope (tag `UDP-FLOW`). Renders the flow's stable id plus
100/// client/backend addresses so flow lines stay filterable.
101macro_rules! log_flow_context {
102 ($flow:expr, $client:expr, $backend:expr) => {{
103 let (open, reset, grey, gray, white) = sozu_command::logging::ansi_palette();
104 format!(
105 "[- - - -]\t{open}UDP-FLOW{reset}\t{grey}Flow{reset}({gray}id{reset}={white}{id}{reset}, {gray}client{reset}={white}{client}{reset}, {gray}backend{reset}={white}{backend:?}{reset})\t >>>",
106 open = open,
107 reset = reset,
108 grey = grey,
109 gray = gray,
110 white = white,
111 id = $flow,
112 client = $client,
113 backend = $backend,
114 )
115 }};
116}
117
118/// Bound on the per-flow upstream write queue (return path to one backend). A
119/// connected upstream socket only buffers one flow's traffic, so a small cap is
120/// enough to ride out a transient `EWOULDBLOCK` (kernel send buffer momentarily
121/// full) without letting a slow/stalled backend balloon memory. Past the cap we
122/// drop the datagram (`udp.datagrams.dropped.wq_full`) — UDP is best-effort and
123/// the client retries.
124const UPSTREAM_WRITE_QUEUE_CAP: usize = 64;
125/// Bound on the per-listener client-return write queue (replies fanned back to
126/// many clients through the single listener socket). Sized larger than the
127/// per-flow cap because it is shared across every flow's return traffic.
128const CLIENT_WRITE_QUEUE_CAP: usize = 256;
129
130/// What `WriteQueue::drain` should do after a single send attempt: keep
131/// draining, stop because the socket went `WouldBlock` (rearm WRITABLE), or stop
132/// because the send hit a hard error (drop the datagram, keep draining the rest).
133enum SendOutcome {
134 /// The datagram was written; pop it and continue.
135 Sent,
136 /// `WouldBlock`: leave the datagram at the front and stop draining.
137 WouldBlock,
138 /// A hard error (e.g. ECONNREFUSED): drop this datagram + count it, continue.
139 Dropped,
140}
141
142/// A bounded FIFO of datagrams awaiting a writable socket. The egress fast path
143/// (a `send`/`send_to` that succeeds immediately) never touches this; it is only
144/// engaged when the kernel send buffer is full (`WouldBlock`). On overflow the
145/// oldest-still-pending order is preserved and the *new* datagram is dropped
146/// (the queued ones are closer to leaving the socket).
147///
148/// The queue stores `(SocketAddr, Vec<u8>)`: the upstream variant ignores the
149/// address (a connected socket's `send` has an implicit destination) while the
150/// client-return variant uses it as the `send_to` destination. Keeping one type
151/// for both lets the drain loop and the unit tests stay shared.
152struct WriteQueue {
153 queue: VecDeque<(SocketAddr, Vec<u8>)>,
154 cap: usize,
155}
156
157impl WriteQueue {
158 fn new(cap: usize) -> Self {
159 WriteQueue {
160 queue: VecDeque::new(),
161 cap,
162 }
163 }
164
165 fn is_empty(&self) -> bool {
166 self.queue.is_empty()
167 }
168
169 /// Current depth. Only used by the unit tests; the runtime drains by the
170 /// `is_empty` / `drain` return value, not a length read.
171 #[cfg(test)]
172 fn len(&self) -> usize {
173 self.queue.len()
174 }
175
176 /// Enqueue a datagram for later draining. Returns `true` if it was accepted,
177 /// `false` if the queue was at capacity (caller drops + counts `wq_full`).
178 #[must_use]
179 fn push(&mut self, dst: SocketAddr, payload: Vec<u8>) -> bool {
180 if self.queue.len() >= self.cap {
181 return false;
182 }
183 self.queue.push_back((dst, payload));
184 // Invariant: the bounded FIFO never grows past its cap. The guard above
185 // is the only growth site, so depth <= cap holds after every push.
186 debug_assert!(
187 self.queue.len() <= self.cap,
188 "WriteQueue overran its cap: len {} > cap {}",
189 self.queue.len(),
190 self.cap,
191 );
192 true
193 }
194
195 /// Drain in FIFO order, calling `send` for each datagram. Stops on the first
196 /// `WouldBlock` (leaving that datagram queued for the next writable event) or
197 /// when empty. Hard-errored datagrams are popped and counted via
198 /// `SendOutcome::Dropped`. Returns `true` if the queue is now empty (the
199 /// caller can drop WRITABLE interest back to READABLE-only).
200 fn drain<F: FnMut(&SocketAddr, &[u8]) -> SendOutcome>(&mut self, mut send: F) -> bool {
201 while let Some((dst, payload)) = self.queue.front() {
202 match send(dst, payload) {
203 SendOutcome::Sent | SendOutcome::Dropped => {
204 self.queue.pop_front();
205 }
206 SendOutcome::WouldBlock => break,
207 }
208 }
209 self.queue.is_empty()
210 }
211}
212
213/// One UDP listener: a single `mio::net::UdpSocket` plus its routing config.
214/// Unlike a TCP listener there is no accept loop — a readable event is a batch
215/// of datagrams the session drains to `WouldBlock`.
216pub struct UdpListener {
217 active: SessionIsToBeClosed,
218 address: SocketAddr,
219 cluster_id: Option<String>,
220 config: UdpListenerConfig,
221 socket: Option<UdpSocket>,
222 tags: BTreeMap<String, CachedTags>,
223 token: Token,
224}
225
226impl ListenerHandler for UdpListener {
227 fn get_addr(&self) -> &SocketAddr {
228 &self.address
229 }
230
231 fn get_tags(&self, key: &str) -> Option<&CachedTags> {
232 self.tags.get(key)
233 }
234
235 fn set_tags(&mut self, key: String, tags: Option<BTreeMap<String, String>>) {
236 match tags {
237 Some(tags) => self.tags.insert(key, CachedTags::new(tags)),
238 None => self.tags.remove(&key),
239 };
240 }
241
242 fn protocol(&self) -> Protocol {
243 Protocol::UDP
244 }
245
246 fn public_address(&self) -> SocketAddr {
247 self.config
248 .public_address
249 .map(|addr| addr.into())
250 .unwrap_or(self.address)
251 }
252}
253
254impl UdpListener {
255 fn new(config: UdpListenerConfig, token: Token) -> Result<UdpListener, ListenerError> {
256 Ok(UdpListener {
257 cluster_id: None,
258 socket: None,
259 token,
260 address: config.address.into(),
261 config,
262 active: false,
263 tags: BTreeMap::new(),
264 })
265 }
266
267 /// Bind (or adopt an SCM-passed) socket and register it `READABLE`. The
268 /// READABLE registration is what drives `Server::ready` for this listener —
269 /// there is no accept path.
270 pub fn activate(
271 &mut self,
272 registry: &Registry,
273 udp_socket: Option<UdpSocket>,
274 ) -> Result<Token, ProxyError> {
275 if self.active {
276 return Ok(self.token);
277 }
278
279 let mut socket = match udp_socket {
280 Some(socket) => socket,
281 None => {
282 let address: SocketAddr = self.config.address.into();
283 udp_bind(address).map_err(|e| ProxyError::BindToSocket(address, e))?
284 }
285 };
286
287 registry
288 .register(&mut socket, self.token, Interest::READABLE)
289 .map_err(ProxyError::RegisterListener)?;
290
291 self.socket = Some(socket);
292 self.active = true;
293 Ok(self.token)
294 }
295
296 /// Apply a partial-update patch to this UDP listener's live config. Fields
297 /// absent in the patch (`None`) are preserved.
298 pub fn update_config(&mut self, patch: &UpdateUdpListenerConfig) {
299 if let Some(v) = patch.public_address {
300 self.config.public_address = Some(v);
301 }
302 if let Some(v) = patch.front_timeout {
303 self.config.front_timeout = v;
304 }
305 if let Some(v) = patch.back_timeout {
306 self.config.back_timeout = v;
307 }
308 if let Some(v) = patch.max_rx_datagram_size {
309 self.config.max_rx_datagram_size = v;
310 }
311 if let Some(v) = patch.max_flows {
312 self.config.max_flows = v;
313 }
314 }
315}
316
317/// The UDP proxy. Holds the listeners, one `UdpManager` per listener token, the
318/// shared `BackendMap`, the session slab and a cloned registry. Does NOT
319/// implement `ProxyConfiguration` / `L7Proxy`; the server drives it through the
320/// inherent [`notify`](Self::notify) plus the activate/give-back helpers.
321pub struct UdpProxy {
322 fronts: HashMap<String, Token>,
323 backends: Rc<RefCell<BackendMap>>,
324 listeners: HashMap<Token, Rc<RefCell<UdpListener>>>,
325 /// The built listener session per listener token. The server inserts the
326 /// same `Rc` into the slab at the listener token; the proxy keeps a clone so
327 /// it can drive per-flow teardown (soft/hard stop) without downcasting the
328 /// slab's `dyn ProxySession`. Cleared on listener removal/stop.
329 listener_sessions: HashMap<Token, Rc<RefCell<UdpListenerSession>>>,
330 /// One sans-io manager per listener token, sharing the listener's lifecycle.
331 managers: HashMap<Token, Rc<RefCell<UdpManager>>>,
332 /// Cluster routing for each listener token (set by the UDP frontend).
333 cluster_for_listener: HashMap<Token, ClusterId>,
334 /// Last `AddCluster`-supplied UDP knobs per cluster id. Cached so a frontend
335 /// added AFTER its cluster still picks up `responses` / `requests` / PPv2 /
336 /// affinity — `AddCluster` and `AddUdpFrontend` arrive in either order, and
337 /// neither must clobber the other's contribution to a manager's
338 /// `ClusterConfig`.
339 cluster_udp_config: HashMap<ClusterId, sozu_command::proto::command::UdpClusterConfig>,
340 registry: Registry,
341 sessions: Rc<RefCell<SessionManager>>,
342 #[allow(dead_code)]
343 pool: Rc<RefCell<Pool>>,
344 /// Fixed hash seed injected once into every manager. It is deliberately the
345 /// same constant on every worker and across restarts so HRW/Maglev affinity
346 /// is reproducible cluster-wide — a per-worker random seed would scatter the
347 /// same flow key onto a different backend on each worker and reshuffle it on
348 /// every restart, defeating the documented affinity-stability contract (see
349 /// [`crate::load_balancing::DEFAULT_HASH_SEED`]).
350 hash_seed: u64,
351 /// Global `max_connections` (from `ServerConfig`). Used to clamp a UDP
352 /// listener's auto `max_flows` so a single listener cannot inflate the
353 /// shared `SessionManager` slab and starve HTTP/TCP.
354 max_connections: usize,
355 /// Global `buffer_size` (from `ServerConfig`). Used to clamp a listener's
356 /// `max_rx_datagram_size` so a hostile `AddUdpListener` cannot allocate a
357 /// multi-GB recv buffer.
358 buffer_size: usize,
359 /// Endpoint-bound active health prober (TCP probe + hysteresis +
360 /// fail-open). Driven from the server event loop via
361 /// [`UdpProxy::health_poll`] / [`UdpProxy::health_ready`].
362 health: UdpHealthChecker,
363}
364
365impl UdpProxy {
366 pub fn new(
367 registry: Registry,
368 sessions: Rc<RefCell<SessionManager>>,
369 pool: Rc<RefCell<Pool>>,
370 backends: Rc<RefCell<BackendMap>>,
371 max_connections: usize,
372 buffer_size: usize,
373 ) -> UdpProxy {
374 // Fixed seed, identical on every worker and across restarts: HRW/Maglev
375 // affinity must be reproducible cluster-wide. A per-worker/per-restart
376 // random seed (as a `process::id()` + `Instant::now()` mix would be)
377 // would route the same flow key to a different backend on each worker
378 // and reshuffle it on every restart. Reuse the LB module's canonical
379 // affinity seed so UDP and the HTTP/TCP affinity hashers agree.
380 let hash_seed = crate::load_balancing::DEFAULT_HASH_SEED;
381 UdpProxy {
382 backends,
383 listeners: HashMap::new(),
384 listener_sessions: HashMap::new(),
385 managers: HashMap::new(),
386 cluster_for_listener: HashMap::new(),
387 cluster_udp_config: HashMap::new(),
388 fronts: HashMap::new(),
389 registry,
390 sessions,
391 pool,
392 hash_seed,
393 max_connections,
394 buffer_size,
395 health: UdpHealthChecker::new(),
396 }
397 }
398
399 /// Drive the UDP health prober one event-loop step (server calls this once
400 /// per iteration, mirroring `HealthChecker::poll`). Non-blocking.
401 pub fn health_poll(&mut self) {
402 let registry = self.registry.try_clone();
403 if let Ok(registry) = registry {
404 self.health.poll(&self.backends, ®istry);
405 }
406 }
407
408 /// Record mio readiness for a UDP health-probe socket.
409 pub fn health_ready(&mut self, token: Token) {
410 self.health.ready(token);
411 }
412
413 /// Whether `token` is a UDP health-probe socket this proxy owns.
414 pub fn health_owns_token(&self, token: Token) -> bool {
415 self.health.owns_token(token)
416 }
417
418 pub fn add_listener(
419 &mut self,
420 config: UdpListenerConfig,
421 token: Token,
422 ) -> Result<Token, ProxyError> {
423 match self.listeners.entry(token) {
424 Entry::Vacant(entry) => {
425 let mut config = config;
426 let max_flows = effective_max_flows(config.max_flows, self.max_connections);
427 // Defense in depth: cap the recv-buffer sizing to `buffer_size`
428 // so a hostile `AddUdpListener` (`max_rx_datagram_size =
429 // u32::MAX`) can't allocate a multi-GB buffer. Write the clamped
430 // value back into the stored config so the manager AND the
431 // session's `recv_buf` (sized from `config` in `new()`) agree on
432 // the same bound.
433 let max_rx = clamp_max_rx(config.max_rx_datagram_size as usize, self.buffer_size);
434 config.max_rx_datagram_size = max_rx as u32;
435 let front = Duration::from_secs(u64::from(config.front_timeout));
436 let back = Duration::from_secs(u64::from(config.back_timeout));
437 let listener = UdpListener::new(config, token).map_err(ProxyError::AddListener)?;
438 entry.insert(Rc::new(RefCell::new(listener)));
439 let cluster_cfg = ClusterConfig {
440 front_timeout: front,
441 back_timeout: back,
442 ..Default::default()
443 };
444 self.managers.insert(
445 token,
446 Rc::new(RefCell::new(UdpManager::new(
447 cluster_cfg,
448 max_flows,
449 max_rx,
450 self.hash_seed,
451 ))),
452 );
453 Ok(token)
454 }
455 _ => Err(ProxyError::ListenerAlreadyPresent),
456 }
457 }
458
459 pub fn remove_listener(&mut self, address: SocketAddr) -> SessionIsToBeClosed {
460 let len = self.listeners.len();
461 let mut removed_tokens = Vec::new();
462 self.listeners.retain(|token, l| {
463 if l.borrow().address == address {
464 removed_tokens.push(*token);
465 false
466 } else {
467 true
468 }
469 });
470 let now = Instant::now();
471 for token in removed_tokens {
472 self.cluster_for_listener.remove(&token);
473 // Drive per-flow teardown THROUGH the manager (emits FlowEvicted +
474 // CloseFlow per flow) BEFORE dropping the manager, so the
475 // active-flows gauge is decremented once per flow and the per-flow
476 // upstream sockets + slab slots are freed. Removing the manager first
477 // would silently leak the gauge by N.
478 if let Some(session) = self.listener_sessions.remove(&token) {
479 session.borrow_mut().close_all_flows(now);
480 }
481 self.managers.remove(&token);
482 }
483 self.listeners.len() < len
484 }
485
486 pub fn activate_listener(
487 &self,
488 addr: &SocketAddr,
489 udp_socket: Option<UdpSocket>,
490 ) -> Result<Token, ProxyError> {
491 let listener = self
492 .listeners
493 .values()
494 .find(|listener| listener.borrow().address == *addr)
495 .ok_or(ProxyError::NoListenerFound(*addr))?;
496
497 listener.borrow_mut().activate(&self.registry, udp_socket)
498 }
499
500 /// Build the [`UdpListenerSession`] that drives this listener's datagrams.
501 /// The server inserts the returned session into the slab **at the listener
502 /// token**, replacing the `ListenSession` placeholder, so the generic
503 /// readiness path reaches `UdpListenerSession::update_readiness`. Returns
504 /// `None` if the listener token is unknown.
505 pub fn build_session(&mut self, token: Token) -> Option<Rc<RefCell<UdpListenerSession>>> {
506 let listener = self.listeners.get(&token)?.clone();
507 let manager = self.managers.get(&token)?.clone();
508 let registry = self.registry.try_clone().ok()?;
509 let session = Rc::new(RefCell::new(UdpListenerSession::new(
510 listener,
511 manager,
512 self.backends.clone(),
513 registry,
514 self.sessions.clone(),
515 token,
516 )));
517 // Keep a clone so soft/hard stop can drive per-flow teardown.
518 self.listener_sessions.insert(token, session.clone());
519 Some(session)
520 }
521
522 pub fn give_back_listeners(&mut self) -> Vec<(SocketAddr, UdpSocket)> {
523 self.listeners
524 .values()
525 .filter_map(|listener| {
526 let mut owned = listener.borrow_mut();
527 if let Some(socket) = owned.socket.take() {
528 owned.active = false;
529 return Some((owned.address, socket));
530 }
531 None
532 })
533 .collect()
534 }
535
536 pub fn give_back_listener(
537 &mut self,
538 address: SocketAddr,
539 ) -> Result<(Token, UdpSocket), ProxyError> {
540 let listener = self
541 .listeners
542 .values()
543 .find(|listener| listener.borrow().address == address)
544 .ok_or(ProxyError::NoListenerFound(address))?;
545
546 let (token, taken) = {
547 let mut owned = listener.borrow_mut();
548 let taken = owned.socket.take().ok_or(ProxyError::UnactivatedListener)?;
549 owned.active = false;
550 (owned.token, taken)
551 };
552 // Deactivating removes the listener from service: tear down its active
553 // flows THROUGH the manager so the per-flow upstream slab slots + fds
554 // don't dangle and the active-flows gauge is decremented once per flow
555 // (the manager is RETAINED here — not removed from `self.managers` — so
556 // `close_all` also resets its flow table, keeping manager and shell
557 // consistent for a later reactivate).
558 if let Some(session) = self.listener_sessions.remove(&token) {
559 session.borrow_mut().close_all_flows(Instant::now());
560 }
561 Ok((token, taken))
562 }
563
564 pub fn update_listener(&mut self, patch: UpdateUdpListenerConfig) -> Result<(), ProxyError> {
565 let address: SocketAddr = patch.address.into();
566 let listener = self
567 .listeners
568 .values()
569 .find(|l| l.borrow().address == address)
570 .ok_or(ProxyError::NoListenerFound(address))?;
571 {
572 let mut l = listener.borrow_mut();
573 l.update_config(&patch);
574 // Clamp the stored rx size in place (defense in depth): keep the
575 // listener config, the manager, and the session's `recv_buf` agreeing
576 // on the same `buffer_size`-bounded value.
577 l.config.max_rx_datagram_size =
578 clamp_max_rx(l.config.max_rx_datagram_size as usize, self.buffer_size) as u32;
579 }
580
581 // Reflect the timeout / cap / rx-size changes into the manager for new
582 // flows. Existing flows keep their captured config (stable contract).
583 if let Some(token) = self
584 .listeners
585 .iter()
586 .find(|(_, l)| l.borrow().address == address)
587 .map(|(t, _)| *t)
588 && let Some(mgr) = self.managers.get(&token)
589 {
590 let now = Instant::now();
591 let (cfg, max_flows, max_rx) = {
592 let l = listener.borrow();
593 (
594 self.cluster_config_for(&l, token),
595 effective_max_flows(l.config.max_flows, self.max_connections),
596 // Defense in depth: clamp the raised rx size to `buffer_size`
597 // so a hostile `UpdateUdpListener` can't grow the recv buffer
598 // past the global cap.
599 clamp_max_rx(l.config.max_rx_datagram_size as usize, self.buffer_size),
600 )
601 };
602 {
603 let mut m = mgr.borrow_mut();
604 m.handle_input(ManagerInput::Config(ConfigEvent::SetCluster(cfg)), now);
605 m.handle_input(
606 ManagerInput::Config(ConfigEvent::SetMaxFlows(max_flows)),
607 now,
608 );
609 m.handle_input(
610 ManagerInput::Config(ConfigEvent::SetMaxRxDatagramSize(max_rx)),
611 now,
612 );
613 }
614 // Resize the live session's recv scratch to match the (clamped) new
615 // rx size. Without this, a config that RAISES `max_rx_datagram_size`
616 // would leave `recv_buf` at its old (smaller) length: `recv_from`
617 // would kernel-truncate datagrams between the old and new size while
618 // the manager's `len > max_rx` check (now the larger value) passes
619 // them, forwarding them truncated. The `+ 1` mirrors `new()`: it lets
620 // the manager observe `len == max_rx + 1 > max_rx` and drop an
621 // oversized datagram (`DropReason::Truncated`) instead of silently
622 // truncating it. Existing flows keep their captured config; only the
623 // shared recv buffer tracks the live rx size.
624 if let Some(session) = self.listener_sessions.get(&token) {
625 session.borrow_mut().resize_recv_buf(max_rx);
626 }
627 }
628 Ok(())
629 }
630
631 pub fn add_udp_front(&mut self, front: RequestUdpFrontend) -> Result<(), ProxyError> {
632 let address = front.address.into();
633 let token = {
634 let mut listener = self
635 .listeners
636 .values()
637 .find(|l| l.borrow().address == address)
638 .ok_or(ProxyError::NoListenerFound(address))?
639 .borrow_mut();
640 self.fronts
641 .insert(front.cluster_id.to_string(), listener.token);
642 listener.set_tags(address.to_string(), Some(front.tags));
643 listener.cluster_id = Some(front.cluster_id.clone());
644 listener.token
645 };
646 self.cluster_for_listener
647 .insert(token, front.cluster_id.clone());
648
649 // Commit the cluster routing into the manager so admitted flows know
650 // which cluster to `SelectBackend` against.
651 if let Some(mgr) = self.managers.get(&token) {
652 let listener = self.listeners.get(&token).unwrap();
653 let cfg = {
654 let l = listener.borrow();
655 self.cluster_config_for(&l, token)
656 };
657 mgr.borrow_mut().handle_input(
658 ManagerInput::Config(ConfigEvent::SetCluster(cfg)),
659 Instant::now(),
660 );
661 }
662 Ok(())
663 }
664
665 pub fn remove_udp_front(&mut self, front: RequestUdpFrontend) -> Result<(), ProxyError> {
666 let address = front.address.into();
667 let token = {
668 let mut listener = match self
669 .listeners
670 .values()
671 .find(|l| l.borrow().address == address)
672 {
673 Some(l) => l.borrow_mut(),
674 None => return Err(ProxyError::NoListenerFound(address)),
675 };
676 listener.set_tags(address.to_string(), None);
677 if let Some(cluster_id) = listener.cluster_id.take() {
678 self.fronts.remove(&cluster_id);
679 }
680 listener.token
681 };
682 self.cluster_for_listener.remove(&token);
683 // Drop the routing in the manager — new datagrams now have no backend.
684 if let Some(mgr) = self.managers.get(&token) {
685 mgr.borrow_mut().handle_input(
686 ManagerInput::Config(ConfigEvent::SetCluster(ClusterConfig::default())),
687 Instant::now(),
688 );
689 }
690 Ok(())
691 }
692
693 /// Build a [`ClusterConfig`] for a listener from its current frontend
694 /// cluster routing + timeouts. The per-cluster knobs (responses/requests/
695 /// PPv2/affinity) are populated by [`apply_cluster`](Self::apply_cluster)
696 /// when an `AddCluster` carries a `udp` block; absent that they stay at the
697 /// proto defaults.
698 fn cluster_config_for(&self, listener: &UdpListener, _token: Token) -> ClusterConfig {
699 let cluster = listener.cluster_id.clone().unwrap_or_default();
700 let mut cfg = ClusterConfig {
701 cluster: cluster.clone(),
702 front_timeout: Duration::from_secs(u64::from(listener.config.front_timeout)),
703 back_timeout: Duration::from_secs(u64::from(listener.config.back_timeout)),
704 ..Default::default()
705 };
706 // Fold in the cluster's cached UDP knobs (set by a prior or later
707 // `AddCluster`) so the order of `AddCluster` vs `AddUdpFrontend` does not
708 // matter — both rebuilds of a manager's `ClusterConfig` converge on the
709 // same per-cluster contract.
710 if let Some(udp) = self.cluster_udp_config.get(&cluster) {
711 apply_udp_knobs(&mut cfg, udp);
712 }
713 cfg
714 }
715
716 /// Apply an `AddCluster` to every listener routing to it: fold the UDP
717 /// cluster knobs (affinity / responses / requests / PPv2) into the
718 /// manager's `ClusterConfig`, and rebuild the LB policy (HRW/Maglev table)
719 /// via the shared `BackendMap`.
720 fn apply_cluster(&mut self, cluster: &Cluster) {
721 // 1. LB policy (HRW/Maglev rebuild happens inside the BackendMap).
722 self.backends
723 .borrow_mut()
724 .set_load_balancing_policy_for_cluster(
725 &cluster.cluster_id,
726 LoadBalancingAlgorithms::try_from(cluster.load_balancing).unwrap_or_default(),
727 cluster
728 .load_metric
729 .and_then(|n| LoadMetric::try_from(n).ok()),
730 );
731
732 // 1b. Health settings from the cluster's `udp.health` block, if any.
733 // Mode HEALTH_OFF / no health block disables probing for this cluster.
734 let health_settings = cluster.udp.as_ref().and_then(|udp| {
735 udp.health.as_ref().and_then(|h| {
736 let mode = h
737 .mode
738 .and_then(|m| sozu_command::proto::command::UdpHealthMode::try_from(m).ok());
739 match mode {
740 Some(sozu_command::proto::command::UdpHealthMode::HealthOff) => None,
741 // TCP_PROBE (default when a health block is present) and
742 // UDP_PROBE both schedule the TCP-probe state machine; the
743 // app UDP-probe payload rides along for the secondary check.
744 _ => Some(health::UdpHealthSettings::from_proto(h)),
745 }
746 })
747 });
748 self.health
749 .set_cluster(&cluster.cluster_id, health_settings, &self.registry);
750
751 // 1c. Cache the UDP knobs so a frontend added in EITHER order picks them
752 // up via `cluster_config_for`. An `AddCluster` without a `udp` block
753 // clears the cache (back to proto defaults).
754 match &cluster.udp {
755 Some(udp) => {
756 self.cluster_udp_config
757 .insert(cluster.cluster_id.clone(), udp.clone());
758 }
759 None => {
760 self.cluster_udp_config.remove(&cluster.cluster_id);
761 }
762 }
763
764 // 2. Per-cluster UDP knobs into the managers routing to this cluster.
765 // `cluster_config_for` now folds the cached knobs in, so the rebuild is
766 // identical regardless of whether the frontend or the cluster came
767 // first.
768 let now = Instant::now();
769 let tokens: Vec<Token> = self
770 .cluster_for_listener
771 .iter()
772 .filter(|(_, c)| **c == cluster.cluster_id)
773 .map(|(t, _)| *t)
774 .collect();
775 for token in tokens {
776 let Some(listener) = self.listeners.get(&token) else {
777 continue;
778 };
779 let cfg = {
780 let l = listener.borrow();
781 self.cluster_config_for(&l, token)
782 };
783 if let Some(mgr) = self.managers.get(&token) {
784 mgr.borrow_mut()
785 .handle_input(ManagerInput::Config(ConfigEvent::SetCluster(cfg)), now);
786 }
787 }
788 }
789
790 /// Inherent dispatch entry point — the server calls this directly (UDP does
791 /// not implement `ProxyConfiguration`). Handles UDP frontends, cluster
792 /// config, listener removal, and stop. No accept / create_session.
793 pub fn notify(&mut self, message: WorkerRequest) -> WorkerResponse {
794 let request_type = match message.content.request_type {
795 Some(t) => t,
796 None => return WorkerResponse::error(message.id, "Empty request"),
797 };
798 match request_type {
799 RequestType::AddUdpFrontend(front) => match self.add_udp_front(front) {
800 Ok(()) => WorkerResponse::ok(message.id),
801 Err(err) => WorkerResponse::error(message.id, err),
802 },
803 RequestType::RemoveUdpFrontend(front) => match self.remove_udp_front(front) {
804 Ok(()) => WorkerResponse::ok(message.id),
805 Err(err) => WorkerResponse::error(message.id, err),
806 },
807 RequestType::AddCluster(cluster) => {
808 self.apply_cluster(&cluster);
809 WorkerResponse::ok(message.id)
810 }
811 RequestType::RemoveCluster(cluster_id) => {
812 let tokens: Vec<Token> = self
813 .cluster_for_listener
814 .iter()
815 .filter(|(_, c)| **c == cluster_id)
816 .map(|(t, _)| *t)
817 .collect();
818 for token in tokens {
819 if let Some(mgr) = self.managers.get(&token) {
820 mgr.borrow_mut().handle_input(
821 ManagerInput::Config(ConfigEvent::SetCluster(ClusterConfig::default())),
822 Instant::now(),
823 );
824 }
825 }
826 self.cluster_udp_config.remove(&cluster_id);
827 self.health.remove_cluster(&cluster_id, &self.registry);
828 WorkerResponse::ok(message.id)
829 }
830 RequestType::SoftStop(_) => {
831 info!(
832 "{} {} processing soft shutdown",
833 log_module_context!(),
834 message.id
835 );
836 // Drain: admit no new flows. Then actively tear down existing
837 // flows so the worker reaches `base_sessions_count` and exits
838 // promptly instead of waiting out each flow's idle timeout. UDP
839 // has no half-sent response to preserve, so an immediate flow
840 // teardown on soft-stop is the right graceful behavior (a stray
841 // in-flight reply may be lost, which is acceptable for a
842 // best-effort datagram proxy).
843 let now = Instant::now();
844 for mgr in self.managers.values() {
845 mgr.borrow_mut()
846 .handle_input(ManagerInput::Config(ConfigEvent::Drain), now);
847 }
848 // Drive teardown through each manager (FlowEvicted + CloseFlow
849 // per flow) so the active-flows gauge balances to zero.
850 for session in self.listener_sessions.values() {
851 session.borrow_mut().close_all_flows(now);
852 }
853 self.listener_sessions.clear();
854 let listeners: HashMap<_, _> = self.listeners.drain().collect();
855 for (_, l) in listeners.iter() {
856 l.borrow_mut()
857 .socket
858 .take()
859 .map(|mut sock| self.registry.deregister(&mut sock));
860 }
861 WorkerResponse::processing(message.id)
862 }
863 RequestType::HardStop(_) => {
864 info!("{} {} hard shutdown", log_module_context!(), message.id);
865 let now = Instant::now();
866 // Drive teardown through each manager (FlowEvicted + CloseFlow
867 // per flow) so the active-flows gauge balances to zero before the
868 // managers are dropped below.
869 for session in self.listener_sessions.values() {
870 session.borrow_mut().close_all_flows(now);
871 }
872 self.listener_sessions.clear();
873 let mut listeners: HashMap<_, _> = self.listeners.drain().collect();
874 for (_, l) in listeners.drain() {
875 l.borrow_mut()
876 .socket
877 .take()
878 .map(|mut sock| self.registry.deregister(&mut sock));
879 }
880 self.managers.clear();
881 WorkerResponse::ok(message.id)
882 }
883 RequestType::Status(_) => {
884 info!("{} {} status", log_module_context!(), message.id);
885 WorkerResponse::ok(message.id)
886 }
887 RequestType::RemoveListener(remove) => {
888 if !self.remove_listener(remove.address.into()) {
889 WorkerResponse::error(
890 message.id,
891 format!("no UDP listener to remove at address {:?}", remove.address),
892 )
893 } else {
894 WorkerResponse::ok(message.id)
895 }
896 }
897 command => {
898 debug!(
899 "{} {} unsupported message for UDP proxy, ignoring {:?}",
900 log_module_context!(),
901 message.id,
902 command
903 );
904 WorkerResponse::error(message.id, "unsupported message")
905 }
906 }
907 }
908}
909
910/// Fold a proto [`UdpClusterConfig`](sozu_command::proto::command::UdpClusterConfig)
911/// into a [`ClusterConfig`], applying the proto defaults for absent fields.
912/// Single source of truth so `apply_cluster` (live push) and
913/// `cluster_config_for` (frontend add / rebuild) never diverge.
914fn apply_udp_knobs(cfg: &mut ClusterConfig, udp: &sozu_command::proto::command::UdpClusterConfig) {
915 cfg.affinity_with_port = matches!(
916 udp.affinity_key
917 .and_then(|k| UdpAffinityKey::try_from(k).ok()),
918 Some(UdpAffinityKey::SourceIpPort)
919 );
920 cfg.responses = udp.responses.unwrap_or(0);
921 cfg.requests = udp.requests.unwrap_or(0);
922 cfg.send_proxy_protocol = udp.send_proxy_protocol.unwrap_or(false);
923 cfg.proxy_protocol_every_datagram = udp.proxy_protocol_every_datagram.unwrap_or(false);
924}
925
926/// Fallback auto `max_flows` when `RLIMIT_NOFILE` can't be read.
927const DEFAULT_AUTO_MAX_FLOWS: usize = 1024;
928
929/// `max_flows == 0` means "auto": ~70% of the soft `RLIMIT_NOFILE`, so the fd
930/// budget adapts to the host without hand-tuning (one fd + one slab slot per
931/// flow). Falls back to a conservative constant when the limit can't be read.
932///
933/// The auto derivation is clamped to `slab_headroom` (the global
934/// `max_connections`): every admitted flow consumes one shared `SessionManager`
935/// slab slot, so an unclamped ~70%-of-RLIMIT value on a host with a very large
936/// fd limit could try to inflate the slab to hundreds of thousands of entries
937/// and starve HTTP/TCP. An *explicitly configured* `max_flows` is honoured as-is
938/// (the operator opted in); only the auto value is capped. A `slab_headroom` of
939/// 0 (no connection budget configured) disables the clamp.
940fn effective_max_flows(configured: u32, slab_headroom: usize) -> usize {
941 if configured != 0 {
942 return configured as usize;
943 }
944 let auto = {
945 #[cfg(unix)]
946 {
947 let mut limit = libc::rlimit {
948 rlim_cur: 0,
949 rlim_max: 0,
950 };
951 // SAFETY: `getrlimit` writes a fully-initialised `rlimit` into
952 // `limit`; we read the result only on success (`== 0`).
953 let ret = unsafe { libc::getrlimit(libc::RLIMIT_NOFILE, &mut limit) };
954 if ret == 0 && limit.rlim_cur > 0 {
955 let soft = limit.rlim_cur;
956 ((soft.saturating_mul(7)) / 10).max(1) as usize
957 } else {
958 DEFAULT_AUTO_MAX_FLOWS
959 }
960 }
961 #[cfg(not(unix))]
962 {
963 DEFAULT_AUTO_MAX_FLOWS
964 }
965 };
966 if slab_headroom == 0 {
967 auto
968 } else {
969 auto.min(slab_headroom).max(1)
970 }
971}
972
973/// Clamp a configured `max_rx_datagram_size` to the global `buffer_size`
974/// (defense in depth): the per-session `recv_buf` is sized `max_rx + 1`, so a
975/// raw worker `AddUdpListener`/`UpdateUdpListener` carrying e.g.
976/// `max_rx_datagram_size = u32::MAX` must NOT be able to allocate a ~4 GB
977/// buffer. A `buffer_size` of 0 (unset) leaves the value untouched.
978fn clamp_max_rx(configured: usize, buffer_size: usize) -> usize {
979 if buffer_size == 0 {
980 configured
981 } else {
982 configured.min(buffer_size)
983 }
984}
985
986/// The `ProxySession` backing one UDP listener. The server's generic readiness
987/// path drives this (UDP is not in the listen-accept arm). It owns the per-flow
988/// connected upstream sockets and demuxes events by token.
989pub struct UdpListenerSession {
990 /// The listener this session serves.
991 listener: Rc<RefCell<UdpListener>>,
992 /// The sans-io manager for this listener.
993 manager: Rc<RefCell<UdpManager>>,
994 /// Shared backend map for LB selection.
995 backends: Rc<RefCell<BackendMap>>,
996 /// Cloned mio registry for per-flow socket (de)registration.
997 registry: Registry,
998 /// Slab the per-flow upstream tokens are inserted into (the same `Rc` the
999 /// listener-session is registered under: multi-token pattern).
1000 sessions: Rc<RefCell<SessionManager>>,
1001 /// The listener's own token (client recv + timer key).
1002 listener_token: Token,
1003 /// Listener address (for logging).
1004 address: SocketAddr,
1005 /// Per-flow connected upstream sockets, keyed by their slab token.
1006 upstream_sockets: HashMap<Token, UdpSocket>,
1007 /// Per-flow bounded egress queues for the forward path, keyed by the
1008 /// upstream token. A datagram lands here only when the connected upstream
1009 /// socket returns `WouldBlock`; the socket is then reregistered
1010 /// `READABLE | WRITABLE` and drained on the next writable event. Dropped
1011 /// together with the socket on flow close (no leak, gauge stays correct).
1012 upstream_write_queues: HashMap<Token, WriteQueue>,
1013 /// Bounded egress queue for the client-return path (replies fanned back
1014 /// through the single listener socket via `send_to`). Engaged only when the
1015 /// listener socket returns `WouldBlock`; the listener is then reregistered
1016 /// `READABLE | WRITABLE` and drained on its next writable event.
1017 client_write_queue: WriteQueue,
1018 /// `upstream_token -> FlowId` for NAT-return demux.
1019 upstream_to_flow: HashMap<Token, FlowId>,
1020 /// `FlowId -> upstream_token` to tear down on close.
1021 flow_to_upstream: HashMap<FlowId, Token>,
1022 /// `FlowId -> admission Instant` for `udp.flow.duration` on close.
1023 flow_started: HashMap<FlowId, Instant>,
1024 /// `FlowId -> (client, backend)` for the close access log.
1025 flow_endpoints: HashMap<FlowId, (SocketAddr, Option<SocketAddr>)>,
1026 /// The client source whose datagram is currently being drained. Lets
1027 /// `on_send_to_backend` resolve the right per-flow upstream socket for an
1028 /// already-established flow (where no `OpenUpstream` precedes the send).
1029 /// `None` while draining backend-side or timer-driven outputs.
1030 in_flight_client: Option<SocketAddr>,
1031 /// The flow whose upstream was just opened in this drain pass — covers the
1032 /// new-flow path where `OpenUpstream{flow}` immediately precedes the first
1033 /// `SendToBackend`.
1034 in_flight_flow: Option<FlowId>,
1035 /// Shadow of the manager's flow table: normalised client key → `FlowId`.
1036 /// Lets the shell resolve the owning flow for a `SendToBackend` on an
1037 /// established flow from the in-flight client source. Kept in lockstep with
1038 /// `OpenUpstream` / `CloseFlow`.
1039 client_key_to_flow: HashMap<SocketAddr, FlowId>,
1040 /// Reusable recv scratch buffer, sized to `max_rx_datagram_size`.
1041 recv_buf: Vec<u8>,
1042 /// The currently-armed `TIMER` handle, so a re-arm cancels the previous
1043 /// deadline instead of leaking timer-slab entries (the manager only emits a
1044 /// fresh `ArmTimer` when the deadline actually changes).
1045 timer_handle: Option<crate::timer::Timeout>,
1046}
1047
1048impl UdpListenerSession {
1049 #[allow(clippy::too_many_arguments)]
1050 pub fn new(
1051 listener: Rc<RefCell<UdpListener>>,
1052 manager: Rc<RefCell<UdpManager>>,
1053 backends: Rc<RefCell<BackendMap>>,
1054 registry: Registry,
1055 sessions: Rc<RefCell<SessionManager>>,
1056 listener_token: Token,
1057 ) -> UdpListenerSession {
1058 let (address, max_rx) = {
1059 let l = listener.borrow();
1060 (l.address, l.config.max_rx_datagram_size as usize)
1061 };
1062 UdpListenerSession {
1063 listener,
1064 manager,
1065 backends,
1066 registry,
1067 sessions,
1068 listener_token,
1069 address,
1070 upstream_sockets: HashMap::new(),
1071 upstream_write_queues: HashMap::new(),
1072 client_write_queue: WriteQueue::new(CLIENT_WRITE_QUEUE_CAP),
1073 upstream_to_flow: HashMap::new(),
1074 flow_to_upstream: HashMap::new(),
1075 flow_started: HashMap::new(),
1076 flow_endpoints: HashMap::new(),
1077 in_flight_client: None,
1078 in_flight_flow: None,
1079 client_key_to_flow: HashMap::new(),
1080 // Size the recv scratch to `max_rx + 1`, NOT `max_rx`: a UDP
1081 // `recv_from` truncates the datagram to the buffer length and
1082 // silently discards the tail. If the buffer were exactly `max_rx`,
1083 // an oversized datagram would arrive as a `max_rx`-byte payload —
1084 // indistinguishable from a legal one — and be forwarded truncated.
1085 // The extra byte lets the manager observe `len == max_rx + 1 >
1086 // max_rx` and drop it (`DropReason::Truncated`) instead.
1087 recv_buf: vec![0u8; max_rx.saturating_add(1).max(1)],
1088 timer_handle: None,
1089 }
1090 }
1091
1092 /// Resize the recv scratch buffer to `max_rx + 1` (the same `+ 1` sizing as
1093 /// [`new`](Self::new): the extra byte lets the manager observe an oversized
1094 /// datagram as `len == max_rx + 1 > max_rx` and drop it as `Truncated`
1095 /// rather than silently forwarding a kernel-truncated payload). Called from
1096 /// `UdpProxy::update_listener` when a config push changes the listener's
1097 /// `max_rx_datagram_size`. Resizing larger zero-fills the new tail; resizing
1098 /// smaller truncates the (idle, between-datagram) scratch — both are safe
1099 /// because the buffer holds no live datagram across calls.
1100 fn resize_recv_buf(&mut self, max_rx: usize) {
1101 self.recv_buf.resize(max_rx.saturating_add(1).max(1), 0u8);
1102 }
1103
1104 /// Normalise a client source the same way the active manager config keys
1105 /// flows (4-tuple when `affinity_with_port`, else source-IP with port 0).
1106 fn client_key(&self, src: SocketAddr) -> SocketAddr {
1107 let with_port = self.manager.borrow().affinity_with_port();
1108 if with_port {
1109 src
1110 } else {
1111 let mut s = src;
1112 s.set_port(0);
1113 s
1114 }
1115 }
1116
1117 /// Drain every datagram waiting on the listener socket into the manager.
1118 /// Edge-triggered epoll: loop `recv_from` to `WouldBlock`.
1119 fn ingest_client(&mut self, now: Instant) {
1120 // Pull the socket out behind a short borrow then operate on it.
1121 loop {
1122 let result = {
1123 let listener = self.listener.borrow();
1124 let Some(socket) = listener.socket.as_ref() else {
1125 return;
1126 };
1127 socket.recv_from(&mut self.recv_buf)
1128 };
1129 match result {
1130 Ok((len, src)) => {
1131 // Mark the in-flight client so `on_send_to_backend` can
1132 // resolve the owning flow's upstream socket for an
1133 // already-established flow (no `OpenUpstream` precedes it).
1134 self.in_flight_client = Some(src);
1135 self.in_flight_flow = None;
1136 let len = len.min(self.recv_buf.len());
1137 // Borrow the payload via a split so the mutable borrow of
1138 // `self.manager` does not alias `self.recv_buf`.
1139 let payload: &[u8] = &self.recv_buf[..len];
1140 // SAFETY-free: `handle_input` only reads the slice; the
1141 // borrow checker is satisfied because `recv_buf` and
1142 // `manager` are disjoint fields.
1143 let mgr = self.manager.clone();
1144 mgr.borrow_mut()
1145 .handle_input(ManagerInput::ClientDatagram { src, payload }, now);
1146 // Drain outputs after each datagram to bound queue growth.
1147 self.drain_outputs(now);
1148 self.in_flight_client = None;
1149 }
1150 Err(ref e) if e.kind() == ErrorKind::WouldBlock => break,
1151 Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
1152 Err(e) => {
1153 debug!(
1154 "{} recv_from error on UDP listener: {}",
1155 log_context!(self),
1156 e
1157 );
1158 break;
1159 }
1160 }
1161 }
1162 }
1163
1164 /// Drain datagrams waiting on one flow's connected upstream socket into the
1165 /// manager as `BackendDatagram`s.
1166 fn ingest_upstream(&mut self, upstream_token: Token, now: Instant) {
1167 let Some(&flow) = self.upstream_to_flow.get(&upstream_token) else {
1168 return;
1169 };
1170 loop {
1171 let result = {
1172 let Some(socket) = self.upstream_sockets.get(&upstream_token) else {
1173 return;
1174 };
1175 socket.recv(&mut self.recv_buf)
1176 };
1177 match result {
1178 Ok(len) => {
1179 let len = len.min(self.recv_buf.len());
1180 let payload: &[u8] = &self.recv_buf[..len];
1181 let mgr = self.manager.clone();
1182 mgr.borrow_mut()
1183 .handle_input(ManagerInput::BackendDatagram { flow, payload }, now);
1184 self.drain_outputs(now);
1185 }
1186 Err(ref e) if e.kind() == ErrorKind::WouldBlock => break,
1187 Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
1188 Err(e) => {
1189 debug!(
1190 "{} recv error on upstream socket: {}",
1191 log_context!(self),
1192 e
1193 );
1194 break;
1195 }
1196 }
1197 }
1198 }
1199
1200 /// Drain the manager's output queue, acting on each `Output`.
1201 fn drain_outputs(&mut self, now: Instant) {
1202 let mgr = self.manager.clone();
1203 loop {
1204 let out = mgr.borrow_mut().poll_output();
1205 let Some(out) = out else { break };
1206 match out {
1207 Output::SelectBackend { flow, cluster, key } => {
1208 self.on_select_backend(flow, &cluster, key, now)
1209 }
1210 Output::OpenUpstream { flow, backend } => self.on_open_upstream(flow, backend, now),
1211 Output::SendToBackend(transmit) => self.on_send_to_backend(transmit),
1212 Output::SendToClient(transmit) => self.on_send_to_client(transmit),
1213 Output::ArmTimer(deadline) => self.arm_timer(deadline, now),
1214 Output::Metric(ev) => Self::record_metric(ev),
1215 Output::CloseFlow(flow) => self.on_close_flow(flow),
1216 Output::Drop(reason) => Self::record_drop(reason),
1217 }
1218 }
1219 }
1220
1221 fn on_select_backend(&mut self, flow: FlowId, cluster: &str, key: u64, now: Instant) {
1222 let resolved = self
1223 .backends
1224 .borrow_mut()
1225 .backend_from_cluster_id_with_key(cluster, Some(key));
1226 match resolved {
1227 Ok((backend, addr)) => {
1228 self.manager.borrow_mut().handle_input(
1229 ManagerInput::BackendResolved {
1230 flow,
1231 backend,
1232 addr,
1233 },
1234 now,
1235 );
1236 }
1237 Err(e) => {
1238 debug!(
1239 "{} no backend for cluster {}: {}; aborting flow {}",
1240 log_context!(self),
1241 cluster,
1242 e,
1243 flow
1244 );
1245 incr!(names::udp::DROPPED_NO_BACKEND);
1246 // Abort the flow instead of leaving it parked AwaitingBackend:
1247 // the manager already counted it (FlowCreated, +1 gauge, slab +
1248 // admission slot), so without this it would squat a `max_flows`
1249 // slot for the full idle timeout while every later datagram is
1250 // dropped. `abort_flow` enqueues FlowEvicted + CloseFlow, which
1251 // the surrounding `drain_outputs` loop processes — freeing the
1252 // slot immediately and balancing the gauge via FlowEvicted.
1253 self.manager
1254 .borrow_mut()
1255 .abort_flow(flow, now, CloseReason::Aborted);
1256 }
1257 }
1258 }
1259
1260 fn on_open_upstream(&mut self, flow: FlowId, backend: SocketAddr, now: Instant) {
1261 let mut socket = match udp_connect(backend) {
1262 Ok(socket) => socket,
1263 Err(e) => {
1264 // EMFILE/ENFILE/connect refusal → shed this flow, never panic.
1265 warn!(
1266 "{} could not open upstream socket to {}: {}; shedding flow {}",
1267 log_context!(self),
1268 backend,
1269 e,
1270 flow
1271 );
1272 incr!(names::udp::FLOWS_SHED);
1273 // The manager already moved this flow toward Established and
1274 // counted it; abort it so the `max_flows` slot frees immediately
1275 // (FlowEvicted balances the gauge) instead of squatting until the
1276 // idle timeout. Keep the FLOWS_SHED metric above for the
1277 // EMFILE/refused case. `abort_flow` enqueues FlowEvicted +
1278 // CloseFlow for the surrounding `drain_outputs` loop.
1279 self.manager
1280 .borrow_mut()
1281 .abort_flow(flow, now, CloseReason::Aborted);
1282 return;
1283 }
1284 };
1285 // Multi-token pattern (template tcp.rs:1029-1053): a fresh slab slot
1286 // under the SAME listener-session Rc, registered READABLE so its
1287 // readiness reaches `Server::ready` → demuxed back to this session by
1288 // `update_readiness`. The flow-table cap (`max_flows`) already bounds
1289 // how many upstream sockets/slots can exist, so the slab cannot grow
1290 // unbounded here.
1291 let upstream_token = {
1292 let mut s = self.sessions.borrow_mut();
1293 let listener_session = s.slab[self.listener_token.0].clone();
1294 let entry = s.slab.vacant_entry();
1295 let token = Token(entry.key());
1296 entry.insert(listener_session);
1297 token
1298 };
1299 if let Err(e) = self
1300 .registry
1301 .register(&mut socket, upstream_token, Interest::READABLE)
1302 {
1303 error!(
1304 "{} could not register upstream socket: {}",
1305 log_context!(self),
1306 e
1307 );
1308 self.sessions.borrow_mut().slab.try_remove(upstream_token.0);
1309 // The flow is Established in the manager but has no usable upstream
1310 // socket: abort it so its `max_flows` slot frees now (FlowEvicted
1311 // balances the gauge) rather than squatting until idle timeout.
1312 self.manager
1313 .borrow_mut()
1314 .abort_flow(flow, now, CloseReason::Aborted);
1315 return;
1316 }
1317 self.upstream_sockets.insert(upstream_token, socket);
1318 self.upstream_to_flow.insert(upstream_token, flow);
1319 self.flow_to_upstream.insert(flow, upstream_token);
1320 // `flow_to_upstream` and `upstream_to_flow` are inverse maps: the token
1321 // in one points back to the flow in the other. A broken inverse would let
1322 // a backend reply demux to the wrong client (a NAT-return mismatch).
1323 debug_assert_eq!(
1324 self.upstream_to_flow.get(&upstream_token),
1325 Some(&flow),
1326 "upstream_to_flow must map the new token back to its flow"
1327 );
1328 debug_assert_eq!(
1329 self.flow_to_upstream.get(&flow),
1330 Some(&upstream_token),
1331 "flow_to_upstream must map the flow back to its upstream token"
1332 );
1333 self.flow_started.insert(flow, Instant::now());
1334 // The client source for this flow is the one currently in flight.
1335 let client = self.in_flight_client.unwrap_or(self.address);
1336 self.flow_endpoints.insert(flow, (client, Some(backend)));
1337 if let Some(src) = self.in_flight_client {
1338 let key = self.client_key(src);
1339 self.client_key_to_flow.insert(key, flow);
1340 // The shadow flow-table only ever holds live flows: the flow we just
1341 // mapped must have a live upstream token (it is the one we just
1342 // opened). Pairs the on-close drop in `on_close_flow`.
1343 debug_assert!(
1344 self.flow_to_upstream.contains_key(&flow),
1345 "client_key_to_flow points at flow {flow} with no live upstream token"
1346 );
1347 }
1348 // This flow's first SendToBackend (if any) follows immediately.
1349 self.in_flight_flow = Some(flow);
1350 }
1351
1352 fn on_send_to_backend(&mut self, transmit: crate::protocol::udp::Transmit) {
1353 // Resolve the owning flow precisely (NOT by `transmit.dst`, which two
1354 // flows to the same backend would alias — sending on the wrong
1355 // connected socket misroutes that backend's reply to the wrong client).
1356 // * new flow: `OpenUpstream{flow}` set `in_flight_flow` just before.
1357 // * established flow: resolve via the in-flight client source through
1358 // the shell-side `client_key -> flow` shadow of the flow table.
1359 let flow = self.in_flight_flow.or_else(|| {
1360 self.in_flight_client
1361 .map(|src| self.client_key(src))
1362 .and_then(|key| self.client_key_to_flow.get(&key).copied())
1363 });
1364 let token = flow.and_then(|f| self.flow_to_upstream.get(&f).copied());
1365 let Some(token) = token else {
1366 // No resolved flow / upstream socket: this is not a queue-full drop.
1367 incr!(names::udp::DROPPED_UNKNOWN_FLOW);
1368 return;
1369 };
1370 let Some(socket) = self.upstream_sockets.get(&token) else {
1371 // Socket already gone (flow closed mid-drain): unknown-flow, not
1372 // queue-full.
1373 incr!(names::udp::DROPPED_UNKNOWN_FLOW);
1374 return;
1375 };
1376 // If a queue is already backed up for this flow, preserve FIFO order —
1377 // do NOT jump the line with the fast-path `send`. Append (or drop on
1378 // overflow) and let the WRITABLE drain catch up.
1379 if let Some(q) = self.upstream_write_queues.get_mut(&token)
1380 && !q.is_empty()
1381 {
1382 if !q.push(transmit.dst, transmit.payload) {
1383 debug!("{} upstream write queue full, dropping", log_context!(self));
1384 incr!(names::udp::DROPPED_WQ_FULL);
1385 }
1386 return;
1387 }
1388 match socket.send(&transmit.payload) {
1389 Ok(_) => {}
1390 Err(ref e) if e.kind() == ErrorKind::WouldBlock => {
1391 // Kernel send buffer full: enqueue (bounded) and arm WRITABLE so
1392 // the next writable event drains it. Drop + metric only at cap.
1393 let q = self
1394 .upstream_write_queues
1395 .entry(token)
1396 .or_insert_with(|| WriteQueue::new(UPSTREAM_WRITE_QUEUE_CAP));
1397 if q.push(transmit.dst, transmit.payload) {
1398 self.arm_upstream_writable(token);
1399 } else {
1400 debug!("{} upstream write queue full, dropping", log_context!(self));
1401 incr!(names::udp::DROPPED_WQ_FULL);
1402 }
1403 }
1404 Err(e) => {
1405 debug!("{} upstream send error: {}", log_context!(self), e);
1406 // Hard send error (e.g. ECONNREFUSED), not a queue-full drop.
1407 incr!(names::udp::DROPPED_SEND_ERROR);
1408 }
1409 }
1410 }
1411
1412 /// Reregister a flow's connected upstream socket for `READABLE | WRITABLE`
1413 /// so a queued forward datagram gets a writable wake (the edge-triggered
1414 /// analog of `signal_pending_write`). Idempotent enough — mio coalesces a
1415 /// repeated interest set.
1416 fn arm_upstream_writable(&mut self, token: Token) {
1417 if let Some(socket) = self.upstream_sockets.get_mut(&token)
1418 && let Err(e) =
1419 self.registry
1420 .reregister(socket, token, Interest::READABLE | Interest::WRITABLE)
1421 {
1422 debug!(
1423 "{} could not arm WRITABLE on upstream socket: {}",
1424 log_context!(self),
1425 e
1426 );
1427 }
1428 }
1429
1430 /// Drop a flow's upstream socket back to `READABLE`-only once its write queue
1431 /// has fully drained, so an empty socket no longer wakes the loop on every
1432 /// writable edge (a permanently-WRITABLE UDP socket would busy-loop).
1433 fn disarm_upstream_writable(&mut self, token: Token) {
1434 if let Some(socket) = self.upstream_sockets.get_mut(&token)
1435 && let Err(e) = self.registry.reregister(socket, token, Interest::READABLE)
1436 {
1437 debug!(
1438 "{} could not disarm WRITABLE on upstream socket: {}",
1439 log_context!(self),
1440 e
1441 );
1442 }
1443 }
1444
1445 /// Drain a flow's upstream write queue on a writable event. Re-sends in FIFO
1446 /// order until `WouldBlock` or empty; on empty, drops WRITABLE interest.
1447 fn drain_upstream_queue(&mut self, token: Token) {
1448 let Some(mut queue) = self.upstream_write_queues.remove(&token) else {
1449 return;
1450 };
1451 let socket = self.upstream_sockets.get(&token);
1452 let Some(socket) = socket else {
1453 // Socket gone (flow closed mid-drain): discard the queue.
1454 return;
1455 };
1456 let emptied = queue.drain(|_dst, payload| match socket.send(payload) {
1457 Ok(_) => SendOutcome::Sent,
1458 Err(ref e) if e.kind() == ErrorKind::WouldBlock => SendOutcome::WouldBlock,
1459 Err(_) => SendOutcome::Dropped,
1460 });
1461 if emptied {
1462 self.disarm_upstream_writable(token);
1463 } else {
1464 // Still backed up: put the queue back and keep WRITABLE armed.
1465 self.upstream_write_queues.insert(token, queue);
1466 }
1467 }
1468
1469 fn on_send_to_client(&mut self, transmit: crate::protocol::udp::Transmit) {
1470 // Preserve FIFO: if the client-return queue is already backed up, append
1471 // (or drop on overflow) rather than jumping the line via the fast path.
1472 if !self.client_write_queue.is_empty() {
1473 if !self.client_write_queue.push(transmit.dst, transmit.payload) {
1474 debug!("{} client write queue full, dropping", log_context!(self));
1475 incr!(names::udp::DROPPED_WQ_FULL);
1476 }
1477 return;
1478 }
1479 let send_result = {
1480 let listener = self.listener.borrow();
1481 let Some(socket) = listener.socket.as_ref() else {
1482 return;
1483 };
1484 socket.send_to(&transmit.payload, transmit.dst)
1485 };
1486 match send_result {
1487 Ok(_) => {}
1488 Err(ref e) if e.kind() == ErrorKind::WouldBlock => {
1489 // Listener send buffer full: enqueue (bounded) + arm WRITABLE on
1490 // the listener so the next writable event drains it.
1491 if self.client_write_queue.push(transmit.dst, transmit.payload) {
1492 self.arm_client_writable();
1493 } else {
1494 debug!("{} client write queue full, dropping", log_context!(self));
1495 incr!(names::udp::DROPPED_WQ_FULL);
1496 }
1497 }
1498 Err(e) => {
1499 debug!("{} client send_to error: {}", log_context!(self), e);
1500 // Hard send_to error, not a queue-full drop.
1501 incr!(names::udp::DROPPED_SEND_ERROR);
1502 }
1503 }
1504 }
1505
1506 /// Reregister the listener socket for `READABLE | WRITABLE` so a queued
1507 /// client-return datagram gets a writable wake. The listener token routes
1508 /// the writable event back into `update_readiness`.
1509 fn arm_client_writable(&mut self) {
1510 let listener = self.listener.borrow();
1511 let fd = match listener.socket.as_ref() {
1512 Some(socket) => socket.as_raw_fd(),
1513 None => return,
1514 };
1515 if let Err(e) = self.registry.reregister(
1516 &mut SourceFd(&fd),
1517 self.listener_token,
1518 Interest::READABLE | Interest::WRITABLE,
1519 ) {
1520 debug!(
1521 "{} could not arm WRITABLE on listener socket: {}",
1522 log_context!(self),
1523 e
1524 );
1525 }
1526 }
1527
1528 /// Drop the listener socket back to `READABLE`-only once the client-return
1529 /// queue is empty (a permanently-WRITABLE listener would busy-loop).
1530 fn disarm_client_writable(&mut self) {
1531 let listener = self.listener.borrow();
1532 let fd = match listener.socket.as_ref() {
1533 Some(socket) => socket.as_raw_fd(),
1534 None => return,
1535 };
1536 if let Err(e) =
1537 self.registry
1538 .reregister(&mut SourceFd(&fd), self.listener_token, Interest::READABLE)
1539 {
1540 debug!(
1541 "{} could not disarm WRITABLE on listener socket: {}",
1542 log_context!(self),
1543 e
1544 );
1545 }
1546 }
1547
1548 /// Drain the client-return write queue on a listener writable event. Re-sends
1549 /// in FIFO order until `WouldBlock` or empty; on empty, drops WRITABLE.
1550 fn drain_client_queue(&mut self) {
1551 let mut queue = std::mem::replace(&mut self.client_write_queue, WriteQueue::new(0));
1552 let emptied = {
1553 let listener = self.listener.borrow();
1554 let Some(socket) = listener.socket.as_ref() else {
1555 // Listener socket gone: discard the queue and restore an empty one.
1556 self.client_write_queue = WriteQueue::new(CLIENT_WRITE_QUEUE_CAP);
1557 return;
1558 };
1559 queue.drain(|dst, payload| match socket.send_to(payload, *dst) {
1560 Ok(_) => SendOutcome::Sent,
1561 Err(ref e) if e.kind() == ErrorKind::WouldBlock => SendOutcome::WouldBlock,
1562 Err(_) => SendOutcome::Dropped,
1563 })
1564 };
1565 // Restore the (possibly still-backed-up) queue; preserve its capacity.
1566 queue.cap = CLIENT_WRITE_QUEUE_CAP;
1567 self.client_write_queue = queue;
1568 if emptied {
1569 self.disarm_client_writable();
1570 }
1571 }
1572
1573 /// Map the single manager-wide `ArmTimer(deadline)` onto the thread-local
1574 /// `TIMER`, keyed by the **listener token**: when it fires, the run loop
1575 /// calls `Server::timeout(listener_token)` → `session.timeout(token)` →
1576 /// `manager.handle_timeout(now)`.
1577 fn arm_timer(&mut self, deadline: Instant, now: Instant) {
1578 let delay = deadline.saturating_duration_since(now);
1579 TIMER.with(|timer| {
1580 let mut timer = timer.borrow_mut();
1581 // Cancel the previous deadline so re-arming does not leak timer-slab
1582 // entries. The manager only emits a fresh `ArmTimer` on a real
1583 // deadline change, so cancellations are rare.
1584 if let Some(old) = self.timer_handle.take() {
1585 let _ = timer.cancel_timeout(&old);
1586 }
1587 self.timer_handle = Some(timer.set_timeout(delay, self.listener_token));
1588 });
1589 }
1590
1591 fn on_close_flow(&mut self, flow: FlowId) {
1592 if let Some(token) = self.flow_to_upstream.remove(&flow) {
1593 if let Some(mut socket) = self.upstream_sockets.remove(&token) {
1594 if let Err(e) = self.registry.deregister(&mut socket) {
1595 debug!("{} deregister upstream on close: {}", log_context!(self), e);
1596 }
1597 }
1598 // Drop any queued (un-drained) forward datagrams with the socket so
1599 // the per-flow queue cannot leak; gauge correctness is preserved
1600 // because the queue holds bytes, not flow-count state.
1601 self.upstream_write_queues.remove(&token);
1602 self.upstream_to_flow.remove(&token);
1603 self.sessions.borrow_mut().slab.try_remove(token.0);
1604 // On close, all per-flow maps drop the flow together: neither
1605 // direction of the upstream-token map may still reference it.
1606 debug_assert!(
1607 !self.upstream_to_flow.values().any(|&f| f == flow),
1608 "on_close_flow left upstream_to_flow referencing closed flow {flow}"
1609 );
1610 debug_assert!(
1611 !self.flow_to_upstream.contains_key(&flow),
1612 "on_close_flow left flow_to_upstream entry for closed flow {flow}"
1613 );
1614 }
1615 if let Some(started) = self.flow_started.remove(&flow) {
1616 let duration = started.elapsed();
1617 time!(names::udp::FLOW_DURATION, duration.as_millis());
1618 }
1619 let (client, backend) = self
1620 .flow_endpoints
1621 .remove(&flow)
1622 .unwrap_or((self.address, None));
1623 // Drop the shadow flow-table entry if it still points at this flow.
1624 let key = self.client_key(client);
1625 if self.client_key_to_flow.get(&key) == Some(&flow) {
1626 self.client_key_to_flow.remove(&key);
1627 }
1628 // The shadow flow-table must no longer map THIS flow id. A surviving
1629 // entry would misroute a later established-flow `SendToBackend` onto a
1630 // freed upstream token.
1631 debug_assert!(
1632 !self.client_key_to_flow.values().any(|&f| f == flow),
1633 "on_close_flow left client_key_to_flow referencing closed flow {flow}"
1634 );
1635 info!("{} flow closed", log_flow_context!(flow, client, backend));
1636 }
1637
1638 fn record_metric(ev: MetricEvent) {
1639 match ev {
1640 MetricEvent::FlowCreated => {
1641 incr!(names::udp::FLOWS_CREATED);
1642 gauge_add!(names::udp::ACTIVE_FLOWS, 1);
1643 }
1644 MetricEvent::FlowEvicted => {
1645 incr!(names::udp::FLOWS_EVICTED);
1646 gauge_add!(names::udp::ACTIVE_FLOWS, -1);
1647 }
1648 MetricEvent::FlowShed => {
1649 incr!(names::udp::FLOWS_SHED);
1650 }
1651 MetricEvent::DatagramIn(bytes) => {
1652 incr!(names::udp::DATAGRAMS_IN);
1653 count!(names::udp::BYTES_IN, bytes as i64);
1654 }
1655 MetricEvent::DatagramOut(bytes) => {
1656 incr!(names::udp::DATAGRAMS_OUT);
1657 count!(names::udp::BYTES_OUT, bytes as i64);
1658 }
1659 MetricEvent::DatagramDropped(reason) => Self::record_drop(reason),
1660 }
1661 }
1662
1663 fn record_drop(reason: DropReason) {
1664 incr!(names::udp::DATAGRAMS_DROPPED);
1665 match reason {
1666 DropReason::Invalid => incr!(names::udp::DROPPED_INVALID),
1667 DropReason::Truncated => incr!(names::udp::DROPPED_TRUNCATED),
1668 DropReason::NoBackend => incr!(names::udp::DROPPED_NO_BACKEND),
1669 DropReason::Shed => incr!(names::udp::DROPPED_SHED),
1670 DropReason::UnknownFlow => incr!(names::udp::DROPPED_UNKNOWN_FLOW),
1671 }
1672 }
1673
1674 /// Tear down every active flow on this listener **through the manager**, so
1675 /// each close emits `FlowEvicted` + `CloseFlow` exactly once and the shell's
1676 /// normal [`on_close_flow`](Self::on_close_flow) handler frees the upstream
1677 /// socket + slab slot and decrements `udp.active_flows`. Used on soft/hard
1678 /// stop, listener remove, and listener deactivate so the worker reaches its
1679 /// `base_sessions_count` and exits promptly instead of waiting out every
1680 /// flow's idle timeout — and so the active-flows gauge does not leak by N
1681 /// (the bug the old direct-teardown path had: it cleared the shell maps
1682 /// without telling the manager, so `FlowEvicted` never fired).
1683 ///
1684 /// On a deactivate where the manager is *retained* (not dropped), this also
1685 /// resets the manager's flow table to empty, keeping manager and shell
1686 /// consistent. The listener socket and the listener-session slab slot are
1687 /// left intact (the listener slot is part of `base_sessions_count` and
1688 /// reclaimed when the worker exits); only the connectionless per-flow slots
1689 /// — which are NOT counted in `nb_connections` and so removed without
1690 /// `decr` — are freed.
1691 pub fn close_all_flows(&mut self, now: Instant) {
1692 // Drive teardown through the manager: it emits one `FlowEvicted` +
1693 // `CloseFlow` per live flow into its output queue. Draining those runs
1694 // `record_metric(FlowEvicted)` (the single gauge decrement) and
1695 // `on_close_flow` (frees socket + slab slot + shell maps) per flow.
1696 self.manager.borrow_mut().close_all(now);
1697 self.drain_outputs(now);
1698 // After `close_all`, the manager's flow table is empty and it armed no
1699 // new timer (`reschedule` emits `ArmTimer` only on a real deadline
1700 // change, and an empty table has no deadline). Cancel any residual shell
1701 // timer so it can't fire against a now-flowless listener.
1702 if let Some(handle) = self.timer_handle.take() {
1703 TIMER.with(|timer| {
1704 let _ = timer.borrow_mut().cancel_timeout(&handle);
1705 });
1706 }
1707 }
1708}
1709
1710impl ProxySession for UdpListenerSession {
1711 fn protocol(&self) -> Protocol {
1712 Protocol::UDPListen
1713 }
1714
1715 fn update_readiness(&mut self, token: Token, events: Ready) {
1716 // WRITABLE first: a previously-`WouldBlock` socket can now accept queued
1717 // egress. Drain before reading so the kernel send buffer has room before
1718 // this pass enqueues more (and so a writable-only event still drains).
1719 if events.is_writable() {
1720 if token == self.listener_token {
1721 self.drain_client_queue();
1722 } else if self.upstream_to_flow.contains_key(&token) {
1723 self.drain_upstream_queue(token);
1724 }
1725 }
1726 if !events.is_readable() {
1727 return;
1728 }
1729 let now = Instant::now();
1730 if token == self.listener_token {
1731 self.ingest_client(now);
1732 } else if self.upstream_to_flow.contains_key(&token) {
1733 self.ingest_upstream(token, now);
1734 }
1735 }
1736
1737 fn ready(&mut self, _session: Rc<RefCell<dyn ProxySession>>) -> SessionIsToBeClosed {
1738 // All work happens in `update_readiness` (it has the firing token; the
1739 // generic `ready()` does not). Never close the listener session here —
1740 // it lives for the listener's lifetime.
1741 false
1742 }
1743
1744 fn timeout(&mut self, token: Token) -> SessionIsToBeClosed {
1745 if token == self.listener_token {
1746 let now = Instant::now();
1747 self.manager.borrow_mut().handle_timeout(now);
1748 self.drain_outputs(now);
1749 // Re-arm: the manager emits a fresh ArmTimer via poll_output if a
1750 // flow is still scheduled (handled inside drain_outputs). Nothing
1751 // to do here. Never close the listener on a flow timeout.
1752 }
1753 false
1754 }
1755
1756 fn close(&mut self) {
1757 // Tear down any still-live flows THROUGH the manager (FlowEvicted +
1758 // CloseFlow per flow) so `udp.active_flows` balances to zero even if the
1759 // server reaps this listener session directly without a prior
1760 // proxy-driven `close_all_flows`. In the common path
1761 // (`close_all_flows` already ran) the manager has no live flows, so this
1762 // is a cheap no-op. Also cancels the residual idle timer.
1763 self.close_all_flows(Instant::now());
1764 // Any leftover egress queues / shell maps are dropped here (the per-flow
1765 // ones were already freed by the manager-driven close above; this only
1766 // resets the client-return queue, which is not flow-state).
1767 self.upstream_write_queues.clear();
1768 self.client_write_queue = WriteQueue::new(CLIENT_WRITE_QUEUE_CAP);
1769 self.upstream_to_flow.clear();
1770 self.flow_to_upstream.clear();
1771 // Deregister + drop the listener socket. Never shutdown(Both) — UDP has
1772 // no connection to shut down; just deregister + drop the fds.
1773 let mut listener = self.listener.borrow_mut();
1774 if let Some(socket) = listener.socket.as_ref() {
1775 let fd = socket.as_raw_fd();
1776 let _ = self.registry.deregister(&mut SourceFd(&fd));
1777 }
1778 listener.active = false;
1779 }
1780
1781 fn last_event(&self) -> Instant {
1782 // The listener session lives for the listener's lifetime — it is never a
1783 // zombie even when idle. Mirror `ListenSession::last_event` (which
1784 // returns `now`) so the `zombie_check` (which has no listener-protocol
1785 // exclusion) never reaps a quiet UDP listener. Per-flow idle reaping is
1786 // handled by the manager's timer wheel, not the zombie sweep.
1787 Instant::now()
1788 }
1789
1790 fn print_session(&self) {
1791 error!(
1792 "{} UDP listener session: {} active flows, {} upstream sockets",
1793 log_context!(self),
1794 self.manager.borrow().flow_count(),
1795 self.upstream_sockets.len(),
1796 );
1797 }
1798
1799 fn frontend_token(&self) -> Token {
1800 self.listener_token
1801 }
1802
1803 fn shutting_down(&mut self) -> SessionIsToBeClosed {
1804 // The listener session is a *listener*, not a connection: it was never
1805 // counted by `SessionManager::incr` (only accepted sessions are), so it
1806 // MUST NOT be routed through the connection-counted
1807 // `shut_down_sessions_by_frontend_tokens` path — that calls `decr`,
1808 // which underflows `nb_connections` and panics (`assert!(nb != 0)` at
1809 // `server.rs`). This mirrors `ListenSession::shutting_down` (which also
1810 // returns `false`). Soft-stop draining is owned by
1811 // `UdpProxy::notify(SoftStop)`: it flips every manager to `Drain` and
1812 // deregisters the listener sockets, so existing flows reach teardown and
1813 // no new flow is admitted. The slot is reclaimed when the worker exits.
1814 false
1815 }
1816
1817 fn cluster_id(&self) -> Option<String> {
1818 self.listener.borrow().cluster_id.clone()
1819 }
1820}
1821
1822#[allow(unused_imports)]
1823pub(crate) use {log_context, log_flow_context, log_module_context};
1824
1825#[cfg(test)]
1826mod tests {
1827 use super::*;
1828
1829 use std::cell::Cell;
1830
1831 #[test]
1832 fn effective_max_flows_explicit_value_is_used() {
1833 // An explicit value is honoured verbatim, ignoring the slab headroom
1834 // clamp (the operator opted in).
1835 assert_eq!(effective_max_flows(42, 0), 42);
1836 assert_eq!(effective_max_flows(42, 10), 42);
1837 }
1838
1839 #[test]
1840 fn effective_max_flows_auto_is_positive() {
1841 // 0 = auto: derives ~70% of RLIMIT_NOFILE, always >= 1.
1842 assert!(effective_max_flows(0, 0) >= 1);
1843 }
1844
1845 #[test]
1846 fn effective_max_flows_auto_is_clamped_to_slab_headroom() {
1847 // Auto derivation is capped at the slab headroom (max_connections) so a
1848 // UDP listener can't inflate the shared slab. A headroom of 0 disables
1849 // the clamp.
1850 assert!(effective_max_flows(0, 4) <= 4);
1851 assert!(effective_max_flows(0, 4) >= 1);
1852 }
1853
1854 #[test]
1855 fn clamp_max_rx_respects_buffer_size() {
1856 // The configured rx size is clamped to buffer_size; an unset (0)
1857 // buffer_size leaves it untouched.
1858 assert_eq!(clamp_max_rx(u32::MAX as usize, 16_384), 16_384);
1859 assert_eq!(clamp_max_rx(1_024, 16_384), 1_024);
1860 assert_eq!(clamp_max_rx(u32::MAX as usize, 0), u32::MAX as usize);
1861 }
1862
1863 fn addr(port: u16) -> SocketAddr {
1864 SocketAddr::from(([127, 0, 0, 1], port))
1865 }
1866
1867 /// A fake socket whose `send` outcome is scripted per call. Lets the
1868 /// `WriteQueue` state machine be exercised with zero real I/O: we feed a
1869 /// sequence of `SendOutcome`s and record what was sent in order.
1870 struct FakeSocket {
1871 /// Outcomes returned by successive `send` calls (front = next).
1872 script: RefCell<VecDeque<SendOutcome>>,
1873 /// Payloads that `Sent`/`Dropped` actually consumed, in order.
1874 consumed: RefCell<Vec<Vec<u8>>>,
1875 /// Default outcome once the script is exhausted.
1876 default: Cell<bool>, // true = Sent, false = WouldBlock
1877 }
1878
1879 impl FakeSocket {
1880 fn new(script: Vec<SendOutcome>, default_sent: bool) -> Self {
1881 FakeSocket {
1882 script: RefCell::new(script.into()),
1883 consumed: RefCell::new(Vec::new()),
1884 default: Cell::new(default_sent),
1885 }
1886 }
1887
1888 fn send(&self, payload: &[u8]) -> SendOutcome {
1889 let outcome = self.script.borrow_mut().pop_front().unwrap_or({
1890 if self.default.get() {
1891 SendOutcome::Sent
1892 } else {
1893 SendOutcome::WouldBlock
1894 }
1895 });
1896 if matches!(outcome, SendOutcome::Sent | SendOutcome::Dropped) {
1897 self.consumed.borrow_mut().push(payload.to_vec());
1898 }
1899 outcome
1900 }
1901 }
1902
1903 #[test]
1904 fn write_queue_push_until_full_then_drops() {
1905 let mut q = WriteQueue::new(2);
1906 assert!(q.is_empty());
1907 assert!(q.push(addr(1), vec![1]));
1908 assert!(q.push(addr(2), vec![2]));
1909 assert_eq!(q.len(), 2);
1910 // At capacity: the third push is rejected (caller drops + counts).
1911 assert!(!q.push(addr(3), vec![3]));
1912 assert_eq!(q.len(), 2);
1913 }
1914
1915 #[test]
1916 fn write_queue_drains_in_fifo_order_on_writable() {
1917 let mut q = WriteQueue::new(8);
1918 for i in 0..4u8 {
1919 assert!(q.push(addr(i as u16), vec![i]));
1920 }
1921 // All sends succeed: drain empties the queue, FIFO preserved.
1922 let sock = FakeSocket::new(vec![], true);
1923 let emptied = q.drain(|_dst, payload| sock.send(payload));
1924 assert!(emptied);
1925 assert!(q.is_empty());
1926 assert_eq!(
1927 *sock.consumed.borrow(),
1928 vec![vec![0u8], vec![1u8], vec![2u8], vec![3u8]]
1929 );
1930 }
1931
1932 #[test]
1933 fn write_queue_stops_on_wouldblock_and_resumes() {
1934 let mut q = WriteQueue::new(8);
1935 for i in 0..3u8 {
1936 assert!(q.push(addr(i as u16), vec![i]));
1937 }
1938 // First send ok, then WouldBlock: drain sends one and stops, leaving two.
1939 let sock = FakeSocket::new(vec![SendOutcome::Sent, SendOutcome::WouldBlock], false);
1940 let emptied = q.drain(|_dst, payload| sock.send(payload));
1941 assert!(!emptied);
1942 assert_eq!(q.len(), 2);
1943 assert_eq!(*sock.consumed.borrow(), vec![vec![0u8]]);
1944 // Front is still the second datagram (FIFO preserved across the stall).
1945 assert_eq!(q.queue.front().unwrap().1, vec![1u8]);
1946 // Second writable event: now everything goes through.
1947 let sock2 = FakeSocket::new(vec![], true);
1948 let emptied2 = q.drain(|_dst, payload| sock2.send(payload));
1949 assert!(emptied2);
1950 assert!(q.is_empty());
1951 assert_eq!(*sock2.consumed.borrow(), vec![vec![1u8], vec![2u8]]);
1952 }
1953
1954 #[test]
1955 fn write_queue_hard_error_drops_one_and_continues() {
1956 let mut q = WriteQueue::new(8);
1957 for i in 0..3u8 {
1958 assert!(q.push(addr(i as u16), vec![i]));
1959 }
1960 // Middle datagram hard-errors: it is popped + skipped, the rest proceed.
1961 let sock = FakeSocket::new(
1962 vec![SendOutcome::Sent, SendOutcome::Dropped, SendOutcome::Sent],
1963 true,
1964 );
1965 let emptied = q.drain(|_dst, payload| sock.send(payload));
1966 assert!(emptied);
1967 assert!(q.is_empty());
1968 // The dropped datagram (1) was consumed-for-accounting but the queue is
1969 // empty and the surviving datagrams (0, 2) went out in order.
1970 assert_eq!(
1971 *sock.consumed.borrow(),
1972 vec![vec![0u8], vec![1u8], vec![2u8]]
1973 );
1974 }
1975
1976 #[test]
1977 fn write_queue_empties_cleanly_when_already_empty() {
1978 let mut q = WriteQueue::new(4);
1979 let sock = FakeSocket::new(vec![], true);
1980 // Draining an empty queue is a no-op that reports emptied = true.
1981 let emptied = q.drain(|_dst, payload| sock.send(payload));
1982 assert!(emptied);
1983 assert!(q.is_empty());
1984 assert!(sock.consumed.borrow().is_empty());
1985 }
1986}