Skip to main content

cellos_supervisor/
ebpf_flow.rs

1//! E7-4 — eBPF host-side flow monitor (aya loader + ring-buffer drainer).
2//!
3//! This module ships two things:
4//!
5//! 1. **The original FC-38 Phase 2 scaffold** ([`FlowEvent`],
6//!    [`FlowEventListener`], [`NoopFlowListener`], the env-flag reader, the
7//!    reason-code constants, and the [`flow_event_to_network_flow_decision`]
8//!    pure translator). These are preserved verbatim — downstream callers
9//!    in `per_flow.rs`, `supervisor.rs`, and external integration tests
10//!    consume them, and the scaffold's unit tests stay green.
11//!
12//! 2. **The new [`EbpfFlowMonitor`] type** — the host-side aya loader the
13//!    supervisor activates when `CELLOS_PER_FLOW_REALTIME=1` AND
14//!    `CELLOS_PER_FLOW_BACKEND=ebpf`. On Linux with the `ebpf-aya` Cargo
15//!    feature enabled it `setns(2)`'s into the cell's network namespace,
16//!    loads a tc-clsact egress program from an operator-supplied BPF
17//!    object, attaches it to `tap_iface`, spawns a tokio task draining a
18//!    ring buffer into an mpsc channel, and returns to the original
19//!    netns. On macOS, on Linux without the feature, or when any of the
20//!    above fails, it returns an [`EbpfMonitorError`] that the supervisor
21//!    treats as "fall back to nflog" — never a fatal error.
22//!
23//! ## Why a separate type rather than another [`FlowEventListener`] impl
24//!
25//! The scaffold's [`FlowEventListener`] trait surfaces a single
26//! `drain_events` hook that the supervisor calls AFTER `child.wait()`
27//! returns, in the same `spawn_blocking` closure as the nft counter
28//! scrape. Ring-buffer-based eBPF flow monitors need to drain
29//! *continuously* (kernel ring buffers overflow if left unread) on a
30//! dedicated tokio task. The [`EbpfFlowMonitor::drain`] semantics
31//! (non-blocking pull from an mpsc channel) match that lifecycle far
32//! better than the trait's single-shot post-run hook. Both surfaces can
33//! coexist — the supervisor selects which one to use based on
34//! [`crate::per_flow::PerFlowBackend`].
35//!
36//! ## Honest scope under macOS
37//!
38//! - `[EbpfFlowMonitor::start]` always returns
39//!   [`EbpfMonitorError::Unsupported`] on `cfg(not(target_os = "linux"))`.
40//! - On Linux without the `ebpf-aya` Cargo feature, `start` returns
41//!   [`EbpfMonitorError::FeatureDisabled`].
42//! - On Linux with the feature but no BPF object available
43//!   (`CELLOS_EBPF_OBJECT_PATH` unset / file missing), `start` returns
44//!   [`EbpfMonitorError::ObjectMissing`].
45//! - Real kernel attachment + ring-buffer drain are only exercised when
46//!   all three conditions are met.
47//!
48//! The supervisor's call site (`per_flow.rs`) catches any
49//! [`EbpfMonitorError`], emits a single `tracing::warn`, and reverts to
50//! the existing nflog path so per-flow telemetry is never silently lost.
51
52#![allow(dead_code)] // scaffold + feature-gated implementation; consumers vary
53
54use cellos_core::{NetworkFlowDecision, NetworkFlowDecisionOutcome, NetworkFlowDirection};
55
56// ────────────────────────────────────────────────────────────────────────
57// Original FC-38 Phase 2 scaffold (preserved for downstream callers)
58// ────────────────────────────────────────────────────────────────────────
59
60/// Operator-opt-in env flag for FC-38 Phase 2 eBPF/nflog real-time
61/// per-flow events.
62///
63/// Set to `1` to enable. Default OFF: Phase 2 listener has kernel
64/// requirements (CAP_BPF, kernel ≥ 5.8 for full BPF surface, or
65/// CAP_NET_ADMIN for nflog) that Phase 1's post-run nft scrape does not.
66/// Operators must explicitly opt in to acknowledge they have provisioned
67/// the right capability surface.
68pub const PER_FLOW_EBPF_ENV: &str = "CELLOS_FIRECRACKER_PER_FLOW_EBPF";
69
70/// Phase 2 reason code: an eBPF-attached program (likely
71/// `cgroup_skb/egress` or `tc clsact`) observed an XDP-style drop on the
72/// cell's egress.
73pub const REASON_EBPF_XDP_DROP: &str = "ebpf_xdp_drop";
74
75/// Phase 2 reason code: an `NFLOG` netlink consumer observed a packet
76/// that matched a logging-enabled nft rule.
77pub const REASON_NFLOG_MATCH: &str = "nflog_match";
78
79/// Returns `true` when the operator has explicitly opted into Phase 2
80/// eBPF/nflog real-time per-flow events via the
81/// [`PER_FLOW_EBPF_ENV`] env var.
82pub fn is_per_flow_ebpf_enabled() -> bool {
83    std::env::var(PER_FLOW_EBPF_ENV).as_deref() == Ok("1")
84}
85
86/// One real-time observation a Phase 2 listener surfaces.
87///
88/// Backend-neutral: an eBPF `cgroup_skb` program populates the same
89/// fields a `nflog` netlink consumer does, AND the new ring-buffer-based
90/// [`EbpfFlowMonitor`] populates the same fields.
91#[derive(Debug, Clone, PartialEq, Eq)]
92pub struct FlowEvent {
93    /// Outbound vs inbound. Phase 2 starts on egress (matching Phase 1)
94    /// and may add ingress later.
95    pub direction: NetworkFlowDirection,
96    /// Allow vs deny.
97    pub decision: NetworkFlowDecisionOutcome,
98    /// Reason code distinguishing the data source.
99    pub reason_code: &'static str,
100    /// Optional destination IP.
101    pub dst_addr: Option<String>,
102    /// Optional destination port.
103    pub dst_port: Option<u16>,
104    /// Optional protocol (`udp` / `tcp` / `icmp`).
105    pub protocol: Option<String>,
106    /// Optional bytes observed for this event.
107    pub byte_count: Option<u64>,
108}
109
110/// Trait the supervisor calls on its hot path to drain pending events
111/// from a Phase 2 listener.
112pub trait FlowEventListener: Send + Sync {
113    /// Drain events accumulated since the last call.
114    fn drain_events(&mut self) -> Vec<FlowEvent>;
115
116    /// Identifier the supervisor stamps into the audit log.
117    fn backend_name(&self) -> &'static str;
118}
119
120/// Default Phase 2 listener: yields zero events, identifies as `"noop"`.
121#[derive(Debug, Default)]
122pub struct NoopFlowListener;
123
124impl FlowEventListener for NoopFlowListener {
125    fn drain_events(&mut self) -> Vec<FlowEvent> {
126        Vec::new()
127    }
128
129    fn backend_name(&self) -> &'static str {
130        "noop"
131    }
132}
133
134/// Build the default scaffold listener — always [`NoopFlowListener`] in
135/// the scaffolding pass.
136pub fn build_default_listener() -> Box<dyn FlowEventListener> {
137    Box::new(NoopFlowListener)
138}
139
140/// Translate a [`FlowEvent`] into a [`NetworkFlowDecision`] payload
141/// suitable for emitting on the existing
142/// `dev.cellos.events.cell.observability.v1.network_flow_decision` channel.
143#[allow(clippy::too_many_arguments)]
144pub fn flow_event_to_network_flow_decision(
145    event: &FlowEvent,
146    cell_id: &str,
147    run_id: &str,
148    decision_id: &str,
149    policy_digest: Option<&str>,
150    keyset_id: Option<&str>,
151    issuer_kid: Option<&str>,
152    correlation_id: Option<&str>,
153    observed_at: &str,
154) -> NetworkFlowDecision {
155    NetworkFlowDecision {
156        schema_version: "1.0.0".into(),
157        cell_id: cell_id.to_string(),
158        run_id: run_id.to_string(),
159        decision_id: decision_id.to_string(),
160        direction: event.direction,
161        decision: event.decision,
162        reason_code: event.reason_code.to_string(),
163        nft_rule_ref: None,
164        dst_addr: event.dst_addr.clone(),
165        dst_port: event.dst_port,
166        protocol: event.protocol.clone(),
167        packet_count: event.byte_count.map(|_| 1u64),
168        byte_count: event.byte_count,
169        policy_digest: policy_digest.map(|s| s.to_string()),
170        keyset_id: keyset_id.map(|s| s.to_string()),
171        issuer_kid: issuer_kid.map(|s| s.to_string()),
172        correlation_id: correlation_id.map(|s| s.to_string()),
173        observed_at: observed_at.to_string(),
174    }
175}
176
177// ────────────────────────────────────────────────────────────────────────
178// E7-4 — EbpfFlowMonitor (host-side aya loader)
179// ────────────────────────────────────────────────────────────────────────
180
181/// Errors returned by [`EbpfFlowMonitor::start`].
182///
183/// All variants are non-fatal at the supervisor level: the per-flow
184/// activation path (`per_flow::activate_per_flow_listener`) catches them,
185/// logs a `tracing::warn`, and falls back to the nflog listener so the
186/// cell still emits `network_flow_decision` events on the existing path.
187#[derive(Debug, thiserror::Error)]
188pub enum EbpfMonitorError {
189    /// Host platform is not Linux. eBPF requires Linux kernel support.
190    #[error("eBPF flow monitor is Linux-only (host platform: {host})")]
191    Unsupported { host: &'static str },
192
193    /// Linux host but the crate was built without the `ebpf-aya` Cargo
194    /// feature. The supervisor binary ships with the feature off by
195    /// default so macOS / non-eBPF Linux builds don't pay for a kernel
196    /// dependency they cannot use.
197    #[error(
198        "eBPF flow monitor requested but the `ebpf-aya` Cargo feature is \
199         not enabled on this build"
200    )]
201    FeatureDisabled,
202
203    /// The BPF object file referenced by `CELLOS_EBPF_OBJECT_PATH` (or
204    /// the default path) was not present on disk. Real deployments
205    /// pre-build `cellos-supervisor-ebpf` via `cargo bpf` and stage the
206    /// resulting object alongside the supervisor binary; the env var
207    /// lets operators override the location.
208    #[error("eBPF object file missing at {path}")]
209    ObjectMissing { path: String },
210
211    /// `setns(2)` into the cell's network namespace failed. Usually
212    /// `EPERM` (missing `CAP_SYS_ADMIN`) or `ENOENT` (cell already torn
213    /// down).
214    #[error("setns(CLONE_NEWNET) into cell netns failed: {source}")]
215    Setns {
216        #[source]
217        source: std::io::Error,
218    },
219
220    /// Loading the BPF object via aya failed. Surfaces verifier
221    /// rejections, license issues, missing program sections, etc.
222    #[error("aya BPF object load failed: {message}")]
223    LoadFailed { message: String },
224
225    /// Attaching the loaded program to the tap interface via
226    /// `tc clsact` failed. Usually `ENODEV` (tap iface missing) or
227    /// `EEXIST` (clsact qdisc collision with an operator-provisioned
228    /// qdisc on the same interface).
229    #[error("tc clsact attach to {iface} failed: {message}")]
230    AttachFailed { iface: String, message: String },
231
232    /// Opening the ring-buffer map for draining failed.
233    #[error("ring-buffer map open failed: {message}")]
234    RingBufferOpenFailed { message: String },
235}
236
237/// Host-side eBPF flow monitor.
238///
239/// Owns the loaded aya BPF object (so it stays alive for the duration of
240/// the cell run — dropping `aya::Ebpf` detaches all programs) plus the
241/// receiver end of an mpsc channel into which the ring-buffer drainer
242/// task pushes [`FlowEvent`]s.
243///
244/// Lifecycle:
245///
246/// 1. [`EbpfFlowMonitor::start`] enters the cell's netns, loads + attaches
247///    the BPF program, spawns the drainer task, and returns to the original
248///    netns.
249/// 2. The supervisor periodically calls [`EbpfFlowMonitor::drain`] (or
250///    consumes events directly via the channel) on its hot path.
251/// 3. [`EbpfFlowMonitor::stop`] aborts the drainer task and drops the
252///    aya object, which detaches the BPF program and releases the
253///    ring-buffer map.
254///
255/// The struct is constructed only via `start`; `Default` is intentionally
256/// not provided to prevent callers from skipping the netns + load
257/// sequence.
258pub struct EbpfFlowMonitor {
259    /// Holds the loaded eBPF program + ring buffer FD on Linux with the
260    /// aya feature enabled. `None` on all other configurations — the
261    /// monitor in that case is a thin wrapper around an empty channel
262    /// (the constructor returns `Err` so a `None` here at runtime is a
263    /// programmer error, not a deployment scenario).
264    #[cfg(all(target_os = "linux", feature = "ebpf-aya"))]
265    _bpf: Option<aya::Ebpf>,
266
267    /// Receiver end of the mpsc channel the drainer task pushes events
268    /// onto. Construction always produces a paired sender (held by the
269    /// drainer task) + receiver (held here); dropping `_bpf` cancels the
270    /// drainer task which closes the sender side, signalling EOF.
271    rx: tokio::sync::mpsc::UnboundedReceiver<FlowEvent>,
272
273    /// Optional handle to the drainer task so [`Self::stop`] can abort
274    /// it deterministically rather than waiting for the channel close.
275    drainer: Option<tokio::task::JoinHandle<()>>,
276
277    /// Cached interface name for tracing breadcrumbs.
278    iface: String,
279}
280
281impl EbpfFlowMonitor {
282    /// Load and attach the BPF program inside the cell's netns. Falls
283    /// back to nflog (via the caller in `per_flow.rs`) if any step
284    /// fails by returning an [`EbpfMonitorError`].
285    ///
286    /// `netns_fd` MUST be an open file descriptor for the cell's
287    /// `/proc/<child_pid>/ns/net`. The caller retains ownership.
288    /// `tap_iface` is the supervisor-provisioned tap interface name
289    /// (e.g. `"vethC0"`).
290    #[allow(unused_variables)] // platform-dependent path
291    pub async fn start(
292        netns_fd: std::os::unix::io::RawFd,
293        tap_iface: &str,
294    ) -> Result<Self, EbpfMonitorError> {
295        #[cfg(all(target_os = "linux", feature = "ebpf-aya"))]
296        {
297            Self::start_linux_aya(netns_fd, tap_iface).await
298        }
299        #[cfg(all(target_os = "linux", not(feature = "ebpf-aya")))]
300        {
301            Err(EbpfMonitorError::FeatureDisabled)
302        }
303        #[cfg(not(target_os = "linux"))]
304        {
305            // tap_iface is intentionally unused on non-Linux — surface
306            // it through the error message so the operator's log line
307            // identifies which cell's monitor declined to start.
308            tracing::debug!(
309                target: "cellos.supervisor.ebpf_flow",
310                iface = %tap_iface,
311                "EbpfFlowMonitor::start on non-Linux host — falling back to nflog"
312            );
313            Err(EbpfMonitorError::Unsupported {
314                host: std::env::consts::OS,
315            })
316        }
317    }
318
319    /// Drain pending flow events into a `Vec` without blocking.
320    ///
321    /// Used by callers that want a synchronous batch pull instead of
322    /// `.recv().await` per event. Returns an empty `Vec` when no events
323    /// are buffered.
324    pub fn drain(&mut self) -> Vec<FlowEvent> {
325        let mut out = Vec::new();
326        while let Ok(ev) = self.rx.try_recv() {
327            out.push(ev);
328        }
329        out
330    }
331
332    /// Async pull of one event. Returns `None` when the drainer task
333    /// has exited (i.e. the channel sender side has been dropped).
334    pub async fn recv(&mut self) -> Option<FlowEvent> {
335        self.rx.recv().await
336    }
337
338    /// Stop the monitor and clean up. Aborts the drainer task; dropping
339    /// the held [`aya::Ebpf`] object detaches the BPF program.
340    pub async fn stop(mut self) {
341        if let Some(drainer) = self.drainer.take() {
342            drainer.abort();
343            // Best-effort join — we don't care about the result, only
344            // that the task has stopped scheduling.
345            let _ = drainer.await;
346        }
347        // `_bpf` (when present) drops here, releasing kernel resources.
348    }
349
350    // ────────────────────────────────────────────────────────────────
351    // Linux + aya implementation
352    // ────────────────────────────────────────────────────────────────
353
354    #[cfg(all(target_os = "linux", feature = "ebpf-aya"))]
355    async fn start_linux_aya(
356        netns_fd: std::os::unix::io::RawFd,
357        tap_iface: &str,
358    ) -> Result<Self, EbpfMonitorError> {
359        // The BPF object path is operator-controllable so deployments
360        // can stage the object wherever their packaging puts it. The
361        // default path matches the cellos-supervisor-ebpf crate's
362        // expected cargo-bpf output location.
363        let object_path = std::env::var("CELLOS_EBPF_OBJECT_PATH")
364            .ok()
365            .unwrap_or_else(|| "/usr/local/lib/cellos/cellos-supervisor-ebpf.o".to_string());
366        let object_bytes =
367            std::fs::read(&object_path).map_err(|_| EbpfMonitorError::ObjectMissing {
368                path: object_path.clone(),
369            })?;
370
371        // Save the supervisor's original netns so we can return to it
372        // after attaching the program. `setns` is per-thread on Linux,
373        // so we MUST restore before the tokio worker thread we're on
374        // services any other task.
375        let self_netns = std::fs::File::open("/proc/self/ns/net")
376            .map_err(|source| EbpfMonitorError::Setns { source })?;
377
378        // Enter the cell's netns.
379        use std::os::unix::io::AsRawFd;
380        let setns_rc = unsafe { libc::setns(netns_fd, libc::CLONE_NEWNET) };
381        if setns_rc != 0 {
382            return Err(EbpfMonitorError::Setns {
383                source: std::io::Error::last_os_error(),
384            });
385        }
386
387        // Load the BPF object inside the cell's netns. aya 0.13's
388        // `Ebpf::load` returns a `Result<Ebpf, EbpfError>` — surface
389        // its display string into our error type so the caller's
390        // tracing line carries the verifier message verbatim.
391        let load_result = aya::Ebpf::load(&object_bytes);
392
393        // Restore the supervisor's netns BEFORE we propagate any
394        // load error — leaving the worker thread in the cell's netns
395        // would break unrelated tokio tasks scheduled later.
396        let restore_rc = unsafe { libc::setns(self_netns.as_raw_fd(), libc::CLONE_NEWNET) };
397        if restore_rc != 0 {
398            // Restoration failure is a real problem (this thread is
399            // now stuck in the cell's netns). Surface as Setns rather
400            // than discarding — the supervisor will refuse to use
401            // this monitor and the caller falls back to nflog.
402            return Err(EbpfMonitorError::Setns {
403                source: std::io::Error::last_os_error(),
404            });
405        }
406
407        let mut bpf = load_result.map_err(|e| EbpfMonitorError::LoadFailed {
408            message: format!("{e}"),
409        })?;
410
411        // tc clsact attach. The program name is a contract with the
412        // ebpf crate's source; we don't hardcode it as a constant
413        // here because the ebpf crate ships its own constants module
414        // — when the ebpf crate lands the supervisor will import
415        // `cellos_supervisor_ebpf::PROGRAM_NAME` instead. For now we
416        // probe a small set of conventional names so a partial BPF
417        // crate doesn't force a supervisor recompile.
418        const CANDIDATE_PROGRAM_NAMES: &[&str] =
419            &["cellos_flow_egress", "tc_egress", "flow_classifier"];
420        let mut attached = false;
421        let mut last_attach_err: Option<String> = None;
422        for prog_name in CANDIDATE_PROGRAM_NAMES {
423            // aya 0.13: `program_mut` returns Option; tc programs need
424            // `SchedClassifier::try_from` + `.load()` + `.attach()`.
425            // We avoid hard-coding the call shape against an unloaded
426            // feature dep by recording the attempt; the actual probe
427            // and attach happens via the helper below so this file
428            // still compiles when aya's surface evolves.
429            match attach_tc_egress(&mut bpf, prog_name, tap_iface) {
430                Ok(()) => {
431                    attached = true;
432                    break;
433                }
434                Err(e) => {
435                    last_attach_err = Some(e);
436                }
437            }
438        }
439        if !attached {
440            return Err(EbpfMonitorError::AttachFailed {
441                iface: tap_iface.to_string(),
442                message: last_attach_err
443                    .unwrap_or_else(|| "no candidate program found".to_string()),
444            });
445        }
446
447        // Spawn the ring-buffer drainer. We pull the ring-buffer map
448        // by name (`"flow_events"`); when the ebpf crate lands it will
449        // export this constant.
450        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<FlowEvent>();
451        let drainer = spawn_ring_buffer_drainer(&mut bpf, tx)
452            .map_err(|e| EbpfMonitorError::RingBufferOpenFailed { message: e })?;
453
454        Ok(Self {
455            _bpf: Some(bpf),
456            rx,
457            drainer: Some(drainer),
458            iface: tap_iface.to_string(),
459        })
460    }
461}
462
463// ────────────────────────────────────────────────────────────────────
464// Linux+aya helpers — intentionally narrow so this file stays
465// compilable on macOS regardless of aya's API drift.
466// ────────────────────────────────────────────────────────────────
467
468/// Attach the loaded eBPF `SchedClassifier` program to `iface` on the tc
469/// clsact egress hook.
470///
471/// Steps (mirrors aya 0.13's documented attach flow):
472///   1. Ensure a `clsact` qdisc exists on the interface — required before
473///      a `SchedClassifier` can attach. `qdisc_add_clsact` is idempotent
474///      from our point of view: if the qdisc already exists we treat
475///      `EEXIST` as success because some operator setups pre-provision
476///      `clsact` via `tc` and we should not collide.
477///   2. Pull the program by name from the loaded Ebpf object and coerce
478///      it into a `SchedClassifier` via `TryInto`.
479///   3. `program.load()` — finalises verifier-side validation.
480///   4. `program.attach(iface, TcAttachType::Egress)` — wires it to the
481///      egress hook. The returned `LinkId` is dropped here; the link's
482///      kernel-side lifetime is tied to the parent `aya::Ebpf` object,
483///      which the caller (`EbpfFlowMonitor`) holds for the duration of
484///      the cell run. Dropping `aya::Ebpf` detaches the link.
485///
486/// All error paths surface as `Err(String)` so the caller's candidate-name
487/// loop can record the failure and try the next program name without
488/// having to thread an `aya::ProgramError` through the signature.
489#[cfg(all(target_os = "linux", feature = "ebpf-aya"))]
490fn attach_tc_egress(bpf: &mut aya::Ebpf, program_name: &str, iface: &str) -> Result<(), String> {
491    use aya::programs::{tc, SchedClassifier, TcAttachType};
492
493    // Step 1 — clsact qdisc. EEXIST is benign (operator may have
494    // pre-provisioned it, or a prior cell on the same shared iface left
495    // it behind). Any other error we surface so the caller can fall back
496    // to nflog.
497    if let Err(e) = tc::qdisc_add_clsact(iface) {
498        let msg = format!("{e}");
499        // The error type wraps an io::Error; the only string-safe way to
500        // detect "already exists" portably across aya versions is a
501        // substring match on the rendered message. False negatives just
502        // mean we surface a real EEXIST as an attach failure and the
503        // supervisor falls back to nflog — non-fatal.
504        if !msg.contains("File exists") && !msg.contains("EEXIST") {
505            return Err(format!("qdisc_add_clsact({iface}) failed: {msg}"));
506        }
507    }
508
509    // Step 2 — locate the program. Missing-program is the most common
510    // outcome when the BPF object was built without this classifier name
511    // exported; surface a stable string so the caller's loop can move on.
512    let program_handle = bpf
513        .program_mut(program_name)
514        .ok_or_else(|| format!("program {program_name:?} not found in BPF object"))?;
515    let program: &mut SchedClassifier =
516        program_handle
517            .try_into()
518            .map_err(|e: aya::programs::ProgramError| {
519                format!("program {program_name:?} is not a SchedClassifier: {e}")
520            })?;
521
522    // Step 3 — verifier-side load. Idempotent inside aya: calling load()
523    // twice on the same program returns `AlreadyLoaded`, which we treat
524    // as benign so callers retrying after a transient attach error
525    // don't have to track per-program state.
526    if let Err(e) = program.load() {
527        let msg = format!("{e}");
528        if !msg.contains("AlreadyLoaded") && !msg.contains("already loaded") {
529            return Err(format!("SchedClassifier::load failed: {msg}"));
530        }
531    }
532
533    // Step 4 — attach to egress. The LinkId is owned by `program` (i.e.
534    // by the parent `aya::Ebpf` object) and is dropped when the monitor
535    // is stopped.
536    program
537        .attach(iface, TcAttachType::Egress)
538        .map_err(|e| format!("SchedClassifier::attach({iface}, egress) failed: {e}"))?;
539
540    Ok(())
541}
542
543/// Wire layout of one BPF-side `FlowEvent` (must mirror
544/// `crates/cellos-supervisor-ebpf/src/main.rs`).
545///
546/// Layout summary (40 bytes total, `#[repr(C)]` natural alignment):
547///   - bytes 0..4   src_ip   (u32, host order — BPF program converts from
548///                  big-endian before writing)
549///   - bytes 4..8   dst_ip   (u32, host order)
550///   - bytes 8..10  src_port (u16, host order)
551///   - bytes 10..12 dst_port (u16, host order)
552///   - byte  12     proto    (u8, IANA proto byte)
553///   - bytes 13..16 _pad     (3 bytes, ignored)
554///   - bytes 16..24 pkt_count    (u64)
555///   - bytes 24..32 first_seen_ns (u64)
556///   - byte  32     verdict  (u8)
557///   - bytes 33..40 _pad     (7 bytes, ignored)
558///
559/// Total: 40 bytes. This constant pins the parser against accidental
560/// drift between the BPF crate's `FlowEvent` and the host-side decoder.
561#[cfg(all(target_os = "linux", feature = "ebpf-aya"))]
562const BPF_FLOW_EVENT_SIZE: usize = 40;
563
564/// Spawn the tokio task that drains the `FLOW_EVENTS` ring buffer.
565///
566/// We use `take_map` rather than `map_mut` so the resulting `RingBuf`
567/// owns its `MapData` and can move into a `spawn_blocking` closure
568/// without holding a `&mut aya::Ebpf` across the await point. The
569/// supervisor's `aya::Ebpf` retains every OTHER map (notably `FLOWS`)
570/// and the loaded program; only the ring buffer migrates ownership.
571///
572/// The drainer loop:
573///   1. Calls `ring_buf.next()`. On `Some(item)`, parses the bytes via
574///      `parse_ring_buf_event` and forwards the resulting `FlowEvent`
575///      down `tx`. If the receiver has been dropped (caller stopped the
576///      monitor), the send fails and the task exits cleanly.
577///   2. On `None`, sleeps 100 µs and retries. This is intentionally
578///      coarse: ring-buffer polling is a hot loop and a tight spin would
579///      waste a worker thread, while a longer sleep would inflate
580///      first-flow latency. 100 µs gives ~10k poll iterations/sec which
581///      comfortably tracks any realistic per-cell flow rate.
582///
583/// The returned `JoinHandle` is held by `EbpfFlowMonitor::drainer` so
584/// `stop()` can `abort()` the task deterministically.
585#[cfg(all(target_os = "linux", feature = "ebpf-aya"))]
586fn spawn_ring_buffer_drainer(
587    bpf: &mut aya::Ebpf,
588    tx: tokio::sync::mpsc::UnboundedSender<FlowEvent>,
589) -> Result<tokio::task::JoinHandle<()>, String> {
590    use aya::maps::RingBuf;
591
592    // The BPF crate names the map `FLOW_EVENTS`. aya 0.13 exposes both
593    // the original lowercase name and the macro-stamped uppercase name
594    // through the symbol table; we try the canonical name first, then
595    // fall back to the legacy lowercase form so a BPF crate rebuild
596    // with a different naming convention doesn't break the supervisor.
597    let map = bpf
598        .take_map("FLOW_EVENTS")
599        .or_else(|| bpf.take_map("flow_events"))
600        .ok_or_else(|| "FLOW_EVENTS map not found in BPF object".to_string())?;
601    let mut ring_buf: RingBuf<aya::maps::MapData> =
602        RingBuf::try_from(map).map_err(|e| format!("FLOW_EVENTS map is not a ring buffer: {e}"))?;
603
604    let handle = tokio::task::spawn_blocking(move || {
605        // Tight-but-yielding poll loop. We deliberately do NOT register
606        // the ring-buffer FD with epoll/AsyncFd here: the supervisor's
607        // tokio runtime is a multi-threaded reactor and the cost of one
608        // dedicated blocking thread per cell is far smaller than the
609        // contract surface of an AsyncFd-based wakeup path (which would
610        // require holding `&mut RingBuf` across `.await`). 100 µs sleeps
611        // bound the busy-poll overhead while keeping new-flow latency
612        // well under 1 ms.
613        loop {
614            match ring_buf.next() {
615                Some(item) => {
616                    if let Some(event) = parse_ring_buf_event(&item) {
617                        if tx.send(event).is_err() {
618                            // Receiver dropped — monitor stopped. Exit.
619                            return;
620                        }
621                    }
622                    // Item drops here, advancing the consumer position.
623                }
624                None => {
625                    std::thread::sleep(std::time::Duration::from_micros(100));
626                }
627            }
628        }
629    });
630
631    Ok(handle)
632}
633
634/// Decode the BPF-side `FlowEvent` byte layout into the host-side
635/// [`FlowEvent`].
636///
637/// Returns `None` if the slice is shorter than the expected 40 bytes
638/// (a truncated ring-buffer write should never happen in practice, but
639/// we treat it as a soft failure rather than a panic so a single
640/// malformed event cannot crash the supervisor).
641///
642/// The BPF program writes `src_ip` / `dst_ip` / ports in HOST byte order
643/// (it applies `u32::from_be` / `u16::from_be` before writing into the
644/// map key — see `try_track` in the BPF crate), so we read them with
645/// native-endian decoding here.
646///
647/// ## Relationship to [`decode_ring_buf_event`]
648///
649/// This function is the LIVE-PATH adapter the Linux+aya drainer calls
650/// per ring-buffer slot. It returns `Option<FlowEvent>` (silent drop on
651/// short reads) because the drainer cannot afford a typed error per
652/// slot on the hot path.
653///
654/// The platform-independent decoder [`decode_ring_buf_event`] returns a
655/// typed [`RingBufFlowEvent`] + [`RingBufParseError`] and is used by
656/// the unit-test suite (which runs on every host, including macOS),
657/// plus any future caller that needs the full kernel-side layout
658/// (pkt_count, first_seen_ns, verdict — fields this adapter discards).
659/// Both decoders agree on the wire layout; the layout-pin test
660/// `ring_buf_event_layout_constant_matches_producer` keeps them in
661/// lockstep against the BPF crate's `#[repr(C)]` producer struct.
662#[cfg(all(target_os = "linux", feature = "ebpf-aya"))]
663fn parse_ring_buf_event(bytes: &[u8]) -> Option<FlowEvent> {
664    use std::net::Ipv4Addr;
665
666    if bytes.len() < BPF_FLOW_EVENT_SIZE {
667        return None;
668    }
669
670    // FlowKey — bytes 0..16.
671    let src_ip = u32::from_ne_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]);
672    let dst_ip = u32::from_ne_bytes([bytes[4], bytes[5], bytes[6], bytes[7]]);
673    let _src_port = u16::from_ne_bytes([bytes[8], bytes[9]]);
674    let dst_port = u16::from_ne_bytes([bytes[10], bytes[11]]);
675    let proto = bytes[12];
676    // bytes 13..16: padding, ignored.
677
678    // FlowEntry — bytes 16..40. Decoded but currently unused:
679    //   - pkt_count (bytes 16..24): observability metadata; the host-side
680    //     `FlowEvent` shape carries `byte_count`, not `packet_count`, and
681    //     the BPF program does not track bytes today. Leaving both `None`
682    //     beats fabricating a value.
683    //   - first_seen_ns (bytes 24..32): the FlowEvent wire shape has no
684    //     timestamp field today.
685    //   - verdict (byte 32): always 0 from the BPF program (userspace
686    //     will write it for analytics later).
687    let _pkt_count = u64::from_ne_bytes([
688        bytes[16], bytes[17], bytes[18], bytes[19], bytes[20], bytes[21], bytes[22], bytes[23],
689    ]);
690
691    // Map IANA proto byte to the protocol string the
692    // `network_flow_decision` schema expects. Anything outside the BPF
693    // program's filter (TCP/UDP only) becomes `None` defensively.
694    let protocol = match proto {
695        6 => Some("tcp".to_string()),
696        17 => Some("udp".to_string()),
697        1 => Some("icmp".to_string()),
698        58 => Some("icmpv6".to_string()),
699        _ => None,
700    };
701
702    let _src_addr = Ipv4Addr::from(src_ip); // not exposed on FlowEvent; kept
703                                            // for future src-side surfacing
704    let dst_addr = Ipv4Addr::from(dst_ip);
705
706    Some(FlowEvent {
707        direction: NetworkFlowDirection::Egress,
708        // The BPF classifier is OBSERVABILITY ONLY (always returns
709        // TC_ACT_OK) — every event it emits represents a packet the
710        // kernel actually let through. Map that to `Allow`. Real
711        // nftables drops surface via the separate nflog path with
712        // `REASON_NFLOG_MATCH`.
713        decision: NetworkFlowDecisionOutcome::Allow,
714        reason_code: REASON_EBPF_XDP_DROP,
715        dst_addr: Some(dst_addr.to_string()),
716        dst_port: Some(dst_port),
717        protocol,
718        // byte_count: the BPF program counts packets, not bytes, so we
719        // surface neither today. A future BPF-side change can add a
720        // bytes_total field to FlowEntry and we'd thread it through
721        // here.
722        byte_count: None,
723    })
724}
725
726// ────────────────────────────────────────────────────────────────
727// Platform-independent ring-buffer event decoder
728// ────────────────────────────────────────────────────────────────
729//
730// The Linux+aya path above ([`parse_ring_buf_event`]) is the
731// production adapter the drainer calls per slot. It is cfg-gated so it
732// only exists on Linux builds with the `ebpf-aya` feature on, and it
733// returns `Option<FlowEvent>` shaped for the live channel.
734//
735// This second decoder is the unit-testable, cross-platform companion.
736// It decodes the SAME 40-byte `#[repr(C)]` record from the BPF
737// producer crate (`cellos-supervisor-ebpf/src/main.rs`) but:
738//
739//   - Compiles on every target (no cfg gate, no aya dependency).
740//   - Returns a typed `RingBufFlowEvent` exposing every field the
741//     kernel writes (including `pkt_count`, `first_seen_ns`, `verdict`,
742//     `src_port` — fields the live adapter discards today).
743//   - Surfaces malformed slots as a typed `RingBufParseError` rather
744//     than a silent `None`, so future call sites that care about
745//     layout-drift detection can act on it.
746//   - Has a layout-pin test (`ring_buf_event_layout_constant_matches_producer`)
747//     that fails loudly if the producer's 40-byte assumption shifts —
748//     catching the drift on every CI host, not just Linux runners with
749//     the aya feature enabled.
750//
751// The two decoders share the wire layout; only the call shape differs.
752// A future refactor can collapse them once the live adapter wants
753// access to the full FlowEntry surface (timestamps, verdict, pkt
754// counts).
755//
756// Byte layout (native endian — the kernel program normalises wire
757// bytes via `u32::from_be` / `u16::from_be` before insert, so the
758// ring-buffer values are already host-order):
759//
760//   FlowKey   (16 bytes)
761//     0..4   : src_ip   (u32, host order)
762//     4..8   : dst_ip   (u32, host order)
763//     8..10  : src_port (u16, host order)
764//     10..12 : dst_port (u16, host order)
765//     12     : proto    (u8; IANA: 6=TCP, 17=UDP)
766//     13..16 : _pad     ([u8; 3], zeroed by the producer)
767//   FlowEntry (24 bytes)
768//     16..24 : pkt_count     (u64, host order)
769//     24..32 : first_seen_ns (u64, host order)
770//     32     : verdict       (u8; 0=unknown, 1=accept, 2=drop)
771//     33..40 : _pad          ([u8; 7], zeroed)
772
773/// Size in bytes of the on-the-ring-buffer `FlowEvent` record. Tied to
774/// the `#[repr(C)]` layout in `cellos-supervisor-ebpf/src/main.rs`. If
775/// the producer ever changes shape, this constant + the decoder must
776/// move together — the unit tests below pin the exact byte offsets.
777pub const RING_BUF_EVENT_LEN: usize = 40;
778
779/// IANA protocol numbers we know how to label. Anything else is
780/// surfaced as `None` so the downstream event keeps its `protocol`
781/// field empty (rather than lying about a TCP/UDP that isn't either).
782const IPPROTO_TCP: u8 = 6;
783const IPPROTO_UDP: u8 = 17;
784const IPPROTO_ICMP: u8 = 1;
785const IPPROTO_ICMPV6: u8 = 58;
786
787/// Decoded ring-buffer record. Mirrors the kernel-side `FlowEvent`
788/// 1:1 but as plain Rust types: callers that don't care about the eBPF
789/// shape get a portable, copy-only struct.
790#[derive(Debug, Clone, Copy, PartialEq, Eq)]
791pub struct RingBufFlowEvent {
792    pub src_ip: u32,
793    pub dst_ip: u32,
794    pub src_port: u16,
795    pub dst_port: u16,
796    pub proto: u8,
797    pub pkt_count: u64,
798    pub first_seen_ns: u64,
799    pub verdict: u8,
800}
801
802/// Decode failure: the slice didn't have the canonical 40-byte shape.
803/// The caller (drainer task) logs once and drops the malformed record —
804/// surfacing a kernel-protocol mismatch is more useful than letting a
805/// torn read silently inflate the flow count.
806#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
807pub enum RingBufParseError {
808    #[error(
809        "ring-buffer record is {got} bytes, expected exactly {RING_BUF_EVENT_LEN}; \
810         kernel producer ABI drift or torn read"
811    )]
812    WrongLength { got: usize },
813}
814
815/// Decode a single 40-byte ring-buffer slot into a [`RingBufFlowEvent`].
816///
817/// The slot is treated as the contiguous `#[repr(C)]` byte image of
818/// the eBPF `FlowEvent { key: FlowKey, entry: FlowEntry }`. All
819/// multi-byte ints are host-endian (the eBPF program byte-swaps wire
820/// values before inserting).
821///
822/// This is a pure function with no IO and no aya dependency, so it
823/// builds and tests identically on Linux and macOS. The live-path
824/// drainer uses the cfg-gated [`parse_ring_buf_event`] adapter (above)
825/// for its hot path; this decoder is for unit tests and any future
826/// caller that wants the full kernel field surface.
827pub fn decode_ring_buf_event(bytes: &[u8]) -> Result<RingBufFlowEvent, RingBufParseError> {
828    if bytes.len() != RING_BUF_EVENT_LEN {
829        return Err(RingBufParseError::WrongLength { got: bytes.len() });
830    }
831    // Safe: slice length is exactly RING_BUF_EVENT_LEN, every fixed
832    // offset below is bounded by it.
833    let src_ip = u32::from_ne_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]);
834    let dst_ip = u32::from_ne_bytes([bytes[4], bytes[5], bytes[6], bytes[7]]);
835    let src_port = u16::from_ne_bytes([bytes[8], bytes[9]]);
836    let dst_port = u16::from_ne_bytes([bytes[10], bytes[11]]);
837    let proto = bytes[12];
838    // bytes[13..16] are explicit padding; we ignore by contract.
839    let pkt_count = u64::from_ne_bytes([
840        bytes[16], bytes[17], bytes[18], bytes[19], bytes[20], bytes[21], bytes[22], bytes[23],
841    ]);
842    let first_seen_ns = u64::from_ne_bytes([
843        bytes[24], bytes[25], bytes[26], bytes[27], bytes[28], bytes[29], bytes[30], bytes[31],
844    ]);
845    let verdict = bytes[32];
846    // bytes[33..40] are explicit padding.
847    Ok(RingBufFlowEvent {
848        src_ip,
849        dst_ip,
850        src_port,
851        dst_port,
852        proto,
853        pkt_count,
854        first_seen_ns,
855        verdict,
856    })
857}
858
859/// Translate a decoded [`RingBufFlowEvent`] into the supervisor-facing
860/// [`FlowEvent`] shape consumed by the [`EbpfFlowMonitor`] channel.
861///
862/// IPv4 is rendered as dotted-quad. The kernel-side program filters out
863/// non-IPv4 traffic, so we can format `src_ip`/`dst_ip` as `Ipv4Addr`
864/// without losing information.
865pub fn ring_buf_event_to_flow_event(decoded: &RingBufFlowEvent) -> FlowEvent {
866    use std::net::Ipv4Addr;
867
868    let protocol = match decoded.proto {
869        IPPROTO_TCP => Some("tcp".to_string()),
870        IPPROTO_UDP => Some("udp".to_string()),
871        IPPROTO_ICMP => Some("icmp".to_string()),
872        IPPROTO_ICMPV6 => Some("icmpv6".to_string()),
873        _ => None,
874    };
875    // The kernel program is observability-only; verdict==2 ("drop") is
876    // operator-written analytics, not enforcement. We map the three
877    // documented values to `Allow` / `Deny`; the default (0 = unknown)
878    // mirrors what the proxy + nft sees on the wire (allowed by
879    // default, denials are nft-driven elsewhere).
880    let decision = match decoded.verdict {
881        2 => NetworkFlowDecisionOutcome::Deny,
882        _ => NetworkFlowDecisionOutcome::Allow,
883    };
884    FlowEvent {
885        direction: NetworkFlowDirection::Egress,
886        decision,
887        reason_code: REASON_EBPF_XDP_DROP,
888        dst_addr: Some(Ipv4Addr::from(decoded.dst_ip).to_string()),
889        dst_port: Some(decoded.dst_port),
890        protocol,
891        byte_count: None, // ebpf program tracks pkt_count, not bytes
892    }
893}
894
895// ────────────────────────────────────────────────────────────────
896// Test-only shim: a constructible EbpfFlowMonitor for unit tests
897// that need to exercise the `drain` / `stop` surface without
898// depending on a real BPF program. NOT for production use.
899// ────────────────────────────────────────────────────────────
900
901#[cfg(test)]
902impl EbpfFlowMonitor {
903    /// Construct a monitor from a pre-built receiver. Test-only.
904    pub(crate) fn from_parts_for_test(
905        rx: tokio::sync::mpsc::UnboundedReceiver<FlowEvent>,
906        iface: &str,
907    ) -> Self {
908        Self {
909            #[cfg(all(target_os = "linux", feature = "ebpf-aya"))]
910            _bpf: None,
911            rx,
912            drainer: None,
913            iface: iface.to_string(),
914        }
915    }
916}
917
918#[cfg(test)]
919mod tests {
920    use super::*;
921
922    fn sample_event() -> FlowEvent {
923        FlowEvent {
924            direction: NetworkFlowDirection::Egress,
925            decision: NetworkFlowDecisionOutcome::Deny,
926            reason_code: REASON_EBPF_XDP_DROP,
927            dst_addr: Some("10.0.0.1".into()),
928            dst_port: Some(443),
929            protocol: Some("tcp".into()),
930            byte_count: Some(74),
931        }
932    }
933
934    // ── original scaffold tests (preserved) ───────────────────────────
935
936    #[test]
937    fn env_flag_default_off() {
938        let prev = std::env::var(PER_FLOW_EBPF_ENV).ok();
939        std::env::remove_var(PER_FLOW_EBPF_ENV);
940        assert!(!is_per_flow_ebpf_enabled());
941        if let Some(v) = prev {
942            std::env::set_var(PER_FLOW_EBPF_ENV, v);
943        }
944    }
945
946    #[test]
947    fn env_flag_recognises_one() {
948        let prev = std::env::var(PER_FLOW_EBPF_ENV).ok();
949        std::env::set_var(PER_FLOW_EBPF_ENV, "1");
950        assert!(is_per_flow_ebpf_enabled());
951        match prev {
952            Some(v) => std::env::set_var(PER_FLOW_EBPF_ENV, v),
953            None => std::env::remove_var(PER_FLOW_EBPF_ENV),
954        }
955    }
956
957    #[test]
958    fn env_flag_rejects_truthy_lookalikes() {
959        let prev = std::env::var(PER_FLOW_EBPF_ENV).ok();
960        for bad in ["true", "yes", "on", "TRUE", "0", "", "2"] {
961            std::env::set_var(PER_FLOW_EBPF_ENV, bad);
962            assert!(
963                !is_per_flow_ebpf_enabled(),
964                "value {bad:?} must not enable Phase 2"
965            );
966        }
967        match prev {
968            Some(v) => std::env::set_var(PER_FLOW_EBPF_ENV, v),
969            None => std::env::remove_var(PER_FLOW_EBPF_ENV),
970        }
971    }
972
973    #[test]
974    fn noop_listener_yields_no_events() {
975        let mut listener = NoopFlowListener;
976        let events = listener.drain_events();
977        assert!(events.is_empty());
978    }
979
980    #[test]
981    fn noop_listener_drain_is_idempotent() {
982        let mut listener = NoopFlowListener;
983        for _ in 0..5 {
984            assert!(listener.drain_events().is_empty());
985        }
986    }
987
988    #[test]
989    fn noop_listener_backend_name_is_noop() {
990        let listener = NoopFlowListener;
991        assert_eq!(listener.backend_name(), "noop");
992    }
993
994    #[test]
995    fn build_default_listener_is_noop() {
996        let mut listener = build_default_listener();
997        assert_eq!(listener.backend_name(), "noop");
998        assert!(listener.drain_events().is_empty());
999    }
1000
1001    #[test]
1002    fn translator_preserves_direction_and_outcome() {
1003        let event = sample_event();
1004        let payload = flow_event_to_network_flow_decision(
1005            &event,
1006            "cell-test",
1007            "run-test",
1008            "dec-test",
1009            None,
1010            None,
1011            None,
1012            None,
1013            "2026-05-06T00:00:00Z",
1014        );
1015        assert_eq!(payload.direction, NetworkFlowDirection::Egress);
1016        assert_eq!(payload.decision, NetworkFlowDecisionOutcome::Deny);
1017        assert_eq!(payload.reason_code, REASON_EBPF_XDP_DROP);
1018    }
1019
1020    #[test]
1021    fn translator_stamps_caller_metadata() {
1022        let event = sample_event();
1023        let payload = flow_event_to_network_flow_decision(
1024            &event,
1025            "cell-1",
1026            "run-1",
1027            "dec-1",
1028            Some("sha256:abc"),
1029            Some("kid-1"),
1030            Some("issuer-1"),
1031            Some("corr-1"),
1032            "2026-05-06T12:00:00Z",
1033        );
1034        assert_eq!(payload.cell_id, "cell-1");
1035        assert_eq!(payload.run_id, "run-1");
1036        assert_eq!(payload.decision_id, "dec-1");
1037        assert_eq!(payload.policy_digest.as_deref(), Some("sha256:abc"));
1038        assert_eq!(payload.keyset_id.as_deref(), Some("kid-1"));
1039        assert_eq!(payload.issuer_kid.as_deref(), Some("issuer-1"));
1040        assert_eq!(payload.correlation_id.as_deref(), Some("corr-1"));
1041        assert_eq!(payload.observed_at, "2026-05-06T12:00:00Z");
1042    }
1043
1044    #[test]
1045    fn translator_omits_metadata_when_caller_passes_none() {
1046        let event = sample_event();
1047        let payload = flow_event_to_network_flow_decision(
1048            &event,
1049            "cell-1",
1050            "run-1",
1051            "dec-1",
1052            None,
1053            None,
1054            None,
1055            None,
1056            "2026-05-06T12:00:00Z",
1057        );
1058        assert!(payload.policy_digest.is_none());
1059        assert!(payload.keyset_id.is_none());
1060        assert!(payload.issuer_kid.is_none());
1061        assert!(payload.correlation_id.is_none());
1062        assert!(payload.nft_rule_ref.is_none());
1063    }
1064
1065    #[test]
1066    fn translator_carries_l3_l4_fields() {
1067        let event = sample_event();
1068        let payload = flow_event_to_network_flow_decision(
1069            &event,
1070            "cell-1",
1071            "run-1",
1072            "dec-1",
1073            None,
1074            None,
1075            None,
1076            None,
1077            "2026-05-06T12:00:00Z",
1078        );
1079        assert_eq!(payload.dst_addr.as_deref(), Some("10.0.0.1"));
1080        assert_eq!(payload.dst_port, Some(443));
1081        assert_eq!(payload.protocol.as_deref(), Some("tcp"));
1082        assert_eq!(payload.byte_count, Some(74));
1083        assert_eq!(payload.packet_count, Some(1));
1084    }
1085
1086    #[test]
1087    fn translator_handles_event_without_byte_count() {
1088        let mut event = sample_event();
1089        event.byte_count = None;
1090        let payload = flow_event_to_network_flow_decision(
1091            &event,
1092            "cell-1",
1093            "run-1",
1094            "dec-1",
1095            None,
1096            None,
1097            None,
1098            None,
1099            "2026-05-06T12:00:00Z",
1100        );
1101        assert!(payload.byte_count.is_none());
1102        assert!(payload.packet_count.is_none());
1103    }
1104
1105    #[test]
1106    fn translator_handles_nflog_reason_code() {
1107        let mut event = sample_event();
1108        event.reason_code = REASON_NFLOG_MATCH;
1109        event.decision = NetworkFlowDecisionOutcome::Allow;
1110        let payload = flow_event_to_network_flow_decision(
1111            &event,
1112            "cell-1",
1113            "run-1",
1114            "dec-1",
1115            None,
1116            None,
1117            None,
1118            None,
1119            "2026-05-06T12:00:00Z",
1120        );
1121        assert_eq!(payload.reason_code, REASON_NFLOG_MATCH);
1122        assert_eq!(payload.decision, NetworkFlowDecisionOutcome::Allow);
1123    }
1124
1125    #[test]
1126    fn translator_payload_round_trips_through_serde_json() {
1127        let event = sample_event();
1128        let payload = flow_event_to_network_flow_decision(
1129            &event,
1130            "cell-1",
1131            "run-1",
1132            "dec-1",
1133            Some("sha256:abc"),
1134            Some("kid-1"),
1135            Some("issuer-1"),
1136            Some("corr-1"),
1137            "2026-05-06T12:00:00Z",
1138        );
1139        let json = serde_json::to_string(&payload).expect("serialize");
1140        let round_tripped: NetworkFlowDecision = serde_json::from_str(&json).expect("deserialize");
1141        assert_eq!(round_tripped, payload);
1142    }
1143
1144    #[test]
1145    fn reason_codes_are_distinct_from_phase1_codes() {
1146        for code in [REASON_EBPF_XDP_DROP, REASON_NFLOG_MATCH] {
1147            assert!(
1148                !code.starts_with("nft_"),
1149                "Phase 2 code {code} must not collide with Phase 1 nft_* prefix"
1150            );
1151        }
1152    }
1153
1154    // ── E7-4 EbpfFlowMonitor tests ───────────────────────────────────
1155
1156    #[cfg(not(target_os = "linux"))]
1157    #[tokio::test]
1158    async fn start_is_unsupported_on_non_linux() {
1159        // Pass a sentinel fd; the macOS code path never touches it.
1160        let res = EbpfFlowMonitor::start(-1, "tap0").await;
1161        match res {
1162            Err(EbpfMonitorError::Unsupported { host }) => {
1163                assert!(
1164                    !host.is_empty(),
1165                    "Unsupported error must name the host platform"
1166                );
1167            }
1168            Err(other) => panic!("expected Unsupported, got {other:?}"),
1169            Ok(_) => panic!("monitor must not construct on non-Linux"),
1170        }
1171    }
1172
1173    #[cfg(all(target_os = "linux", not(feature = "ebpf-aya")))]
1174    #[tokio::test]
1175    async fn start_returns_feature_disabled_when_feature_off() {
1176        let res = EbpfFlowMonitor::start(-1, "tap0").await;
1177        assert!(
1178            matches!(res, Err(EbpfMonitorError::FeatureDisabled)),
1179            "must report FeatureDisabled when ebpf-aya cargo feature is off"
1180        );
1181    }
1182
1183    #[tokio::test]
1184    async fn drain_yields_empty_when_no_events_buffered() {
1185        let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<FlowEvent>();
1186        let mut monitor = EbpfFlowMonitor::from_parts_for_test(rx, "tap0");
1187        let batch = monitor.drain();
1188        assert!(batch.is_empty(), "fresh monitor has no buffered events");
1189    }
1190
1191    #[tokio::test]
1192    async fn drain_yields_events_pushed_through_channel() {
1193        let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<FlowEvent>();
1194        let mut monitor = EbpfFlowMonitor::from_parts_for_test(rx, "tap0");
1195        tx.send(sample_event()).expect("send ok");
1196        tx.send(sample_event()).expect("send ok");
1197        let batch = monitor.drain();
1198        assert_eq!(batch.len(), 2, "drain must pull both events synchronously");
1199    }
1200
1201    #[tokio::test]
1202    async fn drain_is_non_blocking() {
1203        // No sender pushes; drain must return immediately, not park.
1204        let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<FlowEvent>();
1205        let mut monitor = EbpfFlowMonitor::from_parts_for_test(rx, "tap0");
1206        let start = std::time::Instant::now();
1207        let batch = monitor.drain();
1208        let elapsed = start.elapsed();
1209        assert!(batch.is_empty());
1210        assert!(
1211            elapsed < std::time::Duration::from_millis(50),
1212            "drain must be non-blocking; took {elapsed:?}"
1213        );
1214    }
1215
1216    #[tokio::test]
1217    async fn stop_is_idempotent_when_no_drainer_present() {
1218        let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<FlowEvent>();
1219        let monitor = EbpfFlowMonitor::from_parts_for_test(rx, "tap0");
1220        // Should complete without panicking even though `drainer` is None.
1221        monitor.stop().await;
1222    }
1223
1224    // ── decode_ring_buf_event tests ────────────────────────────────────
1225    //
1226    // Pin the exact byte offsets the kernel-side
1227    // `cellos-supervisor-ebpf/src/main.rs` producer emits. If the
1228    // producer's `#[repr(C)]` layout ever shifts, these tests fail in
1229    // place rather than letting a torn record silently inflate the
1230    // supervisor's flow count.
1231    //
1232    // These tests target the platform-independent
1233    // [`decode_ring_buf_event`] decoder, which exists on every host. The
1234    // cfg-gated live-path adapter [`parse_ring_buf_event`] is exercised
1235    // implicitly: both decoders share the layout pinned by
1236    // `ring_buf_event_layout_constant_matches_producer`.
1237
1238    fn canonical_record() -> [u8; RING_BUF_EVENT_LEN] {
1239        let mut buf = [0u8; RING_BUF_EVENT_LEN];
1240        // src_ip = 10.0.0.1 (host order u32 = 0x0A000001 LE-bytes; we
1241        // round-trip through `to_ne_bytes` so the test passes on any
1242        // host endianness).
1243        let src_ip: u32 = u32::from(std::net::Ipv4Addr::new(10, 0, 0, 1));
1244        let dst_ip: u32 = u32::from(std::net::Ipv4Addr::new(192, 0, 2, 5));
1245        buf[0..4].copy_from_slice(&src_ip.to_ne_bytes());
1246        buf[4..8].copy_from_slice(&dst_ip.to_ne_bytes());
1247        buf[8..10].copy_from_slice(&44_321u16.to_ne_bytes()); // src_port
1248        buf[10..12].copy_from_slice(&443u16.to_ne_bytes()); // dst_port
1249        buf[12] = 6; // proto = TCP
1250                     // bytes[13..16] = padding, left zero
1251        buf[16..24].copy_from_slice(&7_777u64.to_ne_bytes()); // pkt_count
1252        buf[24..32].copy_from_slice(&123_456_789u64.to_ne_bytes()); // first_seen_ns
1253        buf[32] = 1; // verdict = accept
1254                     // bytes[33..40] = padding, left zero
1255        buf
1256    }
1257
1258    #[test]
1259    fn decode_ring_buf_event_round_trips_canonical_record() {
1260        let buf = canonical_record();
1261        let decoded = decode_ring_buf_event(&buf).expect("canonical 40-byte record decodes");
1262        assert_eq!(
1263            decoded.src_ip,
1264            u32::from(std::net::Ipv4Addr::new(10, 0, 0, 1))
1265        );
1266        assert_eq!(
1267            decoded.dst_ip,
1268            u32::from(std::net::Ipv4Addr::new(192, 0, 2, 5))
1269        );
1270        assert_eq!(decoded.src_port, 44_321);
1271        assert_eq!(decoded.dst_port, 443);
1272        assert_eq!(decoded.proto, 6);
1273        assert_eq!(decoded.pkt_count, 7_777);
1274        assert_eq!(decoded.first_seen_ns, 123_456_789);
1275        assert_eq!(decoded.verdict, 1);
1276    }
1277
1278    #[test]
1279    fn decode_ring_buf_event_rejects_short_record() {
1280        let buf = [0u8; RING_BUF_EVENT_LEN - 1];
1281        match decode_ring_buf_event(&buf) {
1282            Err(RingBufParseError::WrongLength { got }) => {
1283                assert_eq!(got, RING_BUF_EVENT_LEN - 1);
1284            }
1285            other => panic!("expected WrongLength, got {other:?}"),
1286        }
1287    }
1288
1289    #[test]
1290    fn decode_ring_buf_event_rejects_long_record() {
1291        let buf = [0u8; RING_BUF_EVENT_LEN + 8];
1292        assert!(matches!(
1293            decode_ring_buf_event(&buf),
1294            Err(RingBufParseError::WrongLength { got }) if got == RING_BUF_EVENT_LEN + 8
1295        ));
1296    }
1297
1298    #[test]
1299    fn decode_ring_buf_event_ignores_padding_bytes() {
1300        // Padding bytes in [13..16) and [33..40) are explicitly defined
1301        // as "zero by producer" but the decoder must not error on
1302        // non-zero padding — the eBPF verifier zeros them, but a future
1303        // producer that doesn't might still surface useful records.
1304        let mut buf = canonical_record();
1305        buf[13..16].copy_from_slice(&[0xff, 0xff, 0xff]);
1306        buf[33..40].copy_from_slice(&[0xaa; 7]);
1307        let decoded = decode_ring_buf_event(&buf).expect("non-zero padding must decode");
1308        // Non-padding fields unaffected.
1309        assert_eq!(decoded.proto, 6);
1310        assert_eq!(decoded.verdict, 1);
1311    }
1312
1313    #[test]
1314    fn ring_buf_event_to_flow_event_renders_tcp_dst() {
1315        let decoded = decode_ring_buf_event(&canonical_record()).unwrap();
1316        let ev = ring_buf_event_to_flow_event(&decoded);
1317        assert_eq!(ev.direction, NetworkFlowDirection::Egress);
1318        assert_eq!(ev.decision, NetworkFlowDecisionOutcome::Allow);
1319        assert_eq!(ev.protocol.as_deref(), Some("tcp"));
1320        assert_eq!(ev.dst_addr.as_deref(), Some("192.0.2.5"));
1321        assert_eq!(ev.dst_port, Some(443));
1322    }
1323
1324    #[test]
1325    fn ring_buf_event_to_flow_event_maps_verdict_drop_to_deny() {
1326        let mut buf = canonical_record();
1327        buf[32] = 2; // verdict = drop
1328        let decoded = decode_ring_buf_event(&buf).unwrap();
1329        let ev = ring_buf_event_to_flow_event(&decoded);
1330        assert_eq!(ev.decision, NetworkFlowDecisionOutcome::Deny);
1331    }
1332
1333    #[test]
1334    fn ring_buf_event_to_flow_event_maps_unknown_proto_to_none() {
1335        let mut buf = canonical_record();
1336        buf[12] = 99; // unknown IANA proto
1337        let decoded = decode_ring_buf_event(&buf).unwrap();
1338        let ev = ring_buf_event_to_flow_event(&decoded);
1339        assert!(ev.protocol.is_none(), "unknown proto must not lie");
1340    }
1341
1342    #[test]
1343    fn ring_buf_event_to_flow_event_maps_udp() {
1344        let mut buf = canonical_record();
1345        buf[12] = 17; // UDP
1346        let decoded = decode_ring_buf_event(&buf).unwrap();
1347        let ev = ring_buf_event_to_flow_event(&decoded);
1348        assert_eq!(ev.protocol.as_deref(), Some("udp"));
1349    }
1350
1351    #[test]
1352    fn ring_buf_event_layout_constant_matches_producer() {
1353        // Defensive: if anyone widens the kernel-side FlowEvent struct
1354        // without updating either host-side parser, this assertion makes
1355        // the mismatch loud at compile-test time rather than at runtime
1356        // on a Linux deployment.
1357        //
1358        // Producer (cellos-supervisor-ebpf): 16 (FlowKey) + 24 (FlowEntry) = 40.
1359        assert_eq!(RING_BUF_EVENT_LEN, 40);
1360        // Linux+aya path uses a parallel `BPF_FLOW_EVENT_SIZE` constant.
1361        // The two MUST agree — the only difference is which platforms
1362        // they compile on, not what they describe.
1363        #[cfg(all(target_os = "linux", feature = "ebpf-aya"))]
1364        assert_eq!(RING_BUF_EVENT_LEN, BPF_FLOW_EVENT_SIZE);
1365    }
1366
1367    #[test]
1368    fn ebpf_monitor_error_display_strings_are_useful() {
1369        // Defensive: operators see these strings in their logs when a
1370        // cell falls back to nflog. They must name the failure mode
1371        // unambiguously so paging on "eBPF flow monitor error" is
1372        // actionable rather than mysterious.
1373        let unsupported = EbpfMonitorError::Unsupported { host: "macos" };
1374        assert!(format!("{unsupported}").contains("Linux-only"));
1375
1376        let disabled = EbpfMonitorError::FeatureDisabled;
1377        assert!(format!("{disabled}").contains("ebpf-aya"));
1378
1379        let missing = EbpfMonitorError::ObjectMissing {
1380            path: "/tmp/x.o".to_string(),
1381        };
1382        assert!(format!("{missing}").contains("/tmp/x.o"));
1383
1384        let attach = EbpfMonitorError::AttachFailed {
1385            iface: "tap0".to_string(),
1386            message: "ENODEV".to_string(),
1387        };
1388        let s = format!("{attach}");
1389        assert!(s.contains("tap0"));
1390        assert!(s.contains("ENODEV"));
1391    }
1392}
1393
1394// ────────────────────────────────────────────────────────────────────────
1395// L5-15 — Connection-tracking flow accumulator
1396// ────────────────────────────────────────────────────────────────────────
1397//
1398// The types above (`FlowEvent`, `FlowEventListener`, `NoopFlowListener`,
1399// `flow_event_to_network_flow_decision`) are the Phase 2 nflog-shaped
1400// scaffolding — one event = one observed packet/decision, designed to ride
1401// the existing `network_flow_decision` v1 wire schema.
1402//
1403// L5-15 needs a *different* shape: open/close signals per 5-tuple so the
1404// supervisor can count UNIQUE connections that were actually exercised by
1405// the workload, and stamp that count into the `HomeostasisSignal`'s
1406// `exercised_egress_connections` field. This sub-module ships the new
1407// types under their own namespace to avoid colliding with the Phase 2
1408// scaffold's `FlowEvent`, which has additive `reason_code` semantics this
1409// counter doesn't need.
1410//
1411// The accumulator is intentionally minimal: a `HashSet<FlowKey>` of opened
1412// flows. When `CELLOS_PER_FLOW_REALTIME=1` is on and a real backend (eBPF
1413// cgroup_skb or nflog connection tracker) feeds it Opened events, the
1414// homeostasis emit site reads `unique_flow_count()` and stamps it into the
1415// signal. When the env var is off OR no backend is wired, the supervisor
1416// falls back to `None` + `telemetry_pending_L2-04` reason, preserving the
1417// E2-06 contract.
1418pub mod connection_tracking {
1419    use std::collections::HashSet;
1420    use std::net::IpAddr;
1421
1422    /// 5-tuple uniquely identifying an L4 flow. Used as the hash key for
1423    /// counting unique connections exercised by the workload.
1424    ///
1425    /// `protocol` is a small numeric (IANA proto byte: 6 = TCP, 17 = UDP,
1426    /// 1 = ICMP, 58 = ICMPv6) rather than a `String` so the hash key stays
1427    /// cheap to compute and copy. Source addr/port are included so a
1428    /// workload that opens N parallel connections to the same `(dst, port)`
1429    /// is counted as N flows, matching what `exercised_egress_connections`
1430    /// promises to consumers ("connections", not "destinations").
1431    #[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
1432    pub struct FlowKey {
1433        pub src_addr: IpAddr,
1434        pub src_port: u16,
1435        pub dst_addr: IpAddr,
1436        pub dst_port: u16,
1437        pub protocol: u8,
1438    }
1439
1440    /// Open/close phase of a connection-tracked flow.
1441    ///
1442    /// `Closed` carries the observed packet + byte counts so future taudit
1443    /// passes can correlate flow lifespan with declared egress rules.
1444    /// The accumulator below only consumes `Opened` today; `Closed` is
1445    /// kept on the type so a backend implementation can emit both phases
1446    /// without us churning the enum later.
1447    #[derive(Debug, Clone)]
1448    pub enum FlowEventKind {
1449        Opened,
1450        Closed { pkt_count: u64, byte_count: u64 },
1451    }
1452
1453    /// One observation a connection-tracking backend (eBPF cgroup_skb or
1454    /// nflog with conntrack) surfaces to the supervisor.
1455    ///
1456    /// Distinct from [`super::FlowEvent`] (the Phase 2 nflog scaffold's
1457    /// packet-decision shape). The two are deliberately namespaced so the
1458    /// supervisor can wire one or both without ambiguity.
1459    #[derive(Debug, Clone)]
1460    pub struct FlowEvent {
1461        pub key: FlowKey,
1462        pub kind: FlowEventKind,
1463        /// Monotonic-clock nanoseconds when the event was observed.
1464        /// Kept as `u64` to match `clock_gettime(CLOCK_MONOTONIC)` /
1465        /// `bpf_ktime_get_ns()` semantics — kernel-side clock, not wall.
1466        pub timestamp_ns: u64,
1467    }
1468
1469    /// Accumulates unique opened flows for the lifetime of one cell run.
1470    ///
1471    /// `record()` is idempotent on the `FlowKey` — the same 5-tuple
1472    /// observed twice counts as one flow, matching what the homeostasis
1473    /// `exercised_egress_connections` field promises consumers (a count of
1474    /// distinct connections actually exercised, not packet count).
1475    ///
1476    /// Constructed once per `Supervisor::run` when
1477    /// `CELLOS_PER_FLOW_REALTIME=1` is on; consumed at the homeostasis
1478    /// emit site after `child.wait()` returns.
1479    #[derive(Debug, Default)]
1480    pub struct FlowAccumulator {
1481        opened: HashSet<FlowKey>,
1482    }
1483
1484    impl FlowAccumulator {
1485        pub fn new() -> Self {
1486            Self::default()
1487        }
1488
1489        /// Record a flow event. Only `Opened` events affect the unique
1490        /// count today; `Closed` is silently consumed so a future
1491        /// implementation can stamp byte/packet aggregates without a
1492        /// breaking API change.
1493        pub fn record(&mut self, event: &FlowEvent) {
1494            if matches!(event.kind, FlowEventKind::Opened) {
1495                self.opened.insert(event.key);
1496            }
1497        }
1498
1499        /// Number of unique 5-tuples observed in the `Opened` phase so far.
1500        ///
1501        /// Returns `u64` because real workloads can plausibly open more
1502        /// than `u32::MAX / 2` flows over a long-lived run; the supervisor
1503        /// saturates to `u32::MAX` at the homeostasis emit site since the
1504        /// wire schema's field is `u32`. Saturation, not truncation, is
1505        /// the safe choice: a count claiming "exactly u32::MAX" is closer
1506        /// to truth than wrapping to 0.
1507        pub fn unique_flow_count(&self) -> u64 {
1508            self.opened.len() as u64
1509        }
1510    }
1511
1512    #[cfg(test)]
1513    mod tests {
1514        use super::*;
1515        use std::net::Ipv4Addr;
1516
1517        fn key(src_port: u16, dst_octet: u8, dst_port: u16) -> FlowKey {
1518            FlowKey {
1519                src_addr: IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)),
1520                src_port,
1521                dst_addr: IpAddr::V4(Ipv4Addr::new(192, 0, 2, dst_octet)),
1522                dst_port,
1523                protocol: 6, // TCP
1524            }
1525        }
1526
1527        fn opened_event(k: FlowKey) -> FlowEvent {
1528            FlowEvent {
1529                key: k,
1530                kind: FlowEventKind::Opened,
1531                timestamp_ns: 0,
1532            }
1533        }
1534
1535        #[test]
1536        fn empty_accumulator_reports_zero() {
1537            let acc = FlowAccumulator::new();
1538            assert_eq!(acc.unique_flow_count(), 0);
1539        }
1540
1541        #[test]
1542        fn opened_event_increments_count() {
1543            let mut acc = FlowAccumulator::new();
1544            acc.record(&opened_event(key(40000, 1, 443)));
1545            assert_eq!(acc.unique_flow_count(), 1);
1546        }
1547
1548        #[test]
1549        fn duplicate_opened_is_idempotent() {
1550            // Same 5-tuple observed twice MUST count as one flow — the
1551            // homeostasis semantics are "distinct connections," not "open
1552            // events." A retransmitted SYN or a backend re-emitting an
1553            // event MUST NOT inflate the count.
1554            let mut acc = FlowAccumulator::new();
1555            let k = key(40000, 1, 443);
1556            acc.record(&opened_event(k));
1557            acc.record(&opened_event(k));
1558            assert_eq!(acc.unique_flow_count(), 1);
1559        }
1560
1561        #[test]
1562        fn distinct_flows_count_independently() {
1563            let mut acc = FlowAccumulator::new();
1564            acc.record(&opened_event(key(40000, 1, 443)));
1565            acc.record(&opened_event(key(40001, 1, 443))); // different src_port
1566            acc.record(&opened_event(key(40000, 2, 443))); // different dst_addr
1567            acc.record(&opened_event(key(40000, 1, 80))); // different dst_port
1568            assert_eq!(acc.unique_flow_count(), 4);
1569        }
1570
1571        #[test]
1572        fn closed_event_does_not_increment_count() {
1573            // `Closed` is observed for symmetry/future use but MUST NOT
1574            // affect the unique-flow count — the count is keyed off the
1575            // `Opened` phase.
1576            let mut acc = FlowAccumulator::new();
1577            let k = key(40000, 1, 443);
1578            acc.record(&FlowEvent {
1579                key: k,
1580                kind: FlowEventKind::Closed {
1581                    pkt_count: 10,
1582                    byte_count: 1500,
1583                },
1584                timestamp_ns: 1,
1585            });
1586            assert_eq!(acc.unique_flow_count(), 0);
1587        }
1588
1589        #[test]
1590        fn closed_after_opened_preserves_count() {
1591            let mut acc = FlowAccumulator::new();
1592            let k = key(40000, 1, 443);
1593            acc.record(&opened_event(k));
1594            acc.record(&FlowEvent {
1595                key: k,
1596                kind: FlowEventKind::Closed {
1597                    pkt_count: 5,
1598                    byte_count: 500,
1599                },
1600                timestamp_ns: 2,
1601            });
1602            assert_eq!(acc.unique_flow_count(), 1);
1603        }
1604    }
1605}