#![allow(dead_code)]
use cellos_core::{NetworkFlowDecision, NetworkFlowDecisionOutcome, NetworkFlowDirection};
pub const PER_FLOW_EBPF_ENV: &str = "CELLOS_FIRECRACKER_PER_FLOW_EBPF";
pub const REASON_EBPF_XDP_DROP: &str = "ebpf_xdp_drop";
pub const REASON_NFLOG_MATCH: &str = "nflog_match";
pub fn is_per_flow_ebpf_enabled() -> bool {
std::env::var(PER_FLOW_EBPF_ENV).as_deref() == Ok("1")
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FlowEvent {
pub direction: NetworkFlowDirection,
pub decision: NetworkFlowDecisionOutcome,
pub reason_code: &'static str,
pub dst_addr: Option<String>,
pub dst_port: Option<u16>,
pub protocol: Option<String>,
pub byte_count: Option<u64>,
}
pub trait FlowEventListener: Send + Sync {
fn drain_events(&mut self) -> Vec<FlowEvent>;
fn backend_name(&self) -> &'static str;
}
#[derive(Debug, Default)]
pub struct NoopFlowListener;
impl FlowEventListener for NoopFlowListener {
fn drain_events(&mut self) -> Vec<FlowEvent> {
Vec::new()
}
fn backend_name(&self) -> &'static str {
"noop"
}
}
pub fn build_default_listener() -> Box<dyn FlowEventListener> {
Box::new(NoopFlowListener)
}
#[allow(clippy::too_many_arguments)]
pub fn flow_event_to_network_flow_decision(
event: &FlowEvent,
cell_id: &str,
run_id: &str,
decision_id: &str,
policy_digest: Option<&str>,
keyset_id: Option<&str>,
issuer_kid: Option<&str>,
correlation_id: Option<&str>,
observed_at: &str,
) -> NetworkFlowDecision {
NetworkFlowDecision {
schema_version: "1.0.0".into(),
cell_id: cell_id.to_string(),
run_id: run_id.to_string(),
decision_id: decision_id.to_string(),
direction: event.direction,
decision: event.decision,
reason_code: event.reason_code.to_string(),
nft_rule_ref: None,
dst_addr: event.dst_addr.clone(),
dst_port: event.dst_port,
protocol: event.protocol.clone(),
packet_count: event.byte_count.map(|_| 1u64),
byte_count: event.byte_count,
policy_digest: policy_digest.map(|s| s.to_string()),
keyset_id: keyset_id.map(|s| s.to_string()),
issuer_kid: issuer_kid.map(|s| s.to_string()),
correlation_id: correlation_id.map(|s| s.to_string()),
observed_at: observed_at.to_string(),
}
}
#[derive(Debug, thiserror::Error)]
pub enum EbpfMonitorError {
#[error("eBPF flow monitor is Linux-only (host platform: {host})")]
Unsupported { host: &'static str },
#[error(
"eBPF flow monitor requested but the `ebpf-aya` Cargo feature is \
not enabled on this build"
)]
FeatureDisabled,
#[error("eBPF object file missing at {path}")]
ObjectMissing { path: String },
#[error("setns(CLONE_NEWNET) into cell netns failed: {source}")]
Setns {
#[source]
source: std::io::Error,
},
#[error("aya BPF object load failed: {message}")]
LoadFailed { message: String },
#[error("tc clsact attach to {iface} failed: {message}")]
AttachFailed { iface: String, message: String },
#[error("ring-buffer map open failed: {message}")]
RingBufferOpenFailed { message: String },
}
pub struct EbpfFlowMonitor {
#[cfg(all(target_os = "linux", feature = "ebpf-aya"))]
_bpf: Option<aya::Ebpf>,
rx: tokio::sync::mpsc::UnboundedReceiver<FlowEvent>,
drainer: Option<tokio::task::JoinHandle<()>>,
iface: String,
}
impl EbpfFlowMonitor {
#[allow(unused_variables)] pub async fn start(
netns_fd: std::os::unix::io::RawFd,
tap_iface: &str,
) -> Result<Self, EbpfMonitorError> {
#[cfg(all(target_os = "linux", feature = "ebpf-aya"))]
{
Self::start_linux_aya(netns_fd, tap_iface).await
}
#[cfg(all(target_os = "linux", not(feature = "ebpf-aya")))]
{
Err(EbpfMonitorError::FeatureDisabled)
}
#[cfg(not(target_os = "linux"))]
{
tracing::debug!(
target: "cellos.supervisor.ebpf_flow",
iface = %tap_iface,
"EbpfFlowMonitor::start on non-Linux host — falling back to nflog"
);
Err(EbpfMonitorError::Unsupported {
host: std::env::consts::OS,
})
}
}
pub fn drain(&mut self) -> Vec<FlowEvent> {
let mut out = Vec::new();
while let Ok(ev) = self.rx.try_recv() {
out.push(ev);
}
out
}
pub async fn recv(&mut self) -> Option<FlowEvent> {
self.rx.recv().await
}
pub async fn stop(mut self) {
if let Some(drainer) = self.drainer.take() {
drainer.abort();
let _ = drainer.await;
}
}
#[cfg(all(target_os = "linux", feature = "ebpf-aya"))]
async fn start_linux_aya(
netns_fd: std::os::unix::io::RawFd,
tap_iface: &str,
) -> Result<Self, EbpfMonitorError> {
let object_path = std::env::var("CELLOS_EBPF_OBJECT_PATH")
.ok()
.unwrap_or_else(|| "/usr/local/lib/cellos/cellos-supervisor-ebpf.o".to_string());
let object_bytes =
std::fs::read(&object_path).map_err(|_| EbpfMonitorError::ObjectMissing {
path: object_path.clone(),
})?;
let self_netns = std::fs::File::open("/proc/self/ns/net")
.map_err(|source| EbpfMonitorError::Setns { source })?;
use std::os::unix::io::AsRawFd;
let setns_rc = unsafe { libc::setns(netns_fd, libc::CLONE_NEWNET) };
if setns_rc != 0 {
return Err(EbpfMonitorError::Setns {
source: std::io::Error::last_os_error(),
});
}
let load_result = aya::Ebpf::load(&object_bytes);
let restore_rc = unsafe { libc::setns(self_netns.as_raw_fd(), libc::CLONE_NEWNET) };
if restore_rc != 0 {
return Err(EbpfMonitorError::Setns {
source: std::io::Error::last_os_error(),
});
}
let mut bpf = load_result.map_err(|e| EbpfMonitorError::LoadFailed {
message: format!("{e}"),
})?;
const CANDIDATE_PROGRAM_NAMES: &[&str] =
&["cellos_flow_egress", "tc_egress", "flow_classifier"];
let mut attached = false;
let mut last_attach_err: Option<String> = None;
for prog_name in CANDIDATE_PROGRAM_NAMES {
match attach_tc_egress(&mut bpf, prog_name, tap_iface) {
Ok(()) => {
attached = true;
break;
}
Err(e) => {
last_attach_err = Some(e);
}
}
}
if !attached {
return Err(EbpfMonitorError::AttachFailed {
iface: tap_iface.to_string(),
message: last_attach_err
.unwrap_or_else(|| "no candidate program found".to_string()),
});
}
let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<FlowEvent>();
let drainer = spawn_ring_buffer_drainer(&mut bpf, tx)
.map_err(|e| EbpfMonitorError::RingBufferOpenFailed { message: e })?;
Ok(Self {
_bpf: Some(bpf),
rx,
drainer: Some(drainer),
iface: tap_iface.to_string(),
})
}
}
#[cfg(all(target_os = "linux", feature = "ebpf-aya"))]
fn attach_tc_egress(bpf: &mut aya::Ebpf, program_name: &str, iface: &str) -> Result<(), String> {
use aya::programs::{tc, SchedClassifier, TcAttachType};
if let Err(e) = tc::qdisc_add_clsact(iface) {
let msg = format!("{e}");
if !msg.contains("File exists") && !msg.contains("EEXIST") {
return Err(format!("qdisc_add_clsact({iface}) failed: {msg}"));
}
}
let program_handle = bpf
.program_mut(program_name)
.ok_or_else(|| format!("program {program_name:?} not found in BPF object"))?;
let program: &mut SchedClassifier =
program_handle
.try_into()
.map_err(|e: aya::programs::ProgramError| {
format!("program {program_name:?} is not a SchedClassifier: {e}")
})?;
if let Err(e) = program.load() {
let msg = format!("{e}");
if !msg.contains("AlreadyLoaded") && !msg.contains("already loaded") {
return Err(format!("SchedClassifier::load failed: {msg}"));
}
}
program
.attach(iface, TcAttachType::Egress)
.map_err(|e| format!("SchedClassifier::attach({iface}, egress) failed: {e}"))?;
Ok(())
}
#[cfg(all(target_os = "linux", feature = "ebpf-aya"))]
const BPF_FLOW_EVENT_SIZE: usize = 40;
#[cfg(all(target_os = "linux", feature = "ebpf-aya"))]
fn spawn_ring_buffer_drainer(
bpf: &mut aya::Ebpf,
tx: tokio::sync::mpsc::UnboundedSender<FlowEvent>,
) -> Result<tokio::task::JoinHandle<()>, String> {
use aya::maps::RingBuf;
let map = bpf
.take_map("FLOW_EVENTS")
.or_else(|| bpf.take_map("flow_events"))
.ok_or_else(|| "FLOW_EVENTS map not found in BPF object".to_string())?;
let mut ring_buf: RingBuf<aya::maps::MapData> =
RingBuf::try_from(map).map_err(|e| format!("FLOW_EVENTS map is not a ring buffer: {e}"))?;
let handle = tokio::task::spawn_blocking(move || {
loop {
match ring_buf.next() {
Some(item) => {
if let Some(event) = parse_ring_buf_event(&item) {
if tx.send(event).is_err() {
return;
}
}
}
None => {
std::thread::sleep(std::time::Duration::from_micros(100));
}
}
}
});
Ok(handle)
}
#[cfg(all(target_os = "linux", feature = "ebpf-aya"))]
fn parse_ring_buf_event(bytes: &[u8]) -> Option<FlowEvent> {
use std::net::Ipv4Addr;
if bytes.len() < BPF_FLOW_EVENT_SIZE {
return None;
}
let src_ip = u32::from_ne_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]);
let dst_ip = u32::from_ne_bytes([bytes[4], bytes[5], bytes[6], bytes[7]]);
let _src_port = u16::from_ne_bytes([bytes[8], bytes[9]]);
let dst_port = u16::from_ne_bytes([bytes[10], bytes[11]]);
let proto = bytes[12];
let _pkt_count = u64::from_ne_bytes([
bytes[16], bytes[17], bytes[18], bytes[19], bytes[20], bytes[21], bytes[22], bytes[23],
]);
let protocol = match proto {
6 => Some("tcp".to_string()),
17 => Some("udp".to_string()),
1 => Some("icmp".to_string()),
58 => Some("icmpv6".to_string()),
_ => None,
};
let _src_addr = Ipv4Addr::from(src_ip); let dst_addr = Ipv4Addr::from(dst_ip);
Some(FlowEvent {
direction: NetworkFlowDirection::Egress,
decision: NetworkFlowDecisionOutcome::Allow,
reason_code: REASON_EBPF_XDP_DROP,
dst_addr: Some(dst_addr.to_string()),
dst_port: Some(dst_port),
protocol,
byte_count: None,
})
}
pub const RING_BUF_EVENT_LEN: usize = 40;
const IPPROTO_TCP: u8 = 6;
const IPPROTO_UDP: u8 = 17;
const IPPROTO_ICMP: u8 = 1;
const IPPROTO_ICMPV6: u8 = 58;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct RingBufFlowEvent {
pub src_ip: u32,
pub dst_ip: u32,
pub src_port: u16,
pub dst_port: u16,
pub proto: u8,
pub pkt_count: u64,
pub first_seen_ns: u64,
pub verdict: u8,
}
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
pub enum RingBufParseError {
#[error(
"ring-buffer record is {got} bytes, expected exactly {RING_BUF_EVENT_LEN}; \
kernel producer ABI drift or torn read"
)]
WrongLength { got: usize },
}
pub fn decode_ring_buf_event(bytes: &[u8]) -> Result<RingBufFlowEvent, RingBufParseError> {
if bytes.len() != RING_BUF_EVENT_LEN {
return Err(RingBufParseError::WrongLength { got: bytes.len() });
}
let src_ip = u32::from_ne_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]);
let dst_ip = u32::from_ne_bytes([bytes[4], bytes[5], bytes[6], bytes[7]]);
let src_port = u16::from_ne_bytes([bytes[8], bytes[9]]);
let dst_port = u16::from_ne_bytes([bytes[10], bytes[11]]);
let proto = bytes[12];
let pkt_count = u64::from_ne_bytes([
bytes[16], bytes[17], bytes[18], bytes[19], bytes[20], bytes[21], bytes[22], bytes[23],
]);
let first_seen_ns = u64::from_ne_bytes([
bytes[24], bytes[25], bytes[26], bytes[27], bytes[28], bytes[29], bytes[30], bytes[31],
]);
let verdict = bytes[32];
Ok(RingBufFlowEvent {
src_ip,
dst_ip,
src_port,
dst_port,
proto,
pkt_count,
first_seen_ns,
verdict,
})
}
pub fn ring_buf_event_to_flow_event(decoded: &RingBufFlowEvent) -> FlowEvent {
use std::net::Ipv4Addr;
let protocol = match decoded.proto {
IPPROTO_TCP => Some("tcp".to_string()),
IPPROTO_UDP => Some("udp".to_string()),
IPPROTO_ICMP => Some("icmp".to_string()),
IPPROTO_ICMPV6 => Some("icmpv6".to_string()),
_ => None,
};
let decision = match decoded.verdict {
2 => NetworkFlowDecisionOutcome::Deny,
_ => NetworkFlowDecisionOutcome::Allow,
};
FlowEvent {
direction: NetworkFlowDirection::Egress,
decision,
reason_code: REASON_EBPF_XDP_DROP,
dst_addr: Some(Ipv4Addr::from(decoded.dst_ip).to_string()),
dst_port: Some(decoded.dst_port),
protocol,
byte_count: None, }
}
#[cfg(test)]
impl EbpfFlowMonitor {
pub(crate) fn from_parts_for_test(
rx: tokio::sync::mpsc::UnboundedReceiver<FlowEvent>,
iface: &str,
) -> Self {
Self {
#[cfg(all(target_os = "linux", feature = "ebpf-aya"))]
_bpf: None,
rx,
drainer: None,
iface: iface.to_string(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn sample_event() -> FlowEvent {
FlowEvent {
direction: NetworkFlowDirection::Egress,
decision: NetworkFlowDecisionOutcome::Deny,
reason_code: REASON_EBPF_XDP_DROP,
dst_addr: Some("10.0.0.1".into()),
dst_port: Some(443),
protocol: Some("tcp".into()),
byte_count: Some(74),
}
}
#[test]
fn env_flag_default_off() {
let prev = std::env::var(PER_FLOW_EBPF_ENV).ok();
std::env::remove_var(PER_FLOW_EBPF_ENV);
assert!(!is_per_flow_ebpf_enabled());
if let Some(v) = prev {
std::env::set_var(PER_FLOW_EBPF_ENV, v);
}
}
#[test]
fn env_flag_recognises_one() {
let prev = std::env::var(PER_FLOW_EBPF_ENV).ok();
std::env::set_var(PER_FLOW_EBPF_ENV, "1");
assert!(is_per_flow_ebpf_enabled());
match prev {
Some(v) => std::env::set_var(PER_FLOW_EBPF_ENV, v),
None => std::env::remove_var(PER_FLOW_EBPF_ENV),
}
}
#[test]
fn env_flag_rejects_truthy_lookalikes() {
let prev = std::env::var(PER_FLOW_EBPF_ENV).ok();
for bad in ["true", "yes", "on", "TRUE", "0", "", "2"] {
std::env::set_var(PER_FLOW_EBPF_ENV, bad);
assert!(
!is_per_flow_ebpf_enabled(),
"value {bad:?} must not enable Phase 2"
);
}
match prev {
Some(v) => std::env::set_var(PER_FLOW_EBPF_ENV, v),
None => std::env::remove_var(PER_FLOW_EBPF_ENV),
}
}
#[test]
fn noop_listener_yields_no_events() {
let mut listener = NoopFlowListener;
let events = listener.drain_events();
assert!(events.is_empty());
}
#[test]
fn noop_listener_drain_is_idempotent() {
let mut listener = NoopFlowListener;
for _ in 0..5 {
assert!(listener.drain_events().is_empty());
}
}
#[test]
fn noop_listener_backend_name_is_noop() {
let listener = NoopFlowListener;
assert_eq!(listener.backend_name(), "noop");
}
#[test]
fn build_default_listener_is_noop() {
let mut listener = build_default_listener();
assert_eq!(listener.backend_name(), "noop");
assert!(listener.drain_events().is_empty());
}
#[test]
fn translator_preserves_direction_and_outcome() {
let event = sample_event();
let payload = flow_event_to_network_flow_decision(
&event,
"cell-test",
"run-test",
"dec-test",
None,
None,
None,
None,
"2026-05-06T00:00:00Z",
);
assert_eq!(payload.direction, NetworkFlowDirection::Egress);
assert_eq!(payload.decision, NetworkFlowDecisionOutcome::Deny);
assert_eq!(payload.reason_code, REASON_EBPF_XDP_DROP);
}
#[test]
fn translator_stamps_caller_metadata() {
let event = sample_event();
let payload = flow_event_to_network_flow_decision(
&event,
"cell-1",
"run-1",
"dec-1",
Some("sha256:abc"),
Some("kid-1"),
Some("issuer-1"),
Some("corr-1"),
"2026-05-06T12:00:00Z",
);
assert_eq!(payload.cell_id, "cell-1");
assert_eq!(payload.run_id, "run-1");
assert_eq!(payload.decision_id, "dec-1");
assert_eq!(payload.policy_digest.as_deref(), Some("sha256:abc"));
assert_eq!(payload.keyset_id.as_deref(), Some("kid-1"));
assert_eq!(payload.issuer_kid.as_deref(), Some("issuer-1"));
assert_eq!(payload.correlation_id.as_deref(), Some("corr-1"));
assert_eq!(payload.observed_at, "2026-05-06T12:00:00Z");
}
#[test]
fn translator_omits_metadata_when_caller_passes_none() {
let event = sample_event();
let payload = flow_event_to_network_flow_decision(
&event,
"cell-1",
"run-1",
"dec-1",
None,
None,
None,
None,
"2026-05-06T12:00:00Z",
);
assert!(payload.policy_digest.is_none());
assert!(payload.keyset_id.is_none());
assert!(payload.issuer_kid.is_none());
assert!(payload.correlation_id.is_none());
assert!(payload.nft_rule_ref.is_none());
}
#[test]
fn translator_carries_l3_l4_fields() {
let event = sample_event();
let payload = flow_event_to_network_flow_decision(
&event,
"cell-1",
"run-1",
"dec-1",
None,
None,
None,
None,
"2026-05-06T12:00:00Z",
);
assert_eq!(payload.dst_addr.as_deref(), Some("10.0.0.1"));
assert_eq!(payload.dst_port, Some(443));
assert_eq!(payload.protocol.as_deref(), Some("tcp"));
assert_eq!(payload.byte_count, Some(74));
assert_eq!(payload.packet_count, Some(1));
}
#[test]
fn translator_handles_event_without_byte_count() {
let mut event = sample_event();
event.byte_count = None;
let payload = flow_event_to_network_flow_decision(
&event,
"cell-1",
"run-1",
"dec-1",
None,
None,
None,
None,
"2026-05-06T12:00:00Z",
);
assert!(payload.byte_count.is_none());
assert!(payload.packet_count.is_none());
}
#[test]
fn translator_handles_nflog_reason_code() {
let mut event = sample_event();
event.reason_code = REASON_NFLOG_MATCH;
event.decision = NetworkFlowDecisionOutcome::Allow;
let payload = flow_event_to_network_flow_decision(
&event,
"cell-1",
"run-1",
"dec-1",
None,
None,
None,
None,
"2026-05-06T12:00:00Z",
);
assert_eq!(payload.reason_code, REASON_NFLOG_MATCH);
assert_eq!(payload.decision, NetworkFlowDecisionOutcome::Allow);
}
#[test]
fn translator_payload_round_trips_through_serde_json() {
let event = sample_event();
let payload = flow_event_to_network_flow_decision(
&event,
"cell-1",
"run-1",
"dec-1",
Some("sha256:abc"),
Some("kid-1"),
Some("issuer-1"),
Some("corr-1"),
"2026-05-06T12:00:00Z",
);
let json = serde_json::to_string(&payload).expect("serialize");
let round_tripped: NetworkFlowDecision = serde_json::from_str(&json).expect("deserialize");
assert_eq!(round_tripped, payload);
}
#[test]
fn reason_codes_are_distinct_from_phase1_codes() {
for code in [REASON_EBPF_XDP_DROP, REASON_NFLOG_MATCH] {
assert!(
!code.starts_with("nft_"),
"Phase 2 code {code} must not collide with Phase 1 nft_* prefix"
);
}
}
#[cfg(not(target_os = "linux"))]
#[tokio::test]
async fn start_is_unsupported_on_non_linux() {
let res = EbpfFlowMonitor::start(-1, "tap0").await;
match res {
Err(EbpfMonitorError::Unsupported { host }) => {
assert!(
!host.is_empty(),
"Unsupported error must name the host platform"
);
}
Err(other) => panic!("expected Unsupported, got {other:?}"),
Ok(_) => panic!("monitor must not construct on non-Linux"),
}
}
#[cfg(all(target_os = "linux", not(feature = "ebpf-aya")))]
#[tokio::test]
async fn start_returns_feature_disabled_when_feature_off() {
let res = EbpfFlowMonitor::start(-1, "tap0").await;
assert!(
matches!(res, Err(EbpfMonitorError::FeatureDisabled)),
"must report FeatureDisabled when ebpf-aya cargo feature is off"
);
}
#[tokio::test]
async fn drain_yields_empty_when_no_events_buffered() {
let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<FlowEvent>();
let mut monitor = EbpfFlowMonitor::from_parts_for_test(rx, "tap0");
let batch = monitor.drain();
assert!(batch.is_empty(), "fresh monitor has no buffered events");
}
#[tokio::test]
async fn drain_yields_events_pushed_through_channel() {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<FlowEvent>();
let mut monitor = EbpfFlowMonitor::from_parts_for_test(rx, "tap0");
tx.send(sample_event()).expect("send ok");
tx.send(sample_event()).expect("send ok");
let batch = monitor.drain();
assert_eq!(batch.len(), 2, "drain must pull both events synchronously");
}
#[tokio::test]
async fn drain_is_non_blocking() {
let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<FlowEvent>();
let mut monitor = EbpfFlowMonitor::from_parts_for_test(rx, "tap0");
let start = std::time::Instant::now();
let batch = monitor.drain();
let elapsed = start.elapsed();
assert!(batch.is_empty());
assert!(
elapsed < std::time::Duration::from_millis(50),
"drain must be non-blocking; took {elapsed:?}"
);
}
#[tokio::test]
async fn stop_is_idempotent_when_no_drainer_present() {
let (_tx, rx) = tokio::sync::mpsc::unbounded_channel::<FlowEvent>();
let monitor = EbpfFlowMonitor::from_parts_for_test(rx, "tap0");
monitor.stop().await;
}
fn canonical_record() -> [u8; RING_BUF_EVENT_LEN] {
let mut buf = [0u8; RING_BUF_EVENT_LEN];
let src_ip: u32 = u32::from(std::net::Ipv4Addr::new(10, 0, 0, 1));
let dst_ip: u32 = u32::from(std::net::Ipv4Addr::new(192, 0, 2, 5));
buf[0..4].copy_from_slice(&src_ip.to_ne_bytes());
buf[4..8].copy_from_slice(&dst_ip.to_ne_bytes());
buf[8..10].copy_from_slice(&44_321u16.to_ne_bytes()); buf[10..12].copy_from_slice(&443u16.to_ne_bytes()); buf[12] = 6; buf[16..24].copy_from_slice(&7_777u64.to_ne_bytes()); buf[24..32].copy_from_slice(&123_456_789u64.to_ne_bytes()); buf[32] = 1; buf
}
#[test]
fn decode_ring_buf_event_round_trips_canonical_record() {
let buf = canonical_record();
let decoded = decode_ring_buf_event(&buf).expect("canonical 40-byte record decodes");
assert_eq!(
decoded.src_ip,
u32::from(std::net::Ipv4Addr::new(10, 0, 0, 1))
);
assert_eq!(
decoded.dst_ip,
u32::from(std::net::Ipv4Addr::new(192, 0, 2, 5))
);
assert_eq!(decoded.src_port, 44_321);
assert_eq!(decoded.dst_port, 443);
assert_eq!(decoded.proto, 6);
assert_eq!(decoded.pkt_count, 7_777);
assert_eq!(decoded.first_seen_ns, 123_456_789);
assert_eq!(decoded.verdict, 1);
}
#[test]
fn decode_ring_buf_event_rejects_short_record() {
let buf = [0u8; RING_BUF_EVENT_LEN - 1];
match decode_ring_buf_event(&buf) {
Err(RingBufParseError::WrongLength { got }) => {
assert_eq!(got, RING_BUF_EVENT_LEN - 1);
}
other => panic!("expected WrongLength, got {other:?}"),
}
}
#[test]
fn decode_ring_buf_event_rejects_long_record() {
let buf = [0u8; RING_BUF_EVENT_LEN + 8];
assert!(matches!(
decode_ring_buf_event(&buf),
Err(RingBufParseError::WrongLength { got }) if got == RING_BUF_EVENT_LEN + 8
));
}
#[test]
fn decode_ring_buf_event_ignores_padding_bytes() {
let mut buf = canonical_record();
buf[13..16].copy_from_slice(&[0xff, 0xff, 0xff]);
buf[33..40].copy_from_slice(&[0xaa; 7]);
let decoded = decode_ring_buf_event(&buf).expect("non-zero padding must decode");
assert_eq!(decoded.proto, 6);
assert_eq!(decoded.verdict, 1);
}
#[test]
fn ring_buf_event_to_flow_event_renders_tcp_dst() {
let decoded = decode_ring_buf_event(&canonical_record()).unwrap();
let ev = ring_buf_event_to_flow_event(&decoded);
assert_eq!(ev.direction, NetworkFlowDirection::Egress);
assert_eq!(ev.decision, NetworkFlowDecisionOutcome::Allow);
assert_eq!(ev.protocol.as_deref(), Some("tcp"));
assert_eq!(ev.dst_addr.as_deref(), Some("192.0.2.5"));
assert_eq!(ev.dst_port, Some(443));
}
#[test]
fn ring_buf_event_to_flow_event_maps_verdict_drop_to_deny() {
let mut buf = canonical_record();
buf[32] = 2; let decoded = decode_ring_buf_event(&buf).unwrap();
let ev = ring_buf_event_to_flow_event(&decoded);
assert_eq!(ev.decision, NetworkFlowDecisionOutcome::Deny);
}
#[test]
fn ring_buf_event_to_flow_event_maps_unknown_proto_to_none() {
let mut buf = canonical_record();
buf[12] = 99; let decoded = decode_ring_buf_event(&buf).unwrap();
let ev = ring_buf_event_to_flow_event(&decoded);
assert!(ev.protocol.is_none(), "unknown proto must not lie");
}
#[test]
fn ring_buf_event_to_flow_event_maps_udp() {
let mut buf = canonical_record();
buf[12] = 17; let decoded = decode_ring_buf_event(&buf).unwrap();
let ev = ring_buf_event_to_flow_event(&decoded);
assert_eq!(ev.protocol.as_deref(), Some("udp"));
}
#[test]
fn ring_buf_event_layout_constant_matches_producer() {
assert_eq!(RING_BUF_EVENT_LEN, 40);
#[cfg(all(target_os = "linux", feature = "ebpf-aya"))]
assert_eq!(RING_BUF_EVENT_LEN, BPF_FLOW_EVENT_SIZE);
}
#[test]
fn ebpf_monitor_error_display_strings_are_useful() {
let unsupported = EbpfMonitorError::Unsupported { host: "macos" };
assert!(format!("{unsupported}").contains("Linux-only"));
let disabled = EbpfMonitorError::FeatureDisabled;
assert!(format!("{disabled}").contains("ebpf-aya"));
let missing = EbpfMonitorError::ObjectMissing {
path: "/tmp/x.o".to_string(),
};
assert!(format!("{missing}").contains("/tmp/x.o"));
let attach = EbpfMonitorError::AttachFailed {
iface: "tap0".to_string(),
message: "ENODEV".to_string(),
};
let s = format!("{attach}");
assert!(s.contains("tap0"));
assert!(s.contains("ENODEV"));
}
}
pub mod connection_tracking {
use std::collections::HashSet;
use std::net::IpAddr;
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
pub struct FlowKey {
pub src_addr: IpAddr,
pub src_port: u16,
pub dst_addr: IpAddr,
pub dst_port: u16,
pub protocol: u8,
}
#[derive(Debug, Clone)]
pub enum FlowEventKind {
Opened,
Closed { pkt_count: u64, byte_count: u64 },
}
#[derive(Debug, Clone)]
pub struct FlowEvent {
pub key: FlowKey,
pub kind: FlowEventKind,
pub timestamp_ns: u64,
}
#[derive(Debug, Default)]
pub struct FlowAccumulator {
opened: HashSet<FlowKey>,
}
impl FlowAccumulator {
pub fn new() -> Self {
Self::default()
}
pub fn record(&mut self, event: &FlowEvent) {
if matches!(event.kind, FlowEventKind::Opened) {
self.opened.insert(event.key);
}
}
pub fn unique_flow_count(&self) -> u64 {
self.opened.len() as u64
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::net::Ipv4Addr;
fn key(src_port: u16, dst_octet: u8, dst_port: u16) -> FlowKey {
FlowKey {
src_addr: IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)),
src_port,
dst_addr: IpAddr::V4(Ipv4Addr::new(192, 0, 2, dst_octet)),
dst_port,
protocol: 6, }
}
fn opened_event(k: FlowKey) -> FlowEvent {
FlowEvent {
key: k,
kind: FlowEventKind::Opened,
timestamp_ns: 0,
}
}
#[test]
fn empty_accumulator_reports_zero() {
let acc = FlowAccumulator::new();
assert_eq!(acc.unique_flow_count(), 0);
}
#[test]
fn opened_event_increments_count() {
let mut acc = FlowAccumulator::new();
acc.record(&opened_event(key(40000, 1, 443)));
assert_eq!(acc.unique_flow_count(), 1);
}
#[test]
fn duplicate_opened_is_idempotent() {
let mut acc = FlowAccumulator::new();
let k = key(40000, 1, 443);
acc.record(&opened_event(k));
acc.record(&opened_event(k));
assert_eq!(acc.unique_flow_count(), 1);
}
#[test]
fn distinct_flows_count_independently() {
let mut acc = FlowAccumulator::new();
acc.record(&opened_event(key(40000, 1, 443)));
acc.record(&opened_event(key(40001, 1, 443))); acc.record(&opened_event(key(40000, 2, 443))); acc.record(&opened_event(key(40000, 1, 80))); assert_eq!(acc.unique_flow_count(), 4);
}
#[test]
fn closed_event_does_not_increment_count() {
let mut acc = FlowAccumulator::new();
let k = key(40000, 1, 443);
acc.record(&FlowEvent {
key: k,
kind: FlowEventKind::Closed {
pkt_count: 10,
byte_count: 1500,
},
timestamp_ns: 1,
});
assert_eq!(acc.unique_flow_count(), 0);
}
#[test]
fn closed_after_opened_preserves_count() {
let mut acc = FlowAccumulator::new();
let k = key(40000, 1, 443);
acc.record(&opened_event(k));
acc.record(&FlowEvent {
key: k,
kind: FlowEventKind::Closed {
pkt_count: 5,
byte_count: 500,
},
timestamp_ns: 2,
});
assert_eq!(acc.unique_flow_count(), 1);
}
}
}