Skip to main content

sozu_lib/protocol/udp/
flow.rs

1//! Per-flow UDP state machine (sans-io).
2//!
3//! A [`UdpFlow`] is the per-admitted-flow half of the two-level split. It owns
4//! the three-knob teardown counters (`responses` / `requests` / idle), the idle
5//! and lifetime deadlines, PPv2-first-datagram bookkeeping, the real (pre-NAT)
6//! client address, the chosen backend, and the forward/return decisions. It
7//! carries a `timer_gen` generation token so a stale wheel expiry cannot close
8//! a flow that has since seen traffic.
9//!
10//! No socket, no clock, no rand: every time-dependent method takes `now:
11//! Instant`. The manager owns the slab; this type is the slot payload.
12
13use std::{net::SocketAddr, time::Instant};
14
15use crate::protocol::udp::ClusterConfig;
16
17/// Why a flow reached teardown. Surfaces in the access log on close.
18#[derive(Clone, Copy, Debug, PartialEq, Eq)]
19pub enum CloseReason {
20    /// Idle timeout elapsed (no datagram in either direction).
21    Idle,
22    /// The configured `responses` count was reached (e.g. DNS = 1 reply).
23    ResponsesReached,
24    /// The configured `requests` count was reached.
25    RequestsReached,
26    /// The listener is draining.
27    Drain,
28    /// The flow was aborted by the shell before it could serve traffic — the
29    /// upstream `connect()` failed (EMFILE / refused) or no backend resolved.
30    /// Distinct from `Idle` so the access log shows the flow never established,
31    /// rather than implying it timed out.
32    Aborted,
33}
34
35/// The lifecycle of a flow w.r.t. its backend. A flow is admitted in
36/// [`AwaitingBackend`](FlowPhase::AwaitingBackend), transitions to
37/// [`Established`](FlowPhase::Established) once the shell resolves and opens an
38/// upstream, and is reaped on teardown.
39#[derive(Clone, Copy, Debug, PartialEq, Eq)]
40pub enum FlowPhase {
41    /// `SelectBackend` emitted; awaiting `BackendResolved`. Client datagrams
42    /// received in this window are buffered (one slot) so the first datagram is
43    /// not lost between admission and upstream open.
44    AwaitingBackend,
45    /// Backend resolved and upstream opened; datagrams flow both ways.
46    Established,
47    /// Marked for teardown; the manager will emit `CloseFlow` and free the slot.
48    Closing,
49}
50
51/// Per-admitted-flow state. Slab slot payload owned by the manager.
52#[derive(Clone, Debug)]
53pub struct UdpFlow {
54    /// Real (pre-NAT) client source address — the symmetric NAT return target
55    /// and the PPv2 source address.
56    pub client: SocketAddr,
57    /// Resolved backend id, set on `BackendResolved`.
58    pub backend_id: Option<String>,
59    /// Resolved backend address, set on `BackendResolved`.
60    pub backend_addr: Option<SocketAddr>,
61    /// Lifecycle phase.
62    pub phase: FlowPhase,
63    /// Captured per-cluster knobs (responses/requests/timeouts/PPv2). Captured
64    /// at admission so a mid-flow reconfig does not change a live flow's
65    /// teardown contract (stable affinity).
66    pub config: ClusterConfig,
67
68    /// Client datagrams forwarded so far (counts toward `requests`).
69    pub requests_seen: u32,
70    /// Backend replies returned so far (counts toward `responses`).
71    pub responses_seen: u32,
72
73    /// Absolute idle deadline; reset on every datagram in either direction.
74    pub idle_deadline: Instant,
75    /// Generation token. Incremented every time the idle deadline is pushed
76    /// back. A wheel expiry only closes the flow when its captured generation
77    /// still matches — defeating the stale-close busy-loop bug.
78    pub timer_gen: u64,
79
80    /// True until the first upstream datagram is sent; gates PPv2 prefixing
81    /// when `proxy_protocol_every_datagram` is false (first-datagram-only).
82    pub first_upstream_pending: bool,
83
84    /// One-slot buffer for a client datagram that arrived while
85    /// [`AwaitingBackend`](FlowPhase::AwaitingBackend). Flushed on
86    /// `BackendResolved`. A second datagram in the window replaces it (newest
87    /// wins) rather than allocating an unbounded queue.
88    pub pending_payload: Option<Vec<u8>>,
89}
90
91impl UdpFlow {
92    /// Create a flow for `client`, awaiting a backend, with its idle deadline
93    /// armed `front_timeout` from `now`.
94    pub fn new(client: SocketAddr, config: ClusterConfig, now: Instant) -> Self {
95        let idle_deadline = now + config.front_timeout;
96        UdpFlow {
97            client,
98            backend_id: None,
99            backend_addr: None,
100            phase: FlowPhase::AwaitingBackend,
101            first_upstream_pending: config.send_proxy_protocol,
102            config,
103            requests_seen: 0,
104            responses_seen: 0,
105            idle_deadline,
106            timer_gen: 0,
107            pending_payload: None,
108        }
109    }
110
111    /// Push the idle deadline back to `now + timeout` and bump the generation
112    /// token so any in-flight wheel expiry for the old deadline is invalidated.
113    /// Returns the *new* generation so the manager can re-arm the wheel.
114    pub fn touch(&mut self, timeout: std::time::Duration, now: Instant) -> u64 {
115        // Generation token: a `touch` MUST advance `timer_gen` so any in-flight
116        // wheel expiry captured against the old generation no longer matches and
117        // cannot close a flow that has since seen traffic. Snapshot the old
118        // value and pair-assert (positive: the new value is returned; negative:
119        // it differs from the old, even across the wrapping boundary).
120        // Not `#[cfg(debug_assertions)]`-gated: `debug_assert_ne!` compiles its
121        // arguments in every profile (it only gates *execution*), so a snapshot
122        // it reads must exist in release too. The read is dead code in release and
123        // the optimizer drops the binding — zero cost, but it must still compile.
124        let old_gen = self.timer_gen;
125        self.idle_deadline = now + timeout;
126        self.timer_gen = self.timer_gen.wrapping_add(1);
127        debug_assert_ne!(
128            self.timer_gen, old_gen,
129            "touch must advance the generation token (stale expiry would still match)"
130        );
131        self.timer_gen
132    }
133
134    /// Record that one client datagram was actually *forwarded* upstream: bump
135    /// the `requests` counter and refresh the front idle deadline. Returns the
136    /// new generation token. Call this only at a real forward site — a datagram
137    /// merely buffered while [`AwaitingBackend`](FlowPhase::AwaitingBackend) and
138    /// later overwritten (newest-wins) must NOT count, or a burst during await
139    /// could trip the `requests` cap having delivered fewer than `requests`
140    /// datagrams. Use [`touch`](Self::touch) for the buffer-only idle refresh.
141    pub fn on_client_datagram(&mut self, now: Instant) -> u64 {
142        // A real forward is only ever recorded on an Established flow (the buffer
143        // flush in `on_backend_resolved` transitions to Established first). A
144        // forward on an AwaitingBackend flow would mean we sent upstream before a
145        // backend was resolved — a routing bug.
146        debug_assert_eq!(
147            self.phase,
148            FlowPhase::Established,
149            "on_client_datagram (a real forward) requires an Established flow"
150        );
151        // `requests_seen` is monotonic non-decreasing (a forward only ever bumps
152        // it) and saturates rather than wraps — so the count never silently
153        // resets to a small value and re-arms an already-exhausted cap.
154        // Always declared (not debug-gated): the `debug_assert!` below compiles
155        // this read in release too; dead there, dropped by the optimizer.
156        let before = self.requests_seen;
157        self.requests_seen = self.requests_seen.saturating_add(1);
158        debug_assert!(
159            self.requests_seen >= before,
160            "requests_seen must be monotonic non-decreasing"
161        );
162        self.touch(self.config.front_timeout, now)
163    }
164
165    /// Record that one backend reply was returned; refresh the back idle
166    /// deadline. Returns the new generation token.
167    pub fn on_backend_datagram(&mut self, now: Instant) -> u64 {
168        // A backend reply can only arrive on an Established flow (the upstream
169        // socket is opened on establish). A reply for an AwaitingBackend flow
170        // would mean a datagram on a not-yet-connected upstream — impossible.
171        debug_assert_eq!(
172            self.phase,
173            FlowPhase::Established,
174            "on_backend_datagram requires an Established flow"
175        );
176        // `responses_seen` is monotonic non-decreasing and saturating.
177        // Always declared (not debug-gated): the `debug_assert!` below compiles
178        // this read in release too; dead there, dropped by the optimizer.
179        let before = self.responses_seen;
180        self.responses_seen = self.responses_seen.saturating_add(1);
181        debug_assert!(
182            self.responses_seen >= before,
183            "responses_seen must be monotonic non-decreasing"
184        );
185        self.touch(self.config.back_timeout, now)
186    }
187
188    /// Transition the flow to `next`, asserting the move is legal. The lifecycle
189    /// is strictly forward: `AwaitingBackend → Established → Closing`, with a
190    /// self-loop allowed only into `Closing` (idempotent close). A backward move
191    /// (`Established → AwaitingBackend`) or skipping straight from
192    /// `AwaitingBackend → Closing` is allowed *only* into `Closing` (a flow may
193    /// be aborted before it establishes); every other transition is a bug.
194    ///
195    /// Debug-only guard — the assignment itself is unconditional so release
196    /// behavior is identical.
197    pub fn set_phase(&mut self, next: FlowPhase) {
198        #[cfg(debug_assertions)]
199        {
200            let legal = match (self.phase, next) {
201                // Forward edges.
202                (FlowPhase::AwaitingBackend, FlowPhase::Established) => true,
203                // Either live phase may be torn down (normal close or abort).
204                (FlowPhase::AwaitingBackend, FlowPhase::Closing) => true,
205                (FlowPhase::Established, FlowPhase::Closing) => true,
206                // No legal backward or skipping edge, and no Awaiting/Established
207                // self-loop (callers set Established / Closing exactly once).
208                _ => false,
209            };
210            debug_assert!(
211                legal,
212                "illegal flow phase transition {:?} -> {next:?}",
213                self.phase,
214            );
215        }
216        self.phase = next;
217    }
218
219    /// Whether the `requests` knob has been exhausted (`0` = unlimited).
220    pub fn requests_exhausted(&self) -> bool {
221        self.config.requests != 0 && self.requests_seen >= self.config.requests
222    }
223
224    /// Whether the `responses` knob has been exhausted (`0` = unlimited). A DNS
225    /// flow with `responses = 1` closes after its single reply.
226    pub fn responses_exhausted(&self) -> bool {
227        self.config.responses != 0 && self.responses_seen >= self.config.responses
228    }
229
230    /// The teardown reason if any knob is exhausted, else `None`. Idle is
231    /// handled separately by the manager via the timer wheel.
232    pub fn teardown_reason(&self) -> Option<CloseReason> {
233        let reason = if self.responses_exhausted() {
234            Some(CloseReason::ResponsesReached)
235        } else if self.requests_exhausted() {
236            Some(CloseReason::RequestsReached)
237        } else {
238            None
239        };
240        // Pair-assert the boundary (positive + negative space): a teardown reason
241        // is returned IFF at least one cap is truly exhausted, and `None` is
242        // returned IFF neither cap is. A reason without an exhausted cap (or an
243        // exhausted cap with no reason) is a silent correctness bug — the flow
244        // would close early or never close on its cap.
245        #[cfg(debug_assertions)]
246        {
247            let exhausted = self.responses_exhausted() || self.requests_exhausted();
248            debug_assert_eq!(
249                reason.is_some(),
250                exhausted,
251                "teardown_reason boundary: Some={} but exhausted={} (req {}/{}, resp {}/{})",
252                reason.is_some(),
253                exhausted,
254                self.requests_seen,
255                self.config.requests,
256                self.responses_seen,
257                self.config.responses,
258            );
259        }
260        reason
261    }
262
263    /// Whether this upstream datagram should carry a PPv2 DGRAM prefix, given
264    /// the cluster's first-datagram-only vs every-datagram policy. Marks the
265    /// first-datagram bookkeeping as consumed.
266    pub fn take_proxy_protocol(&mut self) -> bool {
267        if !self.config.send_proxy_protocol {
268            return false;
269        }
270        if self.config.proxy_protocol_every_datagram {
271            return true;
272        }
273        // First-datagram-only: prefix exactly once.
274        if self.first_upstream_pending {
275            self.first_upstream_pending = false;
276            true
277        } else {
278            false
279        }
280    }
281}