sozu_lib/protocol/udp/mod.rs
1//! Pure sans-io UDP load-balancing core.
2//!
3//! This module is the *pure* heart of the UDP datapath (issue #1273). It owns
4//! admission, the virtual 4-tuple flow table, the three-knob teardown
5//! state-machine, the LB-selection request protocol, and a single-deadline
6//! timer scheduler with generation tokens. It performs **no I/O**: there is no
7//! socket, no `Instant::now()` / `SystemTime`, no `rand`, and no `Arc<Mutex>`.
8//! Time is injected as `now: Instant` parameters; the hash seed is injected at
9//! construction. The single admission copy the design allows materialises the
10//! borrowed recv buffer into an owned `Vec<u8>` ([`Transmit::payload`]).
11//!
12//! The I/O shell ([`crate::udp`]) owns every syscall, the buffer pool, the timer
13//! wheel, the connected per-flow upstream sockets, the `BackendMap`, health
14//! checks and metrics. It drives this core through [`ManagerInput`] / [`Output`].
15//!
16//! Two-level split (mirrors the H2 `ConnectionH2` / `Context` split in
17//! `protocol/mux/`):
18//! - [`UdpManager`](manager::UdpManager): flow table + admission + cap/shedding
19//! + flow-key extraction + LB request + timer scheduling.
20//! - [`UdpFlow`](flow::UdpFlow): per-admitted-flow teardown counters, idle /
21//! lifetime deadlines, PPv2 bookkeeping, chosen backend, forward/return
22//! decisions, and a `timer_gen`.
23//!
24//! Long-form lifecycle (flow state machine, NAT return, teardown, hardening):
25//! `lib/src/protocol/udp/LIFECYCLE.md`.
26
27pub mod flow;
28pub mod manager;
29pub mod proxy_protocol;
30
31use std::{net::SocketAddr, time::Duration};
32
33use sozu_command::state::ClusterId;
34
35pub use crate::protocol::udp::{
36 flow::{CloseReason, UdpFlow},
37 manager::{FlowKeyExtractor, SourceTupleExtractor, UdpManager},
38};
39
40/// Slab index of an admitted flow. Stable for the flow's lifetime; reused after
41/// close. The shell maps `upstream_token -> FlowId` for the NAT return path.
42pub type FlowId = usize;
43
44/// Backend identifier, mirroring [`sozu_command::state`]'s string backend ids
45/// and `Backend::backend_id` in `lib/src/backends.rs`.
46pub type BackendId = String;
47
48/// The virtual flow key extracted from a client datagram. The default
49/// [`SourceTupleExtractor`] keys on the real (pre-NAT) client source address;
50/// `with_port` distinguishes the 2-tuple (source IP only) from the 4-tuple
51/// (source IP + port). Other extractors may key differently — the trait is the
52/// only seam — but the 4-tuple impl is the only one in scope.
53#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
54pub struct FlowKey {
55 /// The client source address. When the extractor keys on source IP only,
56 /// the port is normalised to `0`.
57 pub src: SocketAddr,
58}
59
60impl FlowKey {
61 /// Build a key from a client source address, keeping the port when
62 /// `with_port` is set, normalising it to `0` otherwise.
63 pub fn from_src(src: SocketAddr, with_port: bool) -> Self {
64 if with_port {
65 FlowKey { src }
66 } else {
67 let mut src = src;
68 src.set_port(0);
69 FlowKey { src }
70 }
71 }
72}
73
74/// An owned datagram ready to be written by the shell. `payload` is the single
75/// admission copy (`Vec<u8>`); for the first upstream datagram of a PPv2 flow
76/// the core has already prepended the v2 DGRAM header, so the shell writes
77/// `payload` verbatim. `segment_size` reserves room for a future GSO/GRO fast
78/// path and is `None` in phase 1.
79#[derive(Clone, Debug, PartialEq, Eq)]
80pub struct Transmit {
81 /// Destination address (backend for upstream, real client for return).
82 pub dst: SocketAddr,
83 /// GSO segment size hint; `None` in phase 1 (no batching).
84 pub segment_size: Option<usize>,
85 /// Owned datagram bytes, PPv2-prefixed in place when applicable.
86 pub payload: Vec<u8>,
87}
88
89/// Inputs the shell feeds into the manager. Borrows the recv buffer; the core
90/// copies into an owned `Vec<u8>` only on admission.
91#[derive(Debug)]
92pub enum ManagerInput<'a> {
93 /// A datagram from a client. Admitted into an existing/new flow or dropped.
94 ClientDatagram {
95 /// Real (pre-NAT) client source address.
96 src: SocketAddr,
97 /// Borrowed datagram bytes.
98 payload: &'a [u8],
99 },
100 /// A datagram from a backend, tagged by the shell with the owning flow
101 /// (`upstream_token -> FlowId`). Drives the symmetric NAT return path.
102 BackendDatagram {
103 /// Flow that owns the connected upstream socket this arrived on.
104 flow: FlowId,
105 /// Borrowed datagram bytes.
106 payload: &'a [u8],
107 },
108 /// A control-plane / health event (add/remove backend, LB algo, timeouts,
109 /// per-cluster knobs, drain). Never allocates a flow.
110 Config(ConfigEvent),
111 /// Reply to an earlier [`Output::SelectBackend`]: the shell resolved the
112 /// backend via the `BackendMap` and is committing it to the flow.
113 BackendResolved {
114 /// Flow awaiting a backend.
115 flow: FlowId,
116 /// Resolved backend identifier.
117 backend: BackendId,
118 /// Resolved backend address for the connected upstream socket.
119 addr: SocketAddr,
120 },
121}
122
123/// Outputs the manager emits; the shell drains them via `poll_output` until
124/// `None`. The shell owns the actual syscalls and timer wheel.
125#[derive(Clone, Debug, PartialEq, Eq)]
126pub enum Output {
127 /// A new flow needs a backend. The shell consults the `BackendMap` with
128 /// `(cluster, key)` and replies with [`ManagerInput::BackendResolved`].
129 SelectBackend {
130 /// Flow awaiting a backend.
131 flow: FlowId,
132 /// Cluster the flow's listener routes to.
133 cluster: ClusterId,
134 /// Affinity hash for HRW / Maglev selection (`key % M`, `max hash`).
135 key: u64,
136 },
137 /// The shell should `connect()` a fresh upstream socket for this flow and
138 /// register `upstream_token -> flow` for NAT return demux.
139 OpenUpstream {
140 /// Flow that owns the upstream socket.
141 flow: FlowId,
142 /// Backend address to connect to.
143 backend: SocketAddr,
144 },
145 /// Write an owned datagram to the backend (PPv2-prefixed when applicable).
146 SendToBackend(Transmit),
147 /// Write an owned datagram back to the real client (symmetric NAT return).
148 SendToClient(Transmit),
149 /// Re-arm the single manager-wide timer at this absolute deadline. The
150 /// shell owns the wheel; the core only ever requests one deadline.
151 ArmTimer(std::time::Instant),
152 /// A metric event for the shell to translate into `udp.*` counters/gauges.
153 Metric(MetricEvent),
154 /// Tear down a flow (idle / responses-reached / requests-reached / drain).
155 /// The shell closes the upstream socket, frees the slab slot, and
156 /// decrements `udp.active_flows`.
157 CloseFlow(FlowId),
158 /// A datagram was dropped before allocating any flow/buffer/socket
159 /// ("silence is a virtue"). Carries the reason for metrics/logs.
160 Drop(DropReason),
161}
162
163/// Why a datagram was dropped. Maps onto `udp.datagrams.dropped` by-reason.
164#[derive(Clone, Copy, Debug, PartialEq, Eq)]
165pub enum DropReason {
166 /// Datagram failed validation (empty, or the extractor rejected it).
167 Invalid,
168 /// Datagram exceeded the configured `max_rx_datagram_size` (truncation /
169 /// `MSG_TRUNC` surrogate at this layer).
170 Truncated,
171 /// No cluster/backend is configured for the listener.
172 NoBackend,
173 /// Flow-table cap (`max_flows`) reached — shed the new flow, protect the
174 /// existing ones.
175 Shed,
176 /// A backend datagram referenced an unknown / already-closed flow.
177 UnknownFlow,
178}
179
180/// Metric events the core asks the shell to record. The shell owns the
181/// `incr!`/`count!`/`gauge!`/`time!` macros (`lib/src/metrics/`).
182#[derive(Clone, Copy, Debug, PartialEq, Eq)]
183pub enum MetricEvent {
184 /// A new flow was admitted (`udp.flows.created`, `udp.active_flows += 1`).
185 FlowCreated,
186 /// A flow was torn down by idle/teardown/drain (`udp.flows.evicted`,
187 /// `udp.active_flows -= 1`).
188 FlowEvicted,
189 /// A new flow was shed at the cap (`udp.flows.shed`).
190 FlowShed,
191 /// A client→backend datagram was forwarded (`udp.datagrams.in`,
192 /// `udp.bytes.in`). Carries the *payload* byte count (excludes any PPv2
193 /// prefix the core adds).
194 DatagramIn(usize),
195 /// A backend→client datagram was returned (`udp.datagrams.out`,
196 /// `udp.bytes.out`).
197 DatagramOut(usize),
198 /// A datagram was dropped, by reason (`udp.datagrams.dropped`).
199 DatagramDropped(DropReason),
200}
201
202/// Per-cluster knobs the shell binds to a listener's flows. Mirrors the
203/// `UdpClusterConfig` proto but stays in pure-core terms. Defaults match the
204/// proto defaults.
205#[derive(Clone, Debug, PartialEq, Eq)]
206pub struct ClusterConfig {
207 /// Cluster the listener routes to.
208 pub cluster: ClusterId,
209 /// Key on `src_ip + src_port` (true) vs `src_ip` only (false). Maps the
210 /// `UdpAffinityKey` proto enum.
211 pub affinity_with_port: bool,
212 /// Expected replies per flow before close. `0` = unlimited (DNS = 1).
213 pub responses: u32,
214 /// Max client datagrams per flow before close. `0` = unlimited.
215 pub requests: u32,
216 /// Idle timeout (client direction). Resets on every datagram.
217 pub front_timeout: Duration,
218 /// Idle timeout (upstream direction). Resets on every reply.
219 pub back_timeout: Duration,
220 /// Prepend a PPv2 DGRAM header to upstream datagrams.
221 pub send_proxy_protocol: bool,
222 /// PPv2 on every datagram (true) vs first-datagram-only (false, default).
223 pub proxy_protocol_every_datagram: bool,
224}
225
226impl Default for ClusterConfig {
227 fn default() -> Self {
228 ClusterConfig {
229 cluster: String::new(),
230 affinity_with_port: false,
231 responses: 0,
232 requests: 0,
233 front_timeout: Duration::from_secs(30),
234 back_timeout: Duration::from_secs(30),
235 send_proxy_protocol: false,
236 proxy_protocol_every_datagram: false,
237 }
238 }
239}
240
241/// Control-plane events the shell feeds the manager via
242/// [`ManagerInput::Config`]. Additive; an unknown event must never panic.
243#[derive(Clone, Debug, PartialEq, Eq)]
244pub enum ConfigEvent {
245 /// Replace the listener's cluster routing + per-cluster knobs. Applies to
246 /// new flows; existing flows keep their captured config (stable affinity).
247 SetCluster(ClusterConfig),
248 /// Update the flow-table cap. Shrinking does not evict existing flows; it
249 /// only sheds future ones.
250 SetMaxFlows(usize),
251 /// Update the maximum accepted rx datagram size.
252 SetMaxRxDatagramSize(usize),
253 /// Begin draining: admit no new flows; let existing ones reach teardown.
254 Drain,
255}
256
257/// Logging prefix for the pure UDP core (tag `UDP`). Mirrors
258/// `protocol/mux/mod.rs`'s `log_context!`. The core has no socket/ULID of its
259/// own, so the manager-level prefix carries its admission counters; honours the
260/// colored flag via [`ansi_palette`](sozu_command::logging::ansi_palette).
261#[allow(unused_macros)]
262macro_rules! log_context {
263 ($self:expr) => {{
264 let (open, reset, grey, gray, white) = sozu_command::logging::ansi_palette();
265 format!(
266 "[- - - -]\t{open}UDP{reset}\t{grey}Manager{reset}({gray}flows{reset}={white}{flows}{reset}, {gray}max_flows{reset}={white}{max_flows}{reset}, {gray}draining{reset}={white}{draining}{reset})\t >>>",
267 open = open,
268 reset = reset,
269 grey = grey,
270 gray = gray,
271 white = white,
272 flows = $self.flow_count(),
273 max_flows = $self.max_flows(),
274 draining = $self.is_draining(),
275 )
276 }};
277}
278
279/// Per-flow logging prefix (tag `UDP-FLOW`). Renders the canonical
280/// `[session req cluster backend]`-style bracket reduced to the flow's stable
281/// id + client + backend, so flow lines stay filterable.
282#[allow(unused_macros)]
283macro_rules! log_context_lite {
284 ($flow_id:expr, $flow:expr) => {{
285 let (open, reset, grey, gray, white) = sozu_command::logging::ansi_palette();
286 format!(
287 "[- - - -]\t{open}UDP-FLOW{reset}\t{grey}Flow{reset}({gray}id{reset}={white}{id}{reset}, {gray}client{reset}={white}{client}{reset}, {gray}backend{reset}={white}{backend:?}{reset})\t >>>",
288 open = open,
289 reset = reset,
290 grey = grey,
291 gray = gray,
292 white = white,
293 id = $flow_id,
294 client = $flow.client,
295 backend = $flow.backend_addr,
296 )
297 }};
298}
299
300#[allow(unused_imports)]
301pub(crate) use {log_context, log_context_lite};