cellos_supervisor/ebpf_flow.rs
1//! E7-4 — eBPF host-side flow monitor (aya loader + ring-buffer drainer).
2//!
3//! This module ships two things:
4//!
5//! 1. **The original FC-38 Phase 2 scaffold** ([`FlowEvent`],
6//! [`FlowEventListener`], [`NoopFlowListener`], the env-flag reader, the
7//! reason-code constants, and the [`flow_event_to_network_flow_decision`]
8//! pure translator). These are preserved verbatim — downstream callers
9//! in `per_flow.rs`, `supervisor.rs`, and external integration tests
10//! consume them, and the scaffold's unit tests stay green.
11//!
12//! 2. **The new [`EbpfFlowMonitor`] type** — the host-side aya loader the
13//! supervisor activates when `CELLOS_PER_FLOW_REALTIME=1` AND
14//! `CELLOS_PER_FLOW_BACKEND=ebpf`. On Linux with the `ebpf-aya` Cargo
15//! feature enabled it `setns(2)`'s into the cell's network namespace,
16//! loads a tc-clsact egress program from an operator-supplied BPF
17//! object, attaches it to `tap_iface`, spawns a tokio task draining a
18//! ring buffer into an mpsc channel, and returns to the original
19//! netns. On macOS, on Linux without the feature, or when any of the
20//! above fails, it returns an [`EbpfMonitorError`] that the supervisor
21//! treats as "fall back to nflog" — never a fatal error.
22//!
23//! ## Why a separate type rather than another [`FlowEventListener`] impl
24//!
25//! The scaffold's [`FlowEventListener`] trait surfaces a single
26//! `drain_events` hook that the supervisor calls AFTER `child.wait()`
27//! returns, in the same `spawn_blocking` closure as the nft counter
28//! scrape. Ring-buffer-based eBPF flow monitors need to drain
29//! *continuously* (kernel ring buffers overflow if left unread) on a
30//! dedicated tokio task. The [`EbpfFlowMonitor::drain`] semantics
31//! (non-blocking pull from an mpsc channel) match that lifecycle far
32//! better than the trait's single-shot post-run hook. Both surfaces can
33//! coexist — the supervisor selects which one to use based on
34//! [`crate::per_flow::PerFlowBackend`].
35//!
36//! ## Honest scope under macOS
37//!
38//! - `[EbpfFlowMonitor::start]` always returns
39//! [`EbpfMonitorError::Unsupported`] on `cfg(not(target_os = "linux"))`.
40//! - On Linux without the `ebpf-aya` Cargo feature, `start` returns
41//! [`EbpfMonitorError::FeatureDisabled`].
42//! - On Linux with the feature but no BPF object available
43//! (`CELLOS_EBPF_OBJECT_PATH` unset / file missing), `start` returns
44//! [`EbpfMonitorError::ObjectMissing`].
45//! - Real kernel attachment + ring-buffer drain are only exercised when
46//! all three conditions are met.
47//!
48//! The supervisor's call site (`per_flow.rs`) catches any
49//! [`EbpfMonitorError`], emits a single `tracing::warn`, and reverts to
50//! the existing nflog path so per-flow telemetry is never silently lost.
51
52#![allow(dead_code)] // scaffold + feature-gated implementation; consumers vary
53
54use cellos_core::{NetworkFlowDecision, NetworkFlowDecisionOutcome, NetworkFlowDirection};
55
56// ────────────────────────────────────────────────────────────────────────
57// Original FC-38 Phase 2 scaffold (preserved for downstream callers)
58// ────────────────────────────────────────────────────────────────────────
59
60/// Operator-opt-in env flag for FC-38 Phase 2 eBPF/nflog real-time
61/// per-flow events.
62///
63/// Set to `1` to enable. Default OFF: Phase 2 listener has kernel
64/// requirements (CAP_BPF, kernel ≥ 5.8 for full BPF surface, or
65/// CAP_NET_ADMIN for nflog) that Phase 1's post-run nft scrape does not.
66/// Operators must explicitly opt in to acknowledge they have provisioned
67/// the right capability surface.
68pub const PER_FLOW_EBPF_ENV: &str = "CELLOS_FIRECRACKER_PER_FLOW_EBPF";
69
70/// Phase 2 reason code: an eBPF-attached program (likely
71/// `cgroup_skb/egress` or `tc clsact`) observed an XDP-style drop on the
72/// cell's egress.
73pub const REASON_EBPF_XDP_DROP: &str = "ebpf_xdp_drop";
74
75/// Phase 2 reason code: an `NFLOG` netlink consumer observed a packet
76/// that matched a logging-enabled nft rule.
77pub const REASON_NFLOG_MATCH: &str = "nflog_match";
78
79/// Returns `true` when the operator has explicitly opted into Phase 2
80/// eBPF/nflog real-time per-flow events via the
81/// [`PER_FLOW_EBPF_ENV`] env var.
82pub fn is_per_flow_ebpf_enabled() -> bool {
83 std::env::var(PER_FLOW_EBPF_ENV).as_deref() == Ok("1")
84}
85
86/// One real-time observation a Phase 2 listener surfaces.
87///
88/// Backend-neutral: an eBPF `cgroup_skb` program populates the same
89/// fields a `nflog` netlink consumer does, AND the new ring-buffer-based
90/// [`EbpfFlowMonitor`] populates the same fields.
91#[derive(Debug, Clone, PartialEq, Eq)]
92pub struct FlowEvent {
93 /// Outbound vs inbound. Phase 2 starts on egress (matching Phase 1)
94 /// and may add ingress later.
95 pub direction: NetworkFlowDirection,
96 /// Allow vs deny.
97 pub decision: NetworkFlowDecisionOutcome,
98 /// Reason code distinguishing the data source.
99 pub reason_code: &'static str,
100 /// Optional destination IP.
101 pub dst_addr: Option<String>,
102 /// Optional destination port.
103 pub dst_port: Option<u16>,
104 /// Optional protocol (`udp` / `tcp` / `icmp`).
105 pub protocol: Option<String>,
106 /// Optional bytes observed for this event.
107 pub byte_count: Option<u64>,
108}
109
110/// Trait the supervisor calls on its hot path to drain pending events
111/// from a Phase 2 listener.
112pub trait FlowEventListener: Send + Sync {
113 /// Drain events accumulated since the last call.
114 fn drain_events(&mut self) -> Vec<FlowEvent>;
115
116 /// Identifier the supervisor stamps into the audit log.
117 fn backend_name(&self) -> &'static str;
118}
119
120/// Default Phase 2 listener: yields zero events, identifies as `"noop"`.
121#[derive(Debug, Default)]
122pub struct NoopFlowListener;
123
124impl FlowEventListener for NoopFlowListener {
125 fn drain_events(&mut self) -> Vec<FlowEvent> {
126 Vec::new()
127 }
128
129 fn backend_name(&self) -> &'static str {
130 "noop"
131 }
132}
133
134/// Build the default scaffold listener — always [`NoopFlowListener`] in
135/// the scaffolding pass.
136pub fn build_default_listener() -> Box<dyn FlowEventListener> {
137 Box::new(NoopFlowListener)
138}
139
140/// Translate a [`FlowEvent`] into a [`NetworkFlowDecision`] payload
141/// suitable for emitting on the existing
142/// `dev.cellos.events.cell.observability.v1.network_flow_decision` channel.
143#[allow(clippy::too_many_arguments)]
144pub fn flow_event_to_network_flow_decision(
145 event: &FlowEvent,
146 cell_id: &str,
147 run_id: &str,
148 decision_id: &str,
149 policy_digest: Option<&str>,
150 keyset_id: Option<&str>,
151 issuer_kid: Option<&str>,
152 correlation_id: Option<&str>,
153 observed_at: &str,
154) -> NetworkFlowDecision {
155 NetworkFlowDecision {
156 schema_version: "1.0.0".into(),
157 cell_id: cell_id.to_string(),
158 run_id: run_id.to_string(),
159 decision_id: decision_id.to_string(),
160 direction: event.direction,
161 decision: event.decision,
162 reason_code: event.reason_code.to_string(),
163 nft_rule_ref: None,
164 dst_addr: event.dst_addr.clone(),
165 dst_port: event.dst_port,
166 protocol: event.protocol.clone(),
167 packet_count: event.byte_count.map(|_| 1u64),
168 byte_count: event.byte_count,
169 policy_digest: policy_digest.map(|s| s.to_string()),
170 keyset_id: keyset_id.map(|s| s.to_string()),
171 issuer_kid: issuer_kid.map(|s| s.to_string()),
172 correlation_id: correlation_id.map(|s| s.to_string()),
173 observed_at: observed_at.to_string(),
174 }
175}
176
177// ────────────────────────────────────────────────────────────────────────
178// E7-4 — EbpfFlowMonitor (host-side aya loader)
179// ────────────────────────────────────────────────────────────────────────
180
181/// Errors returned by [`EbpfFlowMonitor::start`].
182///
183/// All variants are non-fatal at the supervisor level: the per-flow
184/// activation path (`per_flow::activate_per_flow_listener`) catches them,
185/// logs a `tracing::warn`, and falls back to the nflog listener so the
186/// cell still emits `network_flow_decision` events on the existing path.
187#[derive(Debug, thiserror::Error)]
188pub enum EbpfMonitorError {
189 /// Host platform is not Linux. eBPF requires Linux kernel support.
190 #[error("eBPF flow monitor is Linux-only (host platform: {host})")]
191 Unsupported { host: &'static str },
192
193 /// Linux host but the crate was built without the `ebpf-aya` Cargo
194 /// feature. The supervisor binary ships with the feature off by
195 /// default so macOS / non-eBPF Linux builds don't pay for a kernel
196 /// dependency they cannot use.
197 #[error(
198 "eBPF flow monitor requested but the `ebpf-aya` Cargo feature is \
199 not enabled on this build"
200 )]
201 FeatureDisabled,
202
203 /// The BPF object file referenced by `CELLOS_EBPF_OBJECT_PATH` (or
204 /// the default path) was not present on disk. Real deployments
205 /// pre-build `cellos-supervisor-ebpf` via `cargo bpf` and stage the
206 /// resulting object alongside the supervisor binary; the env var
207 /// lets operators override the location.
208 #[error("eBPF object file missing at {path}")]
209 ObjectMissing { path: String },
210
211 /// `setns(2)` into the cell's network namespace failed. Usually
212 /// `EPERM` (missing `CAP_SYS_ADMIN`) or `ENOENT` (cell already torn
213 /// down).
214 #[error("setns(CLONE_NEWNET) into cell netns failed: {source}")]
215 Setns {
216 #[source]
217 source: std::io::Error,
218 },
219
220 /// Loading the BPF object via aya failed. Surfaces verifier
221 /// rejections, license issues, missing program sections, etc.
222 #[error("aya BPF object load failed: {message}")]
223 LoadFailed { message: String },
224
225 /// Attaching the loaded program to the tap interface via
226 /// `tc clsact` failed. Usually `ENODEV` (tap iface missing) or
227 /// `EEXIST` (clsact qdisc collision with an operator-provisioned
228 /// qdisc on the same interface).
229 #[error("tc clsact attach to {iface} failed: {message}")]
230 AttachFailed { iface: String, message: String },
231
232 /// Opening the ring-buffer map for draining failed.
233 #[error("ring-buffer map open failed: {message}")]
234 RingBufferOpenFailed { message: String },
235}
236
237/// Host-side eBPF flow monitor.
238///
239/// Owns the loaded aya BPF object (so it stays alive for the duration of
240/// the cell run — dropping `aya::Ebpf` detaches all programs) plus the
241/// receiver end of an mpsc channel into which the ring-buffer drainer
242/// task pushes [`FlowEvent`]s.
243///
244/// Lifecycle:
245///
246/// 1. [`EbpfFlowMonitor::start`] enters the cell's netns, loads + attaches
247/// the BPF program, spawns the drainer task, and returns to the original
248/// netns.
249/// 2. The supervisor periodically calls [`EbpfFlowMonitor::drain`] (or
250/// consumes events directly via the channel) on its hot path.
251/// 3. [`EbpfFlowMonitor::stop`] aborts the drainer task and drops the
252/// aya object, which detaches the BPF program and releases the
253/// ring-buffer map.
254///
255/// The struct is constructed only via `start`; `Default` is intentionally
256/// not provided to prevent callers from skipping the netns + load
257/// sequence.
258pub struct EbpfFlowMonitor {
259 /// Holds the loaded eBPF program + ring buffer FD on Linux with the
260 /// aya feature enabled. `None` on all other configurations — the
261 /// monitor in that case is a thin wrapper around an empty channel
262 /// (the constructor returns `Err` so a `None` here at runtime is a
263 /// programmer error, not a deployment scenario).
264 #[cfg(all(target_os = "linux", feature = "ebpf-aya"))]
265 _bpf: Option<aya::Ebpf>,
266
267 /// Receiver end of the mpsc channel the drainer task pushes events
268 /// onto. Construction always produces a paired sender (held by the
269 /// drainer task) + receiver (held here); dropping `_bpf` cancels the
270 /// drainer task which closes the sender side, signalling EOF.
271 rx: tokio::sync::mpsc::UnboundedReceiver<FlowEvent>,
272
273 /// Optional handle to the drainer task so [`Self::stop`] can abort
274 /// it deterministically rather than waiting for the channel close.
275 drainer: Option<tokio::task::JoinHandle<()>>,
276
277 /// Cached interface name for tracing breadcrumbs.
278 iface: String,
279}
280
281impl EbpfFlowMonitor {
282 /// Load and attach the BPF program inside the cell's netns. Falls
283 /// back to nflog (via the caller in `per_flow.rs`) if any step
284 /// fails by returning an [`EbpfMonitorError`].
285 ///
286 /// `netns_fd` MUST be an open file descriptor for the cell's
287 /// `/proc/<child_pid>/ns/net`. The caller retains ownership.
288 /// `tap_iface` is the supervisor-provisioned tap interface name
289 /// (e.g. `"vethC0"`).
290 #[allow(unused_variables)] // platform-dependent path
291 pub async fn start(
292 netns_fd: std::os::unix::io::RawFd,
293 tap_iface: &str,
294 ) -> Result<Self, EbpfMonitorError> {
295 #[cfg(all(target_os = "linux", feature = "ebpf-aya"))]
296 {
297 Self::start_linux_aya(netns_fd, tap_iface).await
298 }
299 #[cfg(all(target_os = "linux", not(feature = "ebpf-aya")))]
300 {
301 Err(EbpfMonitorError::FeatureDisabled)
302 }
303 #[cfg(not(target_os = "linux"))]
304 {
305 // tap_iface is intentionally unused on non-Linux — surface
306 // it through the error message so the operator's log line
307 // identifies which cell's monitor declined to start.
308 tracing::debug!(
309 target: "cellos.supervisor.ebpf_flow",
310 iface = %tap_iface,
311 "EbpfFlowMonitor::start on non-Linux host — falling back to nflog"
312 );
313 Err(EbpfMonitorError::Unsupported {
314 host: std::env::consts::OS,
315 })
316 }
317 }
318
319 /// Drain pending flow events into a `Vec` without blocking.
320 ///
321 /// Used by callers that want a synchronous batch pull instead of
322 /// `.recv().await` per event. Returns an empty `Vec` when no events
323 /// are buffered.
324 pub fn drain(&mut self) -> Vec<FlowEvent> {
325 let mut out = Vec::new();
326 while let Ok(ev) = self.rx.try_recv() {
327 out.push(ev);
328 }
329 out
330 }
331
332 /// Async pull of one event. Returns `None` when the drainer task
333 /// has exited (i.e. the channel sender side has been dropped).
334 pub async fn recv(&mut self) -> Option<FlowEvent> {
335 self.rx.recv().await
336 }
337
338 /// Stop the monitor and clean up. Aborts the drainer task; dropping
339 /// the held [`aya::Ebpf`] object detaches the BPF program.
340 pub async fn stop(mut self) {
341 if let Some(drainer) = self.drainer.take() {
342 drainer.abort();
343 // Best-effort join — we don't care about the result, only
344 // that the task has stopped scheduling.
345 let _ = drainer.await;
346 }
347 // `_bpf` (when present) drops here, releasing kernel resources.
348 }
349
350 // ────────────────────────────────────────────────────────────────
351 // Linux + aya implementation
352 // ────────────────────────────────────────────────────────────────
353
354 #[cfg(all(target_os = "linux", feature = "ebpf-aya"))]
355 async fn start_linux_aya(
356 netns_fd: std::os::unix::io::RawFd,
357 tap_iface: &str,
358 ) -> Result<Self, EbpfMonitorError> {
359 // The BPF object path is operator-controllable so deployments
360 // can stage the object wherever their packaging puts it. The
361 // default path matches the cellos-supervisor-ebpf crate's
362 // expected cargo-bpf output location.
363 let object_path = std::env::var("CELLOS_EBPF_OBJECT_PATH")
364 .ok()
365 .unwrap_or_else(|| "/usr/local/lib/cellos/cellos-supervisor-ebpf.o".to_string());
366 let object_bytes =
367 std::fs::read(&object_path).map_err(|_| EbpfMonitorError::ObjectMissing {
368 path: object_path.clone(),
369 })?;
370
371 // Save the supervisor's original netns so we can return to it
372 // after attaching the program. `setns` is per-thread on Linux,
373 // so we MUST restore before the tokio worker thread we're on
374 // services any other task.
375 let self_netns = std::fs::File::open("/proc/self/ns/net")
376 .map_err(|source| EbpfMonitorError::Setns { source })?;
377
378 // Enter the cell's netns.
379 use std::os::unix::io::AsRawFd;
380 let setns_rc = unsafe { libc::setns(netns_fd, libc::CLONE_NEWNET) };
381 if setns_rc != 0 {
382 return Err(EbpfMonitorError::Setns {
383 source: std::io::Error::last_os_error(),
384 });
385 }
386
387 // Load the BPF object inside the cell's netns. aya 0.13's
388 // `Ebpf::load` returns a `Result<Ebpf, EbpfError>` — surface
389 // its display string into our error type so the caller's
390 // tracing line carries the verifier message verbatim.
391 let load_result = aya::Ebpf::load(&object_bytes);
392
393 // Restore the supervisor's netns BEFORE we propagate any
394 // load error — leaving the worker thread in the cell's netns
395 // would break unrelated tokio tasks scheduled later.
396 let restore_rc = unsafe { libc::setns(self_netns.as_raw_fd(), libc::CLONE_NEWNET) };
397 if restore_rc != 0 {
398 // Restoration failure is a real problem (this thread is
399 // now stuck in the cell's netns). Surface as Setns rather
400 // than discarding — the supervisor will refuse to use
401 // this monitor and the caller falls back to nflog.
402 return Err(EbpfMonitorError::Setns {
403 source: std::io::Error::last_os_error(),
404 });
405 }
406
407 let mut bpf = load_result.map_err(|e| EbpfMonitorError::LoadFailed {
408 message: format!("{e}"),
409 })?;
410
411 // tc clsact attach. The program name is a contract with the
412 // ebpf crate's source; we don't hardcode it as a constant
413 // here because the ebpf crate ships its own constants module
414 // — when the ebpf crate lands the supervisor will import
415 // `cellos_supervisor_ebpf::PROGRAM_NAME` instead. For now we
416 // probe a small set of conventional names so a partial BPF
417 // crate doesn't force a supervisor recompile.
418 const CANDIDATE_PROGRAM_NAMES: &[&str] =
419 &["cellos_flow_egress", "tc_egress", "flow_classifier"];
420 let mut attached = false;
421 let mut last_attach_err: Option<String> = None;
422 for prog_name in CANDIDATE_PROGRAM_NAMES {
423 // aya 0.13: `program_mut` returns Option; tc programs need
424 // `SchedClassifier::try_from` + `.load()` + `.attach()`.
425 // We avoid hard-coding the call shape against an unloaded
426 // feature dep by recording the attempt; the actual probe
427 // and attach happens via the helper below so this file
428 // still compiles when aya's surface evolves.
429 match attach_tc_egress(&mut bpf, prog_name, tap_iface) {
430 Ok(()) => {
431 attached = true;
432 break;
433 }
434 Err(e) => {
435 last_attach_err = Some(e);
436 }
437 }
438 }
439 if !attached {
440 return Err(EbpfMonitorError::AttachFailed {
441 iface: tap_iface.to_string(),
442 message: last_attach_err
443 .unwrap_or_else(|| "no candidate program found".to_string()),
444 });
445 }
446
447 // Spawn the ring-buffer drainer. We pull the ring-buffer map
448 // by name (`"flow_events"`); when the ebpf crate lands it will
449 // export this constant.
450 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<FlowEvent>();
451 let drainer = spawn_ring_buffer_drainer(&mut bpf, tx)
452 .map_err(|e| EbpfMonitorError::RingBufferOpenFailed { message: e })?;
453
454 Ok(Self {
455 _bpf: Some(bpf),
456 rx,
457 drainer: Some(drainer),
458 iface: tap_iface.to_string(),
459 })
460 }
461}
462
463// ────────────────────────────────────────────────────────────────────
464// Linux+aya helpers — intentionally narrow so this file stays
465// compilable on macOS regardless of aya's API drift.
466// ────────────────────────────────────────────────────────────────
467
468/// Attach the loaded eBPF `SchedClassifier` program to `iface` on the tc
469/// clsact egress hook.
470///
471/// Steps (mirrors aya 0.13's documented attach flow):
472/// 1. Ensure a `clsact` qdisc exists on the interface — required before
473/// a `SchedClassifier` can attach. `qdisc_add_clsact` is idempotent
474/// from our point of view: if the qdisc already exists we treat
475/// `EEXIST` as success because some operator setups pre-provision
476/// `clsact` via `tc` and we should not collide.
477/// 2. Pull the program by name from the loaded Ebpf object and coerce
478/// it into a `SchedClassifier` via `TryInto`.
479/// 3. `program.load()` — finalises verifier-side validation.
480/// 4. `program.attach(iface, TcAttachType::Egress)` — wires it to the
481/// egress hook. The returned `LinkId` is dropped here; the link's
482/// kernel-side lifetime is tied to the parent `aya::Ebpf` object,
483/// which the caller (`EbpfFlowMonitor`) holds for the duration of
484/// the cell run. Dropping `aya::Ebpf` detaches the link.
485///
486/// All error paths surface as `Err(String)` so the caller's candidate-name
487/// loop can record the failure and try the next program name without
488/// having to thread an `aya::ProgramError` through the signature.
489#[cfg(all(target_os = "linux", feature = "ebpf-aya"))]
490fn attach_tc_egress(bpf: &mut aya::Ebpf, program_name: &str, iface: &str) -> Result<(), String> {
491 use aya::programs::{tc, SchedClassifier, TcAttachType};
492
493 // Step 1 — clsact qdisc. EEXIST is benign (operator may have
494 // pre-provisioned it, or a prior cell on the same shared iface left
495 // it behind). Any other error we surface so the caller can fall back
496 // to nflog.
497 if let Err(e) = tc::qdisc_add_clsact(iface) {
498 let msg = format!("{e}");
499 // The error type wraps an io::Error; the only string-safe way to
500 // detect "already exists" portably across aya versions is a
501 // substring match on the rendered message. False negatives just
502 // mean we surface a real EEXIST as an attach failure and the
503 // supervisor falls back to nflog — non-fatal.
504 if !msg.contains("File exists") && !msg.contains("EEXIST") {
505 return Err(format!("qdisc_add_clsact({iface}) failed: {msg}"));
506 }
507 }
508
509 // Step 2 — locate the program. Missing-program is the most common
510 // outcome when the BPF object was built without this classifier name
511 // exported; surface a stable string so the caller's loop can move on.
512 let program_handle = bpf
513 .program_mut(program_name)
514 .ok_or_else(|| format!("program {program_name:?} not found in BPF object"))?;
515 let program: &mut SchedClassifier =
516 program_handle
517 .try_into()
518 .map_err(|e: aya::programs::ProgramError| {
519 format!("program {program_name:?} is not a SchedClassifier: {e}")
520 })?;
521
522 // Step 3 — verifier-side load. Idempotent inside aya: calling load()
523 // twice on the same program returns `AlreadyLoaded`, which we treat
524 // as benign so callers retrying after a transient attach error
525 // don't have to track per-program state.
526 if let Err(e) = program.load() {
527 let msg = format!("{e}");
528 if !msg.contains("AlreadyLoaded") && !msg.contains("already loaded") {
529 return Err(format!("SchedClassifier::load failed: {msg}"));
530 }
531 }
532
533 // Step 4 — attach to egress. The LinkId is owned by `program` (i.e.
534 // by the parent `aya::Ebpf` object) and is dropped when the monitor
535 // is stopped.
536 program
537 .attach(iface, TcAttachType::Egress)
538 .map_err(|e| format!("SchedClassifier::attach({iface}, egress) failed: {e}"))?;
539
540 Ok(())
541}
542
543/// Wire layout of one BPF-side `FlowEvent` (must mirror
544/// `crates/cellos-supervisor-ebpf/src/main.rs`).
545///
546/// Layout summary (40 bytes total, `#[repr(C)]` natural alignment):
547/// - bytes 0..4 src_ip (u32, host order — BPF program converts from
548/// big-endian before writing)
549/// - bytes 4..8 dst_ip (u32, host order)
550/// - bytes 8..10 src_port (u16, host order)
551/// - bytes 10..12 dst_port (u16, host order)
552/// - byte 12 proto (u8, IANA proto byte)
553/// - bytes 13..16 _pad (3 bytes, ignored)
554/// - bytes 16..24 pkt_count (u64)
555/// - bytes 24..32 first_seen_ns (u64)
556/// - byte 32 verdict (u8)
557/// - bytes 33..40 _pad (7 bytes, ignored)
558///
559/// Total: 40 bytes. This constant pins the parser against accidental
560/// drift between the BPF crate's `FlowEvent` and the host-side decoder.
561#[cfg(all(target_os = "linux", feature = "ebpf-aya"))]
562const BPF_FLOW_EVENT_SIZE: usize = 40;
563
564/// Spawn the tokio task that drains the `FLOW_EVENTS` ring buffer.
565///
566/// We use `take_map` rather than `map_mut` so the resulting `RingBuf`
567/// owns its `MapData` and can move into a `spawn_blocking` closure
568/// without holding a `&mut aya::Ebpf` across the await point. The
569/// supervisor's `aya::Ebpf` retains every OTHER map (notably `FLOWS`)
570/// and the loaded program; only the ring buffer migrates ownership.
571///
572/// The drainer loop:
573/// 1. Calls `ring_buf.next()`. On `Some(item)`, parses the bytes via
574/// `parse_ring_buf_event` and forwards the resulting `FlowEvent`
575/// down `tx`. If the receiver has been dropped (caller stopped the
576/// monitor), the send fails and the task exits cleanly.
577/// 2. On `None`, sleeps 100 µs and retries. This is intentionally
578/// coarse: ring-buffer polling is a hot loop and a tight spin would
579/// waste a worker thread, while a longer sleep would inflate
580/// first-flow latency. 100 µs gives ~10k poll iterations/sec which
581/// comfortably tracks any realistic per-cell flow rate.
582///
583/// The returned `JoinHandle` is held by `EbpfFlowMonitor::drainer` so
584/// `stop()` can `abort()` the task deterministically.
585#[cfg(all(target_os = "linux", feature = "ebpf-aya"))]
586fn spawn_ring_buffer_drainer(
587 bpf: &mut aya::Ebpf,
588 tx: tokio::sync::mpsc::UnboundedSender<FlowEvent>,
589) -> Result<tokio::task::JoinHandle<()>, String> {
590 use aya::maps::RingBuf;
591
592 // The BPF crate names the map `FLOW_EVENTS`. aya 0.13 exposes both
593 // the original lowercase name and the macro-stamped uppercase name
594 // through the symbol table; we try the canonical name first, then
595 // fall back to the legacy lowercase form so a BPF crate rebuild
596 // with a different naming convention doesn't break the supervisor.
597 let map = bpf
598 .take_map("FLOW_EVENTS")
599 .or_else(|| bpf.take_map("flow_events"))
600 .ok_or_else(|| "FLOW_EVENTS map not found in BPF object".to_string())?;
601 let mut ring_buf: RingBuf<aya::maps::MapData> =
602 RingBuf::try_from(map).map_err(|e| format!("FLOW_EVENTS map is not a ring buffer: {e}"))?;
603
604 let handle = tokio::task::spawn_blocking(move || {
605 // Tight-but-yielding poll loop. We deliberately do NOT register
606 // the ring-buffer FD with epoll/AsyncFd here: the supervisor's
607 // tokio runtime is a multi-threaded reactor and the cost of one
608 // dedicated blocking thread per cell is far smaller than the
609 // contract surface of an AsyncFd-based wakeup path (which would
610 // require holding `&mut RingBuf` across `.await`). 100 µs sleeps
611 // bound the busy-poll overhead while keeping new-flow latency
612 // well under 1 ms.
613 loop {
614 match ring_buf.next() {
615 Some(item) => {
616 if let Some(event) = parse_ring_buf_event(&item) {
617 if tx.send(event).is_err() {
618 // Receiver dropped — monitor stopped. Exit.
619 return;
620 }
621 }
622 // Item drops here, advancing the consumer position.
623 }
624 None => {
625 std::thread::sleep(std::time::Duration::from_micros(100));
626 }
627 }
628 }
629 });
630
631 Ok(handle)
632}
633
634/// Decode the BPF-side `FlowEvent` byte layout into the host-side
635/// [`FlowEvent`].
636///
637/// Returns `None` if the slice is shorter than the expected 40 bytes
638/// (a truncated ring-buffer write should never happen in practice, but
639/// we treat it as a soft failure rather than a panic so a single
640/// malformed event cannot crash the supervisor).
641///
642/// The BPF program writes `src_ip` / `dst_ip` / ports in HOST byte order
643/// (it applies `u32::from_be` / `u16::from_be` before writing into the
644/// map key — see `try_track` in the BPF crate), so we read them with
645/// native-endian decoding here.
646///
647/// ## Relationship to [`decode_ring_buf_event`]
648///
649/// This function is the LIVE-PATH adapter the Linux+aya drainer calls
650/// per ring-buffer slot. It returns `Option<FlowEvent>` (silent drop on
651/// short reads) because the drainer cannot afford a typed error per
652/// slot on the hot path.
653///
654/// The platform-independent decoder [`decode_ring_buf_event`] returns a
655/// typed [`RingBufFlowEvent`] + [`RingBufParseError`] and is used by
656/// the unit-test suite (which runs on every host, including macOS),
657/// plus any future caller that needs the full kernel-side layout
658/// (pkt_count, first_seen_ns, verdict — fields this adapter discards).
659/// Both decoders agree on the wire layout; the layout-pin test
660/// `ring_buf_event_layout_constant_matches_producer` keeps them in
661/// lockstep against the BPF crate's `#[repr(C)]` producer struct.
662#[cfg(all(target_os = "linux", feature = "ebpf-aya"))]
663fn parse_ring_buf_event(bytes: &[u8]) -> Option<FlowEvent> {
664 use std::net::Ipv4Addr;
665
666 if bytes.len() < BPF_FLOW_EVENT_SIZE {
667 return None;
668 }
669
670 // FlowKey — bytes 0..16.
671 let src_ip = u32::from_ne_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]);
672 let dst_ip = u32::from_ne_bytes([bytes[4], bytes[5], bytes[6], bytes[7]]);
673 let _src_port = u16::from_ne_bytes([bytes[8], bytes[9]]);
674 let dst_port = u16::from_ne_bytes([bytes[10], bytes[11]]);
675 let proto = bytes[12];
676 // bytes 13..16: padding, ignored.
677
678 // FlowEntry — bytes 16..40. Decoded but currently unused:
679 // - pkt_count (bytes 16..24): observability metadata; the host-side
680 // `FlowEvent` shape carries `byte_count`, not `packet_count`, and
681 // the BPF program does not track bytes today. Leaving both `None`
682 // beats fabricating a value.
683 // - first_seen_ns (bytes 24..32): the FlowEvent wire shape has no
684 // timestamp field today.
685 // - verdict (byte 32): always 0 from the BPF program (userspace
686 // will write it for analytics later).
687 let _pkt_count = u64::from_ne_bytes([
688 bytes[16], bytes[17], bytes[18], bytes[19], bytes[20], bytes[21], bytes[22], bytes[23],
689 ]);
690
691 // Map IANA proto byte to the protocol string the
692 // `network_flow_decision` schema expects. Anything outside the BPF
693 // program's filter (TCP/UDP only) becomes `None` defensively.
694 let protocol = match proto {
695 6 => Some("tcp".to_string()),
696 17 => Some("udp".to_string()),
697 1 => Some("icmp".to_string()),
698 58 => Some("icmpv6".to_string()),
699 _ => None,
700 };
701
702 let _src_addr = Ipv4Addr::from(src_ip); // not exposed on FlowEvent; kept
703 // for future src-side surfacing
704 let dst_addr = Ipv4Addr::from(dst_ip);
705
706 Some(FlowEvent {
707 direction: NetworkFlowDirection::Egress,
708 // The BPF classifier is OBSERVABILITY ONLY (always returns
709 // TC_ACT_OK) — every event it emits represents a packet the
710 // kernel actually let through. Map that to `Allow`. Real
711 // nftables drops surface via the separate nflog path with
712 // `REASON_NFLOG_MATCH`.
713 decision: NetworkFlowDecisionOutcome::Allow,
714 reason_code: REASON_EBPF_XDP_DROP,
715 dst_addr: Some(dst_addr.to_string()),
716 dst_port: Some(dst_port),
717 protocol,
718 // byte_count: the BPF program counts packets, not bytes, so we
719 // surface neither today. A future BPF-side change can add a
720 // bytes_total field to FlowEntry and we'd thread it through
721 // here.
722 byte_count: None,
723 })
724}
725
726// ────────────────────────────────────────────────────────────────
727// Platform-independent ring-buffer event decoder
728// ────────────────────────────────────────────────────────────────
729//
730// The Linux+aya path above ([`parse_ring_buf_event`]) is the
731// production adapter the drainer calls per slot. It is cfg-gated so it
732// only exists on Linux builds with the `ebpf-aya` feature on, and it
733// returns `Option<FlowEvent>` shaped for the live channel.
734//
735// This second decoder is the unit-testable, cross-platform companion.
736// It decodes the SAME 40-byte `#[repr(C)]` record from the BPF
737// producer crate (`cellos-supervisor-ebpf/src/main.rs`) but:
738//
739// - Compiles on every target (no cfg gate, no aya dependency).
740// - Returns a typed `RingBufFlowEvent` exposing every field the
741// kernel writes (including `pkt_count`, `first_seen_ns`, `verdict`,
742// `src_port` — fields the live adapter discards today).
743// - Surfaces malformed slots as a typed `RingBufParseError` rather
744// than a silent `None`, so future call sites that care about
745// layout-drift detection can act on it.
746// - Has a layout-pin test (`ring_buf_event_layout_constant_matches_producer`)
747// that fails loudly if the producer's 40-byte assumption shifts —
748// catching the drift on every CI host, not just Linux runners with
749// the aya feature enabled.
750//
751// The two decoders share the wire layout; only the call shape differs.
752// A future refactor can collapse them once the live adapter wants
753// access to the full FlowEntry surface (timestamps, verdict, pkt
754// counts).
755//
756// Byte layout (native endian — the kernel program normalises wire
757// bytes via `u32::from_be` / `u16::from_be` before insert, so the
758// ring-buffer values are already host-order):
759//
760// FlowKey (16 bytes)
761// 0..4 : src_ip (u32, host order)
762// 4..8 : dst_ip (u32, host order)
763// 8..10 : src_port (u16, host order)
764// 10..12 : dst_port (u16, host order)
765// 12 : proto (u8; IANA: 6=TCP, 17=UDP)
766// 13..16 : _pad ([u8; 3], zeroed by the producer)
767// FlowEntry (24 bytes)
768// 16..24 : pkt_count (u64, host order)
769// 24..32 : first_seen_ns (u64, host order)
770// 32 : verdict (u8; 0=unknown, 1=accept, 2=drop)
771// 33..40 : _pad ([u8; 7], zeroed)
772
773/// Size in bytes of the on-the-ring-buffer `FlowEvent` record. Tied to
774/// the `#[repr(C)]` layout in `cellos-supervisor-ebpf/src/main.rs`. If
775/// the producer ever changes shape, this constant + the decoder must
776/// move together — the unit tests below pin the exact byte offsets.
777pub const RING_BUF_EVENT_LEN: usize = 40;
778
779/// IANA protocol numbers we know how to label. Anything else is
780/// surfaced as `None` so the downstream event keeps its `protocol`
781/// field empty (rather than lying about a TCP/UDP that isn't either).
782const IPPROTO_TCP: u8 = 6;
783const IPPROTO_UDP: u8 = 17;
784const IPPROTO_ICMP: u8 = 1;
785const IPPROTO_ICMPV6: u8 = 58;
786
787/// Decoded ring-buffer record. Mirrors the kernel-side `FlowEvent`
788/// 1:1 but as plain Rust types: callers that don't care about the eBPF
789/// shape get a portable, copy-only struct.
790#[derive(Debug, Clone, Copy, PartialEq, Eq)]
791pub struct RingBufFlowEvent {
792 pub src_ip: u32,
793 pub dst_ip: u32,
794 pub src_port: u16,
795 pub dst_port: u16,
796 pub proto: u8,
797 pub pkt_count: u64,
798 pub first_seen_ns: u64,
799 pub verdict: u8,
800}
801
802/// Decode failure: the slice didn't have the canonical 40-byte shape.
803/// The caller (drainer task) logs once and drops the malformed record —
804/// surfacing a kernel-protocol mismatch is more useful than letting a
805/// torn read silently inflate the flow count.
806#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
807pub enum RingBufParseError {
808 #[error(
809 "ring-buffer record is {got} bytes, expected exactly {RING_BUF_EVENT_LEN}; \
810 kernel producer ABI drift or torn read"
811 )]
812 WrongLength { got: usize },
813}
814
815/// Decode a single 40-byte ring-buffer slot into a [`RingBufFlowEvent`].
816///
817/// The slot is treated as the contiguous `#[repr(C)]` byte image of
818/// the eBPF `FlowEvent { key: FlowKey, entry: FlowEntry }`. All
819/// multi-byte ints are host-endian (the eBPF program byte-swaps wire
820/// values before inserting).
821///
822/// This is a pure function with no IO and no aya dependency, so it
823/// builds and tests identically on Linux and macOS. The live-path
824/// drainer uses the cfg-gated [`parse_ring_buf_event`] adapter (above)
825/// for its hot path; this decoder is for unit tests and any future
826/// caller that wants the full kernel field surface.
827pub fn decode_ring_buf_event(bytes: &[u8]) -> Result<RingBufFlowEvent, RingBufParseError> {
828 if bytes.len() != RING_BUF_EVENT_LEN {
829 return Err(RingBufParseError::WrongLength { got: bytes.len() });
830 }
831 // Safe: slice length is exactly RING_BUF_EVENT_LEN, every fixed
832 // offset below is bounded by it.
833 let src_ip = u32::from_ne_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]);
834 let dst_ip = u32::from_ne_bytes([bytes[4], bytes[5], bytes[6], bytes[7]]);
835 let src_port = u16::from_ne_bytes([bytes[8], bytes[9]]);
836 let dst_port = u16::from_ne_bytes([bytes[10], bytes[11]]);
837 let proto = bytes[12];
838 // bytes[13..16] are explicit padding; we ignore by contract.
839 let pkt_count = u64::from_ne_bytes([
840 bytes[16], bytes[17], bytes[18], bytes[19], bytes[20], bytes[21], bytes[22], bytes[23],
841 ]);
842 let first_seen_ns = u64::from_ne_bytes([
843 bytes[24], bytes[25], bytes[26], bytes[27], bytes[28], bytes[29], bytes[30], bytes[31],
844 ]);
845 let verdict = bytes[32];
846 // bytes[33..40] are explicit padding.
847 Ok(RingBufFlowEvent {
848 src_ip,
849 dst_ip,
850 src_port,
851 dst_port,
852 proto,
853 pkt_count,
854 first_seen_ns,
855 verdict,
856 })
857}
858
859/// Translate a decoded [`RingBufFlowEvent`] into the supervisor-facing
860/// [`FlowEvent`] shape consumed by the [`EbpfFlowMonitor`] channel.
861///
862/// IPv4 is rendered as dotted-quad. The kernel-side program filters out
863/// non-IPv4 traffic, so we can format `src_ip`/`dst_ip` as `Ipv4Addr`
864/// without losing information.
865pub fn ring_buf_event_to_flow_event(decoded: &RingBufFlowEvent) -> FlowEvent {
866 use std::net::Ipv4Addr;
867
868 let protocol = match decoded.proto {
869 IPPROTO_TCP => Some("tcp".to_string()),
870 IPPROTO_UDP => Some("udp".to_string()),
871 IPPROTO_ICMP => Some("icmp".to_string()),
872 IPPROTO_ICMPV6 => Some("icmpv6".to_string()),
873 _ => None,
874 };
875 // The kernel program is observability-only; verdict==2 ("drop") is
876 // operator-written analytics, not enforcement. We map the three
877 // documented values to `Allow` / `Deny`; the default (0 = unknown)
878 // mirrors what the proxy + nft sees on the wire (allowed by
879 // default, denials are nft-driven elsewhere).
880 let decision = match decoded.verdict {
881 2 => NetworkFlowDecisionOutcome::Deny,
882 _ => NetworkFlowDecisionOutcome::Allow,
883 };
884 FlowEvent {
885 direction: NetworkFlowDirection::Egress,
886 decision,
887 reason_code: REASON_EBPF_XDP_DROP,
888 dst_addr: Some(Ipv4Addr::from(decoded.dst_ip).to_string()),
889 dst_port: Some(decoded.dst_port),
890 protocol,
891 byte_count: None, // ebpf program tracks pkt_count, not bytes
892 }
893}
894
895// ────────────────────────────────────────────────────────────────
896// Test-only shim: a constructible EbpfFlowMonitor for unit tests
897// that need to exercise the `drain` / `stop` surface without
898// depending on a real BPF program. NOT for production use.
899// ────────────────────────────────────────────────────────────
900
901#[cfg(test)]
902impl EbpfFlowMonitor {
903 /// Construct a monitor from a pre-built receiver. Test-only.
904 pub(crate) fn from_parts_for_test(
905 rx: tokio::sync::mpsc::UnboundedReceiver<FlowEvent>,
906 iface: &str,
907 ) -> Self {
908 Self {
909 #[cfg(all(target_os = "linux", feature = "ebpf-aya"))]
910 _bpf: None,
911 rx,
912 drainer: None,
913 iface: iface.to_string(),
914 }
915 }
916}
917
918#[cfg(test)]
919mod tests {
920 use super::*;
921
922 fn sample_event() -> FlowEvent {
923 FlowEvent {
924 direction: NetworkFlowDirection::Egress,
925 decision: NetworkFlowDecisionOutcome::Deny,
926 reason_code: REASON_EBPF_XDP_DROP,
927 dst_addr: Some("10.0.0.1".into()),
928 dst_port: Some(443),
929 protocol: Some("tcp".into()),
930 byte_count: Some(74),
931 }
932 }
933
934 // ── original scaffold tests (preserved) ───────────────────────────
935
936 #[test]
937 fn env_flag_default_off() {
938 let prev = std::env::var(PER_FLOW_EBPF_ENV).ok();
939 std::env::remove_var(PER_FLOW_EBPF_ENV);
940 assert!(!is_per_flow_ebpf_enabled());
941 if let Some(v) = prev {
942 std::env::set_var(PER_FLOW_EBPF_ENV, v);
943 }
944 }
945
946 #[test]
947 fn env_flag_recognises_one() {
948 let prev = std::env::var(PER_FLOW_EBPF_ENV).ok();
949 std::env::set_var(PER_FLOW_EBPF_ENV, "1");
950 assert!(is_per_flow_ebpf_enabled());
951 match prev {
952 Some(v) => std::env::set_var(PER_FLOW_EBPF_ENV, v),
953 None => std::env::remove_var(PER_FLOW_EBPF_ENV),
954 }
955 }
956
957 #[test]
958 fn env_flag_rejects_truthy_lookalikes() {
959 let prev = std::env::var(PER_FLOW_EBPF_ENV).ok();
960 for bad in ["true", "yes", "on", "TRUE", "0", "", "2"] {
961 std::env::set_var(PER_FLOW_EBPF_ENV, bad);
962 assert!(
963 !is_per_flow_ebpf_enabled(),
964 "value {bad:?} must not enable Phase 2"
965 );
966 }
967 match prev {
968 Some(v) => std::env::set_var(PER_FLOW_EBPF_ENV, v),
969 None => std::env::remove_var(PER_FLOW_EBPF_ENV),
970 }
971 }
972
973 #[test]
974 fn noop_listener_yields_no_events() {
975 let mut listener = NoopFlowListener;
976 let events = listener.drain_events();
977 assert!(events.is_empty());
978 }
979
980 #[test]
981 fn noop_listener_drain_is_idempotent() {
982 let mut listener = NoopFlowListener;
983 for _ in 0..5 {
984 assert!(listener.drain_events().is_empty());
985 }
986 }
987
988 #[test]
989 fn noop_listener_backend_name_is_noop() {
990 let listener = NoopFlowListener;
991 assert_eq!(listener.backend_name(), "noop");
992 }
993
994 #[test]
995 fn build_default_listener_is_noop() {
996 let mut listener = build_default_listener();
997 assert_eq!(listener.backend_name(), "noop");
998 assert!(listener.drain_events().is_empty());
999 }
1000
1001 #[test]
1002 fn translator_preserves_direction_and_outcome() {
1003 let event = sample_event();
1004 let payload = flow_event_to_network_flow_decision(
1005 &event,
1006 "cell-test",
1007 "run-test",
1008 "dec-test",
1009 None,
1010 None,
1011 None,
1012 None,
1013 "2026-05-06T00:00:00Z",
1014 );
1015 assert_eq!(payload.direction, NetworkFlowDirection::Egress);
1016 assert_eq!(payload.decision, NetworkFlowDecisionOutcome::Deny);
1017 assert_eq!(payload.reason_code, REASON_EBPF_XDP_DROP);
1018 }
1019
1020 #[test]
1021 fn translator_stamps_caller_metadata() {
1022 let event = sample_event();
1023 let payload = flow_event_to_network_flow_decision(
1024 &event,
1025 "cell-1",
1026 "run-1",
1027 "dec-1",
1028 Some("sha256:abc"),
1029 Some("kid-1"),
1030 Some("issuer-1"),
1031 Some("corr-1"),
1032 "2026-05-06T12:00:00Z",
1033 );
1034 assert_eq!(payload.cell_id, "cell-1");
1035 assert_eq!(payload.run_id, "run-1");
1036 assert_eq!(payload.decision_id, "dec-1");
1037 assert_eq!(payload.policy_digest.as_deref(), Some("sha256:abc"));
1038 assert_eq!(payload.keyset_id.as_deref(), Some("kid-1"));
1039 assert_eq!(payload.issuer_kid.as_deref(), Some("issuer-1"));
1040 assert_eq!(payload.correlation_id.as_deref(), Some("corr-1"));
1041 assert_eq!(payload.observed_at, "2026-05-06T12:00:00Z");
1042 }
1043
1044 #[test]
1045 fn translator_omits_metadata_when_caller_passes_none() {
1046 let event = sample_event();
1047 let payload = flow_event_to_network_flow_decision(
1048 &event,
1049 "cell-1",
1050 "run-1",
1051 "dec-1",
1052 None,
1053 None,
1054 None,
1055 None,
1056 "2026-05-06T12:00:00Z",
1057 );
1058 assert!(payload.policy_digest.is_none());
1059 assert!(payload.keyset_id.is_none());
1060 assert!(payload.issuer_kid.is_none());
1061 assert!(payload.correlation_id.is_none());
1062 assert!(payload.nft_rule_ref.is_none());
1063 }
1064
1065 #[test]
1066 fn translator_carries_l3_l4_fields() {
1067 let event = sample_event();
1068 let payload = flow_event_to_network_flow_decision(
1069 &event,
1070 "cell-1",
1071 "run-1",
1072 "dec-1",
1073 None,
1074 None,
1075 None,
1076 None,
1077 "2026-05-06T12:00:00Z",
1078 );
1079 assert_eq!(payload.dst_addr.as_deref(), Some("10.0.0.1"));
1080 assert_eq!(payload.dst_port, Some(443));
1081 assert_eq!(payload.protocol.as_deref(), Some("tcp"));
1082 assert_eq!(payload.byte_count, Some(74));
1083 assert_eq!(payload.packet_count, Some(1));
1084 }
1085
1086 #[test]
1087 fn translator_handles_event_without_byte_count() {
1088 let mut event = sample_event();
1089 event.byte_count = None;
1090 let payload = flow_event_to_network_flow_decision(
1091 &event,
1092 "cell-1",
1093 "run-1",
1094 "dec-1",
1095 None,
1096 None,
1097 None,
1098 None,
1099 "2026-05-06T12:00:00Z",
1100 );
1101 assert!(payload.byte_count.is_none());
1102 assert!(payload.packet_count.is_none());
1103 }
1104
1105 #[test]
1106 fn translator_handles_nflog_reason_code() {
1107 let mut event = sample_event();
1108 event.reason_code = REASON_NFLOG_MATCH;
1109 event.decision = NetworkFlowDecisionOutcome::Allow;
1110 let payload = flow_event_to_network_flow_decision(
1111 &event,
1112 "cell-1",
1113 "run-1",
1114 "dec-1",
1115 None,
1116 None,
1117 None,
1118 None,
1119 "2026-05-06T12:00:00Z",
1120 );
1121 assert_eq!(payload.reason_code, REASON_NFLOG_MATCH);
1122 assert_eq!(payload.decision, NetworkFlowDecisionOutcome::Allow);
1123 }
1124
1125 #[test]
1126 fn translator_payload_round_trips_through_serde_json() {
1127 let event = sample_event();
1128 let payload = flow_event_to_network_flow_decision(
1129 &event,
1130 "cell-1",
1131 "run-1",
1132 "dec-1",
1133 Some("sha256:abc"),
1134 Some("kid-1"),
1135 Some("issuer-1"),
1136 Some("corr-1"),
1137 "2026-05-06T12:00:00Z",
1138 );
1139 let json = serde_json::to_string(&payload).expect("serialize");
1140 let round_tripped: NetworkFlowDecision = serde_json::from_str(&json).expect("deserialize");
1141 assert_eq!(round_tripped, payload);
1142 }
1143
1144 #[test]
1145 fn reason_codes_are_distinct_from_phase1_codes() {
1146 for code in [REASON_EBPF_XDP_DROP, REASON_NFLOG_MATCH] {
1147 assert!(
1148 !code.starts_with("nft_"),
1149 "Phase 2 code {code} must not collide with Phase 1 nft_* prefix"
1150 );
1151 }
1152 }
1153
1154 // ── E7-4 EbpfFlowMonitor tests ───────────────────────────────────
1155
1156 #[cfg(not(target_os = "linux"))]
1157 #[tokio::test]
1158 async fn start_is_unsupported_on_non_linux() {
1159 // Pass a sentinel fd; the macOS code path never touches it.
1160 let res = EbpfFlowMonitor::start(-1, "tap0").await;
1161 match res {
1162 Err(EbpfMonitorError::Unsupported { host }) => {
1163 assert!(
1164 !host.is_empty(),
1165 "Unsupported error must name the host platform"
1166 );
1167 }
1168 Err(other) => panic!("expected Unsupported, got {other:?}"),
1169 Ok(_) => panic!("monitor must not construct on non-Linux"),
1170 }
1171 }
1172
1173 #[cfg(all(target_os = "linux", not(feature = "ebpf-aya")))]
1174 #[tokio::test]
1175 async fn start_returns_feature_disabled_when_feature_off() {
1176 let res = EbpfFlowMonitor::start(-1, "tap0").await;
1177 assert!(
1178 matches!(res, Err(EbpfMonitorError::FeatureDisabled)),
1179 "must report FeatureDisabled when ebpf-aya cargo feature is off"
1180 );
1181 }
1182
1183 #[tokio::test]
1184 async fn drain_yields_empty_when_no_events_buffered() {
1185 let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<FlowEvent>();
1186 let mut monitor = EbpfFlowMonitor::from_parts_for_test(rx, "tap0");
1187 let batch = monitor.drain();
1188 assert!(batch.is_empty(), "fresh monitor has no buffered events");
1189 }
1190
1191 #[tokio::test]
1192 async fn drain_yields_events_pushed_through_channel() {
1193 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<FlowEvent>();
1194 let mut monitor = EbpfFlowMonitor::from_parts_for_test(rx, "tap0");
1195 tx.send(sample_event()).expect("send ok");
1196 tx.send(sample_event()).expect("send ok");
1197 let batch = monitor.drain();
1198 assert_eq!(batch.len(), 2, "drain must pull both events synchronously");
1199 }
1200
1201 #[tokio::test]
1202 async fn drain_is_non_blocking() {
1203 // No sender pushes; drain must return immediately, not park.
1204 let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<FlowEvent>();
1205 let mut monitor = EbpfFlowMonitor::from_parts_for_test(rx, "tap0");
1206 let start = std::time::Instant::now();
1207 let batch = monitor.drain();
1208 let elapsed = start.elapsed();
1209 assert!(batch.is_empty());
1210 assert!(
1211 elapsed < std::time::Duration::from_millis(50),
1212 "drain must be non-blocking; took {elapsed:?}"
1213 );
1214 }
1215
1216 #[tokio::test]
1217 async fn stop_is_idempotent_when_no_drainer_present() {
1218 let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<FlowEvent>();
1219 let monitor = EbpfFlowMonitor::from_parts_for_test(rx, "tap0");
1220 // Should complete without panicking even though `drainer` is None.
1221 monitor.stop().await;
1222 }
1223
1224 // ── decode_ring_buf_event tests ────────────────────────────────────
1225 //
1226 // Pin the exact byte offsets the kernel-side
1227 // `cellos-supervisor-ebpf/src/main.rs` producer emits. If the
1228 // producer's `#[repr(C)]` layout ever shifts, these tests fail in
1229 // place rather than letting a torn record silently inflate the
1230 // supervisor's flow count.
1231 //
1232 // These tests target the platform-independent
1233 // [`decode_ring_buf_event`] decoder, which exists on every host. The
1234 // cfg-gated live-path adapter [`parse_ring_buf_event`] is exercised
1235 // implicitly: both decoders share the layout pinned by
1236 // `ring_buf_event_layout_constant_matches_producer`.
1237
1238 fn canonical_record() -> [u8; RING_BUF_EVENT_LEN] {
1239 let mut buf = [0u8; RING_BUF_EVENT_LEN];
1240 // src_ip = 10.0.0.1 (host order u32 = 0x0A000001 LE-bytes; we
1241 // round-trip through `to_ne_bytes` so the test passes on any
1242 // host endianness).
1243 let src_ip: u32 = u32::from(std::net::Ipv4Addr::new(10, 0, 0, 1));
1244 let dst_ip: u32 = u32::from(std::net::Ipv4Addr::new(192, 0, 2, 5));
1245 buf[0..4].copy_from_slice(&src_ip.to_ne_bytes());
1246 buf[4..8].copy_from_slice(&dst_ip.to_ne_bytes());
1247 buf[8..10].copy_from_slice(&44_321u16.to_ne_bytes()); // src_port
1248 buf[10..12].copy_from_slice(&443u16.to_ne_bytes()); // dst_port
1249 buf[12] = 6; // proto = TCP
1250 // bytes[13..16] = padding, left zero
1251 buf[16..24].copy_from_slice(&7_777u64.to_ne_bytes()); // pkt_count
1252 buf[24..32].copy_from_slice(&123_456_789u64.to_ne_bytes()); // first_seen_ns
1253 buf[32] = 1; // verdict = accept
1254 // bytes[33..40] = padding, left zero
1255 buf
1256 }
1257
1258 #[test]
1259 fn decode_ring_buf_event_round_trips_canonical_record() {
1260 let buf = canonical_record();
1261 let decoded = decode_ring_buf_event(&buf).expect("canonical 40-byte record decodes");
1262 assert_eq!(
1263 decoded.src_ip,
1264 u32::from(std::net::Ipv4Addr::new(10, 0, 0, 1))
1265 );
1266 assert_eq!(
1267 decoded.dst_ip,
1268 u32::from(std::net::Ipv4Addr::new(192, 0, 2, 5))
1269 );
1270 assert_eq!(decoded.src_port, 44_321);
1271 assert_eq!(decoded.dst_port, 443);
1272 assert_eq!(decoded.proto, 6);
1273 assert_eq!(decoded.pkt_count, 7_777);
1274 assert_eq!(decoded.first_seen_ns, 123_456_789);
1275 assert_eq!(decoded.verdict, 1);
1276 }
1277
1278 #[test]
1279 fn decode_ring_buf_event_rejects_short_record() {
1280 let buf = [0u8; RING_BUF_EVENT_LEN - 1];
1281 match decode_ring_buf_event(&buf) {
1282 Err(RingBufParseError::WrongLength { got }) => {
1283 assert_eq!(got, RING_BUF_EVENT_LEN - 1);
1284 }
1285 other => panic!("expected WrongLength, got {other:?}"),
1286 }
1287 }
1288
1289 #[test]
1290 fn decode_ring_buf_event_rejects_long_record() {
1291 let buf = [0u8; RING_BUF_EVENT_LEN + 8];
1292 assert!(matches!(
1293 decode_ring_buf_event(&buf),
1294 Err(RingBufParseError::WrongLength { got }) if got == RING_BUF_EVENT_LEN + 8
1295 ));
1296 }
1297
1298 #[test]
1299 fn decode_ring_buf_event_ignores_padding_bytes() {
1300 // Padding bytes in [13..16) and [33..40) are explicitly defined
1301 // as "zero by producer" but the decoder must not error on
1302 // non-zero padding — the eBPF verifier zeros them, but a future
1303 // producer that doesn't might still surface useful records.
1304 let mut buf = canonical_record();
1305 buf[13..16].copy_from_slice(&[0xff, 0xff, 0xff]);
1306 buf[33..40].copy_from_slice(&[0xaa; 7]);
1307 let decoded = decode_ring_buf_event(&buf).expect("non-zero padding must decode");
1308 // Non-padding fields unaffected.
1309 assert_eq!(decoded.proto, 6);
1310 assert_eq!(decoded.verdict, 1);
1311 }
1312
1313 #[test]
1314 fn ring_buf_event_to_flow_event_renders_tcp_dst() {
1315 let decoded = decode_ring_buf_event(&canonical_record()).unwrap();
1316 let ev = ring_buf_event_to_flow_event(&decoded);
1317 assert_eq!(ev.direction, NetworkFlowDirection::Egress);
1318 assert_eq!(ev.decision, NetworkFlowDecisionOutcome::Allow);
1319 assert_eq!(ev.protocol.as_deref(), Some("tcp"));
1320 assert_eq!(ev.dst_addr.as_deref(), Some("192.0.2.5"));
1321 assert_eq!(ev.dst_port, Some(443));
1322 }
1323
1324 #[test]
1325 fn ring_buf_event_to_flow_event_maps_verdict_drop_to_deny() {
1326 let mut buf = canonical_record();
1327 buf[32] = 2; // verdict = drop
1328 let decoded = decode_ring_buf_event(&buf).unwrap();
1329 let ev = ring_buf_event_to_flow_event(&decoded);
1330 assert_eq!(ev.decision, NetworkFlowDecisionOutcome::Deny);
1331 }
1332
1333 #[test]
1334 fn ring_buf_event_to_flow_event_maps_unknown_proto_to_none() {
1335 let mut buf = canonical_record();
1336 buf[12] = 99; // unknown IANA proto
1337 let decoded = decode_ring_buf_event(&buf).unwrap();
1338 let ev = ring_buf_event_to_flow_event(&decoded);
1339 assert!(ev.protocol.is_none(), "unknown proto must not lie");
1340 }
1341
1342 #[test]
1343 fn ring_buf_event_to_flow_event_maps_udp() {
1344 let mut buf = canonical_record();
1345 buf[12] = 17; // UDP
1346 let decoded = decode_ring_buf_event(&buf).unwrap();
1347 let ev = ring_buf_event_to_flow_event(&decoded);
1348 assert_eq!(ev.protocol.as_deref(), Some("udp"));
1349 }
1350
1351 #[test]
1352 fn ring_buf_event_layout_constant_matches_producer() {
1353 // Defensive: if anyone widens the kernel-side FlowEvent struct
1354 // without updating either host-side parser, this assertion makes
1355 // the mismatch loud at compile-test time rather than at runtime
1356 // on a Linux deployment.
1357 //
1358 // Producer (cellos-supervisor-ebpf): 16 (FlowKey) + 24 (FlowEntry) = 40.
1359 assert_eq!(RING_BUF_EVENT_LEN, 40);
1360 // Linux+aya path uses a parallel `BPF_FLOW_EVENT_SIZE` constant.
1361 // The two MUST agree — the only difference is which platforms
1362 // they compile on, not what they describe.
1363 #[cfg(all(target_os = "linux", feature = "ebpf-aya"))]
1364 assert_eq!(RING_BUF_EVENT_LEN, BPF_FLOW_EVENT_SIZE);
1365 }
1366
1367 #[test]
1368 fn ebpf_monitor_error_display_strings_are_useful() {
1369 // Defensive: operators see these strings in their logs when a
1370 // cell falls back to nflog. They must name the failure mode
1371 // unambiguously so paging on "eBPF flow monitor error" is
1372 // actionable rather than mysterious.
1373 let unsupported = EbpfMonitorError::Unsupported { host: "macos" };
1374 assert!(format!("{unsupported}").contains("Linux-only"));
1375
1376 let disabled = EbpfMonitorError::FeatureDisabled;
1377 assert!(format!("{disabled}").contains("ebpf-aya"));
1378
1379 let missing = EbpfMonitorError::ObjectMissing {
1380 path: "/tmp/x.o".to_string(),
1381 };
1382 assert!(format!("{missing}").contains("/tmp/x.o"));
1383
1384 let attach = EbpfMonitorError::AttachFailed {
1385 iface: "tap0".to_string(),
1386 message: "ENODEV".to_string(),
1387 };
1388 let s = format!("{attach}");
1389 assert!(s.contains("tap0"));
1390 assert!(s.contains("ENODEV"));
1391 }
1392}
1393
1394// ────────────────────────────────────────────────────────────────────────
1395// L5-15 — Connection-tracking flow accumulator
1396// ────────────────────────────────────────────────────────────────────────
1397//
1398// The types above (`FlowEvent`, `FlowEventListener`, `NoopFlowListener`,
1399// `flow_event_to_network_flow_decision`) are the Phase 2 nflog-shaped
1400// scaffolding — one event = one observed packet/decision, designed to ride
1401// the existing `network_flow_decision` v1 wire schema.
1402//
1403// L5-15 needs a *different* shape: open/close signals per 5-tuple so the
1404// supervisor can count UNIQUE connections that were actually exercised by
1405// the workload, and stamp that count into the `HomeostasisSignal`'s
1406// `exercised_egress_connections` field. This sub-module ships the new
1407// types under their own namespace to avoid colliding with the Phase 2
1408// scaffold's `FlowEvent`, which has additive `reason_code` semantics this
1409// counter doesn't need.
1410//
1411// The accumulator is intentionally minimal: a `HashSet<FlowKey>` of opened
1412// flows. When `CELLOS_PER_FLOW_REALTIME=1` is on and a real backend (eBPF
1413// cgroup_skb or nflog connection tracker) feeds it Opened events, the
1414// homeostasis emit site reads `unique_flow_count()` and stamps it into the
1415// signal. When the env var is off OR no backend is wired, the supervisor
1416// falls back to `None` + `telemetry_pending_L2-04` reason, preserving the
1417// E2-06 contract.
1418pub mod connection_tracking {
1419 use std::collections::HashSet;
1420 use std::net::IpAddr;
1421
1422 /// 5-tuple uniquely identifying an L4 flow. Used as the hash key for
1423 /// counting unique connections exercised by the workload.
1424 ///
1425 /// `protocol` is a small numeric (IANA proto byte: 6 = TCP, 17 = UDP,
1426 /// 1 = ICMP, 58 = ICMPv6) rather than a `String` so the hash key stays
1427 /// cheap to compute and copy. Source addr/port are included so a
1428 /// workload that opens N parallel connections to the same `(dst, port)`
1429 /// is counted as N flows, matching what `exercised_egress_connections`
1430 /// promises to consumers ("connections", not "destinations").
1431 #[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
1432 pub struct FlowKey {
1433 pub src_addr: IpAddr,
1434 pub src_port: u16,
1435 pub dst_addr: IpAddr,
1436 pub dst_port: u16,
1437 pub protocol: u8,
1438 }
1439
1440 /// Open/close phase of a connection-tracked flow.
1441 ///
1442 /// `Closed` carries the observed packet + byte counts so future taudit
1443 /// passes can correlate flow lifespan with declared egress rules.
1444 /// The accumulator below only consumes `Opened` today; `Closed` is
1445 /// kept on the type so a backend implementation can emit both phases
1446 /// without us churning the enum later.
1447 #[derive(Debug, Clone)]
1448 pub enum FlowEventKind {
1449 Opened,
1450 Closed { pkt_count: u64, byte_count: u64 },
1451 }
1452
1453 /// One observation a connection-tracking backend (eBPF cgroup_skb or
1454 /// nflog with conntrack) surfaces to the supervisor.
1455 ///
1456 /// Distinct from [`super::FlowEvent`] (the Phase 2 nflog scaffold's
1457 /// packet-decision shape). The two are deliberately namespaced so the
1458 /// supervisor can wire one or both without ambiguity.
1459 #[derive(Debug, Clone)]
1460 pub struct FlowEvent {
1461 pub key: FlowKey,
1462 pub kind: FlowEventKind,
1463 /// Monotonic-clock nanoseconds when the event was observed.
1464 /// Kept as `u64` to match `clock_gettime(CLOCK_MONOTONIC)` /
1465 /// `bpf_ktime_get_ns()` semantics — kernel-side clock, not wall.
1466 pub timestamp_ns: u64,
1467 }
1468
1469 /// Accumulates unique opened flows for the lifetime of one cell run.
1470 ///
1471 /// `record()` is idempotent on the `FlowKey` — the same 5-tuple
1472 /// observed twice counts as one flow, matching what the homeostasis
1473 /// `exercised_egress_connections` field promises consumers (a count of
1474 /// distinct connections actually exercised, not packet count).
1475 ///
1476 /// Constructed once per `Supervisor::run` when
1477 /// `CELLOS_PER_FLOW_REALTIME=1` is on; consumed at the homeostasis
1478 /// emit site after `child.wait()` returns.
1479 #[derive(Debug, Default)]
1480 pub struct FlowAccumulator {
1481 opened: HashSet<FlowKey>,
1482 }
1483
1484 impl FlowAccumulator {
1485 pub fn new() -> Self {
1486 Self::default()
1487 }
1488
1489 /// Record a flow event. Only `Opened` events affect the unique
1490 /// count today; `Closed` is silently consumed so a future
1491 /// implementation can stamp byte/packet aggregates without a
1492 /// breaking API change.
1493 pub fn record(&mut self, event: &FlowEvent) {
1494 if matches!(event.kind, FlowEventKind::Opened) {
1495 self.opened.insert(event.key);
1496 }
1497 }
1498
1499 /// Number of unique 5-tuples observed in the `Opened` phase so far.
1500 ///
1501 /// Returns `u64` because real workloads can plausibly open more
1502 /// than `u32::MAX / 2` flows over a long-lived run; the supervisor
1503 /// saturates to `u32::MAX` at the homeostasis emit site since the
1504 /// wire schema's field is `u32`. Saturation, not truncation, is
1505 /// the safe choice: a count claiming "exactly u32::MAX" is closer
1506 /// to truth than wrapping to 0.
1507 pub fn unique_flow_count(&self) -> u64 {
1508 self.opened.len() as u64
1509 }
1510 }
1511
1512 #[cfg(test)]
1513 mod tests {
1514 use super::*;
1515 use std::net::Ipv4Addr;
1516
1517 fn key(src_port: u16, dst_octet: u8, dst_port: u16) -> FlowKey {
1518 FlowKey {
1519 src_addr: IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)),
1520 src_port,
1521 dst_addr: IpAddr::V4(Ipv4Addr::new(192, 0, 2, dst_octet)),
1522 dst_port,
1523 protocol: 6, // TCP
1524 }
1525 }
1526
1527 fn opened_event(k: FlowKey) -> FlowEvent {
1528 FlowEvent {
1529 key: k,
1530 kind: FlowEventKind::Opened,
1531 timestamp_ns: 0,
1532 }
1533 }
1534
1535 #[test]
1536 fn empty_accumulator_reports_zero() {
1537 let acc = FlowAccumulator::new();
1538 assert_eq!(acc.unique_flow_count(), 0);
1539 }
1540
1541 #[test]
1542 fn opened_event_increments_count() {
1543 let mut acc = FlowAccumulator::new();
1544 acc.record(&opened_event(key(40000, 1, 443)));
1545 assert_eq!(acc.unique_flow_count(), 1);
1546 }
1547
1548 #[test]
1549 fn duplicate_opened_is_idempotent() {
1550 // Same 5-tuple observed twice MUST count as one flow — the
1551 // homeostasis semantics are "distinct connections," not "open
1552 // events." A retransmitted SYN or a backend re-emitting an
1553 // event MUST NOT inflate the count.
1554 let mut acc = FlowAccumulator::new();
1555 let k = key(40000, 1, 443);
1556 acc.record(&opened_event(k));
1557 acc.record(&opened_event(k));
1558 assert_eq!(acc.unique_flow_count(), 1);
1559 }
1560
1561 #[test]
1562 fn distinct_flows_count_independently() {
1563 let mut acc = FlowAccumulator::new();
1564 acc.record(&opened_event(key(40000, 1, 443)));
1565 acc.record(&opened_event(key(40001, 1, 443))); // different src_port
1566 acc.record(&opened_event(key(40000, 2, 443))); // different dst_addr
1567 acc.record(&opened_event(key(40000, 1, 80))); // different dst_port
1568 assert_eq!(acc.unique_flow_count(), 4);
1569 }
1570
1571 #[test]
1572 fn closed_event_does_not_increment_count() {
1573 // `Closed` is observed for symmetry/future use but MUST NOT
1574 // affect the unique-flow count — the count is keyed off the
1575 // `Opened` phase.
1576 let mut acc = FlowAccumulator::new();
1577 let k = key(40000, 1, 443);
1578 acc.record(&FlowEvent {
1579 key: k,
1580 kind: FlowEventKind::Closed {
1581 pkt_count: 10,
1582 byte_count: 1500,
1583 },
1584 timestamp_ns: 1,
1585 });
1586 assert_eq!(acc.unique_flow_count(), 0);
1587 }
1588
1589 #[test]
1590 fn closed_after_opened_preserves_count() {
1591 let mut acc = FlowAccumulator::new();
1592 let k = key(40000, 1, 443);
1593 acc.record(&opened_event(k));
1594 acc.record(&FlowEvent {
1595 key: k,
1596 kind: FlowEventKind::Closed {
1597 pkt_count: 5,
1598 byte_count: 500,
1599 },
1600 timestamp_ns: 2,
1601 });
1602 assert_eq!(acc.unique_flow_count(), 1);
1603 }
1604 }
1605}