Skip to main content

sozu_lib/protocol/udp/
mod.rs

1//! Pure sans-io UDP load-balancing core.
2//!
3//! This module is the *pure* heart of the UDP datapath (issue #1273). It owns
4//! admission, the virtual 4-tuple flow table, the three-knob teardown
5//! state-machine, the LB-selection request protocol, and a single-deadline
6//! timer scheduler with generation tokens. It performs **no I/O**: there is no
7//! socket, no `Instant::now()` / `SystemTime`, no `rand`, and no `Arc<Mutex>`.
8//! Time is injected as `now: Instant` parameters; the hash seed is injected at
9//! construction. The single admission copy the design allows materialises the
10//! borrowed recv buffer into an owned `Vec<u8>` ([`Transmit::payload`]).
11//!
12//! The I/O shell ([`crate::udp`]) owns every syscall, the buffer pool, the timer
13//! wheel, the connected per-flow upstream sockets, the `BackendMap`, health
14//! checks and metrics. It drives this core through [`ManagerInput`] / [`Output`].
15//!
16//! Two-level split (mirrors the H2 `ConnectionH2` / `Context` split in
17//! `protocol/mux/`):
18//! - [`UdpManager`](manager::UdpManager): flow table + admission + cap/shedding
19//!   + flow-key extraction + LB request + timer scheduling.
20//! - [`UdpFlow`](flow::UdpFlow): per-admitted-flow teardown counters, idle /
21//!   lifetime deadlines, PPv2 bookkeeping, chosen backend, forward/return
22//!   decisions, and a `timer_gen`.
23//!
24//! Long-form lifecycle (flow state machine, NAT return, teardown, hardening):
25//! `lib/src/protocol/udp/LIFECYCLE.md`.
26
27pub mod flow;
28pub mod manager;
29pub mod proxy_protocol;
30
31use std::{net::SocketAddr, time::Duration};
32
33use sozu_command::state::ClusterId;
34
35pub use crate::protocol::udp::{
36    flow::{CloseReason, UdpFlow},
37    manager::{FlowKeyExtractor, SourceTupleExtractor, UdpManager},
38};
39
40/// Slab index of an admitted flow. Stable for the flow's lifetime; reused after
41/// close. The shell maps `upstream_token -> FlowId` for the NAT return path.
42pub type FlowId = usize;
43
44/// Backend identifier, mirroring [`sozu_command::state`]'s string backend ids
45/// and `Backend::backend_id` in `lib/src/backends.rs`.
46pub type BackendId = String;
47
48/// The virtual flow key extracted from a client datagram. The default
49/// [`SourceTupleExtractor`] keys on the real (pre-NAT) client source address;
50/// `with_port` distinguishes the 2-tuple (source IP only) from the 4-tuple
51/// (source IP + port). Other extractors may key differently — the trait is the
52/// only seam — but the 4-tuple impl is the only one in scope.
53#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
54pub struct FlowKey {
55    /// The client source address. When the extractor keys on source IP only,
56    /// the port is normalised to `0`.
57    pub src: SocketAddr,
58}
59
60impl FlowKey {
61    /// Build a key from a client source address, keeping the port when
62    /// `with_port` is set, normalising it to `0` otherwise.
63    pub fn from_src(src: SocketAddr, with_port: bool) -> Self {
64        if with_port {
65            FlowKey { src }
66        } else {
67            let mut src = src;
68            src.set_port(0);
69            FlowKey { src }
70        }
71    }
72}
73
74/// An owned datagram ready to be written by the shell. `payload` is the single
75/// admission copy (`Vec<u8>`); for the first upstream datagram of a PPv2 flow
76/// the core has already prepended the v2 DGRAM header, so the shell writes
77/// `payload` verbatim. `segment_size` reserves room for a future GSO/GRO fast
78/// path and is `None` in phase 1.
79#[derive(Clone, Debug, PartialEq, Eq)]
80pub struct Transmit {
81    /// Destination address (backend for upstream, real client for return).
82    pub dst: SocketAddr,
83    /// GSO segment size hint; `None` in phase 1 (no batching).
84    pub segment_size: Option<usize>,
85    /// Owned datagram bytes, PPv2-prefixed in place when applicable.
86    pub payload: Vec<u8>,
87}
88
89/// Inputs the shell feeds into the manager. Borrows the recv buffer; the core
90/// copies into an owned `Vec<u8>` only on admission.
91#[derive(Debug)]
92pub enum ManagerInput<'a> {
93    /// A datagram from a client. Admitted into an existing/new flow or dropped.
94    ClientDatagram {
95        /// Real (pre-NAT) client source address.
96        src: SocketAddr,
97        /// Borrowed datagram bytes.
98        payload: &'a [u8],
99    },
100    /// A datagram from a backend, tagged by the shell with the owning flow
101    /// (`upstream_token -> FlowId`). Drives the symmetric NAT return path.
102    BackendDatagram {
103        /// Flow that owns the connected upstream socket this arrived on.
104        flow: FlowId,
105        /// Borrowed datagram bytes.
106        payload: &'a [u8],
107    },
108    /// A control-plane / health event (add/remove backend, LB algo, timeouts,
109    /// per-cluster knobs, drain). Never allocates a flow.
110    Config(ConfigEvent),
111    /// Reply to an earlier [`Output::SelectBackend`]: the shell resolved the
112    /// backend via the `BackendMap` and is committing it to the flow.
113    BackendResolved {
114        /// Flow awaiting a backend.
115        flow: FlowId,
116        /// Resolved backend identifier.
117        backend: BackendId,
118        /// Resolved backend address for the connected upstream socket.
119        addr: SocketAddr,
120    },
121}
122
123/// Outputs the manager emits; the shell drains them via `poll_output` until
124/// `None`. The shell owns the actual syscalls and timer wheel.
125#[derive(Clone, Debug, PartialEq, Eq)]
126pub enum Output {
127    /// A new flow needs a backend. The shell consults the `BackendMap` with
128    /// `(cluster, key)` and replies with [`ManagerInput::BackendResolved`].
129    SelectBackend {
130        /// Flow awaiting a backend.
131        flow: FlowId,
132        /// Cluster the flow's listener routes to.
133        cluster: ClusterId,
134        /// Affinity hash for HRW / Maglev selection (`key % M`, `max hash`).
135        key: u64,
136    },
137    /// The shell should `connect()` a fresh upstream socket for this flow and
138    /// register `upstream_token -> flow` for NAT return demux.
139    OpenUpstream {
140        /// Flow that owns the upstream socket.
141        flow: FlowId,
142        /// Backend address to connect to.
143        backend: SocketAddr,
144    },
145    /// Write an owned datagram to the backend (PPv2-prefixed when applicable).
146    SendToBackend(Transmit),
147    /// Write an owned datagram back to the real client (symmetric NAT return).
148    SendToClient(Transmit),
149    /// Re-arm the single manager-wide timer at this absolute deadline. The
150    /// shell owns the wheel; the core only ever requests one deadline.
151    ArmTimer(std::time::Instant),
152    /// A metric event for the shell to translate into `udp.*` counters/gauges.
153    Metric(MetricEvent),
154    /// Tear down a flow (idle / responses-reached / requests-reached / drain).
155    /// The shell closes the upstream socket, frees the slab slot, and
156    /// decrements `udp.active_flows`.
157    CloseFlow(FlowId),
158    /// A datagram was dropped before allocating any flow/buffer/socket
159    /// ("silence is a virtue"). Carries the reason for metrics/logs.
160    Drop(DropReason),
161}
162
163/// Why a datagram was dropped. Maps onto `udp.datagrams.dropped` by-reason.
164#[derive(Clone, Copy, Debug, PartialEq, Eq)]
165pub enum DropReason {
166    /// Datagram failed validation (empty, or the extractor rejected it).
167    Invalid,
168    /// Datagram exceeded the configured `max_rx_datagram_size` (truncation /
169    /// `MSG_TRUNC` surrogate at this layer).
170    Truncated,
171    /// No cluster/backend is configured for the listener.
172    NoBackend,
173    /// Flow-table cap (`max_flows`) reached — shed the new flow, protect the
174    /// existing ones.
175    Shed,
176    /// A backend datagram referenced an unknown / already-closed flow.
177    UnknownFlow,
178}
179
180/// Metric events the core asks the shell to record. The shell owns the
181/// `incr!`/`count!`/`gauge!`/`time!` macros (`lib/src/metrics/`).
182#[derive(Clone, Copy, Debug, PartialEq, Eq)]
183pub enum MetricEvent {
184    /// A new flow was admitted (`udp.flows.created`, `udp.active_flows += 1`).
185    FlowCreated,
186    /// A flow was torn down by idle/teardown/drain (`udp.flows.evicted`,
187    /// `udp.active_flows -= 1`).
188    FlowEvicted,
189    /// A new flow was shed at the cap (`udp.flows.shed`).
190    FlowShed,
191    /// A client→backend datagram was forwarded (`udp.datagrams.in`,
192    /// `udp.bytes.in`). Carries the *payload* byte count (excludes any PPv2
193    /// prefix the core adds).
194    DatagramIn(usize),
195    /// A backend→client datagram was returned (`udp.datagrams.out`,
196    /// `udp.bytes.out`).
197    DatagramOut(usize),
198    /// A datagram was dropped, by reason (`udp.datagrams.dropped`).
199    DatagramDropped(DropReason),
200}
201
202/// Per-cluster knobs the shell binds to a listener's flows. Mirrors the
203/// `UdpClusterConfig` proto but stays in pure-core terms. Defaults match the
204/// proto defaults.
205#[derive(Clone, Debug, PartialEq, Eq)]
206pub struct ClusterConfig {
207    /// Cluster the listener routes to.
208    pub cluster: ClusterId,
209    /// Key on `src_ip + src_port` (true) vs `src_ip` only (false). Maps the
210    /// `UdpAffinityKey` proto enum.
211    pub affinity_with_port: bool,
212    /// Expected replies per flow before close. `0` = unlimited (DNS = 1).
213    pub responses: u32,
214    /// Max client datagrams per flow before close. `0` = unlimited.
215    pub requests: u32,
216    /// Idle timeout (client direction). Resets on every datagram.
217    pub front_timeout: Duration,
218    /// Idle timeout (upstream direction). Resets on every reply.
219    pub back_timeout: Duration,
220    /// Prepend a PPv2 DGRAM header to upstream datagrams.
221    pub send_proxy_protocol: bool,
222    /// PPv2 on every datagram (true) vs first-datagram-only (false, default).
223    pub proxy_protocol_every_datagram: bool,
224}
225
226impl Default for ClusterConfig {
227    fn default() -> Self {
228        ClusterConfig {
229            cluster: String::new(),
230            affinity_with_port: false,
231            responses: 0,
232            requests: 0,
233            front_timeout: Duration::from_secs(30),
234            back_timeout: Duration::from_secs(30),
235            send_proxy_protocol: false,
236            proxy_protocol_every_datagram: false,
237        }
238    }
239}
240
241/// Control-plane events the shell feeds the manager via
242/// [`ManagerInput::Config`]. Additive; an unknown event must never panic.
243#[derive(Clone, Debug, PartialEq, Eq)]
244pub enum ConfigEvent {
245    /// Replace the listener's cluster routing + per-cluster knobs. Applies to
246    /// new flows; existing flows keep their captured config (stable affinity).
247    SetCluster(ClusterConfig),
248    /// Update the flow-table cap. Shrinking does not evict existing flows; it
249    /// only sheds future ones.
250    SetMaxFlows(usize),
251    /// Update the maximum accepted rx datagram size.
252    SetMaxRxDatagramSize(usize),
253    /// Begin draining: admit no new flows; let existing ones reach teardown.
254    Drain,
255}
256
257/// Logging prefix for the pure UDP core (tag `UDP`). Mirrors
258/// `protocol/mux/mod.rs`'s `log_context!`. The core has no socket/ULID of its
259/// own, so the manager-level prefix carries its admission counters; honours the
260/// colored flag via [`ansi_palette`](sozu_command::logging::ansi_palette).
261#[allow(unused_macros)]
262macro_rules! log_context {
263    ($self:expr) => {{
264        let (open, reset, grey, gray, white) = sozu_command::logging::ansi_palette();
265        format!(
266            "[- - - -]\t{open}UDP{reset}\t{grey}Manager{reset}({gray}flows{reset}={white}{flows}{reset}, {gray}max_flows{reset}={white}{max_flows}{reset}, {gray}draining{reset}={white}{draining}{reset})\t >>>",
267            open = open,
268            reset = reset,
269            grey = grey,
270            gray = gray,
271            white = white,
272            flows = $self.flow_count(),
273            max_flows = $self.max_flows(),
274            draining = $self.is_draining(),
275        )
276    }};
277}
278
279/// Per-flow logging prefix (tag `UDP-FLOW`). Renders the canonical
280/// `[session req cluster backend]`-style bracket reduced to the flow's stable
281/// id + client + backend, so flow lines stay filterable.
282#[allow(unused_macros)]
283macro_rules! log_context_lite {
284    ($flow_id:expr, $flow:expr) => {{
285        let (open, reset, grey, gray, white) = sozu_command::logging::ansi_palette();
286        format!(
287            "[- - - -]\t{open}UDP-FLOW{reset}\t{grey}Flow{reset}({gray}id{reset}={white}{id}{reset}, {gray}client{reset}={white}{client}{reset}, {gray}backend{reset}={white}{backend:?}{reset})\t >>>",
288            open = open,
289            reset = reset,
290            grey = grey,
291            gray = gray,
292            white = white,
293            id = $flow_id,
294            client = $flow.client,
295            backend = $flow.backend_addr,
296        )
297    }};
298}
299
300#[allow(unused_imports)]
301pub(crate) use {log_context, log_context_lite};