sozu_lib/protocol/udp/flow.rs
1//! Per-flow UDP state machine (sans-io).
2//!
3//! A [`UdpFlow`] is the per-admitted-flow half of the two-level split. It owns
4//! the three-knob teardown counters (`responses` / `requests` / idle), the idle
5//! and lifetime deadlines, PPv2-first-datagram bookkeeping, the real (pre-NAT)
6//! client address, the chosen backend, and the forward/return decisions. It
7//! carries a `timer_gen` generation token so a stale wheel expiry cannot close
8//! a flow that has since seen traffic.
9//!
10//! No socket, no clock, no rand: every time-dependent method takes `now:
11//! Instant`. The manager owns the slab; this type is the slot payload.
12
13use std::{net::SocketAddr, time::Instant};
14
15use crate::protocol::udp::ClusterConfig;
16
17/// Why a flow reached teardown. Surfaces in the access log on close.
18#[derive(Clone, Copy, Debug, PartialEq, Eq)]
19pub enum CloseReason {
20 /// Idle timeout elapsed (no datagram in either direction).
21 Idle,
22 /// The configured `responses` count was reached (e.g. DNS = 1 reply).
23 ResponsesReached,
24 /// The configured `requests` count was reached.
25 RequestsReached,
26 /// The listener is draining.
27 Drain,
28 /// The flow was aborted by the shell before it could serve traffic — the
29 /// upstream `connect()` failed (EMFILE / refused) or no backend resolved.
30 /// Distinct from `Idle` so the access log shows the flow never established,
31 /// rather than implying it timed out.
32 Aborted,
33}
34
35/// The lifecycle of a flow w.r.t. its backend. A flow is admitted in
36/// [`AwaitingBackend`](FlowPhase::AwaitingBackend), transitions to
37/// [`Established`](FlowPhase::Established) once the shell resolves and opens an
38/// upstream, and is reaped on teardown.
39#[derive(Clone, Copy, Debug, PartialEq, Eq)]
40pub enum FlowPhase {
41 /// `SelectBackend` emitted; awaiting `BackendResolved`. Client datagrams
42 /// received in this window are buffered (one slot) so the first datagram is
43 /// not lost between admission and upstream open.
44 AwaitingBackend,
45 /// Backend resolved and upstream opened; datagrams flow both ways.
46 Established,
47 /// Marked for teardown; the manager will emit `CloseFlow` and free the slot.
48 Closing,
49}
50
51/// Per-admitted-flow state. Slab slot payload owned by the manager.
52#[derive(Clone, Debug)]
53pub struct UdpFlow {
54 /// Real (pre-NAT) client source address — the symmetric NAT return target
55 /// and the PPv2 source address.
56 pub client: SocketAddr,
57 /// Resolved backend id, set on `BackendResolved`.
58 pub backend_id: Option<String>,
59 /// Resolved backend address, set on `BackendResolved`.
60 pub backend_addr: Option<SocketAddr>,
61 /// Lifecycle phase.
62 pub phase: FlowPhase,
63 /// Captured per-cluster knobs (responses/requests/timeouts/PPv2). Captured
64 /// at admission so a mid-flow reconfig does not change a live flow's
65 /// teardown contract (stable affinity).
66 pub config: ClusterConfig,
67
68 /// Client datagrams forwarded so far (counts toward `requests`).
69 pub requests_seen: u32,
70 /// Backend replies returned so far (counts toward `responses`).
71 pub responses_seen: u32,
72
73 /// Absolute idle deadline; reset on every datagram in either direction.
74 pub idle_deadline: Instant,
75 /// Generation token. Incremented every time the idle deadline is pushed
76 /// back. A wheel expiry only closes the flow when its captured generation
77 /// still matches — defeating the stale-close busy-loop bug.
78 pub timer_gen: u64,
79
80 /// True until the first upstream datagram is sent; gates PPv2 prefixing
81 /// when `proxy_protocol_every_datagram` is false (first-datagram-only).
82 pub first_upstream_pending: bool,
83
84 /// One-slot buffer for a client datagram that arrived while
85 /// [`AwaitingBackend`](FlowPhase::AwaitingBackend). Flushed on
86 /// `BackendResolved`. A second datagram in the window replaces it (newest
87 /// wins) rather than allocating an unbounded queue.
88 pub pending_payload: Option<Vec<u8>>,
89}
90
91impl UdpFlow {
92 /// Create a flow for `client`, awaiting a backend, with its idle deadline
93 /// armed `front_timeout` from `now`.
94 pub fn new(client: SocketAddr, config: ClusterConfig, now: Instant) -> Self {
95 let idle_deadline = now + config.front_timeout;
96 UdpFlow {
97 client,
98 backend_id: None,
99 backend_addr: None,
100 phase: FlowPhase::AwaitingBackend,
101 first_upstream_pending: config.send_proxy_protocol,
102 config,
103 requests_seen: 0,
104 responses_seen: 0,
105 idle_deadline,
106 timer_gen: 0,
107 pending_payload: None,
108 }
109 }
110
111 /// Push the idle deadline back to `now + timeout` and bump the generation
112 /// token so any in-flight wheel expiry for the old deadline is invalidated.
113 /// Returns the *new* generation so the manager can re-arm the wheel.
114 pub fn touch(&mut self, timeout: std::time::Duration, now: Instant) -> u64 {
115 // Generation token: a `touch` MUST advance `timer_gen` so any in-flight
116 // wheel expiry captured against the old generation no longer matches and
117 // cannot close a flow that has since seen traffic. Snapshot the old
118 // value and pair-assert (positive: the new value is returned; negative:
119 // it differs from the old, even across the wrapping boundary).
120 // Not `#[cfg(debug_assertions)]`-gated: `debug_assert_ne!` compiles its
121 // arguments in every profile (it only gates *execution*), so a snapshot
122 // it reads must exist in release too. The read is dead code in release and
123 // the optimizer drops the binding — zero cost, but it must still compile.
124 let old_gen = self.timer_gen;
125 self.idle_deadline = now + timeout;
126 self.timer_gen = self.timer_gen.wrapping_add(1);
127 debug_assert_ne!(
128 self.timer_gen, old_gen,
129 "touch must advance the generation token (stale expiry would still match)"
130 );
131 self.timer_gen
132 }
133
134 /// Record that one client datagram was actually *forwarded* upstream: bump
135 /// the `requests` counter and refresh the front idle deadline. Returns the
136 /// new generation token. Call this only at a real forward site — a datagram
137 /// merely buffered while [`AwaitingBackend`](FlowPhase::AwaitingBackend) and
138 /// later overwritten (newest-wins) must NOT count, or a burst during await
139 /// could trip the `requests` cap having delivered fewer than `requests`
140 /// datagrams. Use [`touch`](Self::touch) for the buffer-only idle refresh.
141 pub fn on_client_datagram(&mut self, now: Instant) -> u64 {
142 // A real forward is only ever recorded on an Established flow (the buffer
143 // flush in `on_backend_resolved` transitions to Established first). A
144 // forward on an AwaitingBackend flow would mean we sent upstream before a
145 // backend was resolved — a routing bug.
146 debug_assert_eq!(
147 self.phase,
148 FlowPhase::Established,
149 "on_client_datagram (a real forward) requires an Established flow"
150 );
151 // `requests_seen` is monotonic non-decreasing (a forward only ever bumps
152 // it) and saturates rather than wraps — so the count never silently
153 // resets to a small value and re-arms an already-exhausted cap.
154 // Always declared (not debug-gated): the `debug_assert!` below compiles
155 // this read in release too; dead there, dropped by the optimizer.
156 let before = self.requests_seen;
157 self.requests_seen = self.requests_seen.saturating_add(1);
158 debug_assert!(
159 self.requests_seen >= before,
160 "requests_seen must be monotonic non-decreasing"
161 );
162 self.touch(self.config.front_timeout, now)
163 }
164
165 /// Record that one backend reply was returned; refresh the back idle
166 /// deadline. Returns the new generation token.
167 pub fn on_backend_datagram(&mut self, now: Instant) -> u64 {
168 // A backend reply can only arrive on an Established flow (the upstream
169 // socket is opened on establish). A reply for an AwaitingBackend flow
170 // would mean a datagram on a not-yet-connected upstream — impossible.
171 debug_assert_eq!(
172 self.phase,
173 FlowPhase::Established,
174 "on_backend_datagram requires an Established flow"
175 );
176 // `responses_seen` is monotonic non-decreasing and saturating.
177 // Always declared (not debug-gated): the `debug_assert!` below compiles
178 // this read in release too; dead there, dropped by the optimizer.
179 let before = self.responses_seen;
180 self.responses_seen = self.responses_seen.saturating_add(1);
181 debug_assert!(
182 self.responses_seen >= before,
183 "responses_seen must be monotonic non-decreasing"
184 );
185 self.touch(self.config.back_timeout, now)
186 }
187
188 /// Transition the flow to `next`, asserting the move is legal. The lifecycle
189 /// is strictly forward: `AwaitingBackend → Established → Closing`, with a
190 /// self-loop allowed only into `Closing` (idempotent close). A backward move
191 /// (`Established → AwaitingBackend`) or skipping straight from
192 /// `AwaitingBackend → Closing` is allowed *only* into `Closing` (a flow may
193 /// be aborted before it establishes); every other transition is a bug.
194 ///
195 /// Debug-only guard — the assignment itself is unconditional so release
196 /// behavior is identical.
197 pub fn set_phase(&mut self, next: FlowPhase) {
198 #[cfg(debug_assertions)]
199 {
200 let legal = match (self.phase, next) {
201 // Forward edges.
202 (FlowPhase::AwaitingBackend, FlowPhase::Established) => true,
203 // Either live phase may be torn down (normal close or abort).
204 (FlowPhase::AwaitingBackend, FlowPhase::Closing) => true,
205 (FlowPhase::Established, FlowPhase::Closing) => true,
206 // No legal backward or skipping edge, and no Awaiting/Established
207 // self-loop (callers set Established / Closing exactly once).
208 _ => false,
209 };
210 debug_assert!(
211 legal,
212 "illegal flow phase transition {:?} -> {next:?}",
213 self.phase,
214 );
215 }
216 self.phase = next;
217 }
218
219 /// Whether the `requests` knob has been exhausted (`0` = unlimited).
220 pub fn requests_exhausted(&self) -> bool {
221 self.config.requests != 0 && self.requests_seen >= self.config.requests
222 }
223
224 /// Whether the `responses` knob has been exhausted (`0` = unlimited). A DNS
225 /// flow with `responses = 1` closes after its single reply.
226 pub fn responses_exhausted(&self) -> bool {
227 self.config.responses != 0 && self.responses_seen >= self.config.responses
228 }
229
230 /// The teardown reason if any knob is exhausted, else `None`. Idle is
231 /// handled separately by the manager via the timer wheel.
232 pub fn teardown_reason(&self) -> Option<CloseReason> {
233 let reason = if self.responses_exhausted() {
234 Some(CloseReason::ResponsesReached)
235 } else if self.requests_exhausted() {
236 Some(CloseReason::RequestsReached)
237 } else {
238 None
239 };
240 // Pair-assert the boundary (positive + negative space): a teardown reason
241 // is returned IFF at least one cap is truly exhausted, and `None` is
242 // returned IFF neither cap is. A reason without an exhausted cap (or an
243 // exhausted cap with no reason) is a silent correctness bug — the flow
244 // would close early or never close on its cap.
245 #[cfg(debug_assertions)]
246 {
247 let exhausted = self.responses_exhausted() || self.requests_exhausted();
248 debug_assert_eq!(
249 reason.is_some(),
250 exhausted,
251 "teardown_reason boundary: Some={} but exhausted={} (req {}/{}, resp {}/{})",
252 reason.is_some(),
253 exhausted,
254 self.requests_seen,
255 self.config.requests,
256 self.responses_seen,
257 self.config.responses,
258 );
259 }
260 reason
261 }
262
263 /// Whether this upstream datagram should carry a PPv2 DGRAM prefix, given
264 /// the cluster's first-datagram-only vs every-datagram policy. Marks the
265 /// first-datagram bookkeeping as consumed.
266 pub fn take_proxy_protocol(&mut self) -> bool {
267 if !self.config.send_proxy_protocol {
268 return false;
269 }
270 if self.config.proxy_protocol_every_datagram {
271 return true;
272 }
273 // First-datagram-only: prefix exactly once.
274 if self.first_upstream_pending {
275 self.first_upstream_pending = false;
276 true
277 } else {
278 false
279 }
280 }
281}