Skip to main content

cellos_supervisor/
per_flow.rs

1//! Real-time per-flow `network_flow_decision` events via nflog.
2//!
3//! Honest scope: this module is the *real-time* companion to
4//! [`crate::nft_counters`] (FC-38 Phase 1 post-run counter scrape). When the
5//! operator opts in via `CELLOS_FIRECRACKER_PER_FLOW_EBPF=1`, the supervisor:
6//!
7//! 1. Rewrites the cell's nft ruleset so every `accept` / `drop` verdict is
8//!    *also* logged via the netfilter `log group N prefix "cellos-flow ..."`
9//!    action ([`augment_ruleset_with_log_actions`]). The original verdict is
10//!    preserved so the rewrite is policy-neutral.
11//! 2. Spawns a dedicated OS thread inside the cell's network namespace
12//!    ([`spawn_per_flow_listener_in_netns`]) that opens a `NETLINK_NETFILTER`
13//!    socket, binds the configured nfnetlink_log group, and translates each
14//!    `NFULNL_PACKET_HDR` + payload into a [`cellos_core::NetworkFlowDecision`]
15//!    CloudEvent emitted live to the supervisor's [`cellos_core::ports::EventSink`].
16//!
17//! **Backend chosen**: `nflog` (`NFNL_SUBSYS_ULOG`). The specification calls
18//! for an eBPF-based tracer; eBPF is **deferred** because the workspace does
19//! not yet carry an eBPF crate (e.g. `aya` / `redbpf`) and Linux distros do
20//! not uniformly grant `CAP_BPF` to non-root supervisors. `nflog` requires
21//! only `CAP_NET_ADMIN` (already wired for nft application), works on every
22//! supported Linux kernel >= 3.13, and provides identical attribution
23//! granularity for accept/drop verdicts. The env var
24//! `CELLOS_FIRECRACKER_PER_FLOW_BACKEND=ebpf` is recognised but currently
25//! warns + falls back to nflog. When an eBPF backend lands, the
26//! [`PerFlowBackend`] enum here grows a real second arm and the listener
27//! spawn dispatches on it.
28//!
29//! ## Why a dedicated OS thread (mirrors `dns_proxy::spawn`)
30//!
31//! `setns(2)` mutates the *calling thread*'s netns association. Tokio's
32//! `spawn_blocking` workers are pooled — polluting one would leak the cell's
33//! netns into unrelated tasks. We therefore allocate a fresh `std::thread`,
34//! `setns` there, run the recv loop to completion, and let the OS reclaim
35//! the thread on exit.
36//!
37//! ## Pure parsing surface
38//!
39//! Everything except `spawn_per_flow_listener_in_netns` is pure: ruleset
40//! augmentation, nflog datagram decoding, L3/L4 attribution, decision
41//! building. The pure helpers are exercised by 18 unit tests on every
42//! platform; the Linux-only spawn helper is integration-tested under
43//! `tests/supervisor_per_flow_realtime.rs` (`#[ignore]`d, requires
44//! `CAP_NET_ADMIN`).
45
46#![allow(dead_code)]
47
48use std::sync::atomic::AtomicBool;
49use std::sync::Arc;
50
51use cellos_core::{NetworkFlowDecision, NetworkFlowDecisionOutcome, NetworkFlowDirection};
52
53// ────────────────────────────────────────────────────────────────────────
54// Env-var contract
55// ────────────────────────────────────────────────────────────────────────
56
57/// D1 opt-in: when set to `1`, the supervisor augments the nft ruleset with
58/// `log group N prefix "cellos-flow ..."` actions and spawns a per-flow
59/// listener thread in the cell's netns.
60pub const ENV_PER_FLOW_EBPF: &str = "CELLOS_FIRECRACKER_PER_FLOW_EBPF";
61
62/// E7 / FC-38 Phase 2 alias for [`ENV_PER_FLOW_EBPF`]. The original env
63/// var name retains the `_EBPF` suffix from when eBPF was the planned
64/// backend; the implemented backend is nflog (eBPF deferred — see module
65/// doc). Operators rolling out FC-38 Phase 2 on top of `nft` ergonomically
66/// expect a backend-neutral flag name. Both env vars are honoured by
67/// [`build_activation_from_env`]; setting either one to `"1"` opts in.
68/// The legacy name remains stable for existing operators.
69pub const ENV_PER_FLOW_REALTIME: &str = "CELLOS_PER_FLOW_REALTIME";
70
71/// Optional override for the nfnetlink_log group bound by the listener.
72/// Defaults to [`DEFAULT_NFLOG_GROUP`] when unset or unparseable.
73pub const ENV_PER_FLOW_NFLOG_GROUP: &str = "CELLOS_FIRECRACKER_PER_FLOW_NFLOG_GROUP";
74
75/// Optional backend selector (legacy name). Recognised values: `nflog`
76/// (default), `ebpf` (E7-4 host-side aya loader — attempts to load a BPF
77/// program inside the cell's netns; falls back to nflog if eBPF startup
78/// fails for any reason).
79pub const ENV_PER_FLOW_BACKEND: &str = "CELLOS_FIRECRACKER_PER_FLOW_BACKEND";
80
81/// E7-5 alias for [`ENV_PER_FLOW_BACKEND`] — the spec-canonical name
82/// without the `_FIRECRACKER_` infix. Both are honoured; the legacy name
83/// retains precedence so operators that already shipped the long name
84/// keep the same value.
85pub const ENV_PER_FLOW_BACKEND_E7: &str = "CELLOS_PER_FLOW_BACKEND";
86
87/// Default nflog group when the operator does not override it. Chosen
88/// outside the typical 0..32 range many distros use for their stock chains
89/// to minimise collisions in shared host netns observability tooling.
90pub const DEFAULT_NFLOG_GROUP: u16 = 100;
91
92/// nft `log` action prefix used to tag augmented rules. Distinguishes
93/// CellOS-emitted log packets from any other unrelated logging the operator
94/// may have configured on the host. The decoder filters on this prefix
95/// substring before building the decision event.
96pub const LOG_PREFIX_BASE: &str = "cellos-flow";
97
98/// Backend selection result of [`select_backend_from_env`].
99#[derive(Debug, Clone, Copy, PartialEq, Eq)]
100pub enum PerFlowBackend {
101    /// nfnetlink_log via `NETLINK_NETFILTER` socket. Default.
102    Nflog,
103    /// eBPF tracer (deferred — selecting this currently warns + falls back
104    /// to nflog at runtime).
105    Ebpf,
106}
107
108/// Activation context for the per-flow listener.
109///
110/// Constructed once per `Supervisor::run` when the env gate is on; consumed
111/// by [`spawn_per_flow_listener_in_netns`] when the workload child PID is
112/// known. Mirrors the shape of [`crate::dns_proxy::spawn::EventSinkEmitter`]
113/// + activation pair.
114pub struct PerFlowActivation {
115    /// Portable cell identifier (stamped into emitted events).
116    pub cell_id: String,
117    /// Run UUID/identifier (stamped into emitted events).
118    pub run_id: String,
119    /// nfnetlink_log group bound by the listener (matches the `log group N`
120    /// action in the augmented ruleset).
121    pub nflog_group: u16,
122    /// Backend in effect (resolves at activation time so the warning fires
123    /// once per run rather than per packet).
124    pub backend: PerFlowBackend,
125    /// Optional sha256 digest of the policy bundle, stamped into events.
126    pub policy_digest: Option<String>,
127    /// Optional trust-keyset id stamped into events.
128    pub keyset_id: Option<String>,
129    /// Optional issuer kid stamped into events.
130    pub issuer_kid: Option<String>,
131}
132
133/// Read the env vars and return `Some(PerFlowActivation)` when the gate is on.
134///
135/// Returns `None` when [`ENV_PER_FLOW_EBPF`] is unset / not `1`. Always
136/// returns `Some` when the gate is on; backend selection / group parsing
137/// degrades to defaults rather than failing.
138pub fn build_activation_from_env(
139    cell_id: &str,
140    run_id: &str,
141    policy_digest: Option<String>,
142    keyset_id: Option<String>,
143    issuer_kid: Option<String>,
144) -> Option<PerFlowActivation> {
145    // Either the legacy `_EBPF` name or the backend-neutral `_REALTIME`
146    // alias opts in. We deliberately accept both rather than aliasing one
147    // to the other in shell rc files: operators can pick whichever name
148    // matches their telemetry convention without coordinating with the
149    // supervisor binary's env-var contract.
150    let ebpf_on = std::env::var(ENV_PER_FLOW_EBPF).as_deref() == Ok("1");
151    let realtime_on = std::env::var(ENV_PER_FLOW_REALTIME).as_deref() == Ok("1");
152    if !ebpf_on && !realtime_on {
153        return None;
154    }
155    let nflog_group = std::env::var(ENV_PER_FLOW_NFLOG_GROUP)
156        .ok()
157        .and_then(|s| s.trim().parse::<u16>().ok())
158        .unwrap_or(DEFAULT_NFLOG_GROUP);
159    let backend = select_backend_from_env();
160    Some(PerFlowActivation {
161        cell_id: cell_id.to_string(),
162        run_id: run_id.to_string(),
163        nflog_group,
164        backend,
165        policy_digest,
166        keyset_id,
167        issuer_kid,
168    })
169}
170
171/// Resolve the backend from [`ENV_PER_FLOW_BACKEND`] (legacy) or
172/// [`ENV_PER_FLOW_BACKEND_E7`] (spec-canonical name).
173///
174/// `ebpf` is honoured: [`crate::ebpf_flow::EbpfFlowMonitor::start`] will
175/// be attempted at activation time. Runtime fallback to nflog happens
176/// *inside* the activation path when the monitor returns an
177/// [`crate::ebpf_flow::EbpfMonitorError`] (Linux-only build, missing BPF
178/// object, attach failure, etc.) — operators see a single
179/// `tracing::warn` per-cell and the cell still emits per-flow events
180/// via nflog.
181///
182/// **Precedence**: when both env vars are set, the legacy name wins.
183/// This matches the same shape as [`build_activation_from_env`] where
184/// both opt-in names are accepted but the legacy one is the precedent.
185/// Operators migrating to the new name should clear the legacy one to
186/// avoid surprise.
187///
188/// Any value other than `ebpf` (including unset / empty / typos) maps
189/// to [`PerFlowBackend::Nflog`] — silently, because the supervisor
190/// pre-Phase-2 already defaulted to nflog and unknown values must not
191/// regress the existing path.
192pub fn select_backend_from_env() -> PerFlowBackend {
193    let legacy = std::env::var(ENV_PER_FLOW_BACKEND).ok();
194    let canonical = std::env::var(ENV_PER_FLOW_BACKEND_E7).ok();
195    let resolved = legacy.as_deref().or(canonical.as_deref()).unwrap_or("");
196    match resolved {
197        "ebpf" => PerFlowBackend::Ebpf,
198        _ => PerFlowBackend::Nflog,
199    }
200}
201
202// ────────────────────────────────────────────────────────────────────────
203// Pure ruleset augmentation
204// ────────────────────────────────────────────────────────────────────────
205
206/// Idempotently rewrite an nft ruleset string so every `accept` / `drop`
207/// verdict is preceded by a `log group N prefix "cellos-flow {accept|drop}"`
208/// action.
209///
210/// **Idempotent**: a second call on already-augmented output is a no-op;
211/// rules whose tail already contains a `log group ` prefix are left alone.
212///
213/// **Skipped lines**: loopback shortcut (`oif "lo" accept`) and policy
214/// declarations (`policy drop;` / `type filter hook ...`) are NOT augmented
215/// — we only care about the explicit allow / deny attribution lines.
216///
217/// The original verdict is preserved verbatim at the end of the rewritten
218/// rule so the rewrite is policy-neutral. The `log` action is a side-effect
219/// in nftables; control flow continues to the trailing `accept` / `drop`.
220pub fn augment_ruleset_with_log_actions(ruleset: &str, group: u16) -> String {
221    let mut out = String::with_capacity(ruleset.len() + 64);
222    for (idx, line) in ruleset.lines().enumerate() {
223        if idx > 0 {
224            out.push('\n');
225        }
226        let augmented = augment_line(line, group);
227        out.push_str(&augmented);
228    }
229    // Preserve trailing newline if present so callers diffing strings byte
230    // for byte don't see a phantom change.
231    if ruleset.ends_with('\n') && !out.ends_with('\n') {
232        out.push('\n');
233    }
234    out
235}
236
237fn augment_line(line: &str, group: u16) -> String {
238    let trimmed = line.trim_start();
239    let leading_ws_len = line.len() - trimmed.len();
240    // Skip already-augmented lines (idempotency).
241    if trimmed.contains("log group ") {
242        return line.to_string();
243    }
244    // Skip the loopback shortcut — we don't audit `lo` traffic.
245    if trimmed.starts_with("oif \"lo\" accept") {
246        return line.to_string();
247    }
248    // Skip chain / policy / brace lines — no verdict to augment.
249    if trimmed.starts_with("policy ")
250        || trimmed.starts_with("type filter hook")
251        || trimmed.starts_with("chain ")
252        || trimmed.starts_with("table ")
253        || trimmed == "}"
254        || trimmed == "{"
255        || trimmed.is_empty()
256    {
257        return line.to_string();
258    }
259    // Determine the trailing verdict (must be the last whitespace-separated
260    // token). We only augment lines whose verdict is `accept` or `drop`; any
261    // other terminator (return, jump, queue, ...) is left untouched.
262    let last_tok = trimmed.split_whitespace().last().unwrap_or("");
263    let verdict = match last_tok {
264        "accept" => "accept",
265        "drop" => "drop",
266        _ => return line.to_string(),
267    };
268    // Reuse the leading whitespace from the original line so indentation is
269    // preserved in the rewritten ruleset.
270    let prefix = &line[..leading_ws_len];
271    let body_without_verdict = trimmed
272        .rsplit_once(char::is_whitespace)
273        .map(|(head, _)| head)
274        .unwrap_or("");
275    format!(
276        "{prefix}{body_without_verdict} log group {group} prefix \"{LOG_PREFIX_BASE} {verdict}\" {verdict}"
277    )
278}
279
280// ────────────────────────────────────────────────────────────────────────
281// Pure datagram decode (used by the Linux listener + unit tests)
282// ────────────────────────────────────────────────────────────────────────
283
284/// One decoded nflog datagram — what the listener pulls out of an
285/// `NFULNL_PACKET_HDR` netlink message.
286///
287/// `prefix` is the operator-set prefix string from the augmented `log group`
288/// action; the listener filters on `prefix.starts_with(LOG_PREFIX_BASE)` so
289/// stray nflog traffic on the same group from unrelated host tooling
290/// doesn't pollute the cell's audit trail.
291#[derive(Debug, Clone, PartialEq, Eq)]
292pub struct DecodedNflog {
293    /// `cellos-flow accept` or `cellos-flow drop` (the verdict suffix is the
294    /// classifier's signal).
295    pub prefix: String,
296    /// Raw L3 packet payload from `NFULA_PAYLOAD`. Empty when copy-mode is
297    /// `META` rather than `PACKET`.
298    pub payload: Vec<u8>,
299}
300
301/// Errors returned by [`decode_nflog_datagram`].
302#[derive(Debug, thiserror::Error)]
303pub enum NflogDecodeError {
304    #[error("nflog datagram too short ({0} bytes)")]
305    TooShort(usize),
306    #[error("nflog datagram missing required attributes")]
307    MissingAttrs,
308}
309
310// nfnetlink/nflog attribute type constants (from `linux/netfilter/nfnetlink_log.h`).
311// Replicated here so we don't pull in an external crate just for a handful of
312// integers. The values are a stable kernel ABI.
313pub(crate) const NFULA_PACKET_HDR: u16 = 1;
314pub(crate) const NFULA_PAYLOAD: u16 = 9;
315pub(crate) const NFULA_PREFIX: u16 = 10;
316
317// nflog command + copy-mode constants.
318pub(crate) const NFULNL_CFG_CMD_BIND: u8 = 1;
319pub(crate) const NFULNL_CFG_CMD_PF_BIND: u8 = 4;
320pub(crate) const NFULNL_CFG_CMD_PF_UNBIND: u8 = 5;
321pub(crate) const NFULNL_COPY_PACKET: u8 = 2;
322
323/// nfnetlink subsys id for nflog (`NFNL_SUBSYS_ULOG`).
324pub(crate) const NFNL_SUBSYS_ULOG: u8 = 4;
325/// `NFULNL_MSG_PACKET` — the message type carrying captured packets.
326pub(crate) const NFULNL_MSG_PACKET: u8 = 0;
327/// `NFULNL_MSG_CONFIG` — used for bind / copy-mode commands.
328pub(crate) const NFULNL_MSG_CONFIG: u8 = 1;
329
330/// netlink socket family literal (`AF_NETLINK`). libc carries the constant
331/// but we pin it here so the pure decoder unit-tests don't need libc on
332/// non-Linux hosts.
333pub(crate) const AF_NETLINK_LITERAL: i32 = 16;
334/// `NETLINK_NETFILTER` socket protocol literal.
335pub(crate) const NETLINK_NETFILTER_LITERAL: i32 = 12;
336
337/// Decode the *body* of an `NFULNL_MSG_PACKET` netlink message — i.e. the
338/// bytes following the `nlmsghdr` + `nfgenmsg` headers — into a
339/// [`DecodedNflog`]. The walking logic understands the TLV ("attribute")
340/// stream nflog uses: pairs of `(nla_len: u16, nla_type: u16, value...)`
341/// padded to 4-byte alignment.
342///
343/// The minimal set of attributes Phase 1 consumes is `NFULA_PREFIX` (the
344/// `log group ... prefix "..."` operator string) and `NFULA_PAYLOAD` (the
345/// captured L3 packet bytes; empty when copy-mode is `META`). Other
346/// attributes — hwaddr, indev, outdev, mark, ts, uid, gid — are skipped.
347pub fn decode_nflog_datagram(body: &[u8]) -> Result<DecodedNflog, NflogDecodeError> {
348    if body.len() < 4 {
349        return Err(NflogDecodeError::TooShort(body.len()));
350    }
351    let mut prefix: Option<String> = None;
352    let mut payload: Option<Vec<u8>> = None;
353
354    let mut cursor = 0usize;
355    while cursor + 4 <= body.len() {
356        let nla_len = u16::from_ne_bytes([body[cursor], body[cursor + 1]]) as usize;
357        let nla_type = u16::from_ne_bytes([body[cursor + 2], body[cursor + 3]]);
358        if nla_len < 4 || cursor + nla_len > body.len() {
359            break;
360        }
361        let value_start = cursor + 4;
362        let value_end = cursor + nla_len;
363        let value = &body[value_start..value_end];
364        match nla_type {
365            NFULA_PREFIX => {
366                // NUL-terminated string.
367                let s = value
368                    .iter()
369                    .position(|b| *b == 0)
370                    .map(|n| &value[..n])
371                    .unwrap_or(value);
372                prefix = Some(String::from_utf8_lossy(s).into_owned());
373            }
374            NFULA_PAYLOAD => {
375                payload = Some(value.to_vec());
376            }
377            _ => {}
378        }
379        // Advance to the next attribute, padded to 4 bytes.
380        cursor += (nla_len + 3) & !3;
381    }
382    let prefix = prefix.ok_or(NflogDecodeError::MissingAttrs)?;
383    Ok(DecodedNflog {
384        prefix,
385        payload: payload.unwrap_or_default(),
386    })
387}
388
389/// L3/L4 attribution distilled from an nflog payload.
390///
391/// All fields are optional because nflog can be configured with copy-mode
392/// `META` (no payload at all) or because the captured packet may be a
393/// non-IP frame (very rare in practice — netfilter only hooks IP).
394///
395/// L5-15: `src_addr` / `src_port` / `protocol_byte` are populated alongside
396/// the dst fields so the listener can derive a [`FlowKey`] suitable for
397/// `FlowAccumulator::record()`. The wire-emitted `NetworkFlowDecision`
398/// schema only carries dst attribution today, so the src fields are
399/// listener-internal — they exist on this struct to keep one parsing pass
400/// rather than re-walking the payload.
401///
402/// [`FlowKey`]: crate::ebpf_flow::connection_tracking::FlowKey
403#[derive(Debug, Default, Clone, PartialEq, Eq)]
404pub struct FlowAttribution {
405    pub src_addr: Option<String>,
406    pub src_port: Option<u16>,
407    pub dst_addr: Option<String>,
408    pub dst_port: Option<u16>,
409    pub protocol: Option<String>,
410    /// IANA protocol byte (6 = TCP, 17 = UDP, 1 = ICMP, 58 = ICMPv6). Kept
411    /// alongside the human-readable `protocol` string so the FlowKey hash
412    /// stays cheap and matches the `FlowKey::protocol: u8` field directly.
413    pub protocol_byte: Option<u8>,
414}
415
416/// Decode the L3 + L4 headers of an nflog payload to extract destination
417/// IP / port / transport protocol. Returns a default-empty
418/// [`FlowAttribution`] for any parse failure — we deliberately degrade
419/// gracefully rather than drop the entire decision event.
420///
421/// Supported shapes:
422/// - IPv4 + TCP / UDP / ICMP
423/// - IPv6 + TCP / UDP / ICMPv6
424///
425/// Extension headers in IPv6 are NOT walked; an exotic packet with
426/// HBH/Routing/Fragment headers between the IPv6 fixed header and the
427/// transport will surface with `protocol: None` and no port.
428pub fn decode_l3_l4_attribution(payload: &[u8]) -> FlowAttribution {
429    if payload.is_empty() {
430        return FlowAttribution::default();
431    }
432    let version = payload[0] >> 4;
433    match version {
434        4 => decode_ipv4(payload),
435        6 => decode_ipv6(payload),
436        _ => FlowAttribution::default(),
437    }
438}
439
440fn decode_ipv4(p: &[u8]) -> FlowAttribution {
441    if p.len() < 20 {
442        return FlowAttribution::default();
443    }
444    let ihl = (p[0] & 0x0f) as usize * 4;
445    if ihl < 20 || p.len() < ihl {
446        return FlowAttribution::default();
447    }
448    let proto_byte = p[9];
449    let src = std::net::Ipv4Addr::new(p[12], p[13], p[14], p[15]).to_string();
450    let dst = std::net::Ipv4Addr::new(p[16], p[17], p[18], p[19]).to_string();
451    let mut attr = FlowAttribution {
452        src_addr: Some(src),
453        src_port: None,
454        dst_addr: Some(dst),
455        dst_port: None,
456        protocol: None,
457        protocol_byte: Some(proto_byte),
458    };
459    match proto_byte {
460        6 => {
461            attr.protocol = Some("tcp".to_string());
462            attr.src_port = parse_src_port(p, ihl);
463            attr.dst_port = parse_dst_port(p, ihl);
464        }
465        17 => {
466            attr.protocol = Some("udp".to_string());
467            attr.src_port = parse_src_port(p, ihl);
468            attr.dst_port = parse_dst_port(p, ihl);
469        }
470        1 => attr.protocol = Some("icmp".to_string()),
471        _ => {}
472    }
473    attr
474}
475
476fn decode_ipv6(p: &[u8]) -> FlowAttribution {
477    if p.len() < 40 {
478        return FlowAttribution::default();
479    }
480    let next_header = p[6];
481    let mut src_octets = [0u8; 16];
482    src_octets.copy_from_slice(&p[8..24]);
483    let src = std::net::Ipv6Addr::from(src_octets).to_string();
484    let mut dst_octets = [0u8; 16];
485    dst_octets.copy_from_slice(&p[24..40]);
486    let dst = std::net::Ipv6Addr::from(dst_octets).to_string();
487    let mut attr = FlowAttribution {
488        src_addr: Some(src),
489        src_port: None,
490        dst_addr: Some(dst),
491        dst_port: None,
492        protocol: None,
493        protocol_byte: Some(next_header),
494    };
495    match next_header {
496        6 => {
497            attr.protocol = Some("tcp".to_string());
498            attr.src_port = parse_src_port(p, 40);
499            attr.dst_port = parse_dst_port(p, 40);
500        }
501        17 => {
502            attr.protocol = Some("udp".to_string());
503            attr.src_port = parse_src_port(p, 40);
504            attr.dst_port = parse_dst_port(p, 40);
505        }
506        58 => attr.protocol = Some("icmp6".to_string()),
507        _ => {}
508    }
509    attr
510}
511
512fn parse_src_port(p: &[u8], l4_offset: usize) -> Option<u16> {
513    if p.len() < l4_offset + 4 {
514        return None;
515    }
516    Some(u16::from_be_bytes([p[l4_offset], p[l4_offset + 1]]))
517}
518
519fn parse_dst_port(p: &[u8], l4_offset: usize) -> Option<u16> {
520    if p.len() < l4_offset + 4 {
521        return None;
522    }
523    Some(u16::from_be_bytes([p[l4_offset + 2], p[l4_offset + 3]]))
524}
525
526/// L5-15 — derive a [`FlowKey`] from a parsed [`FlowAttribution`].
527///
528/// Returns `None` when any required 5-tuple field is missing — non-IP
529/// payloads, copy-mode `META` datagrams (no payload), or transport
530/// protocols without ports (ICMP, ICMPv6) all surface as `None`. The
531/// caller treats `None` as "skip the accumulator update" rather than
532/// fabricating a synthetic key.
533///
534/// [`FlowKey`]: crate::ebpf_flow::connection_tracking::FlowKey
535pub fn flow_key_from_attribution(
536    attr: &FlowAttribution,
537) -> Option<crate::ebpf_flow::connection_tracking::FlowKey> {
538    use crate::ebpf_flow::connection_tracking::FlowKey;
539    let src_addr: std::net::IpAddr = attr.src_addr.as_deref()?.parse().ok()?;
540    let dst_addr: std::net::IpAddr = attr.dst_addr.as_deref()?.parse().ok()?;
541    let src_port = attr.src_port?;
542    let dst_port = attr.dst_port?;
543    let protocol = attr.protocol_byte?;
544    Some(FlowKey {
545        src_addr,
546        src_port,
547        dst_addr,
548        dst_port,
549        protocol,
550    })
551}
552
553/// L5-15 — record an Opened flow into the shared accumulator, swallowing
554/// mutex poisoning (a poisoned accumulator is a bug elsewhere, but the
555/// listener MUST NOT panic on the recv path or the cell loses every
556/// subsequent per-flow event). On `PoisonError` we still walk through the
557/// inner accumulator via `into_inner()`-style recovery (`get_mut` on the
558/// guard) so the count remains correct even after the panic that poisoned
559/// the lock.
560pub fn record_opened_flow(
561    accumulator: &std::sync::Arc<
562        std::sync::Mutex<crate::ebpf_flow::connection_tracking::FlowAccumulator>,
563    >,
564    key: crate::ebpf_flow::connection_tracking::FlowKey,
565) {
566    use crate::ebpf_flow::connection_tracking::{FlowEvent, FlowEventKind};
567    let event = FlowEvent {
568        key,
569        kind: FlowEventKind::Opened,
570        timestamp_ns: 0,
571    };
572    match accumulator.lock() {
573        Ok(mut guard) => guard.record(&event),
574        Err(poisoned) => {
575            // Lock was poisoned by an unrelated panic. Continue recording
576            // rather than dropping the event — `unique_flow_count` is
577            // monotonic so a partially-updated set is still a strict
578            // undercount, never an overcount.
579            poisoned.into_inner().record(&event);
580        }
581    }
582}
583
584// ────────────────────────────────────────────────────────────────────────
585// Decision builder
586// ────────────────────────────────────────────────────────────────────────
587
588/// Compose a [`NetworkFlowDecision`] from a decoded nflog packet + the
589/// activation context.
590///
591/// `prefix` carries the verdict suffix (`cellos-flow accept` or
592/// `cellos-flow drop`). The `decision` enum + `reason_code` follow the same
593/// convention as [`crate::nft_counters::classify_rule_repr`] but with a
594/// per-flow flavour: `nft_log_accept` / `nft_log_drop` mark this as a
595/// real-time event rather than a post-run scrape attribution. Operators
596/// querying the audit trail can `.filter(reasonCode startsWith "nft_log_")`
597/// to isolate the per-flow stream.
598pub fn build_decision(
599    activation: &PerFlowActivation,
600    prefix: &str,
601    payload: &[u8],
602    observed_at: &str,
603) -> NetworkFlowDecision {
604    let attribution = decode_l3_l4_attribution(payload);
605    let (decision, reason_code) = if prefix.contains("accept") {
606        (NetworkFlowDecisionOutcome::Allow, "nft_log_accept")
607    } else if prefix.contains("drop") {
608        (NetworkFlowDecisionOutcome::Deny, "nft_log_drop")
609    } else {
610        // Unknown verdict — surface as deny default-drop so operators see
611        // it rather than silently dropping the event.
612        (NetworkFlowDecisionOutcome::Deny, "nft_log_unknown")
613    };
614    NetworkFlowDecision {
615        schema_version: "1.0.0".to_string(),
616        cell_id: activation.cell_id.clone(),
617        run_id: activation.run_id.clone(),
618        decision_id: uuid::Uuid::new_v4().to_string(),
619        direction: NetworkFlowDirection::Egress,
620        decision,
621        reason_code: reason_code.to_string(),
622        nft_rule_ref: None,
623        dst_addr: attribution.dst_addr,
624        dst_port: attribution.dst_port,
625        protocol: attribution.protocol,
626        packet_count: None,
627        byte_count: None,
628        policy_digest: activation.policy_digest.clone(),
629        keyset_id: activation.keyset_id.clone(),
630        issuer_kid: activation.issuer_kid.clone(),
631        correlation_id: None,
632        observed_at: observed_at.to_string(),
633    }
634}
635
636// ────────────────────────────────────────────────────────────────────────
637// Linux-only listener spawn
638// ────────────────────────────────────────────────────────────────────────
639
640/// Handle returned by [`spawn_per_flow_listener_in_netns`].
641pub struct PerFlowListenerHandle {
642    /// Set to `true` to ask the listener to exit at the next iteration
643    /// (within `LISTENER_RECV_TIMEOUT_MS` worst case).
644    pub shutdown: Arc<AtomicBool>,
645    /// Join handle for the listener OS thread. `Some` until [`Self::join`]
646    /// consumes it.
647    #[cfg(target_os = "linux")]
648    pub thread: Option<std::thread::JoinHandle<PerFlowListenerStats>>,
649}
650
651/// Aggregate stats returned when the listener thread exits.
652#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
653pub struct PerFlowListenerStats {
654    /// Total nflog datagrams the listener observed.
655    pub datagrams_total: u64,
656    /// Datagrams whose prefix matched [`LOG_PREFIX_BASE`] (i.e. originated
657    /// from the augmented ruleset).
658    pub datagrams_matched: u64,
659    /// Datagrams that failed [`decode_nflog_datagram`].
660    pub datagrams_decode_failed: u64,
661    /// Events emitted to the supervisor's sink.
662    pub events_emitted: u64,
663}
664
665impl PerFlowListenerHandle {
666    /// Join the listener thread and return its cumulative stats.
667    #[cfg(target_os = "linux")]
668    pub fn join(&mut self) -> Option<PerFlowListenerStats> {
669        let handle = self.thread.take()?;
670        match handle.join() {
671            Ok(s) => Some(s),
672            Err(_) => {
673                tracing::warn!(
674                    target: "cellos.supervisor.per_flow",
675                    "per-flow listener thread panicked on join"
676                );
677                None
678            }
679        }
680    }
681
682    /// Stub join on non-Linux platforms.
683    #[cfg(not(target_os = "linux"))]
684    pub fn join(&mut self) -> Option<PerFlowListenerStats> {
685        None
686    }
687}
688
689/// Worst-case shutdown latency: the listener's `SO_RCVTIMEO` is set to
690/// 100ms so the recv loop checks the shutdown flag at that cadence.
691#[cfg(target_os = "linux")]
692const LISTENER_RECV_TIMEOUT_MS: i64 = 100;
693
694/// Linux-only — spawn a dedicated OS thread, `setns(2)` into
695/// `/proc/<child_pid>/ns/net`, open a `NETLINK_NETFILTER` socket, bind the
696/// nflog group, and emit one [`NetworkFlowDecision`] CloudEvent per
697/// matching datagram until `shutdown` is set.
698///
699/// **Capability requirement**: the supervisor must hold `CAP_NET_ADMIN` to
700/// open `NETLINK_NETFILTER`. If the syscall fails with `EPERM` the listener
701/// thread emits a single `tracing::warn` and exits cleanly — the cell still
702/// runs, just without per-flow events.
703#[cfg(target_os = "linux")]
704pub fn spawn_per_flow_listener_in_netns(
705    child_pid: u32,
706    activation: PerFlowActivation,
707    sink: Arc<dyn cellos_core::ports::EventSink>,
708    shutdown: Arc<AtomicBool>,
709    accumulator: Option<
710        Arc<std::sync::Mutex<crate::ebpf_flow::connection_tracking::FlowAccumulator>>,
711    >,
712) -> std::io::Result<PerFlowListenerHandle> {
713    use std::fs::File;
714    use std::os::unix::io::AsRawFd;
715
716    let netns_path = format!("/proc/{child_pid}/ns/net");
717    let netns_file = File::open(&netns_path)
718        .map_err(|e| std::io::Error::new(e.kind(), format!("open netns at {netns_path}: {e}")))?;
719
720    let runtime_handle = tokio::runtime::Handle::try_current().ok();
721    let shutdown_for_thread = shutdown.clone();
722
723    let thread = std::thread::Builder::new()
724        .name(format!("cellos-per-flow-{child_pid}"))
725        .spawn(move || {
726            // SAFETY: setns is the documented Linux syscall for moving the
727            // calling thread into the namespace referenced by `fd`. The
728            // listener thread never returns to a tokio worker, so polluting
729            // its netns is intentional and isolated. `netns_file` is held
730            // for the lifetime of the thread.
731            let setns_rc = unsafe { libc::setns(netns_file.as_raw_fd(), libc::CLONE_NEWNET) };
732            if setns_rc != 0 {
733                let err = std::io::Error::last_os_error();
734                tracing::warn!(
735                    target: "cellos.supervisor.per_flow",
736                    error = %err,
737                    child_pid = child_pid,
738                    "setns(CLONE_NEWNET) failed — per-flow listener bailing"
739                );
740                return PerFlowListenerStats::default();
741            }
742            run_listener_loop(
743                activation,
744                sink,
745                shutdown_for_thread,
746                runtime_handle,
747                accumulator,
748            )
749        })?;
750
751    Ok(PerFlowListenerHandle {
752        shutdown,
753        thread: Some(thread),
754    })
755}
756
757/// Stub on non-Linux platforms: per-flow events require netfilter / nflog
758/// which are Linux-only.
759#[cfg(not(target_os = "linux"))]
760pub fn spawn_per_flow_listener_in_netns(
761    _child_pid: u32,
762    _activation: PerFlowActivation,
763    _sink: Arc<dyn cellos_core::ports::EventSink>,
764    shutdown: Arc<AtomicBool>,
765    _accumulator: Option<
766        Arc<std::sync::Mutex<crate::ebpf_flow::connection_tracking::FlowAccumulator>>,
767    >,
768) -> std::io::Result<PerFlowListenerHandle> {
769    Ok(PerFlowListenerHandle { shutdown })
770}
771
772#[cfg(target_os = "linux")]
773fn run_listener_loop(
774    activation: PerFlowActivation,
775    sink: Arc<dyn cellos_core::ports::EventSink>,
776    shutdown: Arc<AtomicBool>,
777    runtime_handle: Option<tokio::runtime::Handle>,
778    accumulator: Option<
779        Arc<std::sync::Mutex<crate::ebpf_flow::connection_tracking::FlowAccumulator>>,
780    >,
781) -> PerFlowListenerStats {
782    use std::os::unix::io::FromRawFd;
783    use std::os::unix::io::OwnedFd;
784
785    let mut stats = PerFlowListenerStats::default();
786
787    // Open AF_NETLINK / NETLINK_NETFILTER socket.
788    let sock_fd =
789        unsafe { libc::socket(libc::AF_NETLINK, libc::SOCK_RAW, NETLINK_NETFILTER_LITERAL) };
790    if sock_fd < 0 {
791        let err = std::io::Error::last_os_error();
792        tracing::warn!(
793            target: "cellos.supervisor.per_flow",
794            error = %err,
795            "socket(AF_NETLINK, NETLINK_NETFILTER) failed — per-flow listener bailing"
796        );
797        return stats;
798    }
799    // SAFETY: sock_fd is a fresh kernel-owned fd we just opened; OwnedFd
800    // takes responsibility for closing on drop, including on early returns
801    // from this function. The variable is bound (rather than `_`) so its
802    // Drop impl runs at function exit.
803    let _sock_guard = unsafe { OwnedFd::from_raw_fd(sock_fd) };
804
805    // Bind to PID 0 (kernel-assigned) — nflog only needs a valid sockaddr_nl.
806    let mut sa: libc::sockaddr_nl = unsafe { std::mem::zeroed() };
807    sa.nl_family = libc::AF_NETLINK as u16;
808    sa.nl_pid = 0;
809    sa.nl_groups = 0;
810    let bind_rc = unsafe {
811        libc::bind(
812            sock_fd,
813            &sa as *const _ as *const libc::sockaddr,
814            std::mem::size_of::<libc::sockaddr_nl>() as u32,
815        )
816    };
817    if bind_rc != 0 {
818        let err = std::io::Error::last_os_error();
819        tracing::warn!(
820            target: "cellos.supervisor.per_flow",
821            error = %err,
822            "bind() on netlink socket failed — per-flow listener bailing"
823        );
824        return stats;
825    }
826
827    // Configure SO_RCVTIMEO so the recv loop wakes every 100ms to check
828    // the shutdown flag.
829    let tv = libc::timeval {
830        tv_sec: 0,
831        tv_usec: (LISTENER_RECV_TIMEOUT_MS * 1000) as libc::suseconds_t,
832    };
833    let _ = unsafe {
834        libc::setsockopt(
835            sock_fd,
836            libc::SOL_SOCKET,
837            libc::SO_RCVTIMEO,
838            &tv as *const _ as *const libc::c_void,
839            std::mem::size_of::<libc::timeval>() as u32,
840        )
841    };
842
843    // PF_BIND for AF_INET + AF_INET6, then BIND group N, then COPY_PACKET mode.
844    if let Err(e) = send_nflog_cfg_pf_bind(sock_fd, libc::AF_INET as u16) {
845        tracing::warn!(target: "cellos.supervisor.per_flow", error=%e, "PF_BIND v4 failed");
846        return stats;
847    }
848    if let Err(e) = send_nflog_cfg_pf_bind(sock_fd, libc::AF_INET6 as u16) {
849        tracing::warn!(target: "cellos.supervisor.per_flow", error=%e, "PF_BIND v6 failed");
850        return stats;
851    }
852    if let Err(e) = send_nflog_cfg_bind_group(sock_fd, activation.nflog_group) {
853        tracing::warn!(target: "cellos.supervisor.per_flow", error=%e, "BIND group failed");
854        return stats;
855    }
856    if let Err(e) = send_nflog_cfg_copy_packet(sock_fd, activation.nflog_group) {
857        tracing::warn!(target: "cellos.supervisor.per_flow", error=%e, "COPY_PACKET failed");
858        return stats;
859    }
860
861    // Recv loop.
862    let mut buf = vec![0u8; 65536];
863    use std::sync::atomic::Ordering;
864    while !shutdown.load(Ordering::SeqCst) {
865        let n = unsafe { libc::recv(sock_fd, buf.as_mut_ptr() as *mut libc::c_void, buf.len(), 0) };
866        if n < 0 {
867            let err = std::io::Error::last_os_error();
868            // EAGAIN / EWOULDBLOCK is the timeout firing — loop & re-check shutdown.
869            // EWOULDBLOCK == EAGAIN on Linux glibc/musl, so one arm covers both.
870            if matches!(err.raw_os_error(), Some(libc::EAGAIN)) {
871                continue;
872            }
873            tracing::debug!(
874                target: "cellos.supervisor.per_flow",
875                error = %err,
876                "recv() error — exiting listener loop"
877            );
878            break;
879        }
880        let received = &buf[..n as usize];
881        stats.datagrams_total += 1;
882        let mut offset = 0usize;
883        while offset < received.len() {
884            // Parse nlmsghdr (16 bytes on 64-bit).
885            if received.len() - offset < 16 {
886                break;
887            }
888            let nlmsg_len = u32::from_ne_bytes([
889                received[offset],
890                received[offset + 1],
891                received[offset + 2],
892                received[offset + 3],
893            ]) as usize;
894            let nlmsg_type = u16::from_ne_bytes([received[offset + 4], received[offset + 5]]);
895            if nlmsg_len < 16 || offset + nlmsg_len > received.len() {
896                break;
897            }
898            let subsys = (nlmsg_type >> 8) as u8;
899            let msg_kind = (nlmsg_type & 0xff) as u8;
900            // Skip headers: nlmsghdr (16) + nfgenmsg (4) = 20 bytes.
901            let body_start = offset + 16 + 4;
902            if subsys == NFNL_SUBSYS_ULOG
903                && msg_kind == NFULNL_MSG_PACKET
904                && body_start <= offset + nlmsg_len
905            {
906                let body = &received[body_start..offset + nlmsg_len];
907                match decode_nflog_datagram(body) {
908                    Ok(decoded) => {
909                        if decoded.prefix.starts_with(LOG_PREFIX_BASE) {
910                            stats.datagrams_matched += 1;
911                            // L5-15 — populate the FlowAccumulator from the
912                            // nflog event path so `exercised_egress_connections`
913                            // reports the real count even when eBPF isn't wired.
914                            // We only count `accept` verdicts: a dropped packet
915                            // means the workload TRIED to open a flow that
916                            // policy denied; the homeostasis signal counts
917                            // ACTUALLY EXERCISED connections (a denied syn is
918                            // not an exercised connection).
919                            if decoded.prefix.contains("accept") {
920                                if let Some(acc) = accumulator.as_ref() {
921                                    let attribution = decode_l3_l4_attribution(&decoded.payload);
922                                    if let Some(key) = flow_key_from_attribution(&attribution) {
923                                        record_opened_flow(acc, key);
924                                    }
925                                }
926                            }
927                            let now = chrono::Utc::now().to_rfc3339();
928                            let decision = build_decision(
929                                &activation,
930                                &decoded.prefix,
931                                &decoded.payload,
932                                &now,
933                            );
934                            match cloud_event_v1_network_flow_decision(
935                                "cellos-supervisor",
936                                &now,
937                                &decision,
938                            ) {
939                                Ok(event) => {
940                                    if let Some(rt) = runtime_handle.as_ref() {
941                                        let sink = sink.clone();
942                                        rt.spawn(async move {
943                                            if let Err(e) = sink.emit(&event).await {
944                                                tracing::warn!(
945                                                    target: "cellos.supervisor.per_flow",
946                                                    error = %e,
947                                                    "sink emit failed for network_flow_decision event"
948                                                );
949                                            }
950                                        });
951                                        stats.events_emitted += 1;
952                                    }
953                                }
954                                Err(e) => {
955                                    tracing::debug!(
956                                        target: "cellos.supervisor.per_flow",
957                                        error = %e,
958                                        "network_flow_decision envelope build failed"
959                                    );
960                                }
961                            }
962                        }
963                    }
964                    Err(_) => {
965                        stats.datagrams_decode_failed += 1;
966                    }
967                }
968            }
969            // 4-byte align next message.
970            offset += (nlmsg_len + 3) & !3;
971        }
972    }
973    stats
974}
975
976#[cfg(target_os = "linux")]
977fn send_nflog_cfg_pf_bind(sock_fd: i32, pf: u16) -> std::io::Result<()> {
978    // NFULA_CFG_CMD attribute carries one byte (the cmd id).
979    let payload = build_nfnl_cfg_msg(0, NFULNL_CFG_CMD_PF_BIND, pf);
980    send_netlink(sock_fd, &payload)
981}
982
983#[cfg(target_os = "linux")]
984fn send_nflog_cfg_bind_group(sock_fd: i32, group: u16) -> std::io::Result<()> {
985    let payload = build_nfnl_cfg_msg(group, NFULNL_CFG_CMD_BIND, 0);
986    send_netlink(sock_fd, &payload)
987}
988
989#[cfg(target_os = "linux")]
990fn send_nflog_cfg_copy_packet(sock_fd: i32, group: u16) -> std::io::Result<()> {
991    // NFULA_CFG_MODE: u32 copy_range (0 = full) + u8 copy_mode + 3 bytes pad.
992    let mut buf = Vec::new();
993    // nlmsghdr: len=24 + attr=12 = 36 (filled in by build_nfnl_cfg_envelope).
994    let attr_type: u16 = 2; // NFULA_CFG_MODE
995    let nla_len: u16 = 4 + 4 + 4; // header(4) + copy_range(4) + copy_mode+pad(4)
996    buf.extend_from_slice(&nla_len.to_ne_bytes());
997    buf.extend_from_slice(&attr_type.to_ne_bytes());
998    buf.extend_from_slice(&0u32.to_be_bytes()); // copy_range = 0xffff (we send 0 → full pkt up to MTU)
999    buf.push(NFULNL_COPY_PACKET);
1000    buf.extend_from_slice(&[0u8; 3]); // pad
1001    let envelope = build_nfnl_cfg_envelope(group, &buf);
1002    send_netlink(sock_fd, &envelope)
1003}
1004
1005/// Build a CFG message carrying a single `NFULA_CFG_CMD` attribute.
1006#[cfg(target_os = "linux")]
1007fn build_nfnl_cfg_msg(group: u16, cmd: u8, pf: u16) -> Vec<u8> {
1008    // attribute: nla_len=8 (4 header + 4 padded body), nla_type=1 (NFULA_CFG_CMD)
1009    let mut attr = Vec::new();
1010    let nla_len: u16 = 4 + 1 + 3; // header(4) + 1 byte cmd + 3 byte pad → align to 4
1011    attr.extend_from_slice(&nla_len.to_ne_bytes());
1012    attr.extend_from_slice(&1u16.to_ne_bytes()); // NFULA_CFG_CMD
1013    attr.push(cmd);
1014    attr.extend_from_slice(&[0u8; 3]); // pad
1015    let _ = pf; // currently embedded in nfgenmsg.res_id (caller sets it via group)
1016    build_nfnl_cfg_envelope(group, &attr)
1017}
1018
1019/// Build the full netlink envelope: nlmsghdr (16) + nfgenmsg (4) + attrs.
1020#[cfg(target_os = "linux")]
1021fn build_nfnl_cfg_envelope(group: u16, attrs: &[u8]) -> Vec<u8> {
1022    let total_len: u32 = 16 + 4 + attrs.len() as u32;
1023    let mut out = Vec::with_capacity(total_len as usize);
1024    // nlmsghdr.
1025    out.extend_from_slice(&total_len.to_ne_bytes());
1026    // type = NFNL_SUBSYS_ULOG << 8 | NFULNL_MSG_CONFIG
1027    let nlmsg_type: u16 = ((NFNL_SUBSYS_ULOG as u16) << 8) | NFULNL_MSG_CONFIG as u16;
1028    out.extend_from_slice(&nlmsg_type.to_ne_bytes());
1029    // flags = NLM_F_REQUEST | NLM_F_ACK
1030    let flags: u16 = (libc::NLM_F_REQUEST | libc::NLM_F_ACK) as u16;
1031    out.extend_from_slice(&flags.to_ne_bytes());
1032    // seq = 0, pid = 0
1033    out.extend_from_slice(&0u32.to_ne_bytes());
1034    out.extend_from_slice(&0u32.to_ne_bytes());
1035    // nfgenmsg: family(u8) + version(u8) + res_id(u16 BE = nflog group)
1036    out.push(0); // family unspec
1037    out.push(0); // version 0
1038    out.extend_from_slice(&group.to_be_bytes()); // res_id is BE in netfilter
1039    out.extend_from_slice(attrs);
1040    out
1041}
1042
1043#[cfg(target_os = "linux")]
1044fn send_netlink(sock_fd: i32, payload: &[u8]) -> std::io::Result<()> {
1045    let mut sa: libc::sockaddr_nl = unsafe { std::mem::zeroed() };
1046    sa.nl_family = libc::AF_NETLINK as u16;
1047    let n = unsafe {
1048        libc::sendto(
1049            sock_fd,
1050            payload.as_ptr() as *const libc::c_void,
1051            payload.len(),
1052            0,
1053            &sa as *const _ as *const libc::sockaddr,
1054            std::mem::size_of::<libc::sockaddr_nl>() as u32,
1055        )
1056    };
1057    if n < 0 {
1058        return Err(std::io::Error::last_os_error());
1059    }
1060    Ok(())
1061}
1062
1063// ────────────────────────────────────────────────────────────────────────
1064// Tests (18 unit tests — pure parsing + classification + decision builder)
1065// ────────────────────────────────────────────────────────────────────────
1066
1067#[cfg(test)]
1068mod tests {
1069    use super::*;
1070    use std::sync::Mutex;
1071
1072    /// Tests in this module that mutate any of the per-flow env vars
1073    /// (`CELLOS_FIRECRACKER_PER_FLOW_EBPF`, `CELLOS_PER_FLOW_REALTIME`,
1074    /// `CELLOS_FIRECRACKER_PER_FLOW_BACKEND`, `CELLOS_PER_FLOW_BACKEND`) must
1075    /// hold this lock for their entire duration. cargo runs tests in parallel
1076    /// by default and the env namespace is process-global — without
1077    /// serialisation, the activation/selector tests race each other and the
1078    /// "unset means None" contract flakes when a sibling test has just
1079    /// written a value between the `remove_var` and the `build_activation_from_env`
1080    /// call. Same pattern as `dns_proxy::upstream::tests::ENV_LOCK` and
1081    /// `linux_seccomp::tests::ENV_LOCK`.
1082    static ENV_LOCK: Mutex<()> = Mutex::new(());
1083
1084    fn sample_activation() -> PerFlowActivation {
1085        PerFlowActivation {
1086            cell_id: "cell-test".to_string(),
1087            run_id: "run-uuid".to_string(),
1088            nflog_group: 100,
1089            backend: PerFlowBackend::Nflog,
1090            policy_digest: Some("sha256:deadbeef".to_string()),
1091            keyset_id: Some("keyset-1".to_string()),
1092            issuer_kid: Some("kid-1".to_string()),
1093        }
1094    }
1095
1096    // ── env-driven activation ─────────────────────────────────────────
1097    #[test]
1098    fn build_activation_returns_none_when_env_unset() {
1099        // ENV_LOCK serialises this test against the realtime/legacy
1100        // activation-on tests below. Without the lock, a sibling test
1101        // setting `CELLOS_PER_FLOW_REALTIME=1` between our `remove_var`
1102        // and the `build_activation_from_env` call would return Some and
1103        // flake this test. We also clear both opt-in vars — the original
1104        // version only cleared `_EBPF`, missing the `_REALTIME` alias.
1105        let _g = ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
1106        let prev_ebpf = std::env::var(ENV_PER_FLOW_EBPF).ok();
1107        let prev_rt = std::env::var(ENV_PER_FLOW_REALTIME).ok();
1108        // SAFETY: env mutation in tests is serialised by ENV_LOCK above.
1109        unsafe {
1110            std::env::remove_var(ENV_PER_FLOW_EBPF);
1111            std::env::remove_var(ENV_PER_FLOW_REALTIME);
1112        }
1113        let act = build_activation_from_env("c", "r", None, None, None);
1114        assert!(act.is_none());
1115        // Restore prior values so we don't leak state to subsequent tests
1116        // that might run while still holding the lock-released window.
1117        unsafe {
1118            match prev_ebpf {
1119                Some(v) => std::env::set_var(ENV_PER_FLOW_EBPF, v),
1120                None => std::env::remove_var(ENV_PER_FLOW_EBPF),
1121            }
1122            match prev_rt {
1123                Some(v) => std::env::set_var(ENV_PER_FLOW_REALTIME, v),
1124                None => std::env::remove_var(ENV_PER_FLOW_REALTIME),
1125            }
1126        }
1127    }
1128
1129    #[test]
1130    fn select_backend_defaults_to_nflog() {
1131        let _g = ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
1132        let prev_legacy = std::env::var(ENV_PER_FLOW_BACKEND).ok();
1133        let prev_canonical = std::env::var(ENV_PER_FLOW_BACKEND_E7).ok();
1134        unsafe {
1135            std::env::remove_var(ENV_PER_FLOW_BACKEND);
1136            std::env::remove_var(ENV_PER_FLOW_BACKEND_E7);
1137        }
1138        assert_eq!(select_backend_from_env(), PerFlowBackend::Nflog);
1139        unsafe {
1140            if let Some(v) = prev_legacy {
1141                std::env::set_var(ENV_PER_FLOW_BACKEND, v);
1142            }
1143            if let Some(v) = prev_canonical {
1144                std::env::set_var(ENV_PER_FLOW_BACKEND_E7, v);
1145            }
1146        }
1147    }
1148
1149    #[test]
1150    fn select_backend_ebpf_returns_ebpf_variant() {
1151        // E7-5: the selector now returns `PerFlowBackend::Ebpf` when
1152        // operators opt in. Runtime fallback to nflog (on BPF object
1153        // missing, attach failure, non-Linux host) happens *inside*
1154        // the activation path, not here. This change is observable:
1155        // callers can now log which backend was selected vs which one
1156        // actually attached.
1157        let _g = ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
1158        let prev_legacy = std::env::var(ENV_PER_FLOW_BACKEND).ok();
1159        let prev_canonical = std::env::var(ENV_PER_FLOW_BACKEND_E7).ok();
1160        unsafe {
1161            std::env::remove_var(ENV_PER_FLOW_BACKEND_E7);
1162            std::env::set_var(ENV_PER_FLOW_BACKEND, "ebpf");
1163        }
1164        assert_eq!(select_backend_from_env(), PerFlowBackend::Ebpf);
1165        unsafe {
1166            match prev_legacy {
1167                Some(v) => std::env::set_var(ENV_PER_FLOW_BACKEND, v),
1168                None => std::env::remove_var(ENV_PER_FLOW_BACKEND),
1169            }
1170            match prev_canonical {
1171                Some(v) => std::env::set_var(ENV_PER_FLOW_BACKEND_E7, v),
1172                None => std::env::remove_var(ENV_PER_FLOW_BACKEND_E7),
1173            }
1174        }
1175    }
1176
1177    #[test]
1178    fn select_backend_honours_e7_canonical_env_name() {
1179        // E7-5: the spec-canonical env name is CELLOS_PER_FLOW_BACKEND
1180        // (no _FIRECRACKER_ infix). Setting only that var must opt in
1181        // to eBPF.
1182        let _g = ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
1183        let prev_legacy = std::env::var(ENV_PER_FLOW_BACKEND).ok();
1184        let prev_canonical = std::env::var(ENV_PER_FLOW_BACKEND_E7).ok();
1185        unsafe {
1186            std::env::remove_var(ENV_PER_FLOW_BACKEND);
1187            std::env::set_var(ENV_PER_FLOW_BACKEND_E7, "ebpf");
1188        }
1189        assert_eq!(select_backend_from_env(), PerFlowBackend::Ebpf);
1190        unsafe {
1191            match prev_legacy {
1192                Some(v) => std::env::set_var(ENV_PER_FLOW_BACKEND, v),
1193                None => std::env::remove_var(ENV_PER_FLOW_BACKEND),
1194            }
1195            match prev_canonical {
1196                Some(v) => std::env::set_var(ENV_PER_FLOW_BACKEND_E7, v),
1197                None => std::env::remove_var(ENV_PER_FLOW_BACKEND_E7),
1198            }
1199        }
1200    }
1201
1202    #[test]
1203    fn select_backend_legacy_name_wins_over_canonical_when_both_set() {
1204        // E7-5 precedence: legacy var wins on conflict.
1205        let _g = ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
1206        let prev_legacy = std::env::var(ENV_PER_FLOW_BACKEND).ok();
1207        let prev_canonical = std::env::var(ENV_PER_FLOW_BACKEND_E7).ok();
1208        unsafe {
1209            std::env::set_var(ENV_PER_FLOW_BACKEND, "nflog");
1210            std::env::set_var(ENV_PER_FLOW_BACKEND_E7, "ebpf");
1211        }
1212        assert_eq!(
1213            select_backend_from_env(),
1214            PerFlowBackend::Nflog,
1215            "legacy var must win when both set"
1216        );
1217        unsafe {
1218            match prev_legacy {
1219                Some(v) => std::env::set_var(ENV_PER_FLOW_BACKEND, v),
1220                None => std::env::remove_var(ENV_PER_FLOW_BACKEND),
1221            }
1222            match prev_canonical {
1223                Some(v) => std::env::set_var(ENV_PER_FLOW_BACKEND_E7, v),
1224                None => std::env::remove_var(ENV_PER_FLOW_BACKEND_E7),
1225            }
1226        }
1227    }
1228
1229    #[test]
1230    fn select_backend_unknown_value_defaults_to_nflog() {
1231        let _g = ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
1232        let prev_legacy = std::env::var(ENV_PER_FLOW_BACKEND).ok();
1233        let prev_canonical = std::env::var(ENV_PER_FLOW_BACKEND_E7).ok();
1234        unsafe {
1235            std::env::remove_var(ENV_PER_FLOW_BACKEND_E7);
1236            std::env::set_var(ENV_PER_FLOW_BACKEND, "xdp"); // typo / unknown
1237        }
1238        assert_eq!(select_backend_from_env(), PerFlowBackend::Nflog);
1239        unsafe {
1240            match prev_legacy {
1241                Some(v) => std::env::set_var(ENV_PER_FLOW_BACKEND, v),
1242                None => std::env::remove_var(ENV_PER_FLOW_BACKEND),
1243            }
1244            match prev_canonical {
1245                Some(v) => std::env::set_var(ENV_PER_FLOW_BACKEND_E7, v),
1246                None => std::env::remove_var(ENV_PER_FLOW_BACKEND_E7),
1247            }
1248        }
1249    }
1250
1251    // ── ruleset augmentation ──────────────────────────────────────────
1252    #[test]
1253    fn augment_rewrites_accept_verdict() {
1254        let input = "    ip daddr 10.0.0.1 tcp dport 443 accept";
1255        let out = augment_ruleset_with_log_actions(input, 100);
1256        assert!(out.contains("log group 100 prefix \"cellos-flow accept\""));
1257        assert!(out.ends_with("accept"));
1258    }
1259
1260    #[test]
1261    fn augment_rewrites_drop_verdict() {
1262        let input = "    udp dport 53 drop";
1263        let out = augment_ruleset_with_log_actions(input, 200);
1264        assert!(out.contains("log group 200 prefix \"cellos-flow drop\""));
1265        assert!(out.ends_with("drop"));
1266    }
1267
1268    #[test]
1269    fn augment_skips_loopback_shortcut() {
1270        let input = "    oif \"lo\" accept";
1271        let out = augment_ruleset_with_log_actions(input, 100);
1272        assert_eq!(out, input);
1273    }
1274
1275    #[test]
1276    fn augment_skips_policy_lines() {
1277        let input = "    type filter hook output priority 0; policy drop;";
1278        let out = augment_ruleset_with_log_actions(input, 100);
1279        assert_eq!(out, input);
1280    }
1281
1282    #[test]
1283    fn augment_is_idempotent() {
1284        let input = "    ip daddr 10.0.0.1 tcp dport 443 accept";
1285        let once = augment_ruleset_with_log_actions(input, 100);
1286        let twice = augment_ruleset_with_log_actions(&once, 100);
1287        assert_eq!(once, twice);
1288    }
1289
1290    #[test]
1291    fn augment_preserves_chain_structure() {
1292        let input = "table inet cellos_test {\n  chain output {\n    type filter hook output priority 0; policy drop;\n    oif \"lo\" accept\n    udp dport 53 drop\n  }\n}";
1293        let out = augment_ruleset_with_log_actions(input, 100);
1294        assert!(out.contains("table inet cellos_test {"));
1295        assert!(out.contains("chain output {"));
1296        assert!(out.contains("policy drop;"));
1297        assert!(out.contains("oif \"lo\" accept"));
1298        assert!(out.contains("log group 100 prefix \"cellos-flow drop\" drop"));
1299        assert!(out.ends_with("}"));
1300    }
1301
1302    // ── L3/L4 attribution ─────────────────────────────────────────────
1303    #[test]
1304    fn decode_l4_ipv4_tcp_extracts_dst() {
1305        // Minimal IPv4 + TCP header. ihl=5 (20 bytes), proto=6, dst=10.0.0.1,
1306        // tcp dport=443.
1307        let mut p = vec![0u8; 40];
1308        p[0] = 0x45;
1309        p[9] = 6; // TCP
1310        p[16] = 10;
1311        p[17] = 0;
1312        p[18] = 0;
1313        p[19] = 1;
1314        p[22] = 0x01; // dst port high byte
1315        p[23] = 0xbb; // dst port low byte (443)
1316        let attr = decode_l3_l4_attribution(&p);
1317        assert_eq!(attr.dst_addr.as_deref(), Some("10.0.0.1"));
1318        assert_eq!(attr.dst_port, Some(443));
1319        assert_eq!(attr.protocol.as_deref(), Some("tcp"));
1320    }
1321
1322    #[test]
1323    fn decode_l4_ipv4_udp_extracts_dst() {
1324        let mut p = vec![0u8; 40];
1325        p[0] = 0x45;
1326        p[9] = 17; // UDP
1327        p[16] = 1;
1328        p[17] = 1;
1329        p[18] = 1;
1330        p[19] = 1;
1331        p[22] = 0x00; // dport high
1332        p[23] = 0x35; // dport low (53)
1333        let attr = decode_l3_l4_attribution(&p);
1334        assert_eq!(attr.dst_addr.as_deref(), Some("1.1.1.1"));
1335        assert_eq!(attr.dst_port, Some(53));
1336        assert_eq!(attr.protocol.as_deref(), Some("udp"));
1337    }
1338
1339    #[test]
1340    fn decode_l4_ipv4_icmp() {
1341        let mut p = vec![0u8; 28];
1342        p[0] = 0x45;
1343        p[9] = 1; // ICMP
1344        p[16] = 8;
1345        p[17] = 8;
1346        p[18] = 8;
1347        p[19] = 8;
1348        let attr = decode_l3_l4_attribution(&p);
1349        assert_eq!(attr.dst_addr.as_deref(), Some("8.8.8.8"));
1350        assert_eq!(attr.protocol.as_deref(), Some("icmp"));
1351        assert_eq!(attr.dst_port, None);
1352    }
1353
1354    #[test]
1355    fn decode_l4_ipv6_tcp_extracts_dst() {
1356        let mut p = vec![0u8; 60];
1357        p[0] = 0x60; // version 6
1358        p[6] = 6; // next header = TCP
1359                  // dst = 2001:db8::1
1360        p[24] = 0x20;
1361        p[25] = 0x01;
1362        p[26] = 0x0d;
1363        p[27] = 0xb8;
1364        p[39] = 0x01;
1365        p[42] = 0x01;
1366        p[43] = 0xbb; // dport 443
1367        let attr = decode_l3_l4_attribution(&p);
1368        assert_eq!(attr.dst_addr.as_deref(), Some("2001:db8::1"));
1369        assert_eq!(attr.dst_port, Some(443));
1370        assert_eq!(attr.protocol.as_deref(), Some("tcp"));
1371    }
1372
1373    #[test]
1374    fn decode_l4_ipv6_icmp6() {
1375        let mut p = vec![0u8; 48];
1376        p[0] = 0x60;
1377        p[6] = 58; // ICMPv6
1378        let attr = decode_l3_l4_attribution(&p);
1379        assert_eq!(attr.protocol.as_deref(), Some("icmp6"));
1380        assert_eq!(attr.dst_port, None);
1381    }
1382
1383    #[test]
1384    fn decode_l4_handles_empty_payload() {
1385        let attr = decode_l3_l4_attribution(&[]);
1386        assert!(attr.dst_addr.is_none());
1387        assert!(attr.protocol.is_none());
1388    }
1389
1390    // ── nflog datagram decode ─────────────────────────────────────────
1391    #[test]
1392    fn decode_nflog_extracts_prefix_and_payload() {
1393        // Synthesize a TLV stream with NFULA_PREFIX = "cellos-flow accept\0"
1394        // followed by NFULA_PAYLOAD = some bytes.
1395        let mut buf = Vec::new();
1396        let prefix_str = b"cellos-flow accept\0";
1397        let nla_len: u16 = 4 + prefix_str.len() as u16;
1398        buf.extend_from_slice(&nla_len.to_ne_bytes());
1399        buf.extend_from_slice(&NFULA_PREFIX.to_ne_bytes());
1400        buf.extend_from_slice(prefix_str);
1401        // align to 4
1402        while buf.len() % 4 != 0 {
1403            buf.push(0);
1404        }
1405        // payload TLV
1406        let pkt = b"abcdef";
1407        let nla_len2: u16 = 4 + pkt.len() as u16;
1408        buf.extend_from_slice(&nla_len2.to_ne_bytes());
1409        buf.extend_from_slice(&NFULA_PAYLOAD.to_ne_bytes());
1410        buf.extend_from_slice(pkt);
1411        while buf.len() % 4 != 0 {
1412            buf.push(0);
1413        }
1414        let decoded = decode_nflog_datagram(&buf).expect("decode ok");
1415        assert_eq!(decoded.prefix, "cellos-flow accept");
1416        assert_eq!(decoded.payload, pkt);
1417    }
1418
1419    #[test]
1420    fn decode_nflog_round_trip_with_packet_attribution() {
1421        // Synthetic round-trip: build a TLV stream containing an IPv4+TCP
1422        // payload, decode, attribute, build a decision. End-to-end pure
1423        // exercise of the decoder.
1424        let prefix_str = b"cellos-flow drop\0";
1425        let mut packet = vec![0u8; 40];
1426        packet[0] = 0x45;
1427        packet[9] = 6; // TCP
1428        packet[16] = 192;
1429        packet[17] = 0;
1430        packet[18] = 2;
1431        packet[19] = 1;
1432        packet[22] = 0x01;
1433        packet[23] = 0xbb;
1434        let mut buf = Vec::new();
1435        let nla_len: u16 = 4 + prefix_str.len() as u16;
1436        buf.extend_from_slice(&nla_len.to_ne_bytes());
1437        buf.extend_from_slice(&NFULA_PREFIX.to_ne_bytes());
1438        buf.extend_from_slice(prefix_str);
1439        while buf.len() % 4 != 0 {
1440            buf.push(0);
1441        }
1442        let nla_len2: u16 = 4 + packet.len() as u16;
1443        buf.extend_from_slice(&nla_len2.to_ne_bytes());
1444        buf.extend_from_slice(&NFULA_PAYLOAD.to_ne_bytes());
1445        buf.extend_from_slice(&packet);
1446        while buf.len() % 4 != 0 {
1447            buf.push(0);
1448        }
1449        let decoded = decode_nflog_datagram(&buf).expect("decode ok");
1450        assert_eq!(decoded.prefix, "cellos-flow drop");
1451        let activation = sample_activation();
1452        let decision = build_decision(
1453            &activation,
1454            &decoded.prefix,
1455            &decoded.payload,
1456            "2026-01-01T00:00:00Z",
1457        );
1458        assert_eq!(decision.decision, NetworkFlowDecisionOutcome::Deny);
1459        assert_eq!(decision.reason_code, "nft_log_drop");
1460        assert_eq!(decision.dst_addr.as_deref(), Some("192.0.2.1"));
1461        assert_eq!(decision.dst_port, Some(443));
1462        assert_eq!(decision.protocol.as_deref(), Some("tcp"));
1463        assert_eq!(decision.cell_id, "cell-test");
1464        assert_eq!(decision.run_id, "run-uuid");
1465    }
1466
1467    #[test]
1468    fn decode_nflog_errors_on_short_input() {
1469        let err = decode_nflog_datagram(&[0u8; 2]).expect_err("must error");
1470        assert!(matches!(err, NflogDecodeError::TooShort(_)));
1471    }
1472
1473    #[test]
1474    fn decode_nflog_errors_on_missing_prefix() {
1475        // Only payload, no prefix attribute → MissingAttrs
1476        let pkt = vec![0u8; 8];
1477        let mut buf = Vec::new();
1478        let nla_len: u16 = 4 + pkt.len() as u16;
1479        buf.extend_from_slice(&nla_len.to_ne_bytes());
1480        buf.extend_from_slice(&NFULA_PAYLOAD.to_ne_bytes());
1481        buf.extend_from_slice(&pkt);
1482        let err = decode_nflog_datagram(&buf).expect_err("must error");
1483        assert!(matches!(err, NflogDecodeError::MissingAttrs));
1484    }
1485
1486    // ── decision builder ──────────────────────────────────────────────
1487    #[test]
1488    fn build_decision_for_accept_yields_allow() {
1489        let activation = sample_activation();
1490        let payload = vec![0u8; 0];
1491        let d = build_decision(&activation, "cellos-flow accept", &payload, "now");
1492        assert_eq!(d.decision, NetworkFlowDecisionOutcome::Allow);
1493        assert_eq!(d.reason_code, "nft_log_accept");
1494    }
1495
1496    #[test]
1497    fn build_decision_for_drop_yields_deny() {
1498        let activation = sample_activation();
1499        let payload = vec![0u8; 0];
1500        let d = build_decision(&activation, "cellos-flow drop", &payload, "now");
1501        assert_eq!(d.decision, NetworkFlowDecisionOutcome::Deny);
1502        assert_eq!(d.reason_code, "nft_log_drop");
1503    }
1504
1505    #[test]
1506    fn build_decision_unknown_prefix_falls_back_to_deny() {
1507        let activation = sample_activation();
1508        let d = build_decision(&activation, "cellos-flow weirdo", &[], "now");
1509        assert_eq!(d.decision, NetworkFlowDecisionOutcome::Deny);
1510        assert_eq!(d.reason_code, "nft_log_unknown");
1511    }
1512
1513    // ── env-alias activation (E7 / FC-38 Phase 2) ─────────────────────
1514    //
1515    // Both `CELLOS_FIRECRACKER_PER_FLOW_EBPF=1` (legacy) and
1516    // `CELLOS_PER_FLOW_REALTIME=1` (backend-neutral) opt the cell in. The
1517    // mutation guards below restore the prior ambient values so the tests
1518    // are order-independent — std's test harness runs unit tests in
1519    // parallel by default and unrelated tests in this module also touch
1520    // these vars.
1521
1522    fn save_realtime_env() -> (Option<String>, Option<String>) {
1523        let prev_ebpf = std::env::var(ENV_PER_FLOW_EBPF).ok();
1524        let prev_rt = std::env::var(ENV_PER_FLOW_REALTIME).ok();
1525        // SAFETY: env mutation in tests is gated by harness isolation
1526        // discipline shared across this module.
1527        unsafe {
1528            std::env::remove_var(ENV_PER_FLOW_EBPF);
1529            std::env::remove_var(ENV_PER_FLOW_REALTIME);
1530        }
1531        (prev_ebpf, prev_rt)
1532    }
1533
1534    fn restore_realtime_env(prev: (Option<String>, Option<String>)) {
1535        unsafe {
1536            match prev.0 {
1537                Some(v) => std::env::set_var(ENV_PER_FLOW_EBPF, v),
1538                None => std::env::remove_var(ENV_PER_FLOW_EBPF),
1539            }
1540            match prev.1 {
1541                Some(v) => std::env::set_var(ENV_PER_FLOW_REALTIME, v),
1542                None => std::env::remove_var(ENV_PER_FLOW_REALTIME),
1543            }
1544        }
1545    }
1546
1547    #[test]
1548    fn build_activation_off_when_neither_env_var_set() {
1549        let _g = ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
1550        let prev = save_realtime_env();
1551        let act = build_activation_from_env("c", "r", None, None, None);
1552        assert!(act.is_none(), "no env vars → no activation");
1553        restore_realtime_env(prev);
1554    }
1555
1556    #[test]
1557    fn build_activation_on_via_legacy_ebpf_env() {
1558        let _g = ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
1559        let prev = save_realtime_env();
1560        unsafe {
1561            std::env::set_var(ENV_PER_FLOW_EBPF, "1");
1562        }
1563        let act = build_activation_from_env("cell-x", "run-x", None, None, None);
1564        assert!(act.is_some(), "legacy _EBPF=1 must still opt in");
1565        restore_realtime_env(prev);
1566    }
1567
1568    #[test]
1569    fn build_activation_on_via_realtime_alias() {
1570        let _g = ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
1571        let prev = save_realtime_env();
1572        unsafe {
1573            std::env::set_var(ENV_PER_FLOW_REALTIME, "1");
1574        }
1575        let act = build_activation_from_env("cell-x", "run-x", None, None, None);
1576        assert!(act.is_some(), "CELLOS_PER_FLOW_REALTIME=1 must opt in");
1577        let act = act.unwrap();
1578        assert_eq!(act.cell_id, "cell-x");
1579        assert_eq!(act.run_id, "run-x");
1580        assert_eq!(act.nflog_group, DEFAULT_NFLOG_GROUP);
1581        restore_realtime_env(prev);
1582    }
1583
1584    #[test]
1585    fn build_activation_rejects_truthy_lookalikes_for_realtime() {
1586        let _g = ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
1587        let prev = save_realtime_env();
1588        for bad in ["true", "yes", "on", "TRUE", "0", "", "2"] {
1589            unsafe {
1590                std::env::set_var(ENV_PER_FLOW_REALTIME, bad);
1591            }
1592            let act = build_activation_from_env("c", "r", None, None, None);
1593            assert!(
1594                act.is_none(),
1595                "value {bad:?} must not enable per-flow realtime"
1596            );
1597        }
1598        restore_realtime_env(prev);
1599    }
1600
1601    #[test]
1602    fn flow_event_payload_serialises_as_cloud_event_schema_compatible_json() {
1603        // E7 unit test: a FlowEvent / NetworkFlowDecision built via
1604        // build_decision serialises to JSON whose top-level fields match
1605        // the `network_flow_decision_data_v1` schema (cellId, runId,
1606        // decisionId, direction, decision, reasonCode, observedAt).
1607        let activation = sample_activation();
1608        let payload = vec![0u8; 0];
1609        let decision = build_decision(
1610            &activation,
1611            "cellos-flow accept",
1612            &payload,
1613            "2026-05-16T00:00:00Z",
1614        );
1615        let json = serde_json::to_value(&decision).expect("serialise");
1616        let obj = json.as_object().expect("object");
1617        for required in [
1618            "cellId",
1619            "runId",
1620            "decisionId",
1621            "direction",
1622            "decision",
1623            "reasonCode",
1624            "observedAt",
1625        ] {
1626            assert!(
1627                obj.contains_key(required),
1628                "missing required field {required}; payload={json}"
1629            );
1630        }
1631        assert_eq!(obj["direction"], "egress");
1632        assert_eq!(obj["decision"], "allow");
1633        assert_eq!(obj["reasonCode"], "nft_log_accept");
1634    }
1635
1636    // ── L5-15 nflog → FlowAccumulator wiring ──────────────────────────────
1637    //
1638    // These tests exercise the *pure* parsing + record path so they run on
1639    // every platform (the listener `setns(2)` spawn is Linux-only and
1640    // covered by `tests/supervisor_per_flow_realtime.rs`). The contract
1641    // under test: when the nflog listener decodes a `cellos-flow accept`
1642    // datagram with a valid 5-tuple, the shared accumulator's
1643    // `unique_flow_count()` increments by one. Duplicate observations of
1644    // the same 5-tuple do NOT inflate the count — that's the homeostasis
1645    // semantics: "distinct connections," not "packet observations."
1646
1647    /// Construct a minimal IPv4+TCP payload with caller-controlled 5-tuple
1648    /// bytes. Mirrors the synthetic packets used by the existing
1649    /// `decode_l4_ipv4_tcp_extracts_dst` test but adds src bytes so a
1650    /// FlowKey can be derived end-to-end.
1651    fn ipv4_tcp_payload(src: [u8; 4], src_port: u16, dst: [u8; 4], dst_port: u16) -> Vec<u8> {
1652        let mut p = vec![0u8; 40];
1653        p[0] = 0x45; // IPv4, ihl=5
1654        p[9] = 6; // TCP
1655        p[12..16].copy_from_slice(&src);
1656        p[16..20].copy_from_slice(&dst);
1657        // TCP header starts at byte 20 (ihl=20). Source port = bytes 20-21,
1658        // dest port = bytes 22-23, both network-byte-order.
1659        p[20..22].copy_from_slice(&src_port.to_be_bytes());
1660        p[22..24].copy_from_slice(&dst_port.to_be_bytes());
1661        p
1662    }
1663
1664    #[test]
1665    fn nflog_populates_flow_accumulator() {
1666        // Parse a mock nflog accept event for a single 5-tuple and verify
1667        // the accumulator reports `unique_flow_count() == 1`.
1668        let payload = ipv4_tcp_payload([10, 0, 0, 5], 40000, [192, 0, 2, 1], 443);
1669
1670        let attribution = decode_l3_l4_attribution(&payload);
1671        let key = flow_key_from_attribution(&attribution).expect("5-tuple fully populated");
1672
1673        let accumulator = std::sync::Arc::new(std::sync::Mutex::new(
1674            crate::ebpf_flow::connection_tracking::FlowAccumulator::new(),
1675        ));
1676        record_opened_flow(&accumulator, key);
1677
1678        let count = accumulator
1679            .lock()
1680            .expect("acquire lock")
1681            .unique_flow_count();
1682        assert_eq!(
1683            count, 1,
1684            "single nflog-derived 5-tuple must yield unique_flow_count() == 1"
1685        );
1686    }
1687
1688    #[test]
1689    fn duplicate_nflog_entries_are_deduplicated() {
1690        // The same 5-tuple observed via two distinct nflog datagrams (e.g.
1691        // SYN + a retransmit, or an accept rule that fires twice for the
1692        // same connection's first and second packet) MUST count as one
1693        // exercised connection. The accumulator's dedup is keyed on
1694        // FlowKey, so the same parsed key inserted twice still yields a
1695        // count of 1.
1696        let payload = ipv4_tcp_payload([10, 0, 0, 5], 40000, [192, 0, 2, 1], 443);
1697
1698        let attribution_a = decode_l3_l4_attribution(&payload);
1699        let key_a = flow_key_from_attribution(&attribution_a).expect("attribution a");
1700
1701        // Re-parse to simulate a second datagram landing on the listener
1702        // thread — separate FlowKey value, same logical 5-tuple.
1703        let attribution_b = decode_l3_l4_attribution(&payload);
1704        let key_b = flow_key_from_attribution(&attribution_b).expect("attribution b");
1705
1706        let accumulator = std::sync::Arc::new(std::sync::Mutex::new(
1707            crate::ebpf_flow::connection_tracking::FlowAccumulator::new(),
1708        ));
1709        record_opened_flow(&accumulator, key_a);
1710        record_opened_flow(&accumulator, key_b);
1711
1712        let count = accumulator
1713            .lock()
1714            .expect("acquire lock")
1715            .unique_flow_count();
1716        assert_eq!(
1717            count, 1,
1718            "two nflog datagrams for the same 5-tuple must dedup to one connection"
1719        );
1720    }
1721}