Skip to main content

sozu_lib/protocol/udp/
manager.rs

1//! The sans-io [`UdpManager`]: admission, flow table, LB request, timers.
2//!
3//! `UdpManager` owns the flow table (`HashMap<FlowKey, FlowId>` over a
4//! `slab::Slab<UdpFlow>`), admission ("allocate nothing for unknown / over-cap
5//! / invalid datagrams"), the flow-table cap and shedding, pluggable flow-key
6//! extraction ([`FlowKeyExtractor`]), the LB-selection *request* for new flows
7//! (it emits [`Output::SelectBackend`]; the shell does the actual LB), and the
8//! timer scheduling: a **single armed manager-wide deadline** plus per-flow
9//! **generation tokens** so a stale expiry can never close a refreshed flow.
10//!
11//! Pure: every entry point that depends on time takes `now: Instant`; the hash
12//! seed is injected at construction. The shell drives the manager with
13//! [`ManagerInput`] and drains [`Output`] via [`poll_output`].
14
15use std::{
16    collections::{HashMap, VecDeque},
17    hash::{Hash, Hasher},
18    net::SocketAddr,
19    time::Instant,
20};
21
22use slab::Slab;
23
24use crate::protocol::udp::{
25    ClusterConfig, ConfigEvent, DropReason, FlowId, FlowKey, ManagerInput, MetricEvent, Output,
26    Transmit,
27    flow::{CloseReason, FlowPhase, UdpFlow},
28    proxy_protocol::prepend_dgram_header,
29};
30
31/// Extracts a [`FlowKey`] from an admitted client datagram. The default
32/// [`SourceTupleExtractor`] keys on the real client source address (2-tuple
33/// source-IP or 4-tuple source-IP+port per cluster config). The trait is the
34/// only seam for alternative keying (e.g. a QUIC-CID extractor, a non-goal);
35/// the 4-tuple impl is the only one in scope.
36pub trait FlowKeyExtractor {
37    /// Compute the flow key for a datagram from `src`. Returns `None` to reject
38    /// the datagram (the manager then emits `Drop(Invalid)` and allocates
39    /// nothing). `cfg` is the listener's active cluster config.
40    fn flow_key(&self, src: SocketAddr, payload: &[u8], cfg: &ClusterConfig) -> Option<FlowKey>;
41}
42
43/// The in-scope flow-key extractor: keys on the real client source address,
44/// honouring the cluster's `affinity_with_port` knob (4-tuple vs 2-tuple).
45#[derive(Clone, Copy, Debug, Default)]
46pub struct SourceTupleExtractor;
47
48impl FlowKeyExtractor for SourceTupleExtractor {
49    fn flow_key(&self, src: SocketAddr, payload: &[u8], cfg: &ClusterConfig) -> Option<FlowKey> {
50        // "Silence is a virtue": an empty datagram is not a valid flow trigger.
51        if payload.is_empty() {
52            return None;
53        }
54        Some(FlowKey::from_src(src, cfg.affinity_with_port))
55    }
56}
57
58/// The pure UDP flow manager. Generic over the flow-key extractor so the seam
59/// stays type-checked; the shell instantiates it with [`SourceTupleExtractor`].
60pub struct UdpManager<E: FlowKeyExtractor = SourceTupleExtractor> {
61    /// `FlowKey -> FlowId` lookup for tracked-flow reuse (two-tier selection).
62    table: HashMap<FlowKey, FlowId>,
63    /// Slab of admitted flows; the slot index is the [`FlowId`].
64    flows: Slab<UdpFlow>,
65    /// Flow-table cap. New flows beyond this are shed (drop + metric).
66    max_flows: usize,
67    /// Maximum accepted rx datagram size; larger datagrams are dropped as
68    /// truncated.
69    max_rx_datagram_size: usize,
70    /// Active cluster routing + per-cluster knobs for *new* flows.
71    cluster: ClusterConfig,
72    /// Per-worker hash seed, injected once at construction; persisted across
73    /// reconfig for stable affinity.
74    hash_seed: u64,
75    /// Pluggable flow-key extractor.
76    extractor: E,
77    /// Draining: admit no new flows; let existing ones reach teardown.
78    draining: bool,
79
80    /// FIFO of outputs the shell drains via [`poll_output`].
81    outputs: VecDeque<Output>,
82    /// The single armed manager-wide deadline currently reflected to the shell
83    /// via the last `ArmTimer`. `None` means no timer is armed.
84    armed_deadline: Option<Instant>,
85
86    /// High-water mark of every `max_flows` cap ever set (construction +
87    /// `SetMaxFlows`). The live cap can be shrunk below `flows.len()` by a
88    /// `SetMaxFlows`, so `flows.len() <= max_flows` is NOT an invariant; but a
89    /// flow can only ever have been admitted under *some* cap that was in force
90    /// at admission, so `flows.len() <= max_flows_high_water` always holds.
91    /// Debug-only — only read by [`check_invariants`](Self::check_invariants).
92    #[cfg(debug_assertions)]
93    max_flows_high_water: usize,
94}
95
96impl UdpManager<SourceTupleExtractor> {
97    /// Construct a manager with the default 4-tuple/2-tuple source extractor.
98    pub fn new(
99        cluster: ClusterConfig,
100        max_flows: usize,
101        max_rx_datagram_size: usize,
102        hash_seed: u64,
103    ) -> Self {
104        Self::with_extractor(
105            cluster,
106            max_flows,
107            max_rx_datagram_size,
108            hash_seed,
109            SourceTupleExtractor,
110        )
111    }
112}
113
114impl<E: FlowKeyExtractor> UdpManager<E> {
115    /// Construct a manager with a custom flow-key extractor.
116    pub fn with_extractor(
117        cluster: ClusterConfig,
118        max_flows: usize,
119        max_rx_datagram_size: usize,
120        hash_seed: u64,
121        extractor: E,
122    ) -> Self {
123        UdpManager {
124            table: HashMap::new(),
125            flows: Slab::new(),
126            max_flows,
127            max_rx_datagram_size,
128            cluster,
129            hash_seed,
130            extractor,
131            draining: false,
132            outputs: VecDeque::new(),
133            armed_deadline: None,
134            #[cfg(debug_assertions)]
135            max_flows_high_water: max_flows,
136        }
137    }
138
139    // ---- introspection (used by log macros / tests / the shell) ------------
140
141    /// Number of currently admitted flows. Mirrors `udp.active_flows`.
142    pub fn flow_count(&self) -> usize {
143        self.flows.len()
144    }
145
146    /// The configured flow-table cap.
147    pub fn max_flows(&self) -> usize {
148        self.max_flows
149    }
150
151    /// Whether the listener is draining.
152    pub fn is_draining(&self) -> bool {
153        self.draining
154    }
155
156    /// Whether the active cluster config keys flows on the 4-tuple (source
157    /// IP + port) rather than source IP only. The shell needs this to mirror
158    /// the manager's flow keying for its `SendToBackend` socket resolution.
159    pub fn affinity_with_port(&self) -> bool {
160        self.cluster.affinity_with_port
161    }
162
163    /// Borrow a flow by id (for the shell's access log on close).
164    pub fn flow(&self, flow: FlowId) -> Option<&UdpFlow> {
165        self.flows.get(flow)
166    }
167
168    // ---- input -------------------------------------------------------------
169
170    /// Feed one input into the manager. Pure: `now` is injected.
171    pub fn handle_input(&mut self, input: ManagerInput<'_>, now: Instant) {
172        match input {
173            ManagerInput::ClientDatagram { src, payload } => {
174                self.on_client_datagram(src, payload, now)
175            }
176            ManagerInput::BackendDatagram { flow, payload } => {
177                self.on_backend_datagram(flow, payload, now)
178            }
179            ManagerInput::Config(event) => self.on_config(event, now),
180            ManagerInput::BackendResolved {
181                flow,
182                backend,
183                addr,
184            } => self.on_backend_resolved(flow, backend, addr, now),
185        }
186        // Post-condition: the public method ran to completion under the caller's
187        // lock, so every structural invariant must hold again.
188        self.debug_assert_invariants();
189    }
190
191    /// Tear down a single flow on demand, emitting the same outputs a normal
192    /// idle close produces — `Output::Metric(MetricEvent::FlowEvicted)` then
193    /// `Output::CloseFlow(flow)` — so the shell draining `poll_output()`
194    /// decrements `udp.active_flows` and frees the upstream socket exactly once.
195    ///
196    /// The shell calls this when it cannot establish a flow it just admitted:
197    /// the upstream `connect()` failed (EMFILE / connection refused) or no
198    /// backend resolved. Without it the flow would sit `AwaitingBackend` /
199    /// `Established` pinning a `max_flows` slot for the full idle timeout.
200    ///
201    /// Idempotent: a missing flow or one already `Closing` is a no-op — no
202    /// double-evict, no gauge underflow. Works in either `AwaitingBackend` or
203    /// `Established`. `now` is accepted for signature symmetry with the other
204    /// time-driven entry points (the teardown itself is time-independent).
205    pub fn abort_flow(&mut self, flow: FlowId, _now: Instant, reason: CloseReason) {
206        self.close_flow(flow, reason);
207        self.debug_assert_invariants();
208    }
209
210    /// Tear down EVERY live flow, emitting — for each — the same outputs a
211    /// normal idle close produces (`Output::Metric(MetricEvent::FlowEvicted)`
212    /// then `Output::CloseFlow(flow)`), so the shell draining `poll_output()`
213    /// decrements `udp.active_flows` and frees each upstream socket exactly once
214    /// per flow.
215    ///
216    /// The shell calls this on listener remove / deactivate / soft-stop before
217    /// dropping the manager, so the active-flows gauge does not leak. Draining
218    /// the flow table + slab to zero; a flow already `Closing` is skipped
219    /// (idempotent, no double-evict, no underflow). After this returns,
220    /// `flow_count() == 0` and no timer is armed.
221    pub fn close_all(&mut self, now: Instant) {
222        // Snapshot the live ids first: `close_flow` mutates the slab, so we must
223        // not iterate it while closing. `Closing` slots are already gone from
224        // the slab (`close_flow` removes them), so every id here is live.
225        let live: Vec<FlowId> = self.flows.iter().map(|(id, _)| id).collect();
226        for flow_id in live {
227            self.abort_flow(flow_id, now, CloseReason::Drain);
228        }
229        // Post: the table and slab are drained to zero and no timer is armed.
230        #[cfg(debug_assertions)]
231        {
232            debug_assert_eq!(self.flow_count(), 0, "close_all must drain every flow");
233            debug_assert!(
234                self.table.is_empty(),
235                "close_all must clear every table entry"
236            );
237            debug_assert!(
238                self.armed_deadline.is_none(),
239                "close_all must leave no armed timer"
240            );
241        }
242        self.debug_assert_invariants();
243    }
244
245    fn on_client_datagram(&mut self, src: SocketAddr, payload: &[u8], now: Instant) {
246        // Over-size check first — never allocate for a truncated datagram.
247        if payload.len() > self.max_rx_datagram_size {
248            self.drop_datagram(DropReason::Truncated);
249            return;
250        }
251        // No backend cluster configured → nothing to route to.
252        if self.cluster.cluster.is_empty() {
253            self.drop_datagram(DropReason::NoBackend);
254            return;
255        }
256        // Extract the flow key; rejection allocates nothing.
257        let key = match self.extractor.flow_key(src, payload, &self.cluster) {
258            Some(key) => key,
259            None => {
260                self.drop_datagram(DropReason::Invalid);
261                return;
262            }
263        };
264
265        // Tracked flow → reuse its backend (two-tier selection).
266        if let Some(&flow_id) = self.table.get(&key) {
267            self.forward_on_existing_flow(flow_id, payload, now);
268            return;
269        }
270
271        // New flow. Draining or at cap → shed, allocate nothing.
272        // Drop / shed paths ("silence is a virtue"): a reject must allocate
273        // nothing, so `flows.len()` is unchanged across it. Snapshot the count
274        // to pair-assert that on every early return below. Not debug-gated: the
275        // `debug_assert_eq!`s that read it compile in release too (execution only
276        // is gated); the read is dead there and the optimizer drops the binding.
277        let flows_before_admit = self.flows.len();
278        if self.draining {
279            self.drop_datagram(DropReason::Shed);
280            debug_assert_eq!(
281                self.flows.len(),
282                flows_before_admit,
283                "drain shed must allocate no flow"
284            );
285            return;
286        }
287        if self.flows.len() >= self.max_flows {
288            self.outputs
289                .push_back(Output::Metric(MetricEvent::FlowShed));
290            self.drop_datagram(DropReason::Shed);
291            debug_assert_eq!(
292                self.flows.len(),
293                flows_before_admit,
294                "cap shed must allocate no flow"
295            );
296            return;
297        }
298
299        // Admit. Pre-conditions (positive + negative space): we are on the admit
300        // path, so there is room under the live cap AND the key is NOT already
301        // tracked (a tracked key would have been served above without a new
302        // slot — a double-insert would orphan the previous flow and leak a slab
303        // slot).
304        debug_assert!(
305            self.flows.len() < self.max_flows,
306            "admit path entered while at/over the cap"
307        );
308        debug_assert!(
309            !self.table.contains_key(&key),
310            "admit path entered for a key already in the table (would orphan a flow)"
311        );
312
313        // Admit: one slab slot + one copy of the payload (the design's single
314        // admission copy). The flow is parked AwaitingBackend with the datagram
315        // buffered until the shell resolves a backend. `requests_seen` stays 0:
316        // the datagram is only buffered here, and is counted as a forward when
317        // it is actually flushed in `on_backend_resolved` — so `requests`
318        // measures real forwards, not buffered-and-maybe-discarded datagrams.
319        let mut flow = UdpFlow::new(src, self.cluster.clone(), now);
320        flow.pending_payload = Some(payload.to_vec());
321        let key_hash = self.affinity_hash(&flow);
322        let flow_id = self.flows.insert(flow);
323        self.table.insert(key, flow_id);
324
325        // Post (admission): exactly one slot was added and the key now maps to
326        // it. Pairs the pre-conditions above (grew by exactly 1, not 0 or 2).
327        debug_assert_eq!(
328            self.flows.len(),
329            flows_before_admit + 1,
330            "admission must add exactly one flow"
331        );
332        debug_assert_eq!(
333            self.table.get(&key),
334            Some(&flow_id),
335            "admission must map the key to the new flow"
336        );
337
338        self.outputs
339            .push_back(Output::Metric(MetricEvent::FlowCreated));
340        self.outputs.push_back(Output::SelectBackend {
341            flow: flow_id,
342            cluster: self.cluster.cluster.clone(),
343            key: key_hash,
344        });
345        // Arm the idle timer for the freshly-admitted flow.
346        self.reschedule();
347    }
348
349    fn forward_on_existing_flow(&mut self, flow_id: FlowId, payload: &[u8], now: Instant) {
350        let Some(flow) = self.flows.get_mut(flow_id) else {
351            // Table/slab desync should be impossible, but never panic on it.
352            self.drop_datagram(DropReason::UnknownFlow);
353            return;
354        };
355
356        match flow.phase {
357            FlowPhase::AwaitingBackend => {
358                // Backend not yet resolved: buffer (newest-wins, one slot) and
359                // refresh idle ONLY. Do not count this toward `requests` — the
360                // previously-buffered datagram is now discarded and was never
361                // forwarded. The single surviving buffered datagram is counted
362                // when it is actually flushed in `on_backend_resolved`.
363                flow.pending_payload = Some(payload.to_vec());
364                flow.touch(flow.config.front_timeout, now);
365                self.reschedule();
366            }
367            FlowPhase::Established => {
368                flow.on_client_datagram(now);
369                let backend = flow
370                    .backend_addr
371                    .expect("Established flow always has a backend address");
372                let mut out = payload.to_vec();
373                if flow.take_proxy_protocol() {
374                    prepend_dgram_header(&mut out, flow.client, backend);
375                }
376                let teardown = flow.teardown_reason();
377                self.outputs
378                    .push_back(Output::Metric(MetricEvent::DatagramIn(payload.len())));
379                self.outputs.push_back(Output::SendToBackend(Transmit {
380                    dst: backend,
381                    segment_size: None,
382                    payload: out,
383                }));
384                if let Some(reason) = teardown {
385                    self.close_flow(flow_id, reason);
386                } else {
387                    self.reschedule();
388                }
389            }
390            FlowPhase::Closing => {
391                // Racing datagram against a teardown already decided: drop it.
392                self.drop_datagram(DropReason::Shed);
393            }
394        }
395    }
396
397    fn on_backend_resolved(
398        &mut self,
399        flow_id: FlowId,
400        backend: String,
401        addr: SocketAddr,
402        now: Instant,
403    ) {
404        let Some(flow) = self.flows.get_mut(flow_id) else {
405            // Flow was reaped (idle/drain) before the shell resolved a backend.
406            self.drop_datagram(DropReason::UnknownFlow);
407            return;
408        };
409        if flow.phase != FlowPhase::AwaitingBackend {
410            // Duplicate / late resolution; ignore without allocating.
411            return;
412        }
413        flow.backend_id = Some(backend);
414        flow.backend_addr = Some(addr);
415        flow.set_phase(FlowPhase::Established);
416
417        // Open the connected upstream socket (the shell registers
418        // upstream_token -> flow for NAT return).
419        self.outputs.push_back(Output::OpenUpstream {
420            flow: flow_id,
421            backend: addr,
422        });
423
424        // Flush the buffered first datagram, if any. This is the real forward
425        // site for the admission datagram, so count it toward `requests` here
426        // (refreshing the front idle deadline at the same time) — not at
427        // admission, where it was only buffered.
428        let pending = flow.pending_payload.take();
429        if let Some(mut payload) = pending {
430            let payload_len = payload.len();
431            flow.on_client_datagram(now);
432            if flow.take_proxy_protocol() {
433                prepend_dgram_header(&mut payload, flow.client, addr);
434            }
435            let teardown = flow.teardown_reason();
436            self.outputs
437                .push_back(Output::Metric(MetricEvent::DatagramIn(payload_len)));
438            self.outputs.push_back(Output::SendToBackend(Transmit {
439                dst: addr,
440                segment_size: None,
441                payload,
442            }));
443            if let Some(reason) = teardown {
444                self.close_flow(flow_id, reason);
445                return;
446            }
447            // `on_client_datagram` already refreshed the deadline + generation.
448            self.reschedule();
449            return;
450        }
451        // No buffered datagram: refresh idle and re-arm (phase changed; the
452        // deadline was set at `new()`).
453        let _gen = self
454            .flows
455            .get_mut(flow_id)
456            .map(|f| f.touch(self.cluster.front_timeout, now));
457        self.reschedule();
458    }
459
460    fn on_backend_datagram(&mut self, flow_id: FlowId, payload: &[u8], now: Instant) {
461        if payload.len() > self.max_rx_datagram_size {
462            self.drop_datagram(DropReason::Truncated);
463            return;
464        }
465        let Some(flow) = self.flows.get_mut(flow_id) else {
466            self.drop_datagram(DropReason::UnknownFlow);
467            return;
468        };
469        if flow.phase != FlowPhase::Established {
470            self.drop_datagram(DropReason::UnknownFlow);
471            return;
472        }
473        flow.on_backend_datagram(now);
474        let client = flow.client;
475        let teardown = flow.teardown_reason();
476        self.outputs
477            .push_back(Output::Metric(MetricEvent::DatagramOut(payload.len())));
478        self.outputs.push_back(Output::SendToClient(Transmit {
479            dst: client,
480            segment_size: None,
481            payload: payload.to_vec(),
482        }));
483        if let Some(reason) = teardown {
484            self.close_flow(flow_id, reason);
485        } else {
486            self.reschedule();
487        }
488    }
489
490    fn on_config(&mut self, event: ConfigEvent, _now: Instant) {
491        match event {
492            ConfigEvent::SetCluster(cfg) => self.cluster = cfg,
493            ConfigEvent::SetMaxFlows(n) => {
494                self.max_flows = n;
495                // Track the high-water mark: a `SetMaxFlows` may shrink the live
496                // cap below `flows.len()` (documented), so the only durable cap
497                // bound is the largest cap ever in force.
498                #[cfg(debug_assertions)]
499                {
500                    self.max_flows_high_water = self.max_flows_high_water.max(n);
501                }
502            }
503            ConfigEvent::SetMaxRxDatagramSize(n) => self.max_rx_datagram_size = n,
504            ConfigEvent::Drain => self.draining = true,
505        }
506    }
507
508    // ---- timers ------------------------------------------------------------
509
510    /// Fire all flows whose idle deadline has elapsed at `now`. A flow is only
511    /// closed if its generation token still matches the scheduled deadline —
512    /// generation mismatch means the flow saw traffic and was rescheduled, so
513    /// the stale expiry is ignored (defeats the busy-loop / stale-close bug).
514    pub fn handle_timeout(&mut self, now: Instant) {
515        // Collect due flow ids first to avoid borrowing the slab while mutating.
516        let due: Vec<FlowId> = self
517            .flows
518            .iter()
519            .filter(|(_, flow)| flow.idle_deadline <= now)
520            .map(|(id, _)| id)
521            .collect();
522        for flow_id in due {
523            // Re-check under current state (a flow may have been closed by an
524            // earlier iteration's teardown, though here ids are disjoint).
525            if let Some(flow) = self.flows.get(flow_id) {
526                if flow.idle_deadline <= now && flow.phase != FlowPhase::Closing {
527                    self.close_flow(flow_id, CloseReason::Idle);
528                }
529            }
530        }
531        self.reschedule();
532
533        // Strict-advance guard: after firing every flow due at `now`, the next
534        // armed deadline (if any) MUST be strictly greater than `now`. A
535        // deadline `<= now` would make the shell immediately re-fire and spin —
536        // the canonical sans-io busy-loop bug. This is the real reason the
537        // generation tokens + `reschedule` exist.
538        #[cfg(debug_assertions)]
539        if let Some(next) = self.armed_deadline {
540            debug_assert!(
541                next > now,
542                "UdpManager::poll_timeout must strictly advance past a firing: \
543                 armed {next:?} <= fired_at {now:?} (busy-loop)"
544            );
545        }
546
547        // Post: every flow due at `now` was reaped — no live flow may retain a
548        // deadline `<= now`. Pair with the strict-advance guard above: that one
549        // proves the next *armed* deadline advanced, this one proves no *flow*
550        // was left behind due (a leak the armed-deadline check alone misses,
551        // since min() over an empty set is None regardless of stragglers).
552        #[cfg(debug_assertions)]
553        for (id, flow) in self.flows.iter() {
554            debug_assert!(
555                flow.idle_deadline > now,
556                "FlowId {id} still due after handle_timeout: deadline {:?} <= now {now:?}",
557                flow.idle_deadline,
558            );
559        }
560        self.debug_assert_invariants();
561    }
562
563    /// The next manager-wide deadline, or `None` if no flow is armed. After a
564    /// [`handle_timeout`] at deadline `d`, the value returned here is guaranteed
565    /// `> d` (or `None`) — the strict-advance invariant `handle_timeout` asserts
566    /// in debug builds, which is what stops the shell busy-looping.
567    pub fn poll_timeout(&self) -> Option<Instant> {
568        self.armed_deadline
569    }
570
571    /// Drain the next queued output, or `None` when the queue is empty.
572    pub fn poll_output(&mut self) -> Option<Output> {
573        self.outputs.pop_front()
574    }
575
576    // ---- internals ---------------------------------------------------------
577
578    /// Recompute the earliest flow deadline and emit `ArmTimer` only when it
579    /// changes, so the shell re-arms its wheel exactly once per real change.
580    fn reschedule(&mut self) {
581        let next = self
582            .flows
583            .iter()
584            .filter(|(_, f)| f.phase != FlowPhase::Closing)
585            .map(|(_, f)| f.idle_deadline)
586            .min();
587        if next != self.armed_deadline {
588            self.armed_deadline = next;
589            if let Some(deadline) = next {
590                self.outputs.push_back(Output::ArmTimer(deadline));
591            }
592        }
593    }
594
595    /// Tear down a flow: remove it from the table + slab, emit `CloseFlow` and
596    /// the eviction metric. Idempotent — a missing flow is a no-op (never an
597    /// underflow). Re-arms the manager timer.
598    fn close_flow(&mut self, flow_id: FlowId, _reason: CloseReason) {
599        let Some(flow) = self.flows.get_mut(flow_id) else {
600            return;
601        };
602        if flow.phase == FlowPhase::Closing {
603            return;
604        }
605        flow.set_phase(FlowPhase::Closing);
606        let key = FlowKey::from_src(flow.client, self.cluster.affinity_with_port);
607        // Remove the table entry only if it still points at this flow; a
608        // recreated flow under the same key must not be unmapped.
609        if self.table.get(&key) == Some(&flow_id) {
610            self.table.remove(&key);
611        } else {
612            // The flow was keyed when admitted; recompute via its own config to
613            // be robust to a mid-flow affinity change.
614            let own_key = FlowKey::from_src(flow.client, flow.config.affinity_with_port);
615            if self.table.get(&own_key) == Some(&flow_id) {
616                self.table.remove(&own_key);
617            }
618        }
619        self.flows.remove(flow_id);
620        self.outputs
621            .push_back(Output::Metric(MetricEvent::FlowEvicted));
622        self.outputs.push_back(Output::CloseFlow(flow_id));
623        self.reschedule();
624
625        // Post: the slot is gone from the slab AND no table key still maps to it
626        // (a stale table key would dangle — caught by check_invariants (1), but
627        // assert it here too so the local failure points at close_flow). Pair:
628        // positive = removed from slab; negative = no key still references it.
629        #[cfg(debug_assertions)]
630        {
631            debug_assert!(
632                !self.flows.contains(flow_id),
633                "close_flow left FlowId {flow_id} in the slab"
634            );
635            debug_assert!(
636                self.table.values().all(|&id| id != flow_id),
637                "close_flow left a table entry mapping to the removed FlowId {flow_id}"
638            );
639        }
640    }
641
642    /// Emit a drop with its by-reason metric. Allocates nothing per the
643    /// "silence is a virtue" posture.
644    fn drop_datagram(&mut self, reason: DropReason) {
645        // "Silence is a virtue": a drop allocates no flow and frees none — the
646        // slab is untouched across the reject. Snapshot + pair-assert so a future
647        // edit that accidentally mutates the slab on a drop path is caught loudly.
648        // Not debug-gated: the `debug_assert_eq!` reads it in release too (only
649        // execution is gated); dead there, dropped by the optimizer.
650        let flows_before_drop = self.flows.len();
651        self.outputs
652            .push_back(Output::Metric(MetricEvent::DatagramDropped(reason)));
653        self.outputs.push_back(Output::Drop(reason));
654        debug_assert_eq!(
655            self.flows.len(),
656            flows_before_drop,
657            "a drop must allocate nothing and free nothing (flows.len() unchanged)"
658        );
659    }
660
661    /// Affinity hash for a flow: `hash(seed, affinity_key)`. The shell feeds
662    /// this into HRW (`max hash`) / Maglev (`key % M`) / RR (ignores it).
663    fn affinity_hash(&self, flow: &UdpFlow) -> u64 {
664        let mut hasher = std::collections::hash_map::DefaultHasher::new();
665        self.hash_seed.hash(&mut hasher);
666        if flow.config.affinity_with_port {
667            flow.client.hash(&mut hasher);
668        } else {
669            flow.client.ip().hash(&mut hasher);
670        }
671        hasher.finish()
672    }
673
674    /// TigerStyle invariant sweep (TigerBeetle / FoundationDB style). A single
675    /// full check of every structural invariant the manager must preserve,
676    /// asserted at the END of every public mutating method via
677    /// [`debug_assert_invariants`](Self::debug_assert_invariants). Compiled out
678    /// entirely in release (`#[cfg(debug_assertions)]`); on in every test / e2e /
679    /// fuzz / dev build. It must NEVER change runtime behavior — it only reads.
680    ///
681    /// The right place for the sweep is a *post-condition*: these public methods
682    /// run to completion under the caller's lock, so the table/slab/timer state
683    /// is fully reconciled by the time the method returns.
684    #[cfg(debug_assertions)]
685    fn check_invariants(&self) {
686        use std::collections::HashSet;
687
688        // (3) flow_count() is exactly the slab population.
689        debug_assert_eq!(
690            self.flow_count(),
691            self.flows.len(),
692            "flow_count() must equal flows.len()"
693        );
694
695        // (1) Table -> slab consistency + (2) table injectivity: every FlowId in
696        // the table points at a live slab slot, and no two keys share a FlowId.
697        let mut seen_ids: HashSet<FlowId> = HashSet::with_capacity(self.table.len());
698        for (key, &id) in self.table.iter() {
699            debug_assert!(
700                self.flows.contains(id),
701                "table key {key:?} maps to FlowId {id} absent from the slab (dangling key)"
702            );
703            debug_assert!(
704                seen_ids.insert(id),
705                "table injectivity violated: FlowId {id} is the target of two distinct FlowKeys"
706            );
707        }
708        // Pair (negative space): a live flow that is reachable from the table is
709        // mapped exactly once — there is never a live flow with two table keys.
710        debug_assert!(
711            seen_ids.len() <= self.flows.len(),
712            "more distinct table targets than live flows"
713        );
714
715        // Per-flow invariants over the slab.
716        let mut min_live_deadline: Option<Instant> = None;
717        let mut live_count = 0usize;
718        for (id, flow) in self.flows.iter() {
719            // (4) No slab flow is Closing. close_flow sets Closing then removes
720            // the slot in the same call, so a Closing flow must never persist.
721            // Pair: positive space — every live flow is Awaiting or Established.
722            debug_assert_ne!(
723                flow.phase,
724                FlowPhase::Closing,
725                "FlowId {id} persists in the slab while Closing (close_flow must remove it)"
726            );
727            debug_assert!(
728                matches!(
729                    flow.phase,
730                    FlowPhase::AwaitingBackend | FlowPhase::Established
731                ),
732                "FlowId {id} has an unexpected live phase {:?}",
733                flow.phase
734            );
735
736            // (5) Established <=> backend_addr.is_some(); AwaitingBackend <=>
737            // backend_addr.is_none() (positive + negative space).
738            match flow.phase {
739                FlowPhase::Established => debug_assert!(
740                    flow.backend_addr.is_some(),
741                    "Established FlowId {id} has no backend address"
742                ),
743                FlowPhase::AwaitingBackend => debug_assert!(
744                    flow.backend_addr.is_none(),
745                    "AwaitingBackend FlowId {id} already carries a backend address"
746                ),
747                FlowPhase::Closing => {}
748            }
749
750            // (7) Counters within caps OR a teardown is due. A flow whose cap is
751            // exhausted must report a teardown reason; an exhausted flow that
752            // reports None would be an immortal flow (the cap silently lost).
753            if flow.requests_exhausted() || flow.responses_exhausted() {
754                debug_assert!(
755                    flow.teardown_reason().is_some(),
756                    "FlowId {id} exhausted a cap (req {}/{}, resp {}/{}) but reports no teardown",
757                    flow.requests_seen,
758                    flow.config.requests,
759                    flow.responses_seen,
760                    flow.config.responses,
761                );
762            } else {
763                // Pair (negative): a flow within both caps must NOT report a
764                // cap-driven teardown (idle teardown is handled by the timer,
765                // not teardown_reason()).
766                debug_assert!(
767                    flow.teardown_reason().is_none(),
768                    "FlowId {id} reports a teardown while within both caps"
769                );
770            }
771
772            live_count += 1;
773            min_live_deadline = Some(match min_live_deadline {
774                Some(d) => d.min(flow.idle_deadline),
775                None => flow.idle_deadline,
776            });
777        }
778
779        // The high-water cap bounds the live population at all times (the live
780        // cap itself can be shrunk below flows.len() by SetMaxFlows, so we assert
781        // against the largest cap ever in force, not max_flows).
782        debug_assert!(
783            self.flows.len() <= self.max_flows_high_water,
784            "live flows {} exceed the high-water cap {}",
785            self.flows.len(),
786            self.max_flows_high_water,
787        );
788
789        // (6) Timer coherence: armed_deadline is Some IFF at least one (non-
790        // Closing) live flow exists, and when Some it equals the minimum
791        // idle_deadline over live flows. Closing flows are never in the slab
792        // (invariant 4), so "live" == "slab" here.
793        debug_assert_eq!(
794            self.armed_deadline.is_some(),
795            live_count > 0,
796            "timer coherence: armed_deadline.is_some() ({}) must match having live flows ({})",
797            self.armed_deadline.is_some(),
798            live_count > 0,
799        );
800        if let Some(armed) = self.armed_deadline {
801            debug_assert_eq!(
802                Some(armed),
803                min_live_deadline,
804                "timer coherence: armed deadline {armed:?} must equal the minimum live idle deadline {min_live_deadline:?}"
805            );
806        }
807    }
808
809    /// Run the full [`check_invariants`](Self::check_invariants) sweep, but only
810    /// in debug builds. A thin wrapper so call sites read as one line and so the
811    /// whole sweep — and any read it performs — is dead-stripped in release.
812    #[inline]
813    fn debug_assert_invariants(&self) {
814        #[cfg(debug_assertions)]
815        self.check_invariants();
816    }
817}
818
819#[cfg(test)]
820mod tests {
821    use std::{
822        net::{IpAddr, Ipv4Addr},
823        time::Duration,
824    };
825
826    use super::*;
827
828    fn cluster(name: &str) -> ClusterConfig {
829        ClusterConfig {
830            cluster: name.to_owned(),
831            front_timeout: Duration::from_secs(30),
832            back_timeout: Duration::from_secs(30),
833            ..Default::default()
834        }
835    }
836
837    fn client(n: u8, port: u16) -> SocketAddr {
838        SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, n)), port)
839    }
840
841    fn backend() -> SocketAddr {
842        SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 5300)
843    }
844
845    /// Drain all outputs into a Vec for assertions.
846    fn drain(mgr: &mut UdpManager) -> Vec<Output> {
847        let mut out = Vec::new();
848        while let Some(o) = mgr.poll_output() {
849            out.push(o);
850        }
851        out
852    }
853
854    #[test]
855    fn unknown_datagram_allocates_nothing_when_no_cluster() {
856        let mut mgr = UdpManager::new(ClusterConfig::default(), 16, 65535, 0xABCD);
857        let now = Instant::now();
858        mgr.handle_input(
859            ManagerInput::ClientDatagram {
860                src: client(1, 1000),
861                payload: b"hi",
862            },
863            now,
864        );
865        assert_eq!(mgr.flow_count(), 0);
866        let outs = drain(&mut mgr);
867        assert!(
868            outs.iter()
869                .any(|o| matches!(o, Output::Drop(DropReason::NoBackend)))
870        );
871        assert!(
872            !outs
873                .iter()
874                .any(|o| matches!(o, Output::SelectBackend { .. }))
875        );
876    }
877
878    #[test]
879    fn empty_datagram_is_invalid() {
880        let mut mgr = UdpManager::new(cluster("dns"), 16, 65535, 1);
881        mgr.handle_input(
882            ManagerInput::ClientDatagram {
883                src: client(1, 1000),
884                payload: b"",
885            },
886            Instant::now(),
887        );
888        assert_eq!(mgr.flow_count(), 0);
889        let outs = drain(&mut mgr);
890        assert!(
891            outs.iter()
892                .any(|o| matches!(o, Output::Drop(DropReason::Invalid)))
893        );
894    }
895
896    #[test]
897    fn truncated_datagram_dropped_before_admission() {
898        let mut mgr = UdpManager::new(cluster("dns"), 16, 4, 1);
899        mgr.handle_input(
900            ManagerInput::ClientDatagram {
901                src: client(1, 1000),
902                payload: b"toolong",
903            },
904            Instant::now(),
905        );
906        assert_eq!(mgr.flow_count(), 0);
907        let outs = drain(&mut mgr);
908        assert!(
909            outs.iter()
910                .any(|o| matches!(o, Output::Drop(DropReason::Truncated)))
911        );
912    }
913
914    #[test]
915    fn new_flow_requests_backend_then_forwards_buffered_datagram() {
916        let mut mgr = UdpManager::new(cluster("dns"), 16, 65535, 7);
917        let now = Instant::now();
918        let src = client(1, 1000);
919        mgr.handle_input(
920            ManagerInput::ClientDatagram {
921                src,
922                payload: b"query",
923            },
924            now,
925        );
926        assert_eq!(mgr.flow_count(), 1);
927        let outs = drain(&mut mgr);
928        let select = outs
929            .iter()
930            .find_map(|o| match o {
931                Output::SelectBackend { flow, cluster, .. } => Some((*flow, cluster.clone())),
932                _ => None,
933            })
934            .expect("SelectBackend emitted");
935        assert_eq!(select.1, "dns");
936        // No SendToBackend yet — backend not resolved.
937        assert!(!outs.iter().any(|o| matches!(o, Output::SendToBackend(_))));
938
939        mgr.handle_input(
940            ManagerInput::BackendResolved {
941                flow: select.0,
942                backend: "b1".to_owned(),
943                addr: backend(),
944            },
945            now,
946        );
947        let outs = drain(&mut mgr);
948        assert!(
949            outs.iter()
950                .any(|o| matches!(o, Output::OpenUpstream { .. }))
951        );
952        let sent = outs
953            .iter()
954            .find_map(|o| match o {
955                Output::SendToBackend(t) => Some(t.clone()),
956                _ => None,
957            })
958            .expect("buffered datagram flushed on resolve");
959        assert_eq!(sent.dst, backend());
960        assert_eq!(sent.payload, b"query");
961    }
962
963    #[test]
964    fn tracked_flow_reuses_backend() {
965        let mut mgr = UdpManager::new(cluster("dns"), 16, 65535, 7);
966        let now = Instant::now();
967        let src = client(1, 1000);
968        mgr.handle_input(
969            ManagerInput::ClientDatagram {
970                src,
971                payload: b"q1",
972            },
973            now,
974        );
975        let select_flow = drain(&mut mgr)
976            .into_iter()
977            .find_map(|o| match o {
978                Output::SelectBackend { flow, .. } => Some(flow),
979                _ => None,
980            })
981            .unwrap();
982        mgr.handle_input(
983            ManagerInput::BackendResolved {
984                flow: select_flow,
985                backend: "b1".to_owned(),
986                addr: backend(),
987            },
988            now,
989        );
990        drain(&mut mgr);
991
992        // Second datagram from same source: no new SelectBackend, direct send.
993        mgr.handle_input(
994            ManagerInput::ClientDatagram {
995                src,
996                payload: b"q2",
997            },
998            now,
999        );
1000        assert_eq!(mgr.flow_count(), 1);
1001        let outs = drain(&mut mgr);
1002        assert!(
1003            !outs
1004                .iter()
1005                .any(|o| matches!(o, Output::SelectBackend { .. }))
1006        );
1007        assert!(
1008            outs.iter()
1009                .any(|o| matches!(o, Output::SendToBackend(t) if t.payload == b"q2"))
1010        );
1011    }
1012
1013    #[test]
1014    fn responses_one_closes_flow_after_single_reply() {
1015        let mut cfg = cluster("dns");
1016        cfg.responses = 1;
1017        let mut mgr = UdpManager::new(cfg, 16, 65535, 7);
1018        let now = Instant::now();
1019        let src = client(1, 1000);
1020        mgr.handle_input(ManagerInput::ClientDatagram { src, payload: b"q" }, now);
1021        let flow = drain(&mut mgr)
1022            .into_iter()
1023            .find_map(|o| match o {
1024                Output::SelectBackend { flow, .. } => Some(flow),
1025                _ => None,
1026            })
1027            .unwrap();
1028        mgr.handle_input(
1029            ManagerInput::BackendResolved {
1030                flow,
1031                backend: "b1".to_owned(),
1032                addr: backend(),
1033            },
1034            now,
1035        );
1036        drain(&mut mgr);
1037
1038        // Single backend reply closes the flow.
1039        mgr.handle_input(
1040            ManagerInput::BackendDatagram {
1041                flow,
1042                payload: b"answer",
1043            },
1044            now,
1045        );
1046        let outs = drain(&mut mgr);
1047        assert!(
1048            outs.iter()
1049                .any(|o| matches!(o, Output::SendToClient(t) if t.payload == b"answer"))
1050        );
1051        assert!(
1052            outs.iter()
1053                .any(|o| matches!(o, Output::CloseFlow(f) if *f == flow))
1054        );
1055        assert_eq!(mgr.flow_count(), 0);
1056    }
1057
1058    #[test]
1059    fn requests_cap_closes_flow() {
1060        let mut cfg = cluster("syslog");
1061        cfg.requests = 2;
1062        let mut mgr = UdpManager::new(cfg, 16, 65535, 7);
1063        let now = Instant::now();
1064        let src = client(1, 1000);
1065        mgr.handle_input(ManagerInput::ClientDatagram { src, payload: b"1" }, now);
1066        let flow = drain(&mut mgr)
1067            .into_iter()
1068            .find_map(|o| match o {
1069                Output::SelectBackend { flow, .. } => Some(flow),
1070                _ => None,
1071            })
1072            .unwrap();
1073        mgr.handle_input(
1074            ManagerInput::BackendResolved {
1075                flow,
1076                backend: "b1".to_owned(),
1077                addr: backend(),
1078            },
1079            now,
1080        );
1081        drain(&mut mgr);
1082        // requests_seen is now 1 (the admission datagram). Second hits the cap.
1083        mgr.handle_input(ManagerInput::ClientDatagram { src, payload: b"2" }, now);
1084        let outs = drain(&mut mgr);
1085        assert!(outs.iter().any(|o| matches!(o, Output::CloseFlow(_))));
1086        assert_eq!(mgr.flow_count(), 0);
1087    }
1088
1089    #[test]
1090    fn flow_table_full_sheds_new_flow() {
1091        let mut mgr = UdpManager::new(cluster("dns"), 1, 65535, 7);
1092        let now = Instant::now();
1093        mgr.handle_input(
1094            ManagerInput::ClientDatagram {
1095                src: client(1, 1000),
1096                payload: b"a",
1097            },
1098            now,
1099        );
1100        assert_eq!(mgr.flow_count(), 1);
1101        drain(&mut mgr);
1102        // Second distinct source over cap → shed.
1103        mgr.handle_input(
1104            ManagerInput::ClientDatagram {
1105                src: client(2, 1000),
1106                payload: b"b",
1107            },
1108            now,
1109        );
1110        assert_eq!(mgr.flow_count(), 1);
1111        let outs = drain(&mut mgr);
1112        assert!(
1113            outs.iter()
1114                .any(|o| matches!(o, Output::Metric(MetricEvent::FlowShed)))
1115        );
1116        assert!(
1117            outs.iter()
1118                .any(|o| matches!(o, Output::Drop(DropReason::Shed)))
1119        );
1120    }
1121
1122    #[test]
1123    fn idle_timeout_closes_flow() {
1124        let mut cfg = cluster("dns");
1125        cfg.front_timeout = Duration::from_secs(10);
1126        let mut mgr = UdpManager::new(cfg, 16, 65535, 7);
1127        let now = Instant::now();
1128        mgr.handle_input(
1129            ManagerInput::ClientDatagram {
1130                src: client(1, 1000),
1131                payload: b"q",
1132            },
1133            now,
1134        );
1135        drain(&mut mgr);
1136        let deadline = mgr.poll_timeout().expect("timer armed");
1137        assert!(deadline >= now + Duration::from_secs(10));
1138        // Fire after the deadline.
1139        mgr.handle_timeout(now + Duration::from_secs(11));
1140        let outs = drain(&mut mgr);
1141        assert!(outs.iter().any(|o| matches!(o, Output::CloseFlow(_))));
1142        assert_eq!(mgr.flow_count(), 0);
1143        assert!(mgr.poll_timeout().is_none());
1144    }
1145
1146    #[test]
1147    fn idle_race_resolved_by_generation_token() {
1148        // A datagram refreshes the deadline; the stale expiry must NOT close.
1149        let mut cfg = cluster("dns");
1150        cfg.front_timeout = Duration::from_secs(10);
1151        let mut mgr = UdpManager::new(cfg, 16, 65535, 7);
1152        let now = Instant::now();
1153        let src = client(1, 1000);
1154        mgr.handle_input(ManagerInput::ClientDatagram { src, payload: b"q" }, now);
1155        let flow = drain(&mut mgr)
1156            .into_iter()
1157            .find_map(|o| match o {
1158                Output::SelectBackend { flow, .. } => Some(flow),
1159                _ => None,
1160            })
1161            .unwrap();
1162        mgr.handle_input(
1163            ManagerInput::BackendResolved {
1164                flow,
1165                backend: "b1".to_owned(),
1166                addr: backend(),
1167            },
1168            now,
1169        );
1170        drain(&mut mgr);
1171        let gen0 = mgr.flow(flow).unwrap().timer_gen;
1172        // Datagram at t=5 refreshes deadline to t=15 and bumps generation.
1173        let t5 = now + Duration::from_secs(5);
1174        mgr.handle_input(
1175            ManagerInput::ClientDatagram {
1176                src,
1177                payload: b"q2",
1178            },
1179            t5,
1180        );
1181        drain(&mut mgr);
1182        let gen1 = mgr.flow(flow).unwrap().timer_gen;
1183        assert_ne!(gen0, gen1, "generation token must bump on touch");
1184        // Stale expiry at the original t=10 deadline must NOT close (deadline is
1185        // now t=15).
1186        mgr.handle_timeout(now + Duration::from_secs(10));
1187        assert_eq!(mgr.flow_count(), 1, "refreshed flow survives stale expiry");
1188        // The real deadline at t=15 closes it.
1189        mgr.handle_timeout(now + Duration::from_secs(16));
1190        assert_eq!(mgr.flow_count(), 0);
1191    }
1192
1193    #[test]
1194    fn drain_sheds_new_flows_but_keeps_existing() {
1195        let mut mgr = UdpManager::new(cluster("dns"), 16, 65535, 7);
1196        let now = Instant::now();
1197        let src = client(1, 1000);
1198        mgr.handle_input(ManagerInput::ClientDatagram { src, payload: b"q" }, now);
1199        drain(&mut mgr);
1200        mgr.handle_input(ManagerInput::Config(ConfigEvent::Drain), now);
1201        // New flow shed.
1202        mgr.handle_input(
1203            ManagerInput::ClientDatagram {
1204                src: client(2, 1000),
1205                payload: b"q",
1206            },
1207            now,
1208        );
1209        assert_eq!(mgr.flow_count(), 1, "existing flow kept, new flow shed");
1210        let outs = drain(&mut mgr);
1211        assert!(
1212            outs.iter()
1213                .any(|o| matches!(o, Output::Drop(DropReason::Shed)))
1214        );
1215    }
1216
1217    #[test]
1218    fn reconfig_midflow_preserves_existing_flow_contract() {
1219        let mut cfg = cluster("dns");
1220        cfg.responses = 0;
1221        let mut mgr = UdpManager::new(cfg, 16, 65535, 7);
1222        let now = Instant::now();
1223        let src = client(1, 1000);
1224        mgr.handle_input(ManagerInput::ClientDatagram { src, payload: b"q" }, now);
1225        let flow = drain(&mut mgr)
1226            .into_iter()
1227            .find_map(|o| match o {
1228                Output::SelectBackend { flow, .. } => Some(flow),
1229                _ => None,
1230            })
1231            .unwrap();
1232        mgr.handle_input(
1233            ManagerInput::BackendResolved {
1234                flow,
1235                backend: "b1".to_owned(),
1236                addr: backend(),
1237            },
1238            now,
1239        );
1240        drain(&mut mgr);
1241        // Reconfigure to responses=1; the live flow keeps responses=0 (captured).
1242        let mut newcfg = cluster("dns");
1243        newcfg.responses = 1;
1244        mgr.handle_input(ManagerInput::Config(ConfigEvent::SetCluster(newcfg)), now);
1245        // A reply does NOT close the existing flow (it captured responses=0).
1246        mgr.handle_input(
1247            ManagerInput::BackendDatagram {
1248                flow,
1249                payload: b"reply",
1250            },
1251            now,
1252        );
1253        assert_eq!(mgr.flow_count(), 1);
1254    }
1255
1256    #[test]
1257    fn reaper_drains_active_flows_to_zero() {
1258        let mut cfg = cluster("dns");
1259        cfg.front_timeout = Duration::from_secs(5);
1260        let mut mgr = UdpManager::new(cfg, 64, 65535, 7);
1261        let now = Instant::now();
1262        for n in 1..=10u8 {
1263            mgr.handle_input(
1264                ManagerInput::ClientDatagram {
1265                    src: client(n, 1000),
1266                    payload: b"q",
1267                },
1268                now,
1269            );
1270        }
1271        assert_eq!(mgr.flow_count(), 10);
1272        drain(&mut mgr);
1273        // Reaper after all idle deadlines pass.
1274        mgr.handle_timeout(now + Duration::from_secs(6));
1275        assert_eq!(mgr.flow_count(), 0, "reaper drains every flow, no leak");
1276        let outs = drain(&mut mgr);
1277        let closes = outs
1278            .iter()
1279            .filter(|o| matches!(o, Output::CloseFlow(_)))
1280            .count();
1281        assert_eq!(closes, 10);
1282        assert!(mgr.poll_timeout().is_none(), "no armed timer after drain");
1283    }
1284
1285    #[test]
1286    fn proxy_protocol_first_datagram_only() {
1287        let mut cfg = cluster("dns");
1288        cfg.send_proxy_protocol = true;
1289        cfg.proxy_protocol_every_datagram = false;
1290        let mut mgr = UdpManager::new(cfg, 16, 65535, 7);
1291        let now = Instant::now();
1292        let src = client(1, 1000);
1293        mgr.handle_input(
1294            ManagerInput::ClientDatagram {
1295                src,
1296                payload: b"q1",
1297            },
1298            now,
1299        );
1300        let flow = drain(&mut mgr)
1301            .into_iter()
1302            .find_map(|o| match o {
1303                Output::SelectBackend { flow, .. } => Some(flow),
1304                _ => None,
1305            })
1306            .unwrap();
1307        mgr.handle_input(
1308            ManagerInput::BackendResolved {
1309                flow,
1310                backend: "b1".to_owned(),
1311                addr: backend(),
1312            },
1313            now,
1314        );
1315        // First flushed datagram carries the PPv2 prefix.
1316        let first = drain(&mut mgr)
1317            .into_iter()
1318            .find_map(|o| match o {
1319                Output::SendToBackend(t) => Some(t.payload),
1320                _ => None,
1321            })
1322            .unwrap();
1323        assert!(first.len() > 2, "PPv2 header prepended to first datagram");
1324        assert_eq!(&first[..4], &[0x0D, 0x0A, 0x0D, 0x0A]);
1325        assert_eq!(first[12], 0x21);
1326        assert_eq!(first[13], 0x12);
1327        assert_eq!(&first[first.len() - 2..], b"q1");
1328
1329        // Second datagram: NO prefix.
1330        mgr.handle_input(
1331            ManagerInput::ClientDatagram {
1332                src,
1333                payload: b"q2",
1334            },
1335            now,
1336        );
1337        let second = drain(&mut mgr)
1338            .into_iter()
1339            .find_map(|o| match o {
1340                Output::SendToBackend(t) => Some(t.payload),
1341                _ => None,
1342            })
1343            .unwrap();
1344        assert_eq!(second, b"q2", "no PPv2 prefix on subsequent datagrams");
1345    }
1346
1347    /// Helper: admit a flow from `src`, resolve it to `backend()`, and return
1348    /// its FlowId. Drains the manager between steps. Leaves the flow
1349    /// `Established`.
1350    fn establish(mgr: &mut UdpManager, src: SocketAddr, now: Instant) -> FlowId {
1351        mgr.handle_input(ManagerInput::ClientDatagram { src, payload: b"q" }, now);
1352        let flow = drain(mgr)
1353            .into_iter()
1354            .find_map(|o| match o {
1355                Output::SelectBackend { flow, .. } => Some(flow),
1356                _ => None,
1357            })
1358            .unwrap();
1359        mgr.handle_input(
1360            ManagerInput::BackendResolved {
1361                flow,
1362                backend: "b1".to_owned(),
1363                addr: backend(),
1364            },
1365            now,
1366        );
1367        drain(mgr);
1368        flow
1369    }
1370
1371    #[test]
1372    fn close_all_evicts_every_flow_exactly_once() {
1373        let mut cfg = cluster("dns");
1374        cfg.front_timeout = Duration::from_secs(30);
1375        let mut mgr = UdpManager::new(cfg, 64, 65535, 7);
1376        let now = Instant::now();
1377        // N flows: a mix of Established and AwaitingBackend.
1378        const N: usize = 6;
1379        let mut flow_ids = Vec::new();
1380        for n in 1..=4u8 {
1381            flow_ids.push(establish(&mut mgr, client(n, 1000), now));
1382        }
1383        // Two flows left AwaitingBackend (admitted, not resolved).
1384        for n in 5..=6u8 {
1385            mgr.handle_input(
1386                ManagerInput::ClientDatagram {
1387                    src: client(n, 1000),
1388                    payload: b"q",
1389                },
1390                now,
1391            );
1392            let flow = drain(&mut mgr)
1393                .into_iter()
1394                .find_map(|o| match o {
1395                    Output::SelectBackend { flow, .. } => Some(flow),
1396                    _ => None,
1397                })
1398                .unwrap();
1399            flow_ids.push(flow);
1400        }
1401        assert_eq!(mgr.flow_count(), N);
1402
1403        mgr.close_all(now);
1404        let outs = drain(&mut mgr);
1405        let evicted = outs
1406            .iter()
1407            .filter(|o| matches!(o, Output::Metric(MetricEvent::FlowEvicted)))
1408            .count();
1409        let closed = outs
1410            .iter()
1411            .filter(|o| matches!(o, Output::CloseFlow(_)))
1412            .count();
1413        assert_eq!(evicted, N, "one FlowEvicted per live flow");
1414        assert_eq!(closed, N, "one CloseFlow per live flow");
1415        assert_eq!(mgr.flow_count(), 0, "flow table + slab drained to zero");
1416        assert!(
1417            mgr.poll_timeout().is_none(),
1418            "no armed timer after close_all"
1419        );
1420    }
1421
1422    #[test]
1423    fn close_all_is_idempotent_and_empty_safe() {
1424        let mut mgr = UdpManager::new(cluster("dns"), 16, 65535, 7);
1425        let now = Instant::now();
1426        // Empty manager: close_all is a no-op, no outputs.
1427        mgr.close_all(now);
1428        assert!(drain(&mut mgr).is_empty());
1429        assert_eq!(mgr.flow_count(), 0);
1430
1431        // Now one flow; close it twice. The second pass emits nothing (no
1432        // double-evict / underflow).
1433        establish(&mut mgr, client(1, 1000), now);
1434        assert_eq!(mgr.flow_count(), 1);
1435        mgr.close_all(now);
1436        let first = drain(&mut mgr);
1437        assert_eq!(
1438            first
1439                .iter()
1440                .filter(|o| matches!(o, Output::CloseFlow(_)))
1441                .count(),
1442            1
1443        );
1444        mgr.close_all(now);
1445        assert!(drain(&mut mgr).is_empty(), "second close_all emits nothing");
1446        assert_eq!(mgr.flow_count(), 0);
1447    }
1448
1449    #[test]
1450    fn abort_flow_closes_established_flow() {
1451        let mut mgr = UdpManager::new(cluster("dns"), 16, 65535, 7);
1452        let now = Instant::now();
1453        let flow = establish(&mut mgr, client(1, 1000), now);
1454        assert_eq!(mgr.flow_count(), 1);
1455
1456        mgr.abort_flow(flow, now, CloseReason::Aborted);
1457        let outs = drain(&mut mgr);
1458        assert!(
1459            outs.iter()
1460                .any(|o| matches!(o, Output::Metric(MetricEvent::FlowEvicted)))
1461        );
1462        assert!(
1463            outs.iter()
1464                .any(|o| matches!(o, Output::CloseFlow(f) if *f == flow))
1465        );
1466        assert_eq!(mgr.flow_count(), 0);
1467
1468        // Idempotent: aborting again emits nothing, no underflow.
1469        mgr.abort_flow(flow, now, CloseReason::Aborted);
1470        assert!(drain(&mut mgr).is_empty());
1471        assert_eq!(mgr.flow_count(), 0);
1472    }
1473
1474    #[test]
1475    fn abort_flow_closes_awaiting_backend_flow() {
1476        // Simulates udp_connect failing / no backend resolving: the flow never
1477        // leaves AwaitingBackend and must still free its slot immediately.
1478        let mut mgr = UdpManager::new(cluster("dns"), 16, 65535, 7);
1479        let now = Instant::now();
1480        mgr.handle_input(
1481            ManagerInput::ClientDatagram {
1482                src: client(1, 1000),
1483                payload: b"q",
1484            },
1485            now,
1486        );
1487        let flow = drain(&mut mgr)
1488            .into_iter()
1489            .find_map(|o| match o {
1490                Output::SelectBackend { flow, .. } => Some(flow),
1491                _ => None,
1492            })
1493            .unwrap();
1494        assert_eq!(mgr.flow_count(), 1);
1495        assert_eq!(mgr.flow(flow).unwrap().phase, FlowPhase::AwaitingBackend);
1496
1497        mgr.abort_flow(flow, now, CloseReason::Aborted);
1498        let outs = drain(&mut mgr);
1499        assert!(
1500            outs.iter()
1501                .any(|o| matches!(o, Output::Metric(MetricEvent::FlowEvicted)))
1502        );
1503        assert!(
1504            outs.iter()
1505                .any(|o| matches!(o, Output::CloseFlow(f) if *f == flow))
1506        );
1507        assert_eq!(mgr.flow_count(), 0, "AwaitingBackend slot freed by abort");
1508        assert!(mgr.poll_timeout().is_none());
1509        // The freed key is reusable: a new datagram from the same source admits
1510        // a fresh flow rather than colliding with the aborted one.
1511        mgr.handle_input(
1512            ManagerInput::ClientDatagram {
1513                src: client(1, 1000),
1514                payload: b"q2",
1515            },
1516            now,
1517        );
1518        assert_eq!(mgr.flow_count(), 1);
1519    }
1520
1521    #[test]
1522    fn abort_flow_unknown_id_is_noop() {
1523        let mut mgr = UdpManager::new(cluster("dns"), 16, 65535, 7);
1524        let now = Instant::now();
1525        mgr.abort_flow(999, now, CloseReason::Aborted);
1526        assert!(drain(&mut mgr).is_empty());
1527        assert_eq!(mgr.flow_count(), 0);
1528    }
1529
1530    #[test]
1531    fn requests_counts_forwards_not_buffered_during_await() {
1532        // requests = 2. A burst of client datagrams arrives while the flow is
1533        // still AwaitingBackend; all but the newest are discarded (newest-wins)
1534        // and never forwarded — so they must NOT count toward `requests`. After
1535        // resolve, the single buffered datagram is forwarded (count = 1). Only a
1536        // SECOND real forward (post-establish) should reach the cap.
1537        let mut cfg = cluster("syslog");
1538        cfg.requests = 2;
1539        let mut mgr = UdpManager::new(cfg, 16, 65535, 7);
1540        let now = Instant::now();
1541        let src = client(1, 1000);
1542
1543        // Admission datagram → AwaitingBackend, buffered (not counted yet).
1544        mgr.handle_input(ManagerInput::ClientDatagram { src, payload: b"1" }, now);
1545        let flow = drain(&mut mgr)
1546            .into_iter()
1547            .find_map(|o| match o {
1548                Output::SelectBackend { flow, .. } => Some(flow),
1549                _ => None,
1550            })
1551            .unwrap();
1552        // Burst during await: 3 more datagrams, all overwriting the one slot.
1553        for p in [b"2".as_slice(), b"3", b"4"] {
1554            mgr.handle_input(ManagerInput::ClientDatagram { src, payload: p }, now);
1555        }
1556        drain(&mut mgr);
1557        // Still alive, still awaiting — the burst did NOT trip the cap.
1558        assert_eq!(mgr.flow_count(), 1, "await burst must not close the flow");
1559        assert_eq!(
1560            mgr.flow(flow).unwrap().requests_seen,
1561            0,
1562            "buffered-only datagrams must not count toward requests"
1563        );
1564
1565        // Resolve: the single surviving buffered datagram is forwarded (count 1).
1566        mgr.handle_input(
1567            ManagerInput::BackendResolved {
1568                flow,
1569                backend: "b1".to_owned(),
1570                addr: backend(),
1571            },
1572            now,
1573        );
1574        let outs = drain(&mut mgr);
1575        assert!(
1576            outs.iter()
1577                .any(|o| matches!(o, Output::SendToBackend(t) if t.payload == b"4")),
1578            "newest buffered datagram is the one forwarded"
1579        );
1580        assert!(
1581            !outs.iter().any(|o| matches!(o, Output::CloseFlow(_))),
1582            "one forward must not reach requests=2"
1583        );
1584        assert_eq!(mgr.flow_count(), 1);
1585        assert_eq!(mgr.flow(flow).unwrap().requests_seen, 1);
1586
1587        // A real second forward (Established) reaches the cap and closes.
1588        mgr.handle_input(ManagerInput::ClientDatagram { src, payload: b"5" }, now);
1589        let outs = drain(&mut mgr);
1590        assert!(
1591            outs.iter()
1592                .any(|o| matches!(o, Output::SendToBackend(t) if t.payload == b"5"))
1593        );
1594        assert!(
1595            outs.iter().any(|o| matches!(o, Output::CloseFlow(_))),
1596            "second real forward reaches requests=2"
1597        );
1598        assert_eq!(mgr.flow_count(), 0);
1599    }
1600
1601    // ---- quickcheck property tests (zero sockets, injected Instants) -------
1602
1603    use quickcheck::{Arbitrary, Gen, quickcheck};
1604
1605    /// One step of an abstract client-side workload: a datagram from one of a
1606    /// bounded set of sources, or a clock advance that fires the reaper.
1607    #[derive(Clone, Debug)]
1608    enum Step {
1609        /// Client datagram from source `id % 8`.
1610        Client(u8),
1611        /// Advance the injected clock by `secs` seconds and fire the reaper.
1612        Tick(u8),
1613    }
1614
1615    impl Arbitrary for Step {
1616        fn arbitrary(g: &mut Gen) -> Self {
1617            if bool::arbitrary(g) {
1618                Step::Client(u8::arbitrary(g))
1619            } else {
1620                Step::Tick(u8::arbitrary(g) % 40)
1621            }
1622        }
1623    }
1624
1625    /// Property: across any interleaving of datagrams and clock ticks, the flow
1626    /// count never exceeds `max_flows`, and a final long tick reaps every flow
1627    /// back to zero — no leak, no gauge underflow (`CloseFlow` count ==
1628    /// `FlowCreated` count). The busy-loop strict-advance invariant is asserted
1629    /// inside `handle_timeout` itself (debug builds), so every `Tick` exercises
1630    /// it for free.
1631    #[test]
1632    fn prop_flow_invariants() {
1633        fn prop(steps: Vec<Step>) -> bool {
1634            const MAX_FLOWS: usize = 4;
1635            let mut cfg = cluster("dns");
1636            cfg.front_timeout = Duration::from_secs(10);
1637            let mut mgr = UdpManager::new(cfg, MAX_FLOWS, 65535, 0x5EED);
1638            let base = Instant::now();
1639            let mut now = base;
1640            let mut created = 0usize;
1641            let mut closed = 0usize;
1642
1643            for step in steps {
1644                match step {
1645                    Step::Client(id) => {
1646                        let src = client(id % 8, 9000 + (id % 8) as u16);
1647                        mgr.handle_input(ManagerInput::ClientDatagram { src, payload: b"q" }, now);
1648                    }
1649                    Step::Tick(secs) => {
1650                        now += Duration::from_secs(secs as u64);
1651                        mgr.handle_timeout(now);
1652                    }
1653                }
1654                // Drain outputs, tallying create/close.
1655                while let Some(out) = mgr.poll_output() {
1656                    match out {
1657                        Output::Metric(MetricEvent::FlowCreated) => created += 1,
1658                        Output::CloseFlow(_) => closed += 1,
1659                        _ => {}
1660                    }
1661                }
1662                // The cap is never exceeded.
1663                if mgr.flow_count() > MAX_FLOWS {
1664                    return false;
1665                }
1666            }
1667
1668            // Final long tick must reap everything: drains to zero, no timer.
1669            now += Duration::from_secs(60);
1670            mgr.handle_timeout(now);
1671            while let Some(out) = mgr.poll_output() {
1672                if let Output::CloseFlow(_) = out {
1673                    closed += 1;
1674                }
1675            }
1676            mgr.flow_count() == 0 && mgr.poll_timeout().is_none() && created == closed
1677        }
1678        quickcheck(prop as fn(Vec<Step>) -> bool);
1679    }
1680
1681    /// Property: an idle-timeout race is always resolved by generation tokens —
1682    /// for any refresh strictly inside the timeout window, a stale expiry at the
1683    /// original deadline never closes a refreshed flow, and the refreshed
1684    /// deadline eventually does.
1685    #[test]
1686    fn prop_generation_token_defeats_stale_close() {
1687        fn prop(refresh_offset: u8) -> bool {
1688            let timeout = 20u64;
1689            // Refresh at 1..=timeout-1 seconds (strictly inside the window).
1690            let offset = 1 + (refresh_offset as u64 % (timeout - 1));
1691            let mut cfg = cluster("dns");
1692            cfg.front_timeout = Duration::from_secs(timeout);
1693            let mut mgr = UdpManager::new(cfg, 8, 65535, 1);
1694            let now = Instant::now();
1695            let src = client(1, 1000);
1696            mgr.handle_input(ManagerInput::ClientDatagram { src, payload: b"q" }, now);
1697            while mgr.poll_output().is_some() {}
1698
1699            // Refresh inside the window: bumps the generation, pushes the
1700            // deadline forward.
1701            let refreshed_at = now + Duration::from_secs(offset);
1702            mgr.handle_input(
1703                ManagerInput::ClientDatagram {
1704                    src,
1705                    payload: b"q2",
1706                },
1707                refreshed_at,
1708            );
1709            while mgr.poll_output().is_some() {}
1710
1711            // Stale expiry at the ORIGINAL deadline must not close the flow.
1712            mgr.handle_timeout(now + Duration::from_secs(timeout));
1713            while mgr.poll_output().is_some() {}
1714            if mgr.flow_count() != 1 {
1715                return false;
1716            }
1717            // The refreshed deadline eventually closes it.
1718            mgr.handle_timeout(refreshed_at + Duration::from_secs(timeout + 1));
1719            while mgr.poll_output().is_some() {}
1720            mgr.flow_count() == 0
1721        }
1722        quickcheck(prop as fn(u8) -> bool);
1723    }
1724}