#![allow(dead_code)]
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use cellos_core::{NetworkFlowDecision, NetworkFlowDecisionOutcome, NetworkFlowDirection};
pub const ENV_PER_FLOW_EBPF: &str = "CELLOS_FIRECRACKER_PER_FLOW_EBPF";
pub const ENV_PER_FLOW_REALTIME: &str = "CELLOS_PER_FLOW_REALTIME";
pub const ENV_PER_FLOW_NFLOG_GROUP: &str = "CELLOS_FIRECRACKER_PER_FLOW_NFLOG_GROUP";
pub const ENV_PER_FLOW_BACKEND: &str = "CELLOS_FIRECRACKER_PER_FLOW_BACKEND";
pub const ENV_PER_FLOW_BACKEND_E7: &str = "CELLOS_PER_FLOW_BACKEND";
pub const DEFAULT_NFLOG_GROUP: u16 = 100;
pub const LOG_PREFIX_BASE: &str = "cellos-flow";
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PerFlowBackend {
Nflog,
Ebpf,
}
pub struct PerFlowActivation {
pub cell_id: String,
pub run_id: String,
pub nflog_group: u16,
pub backend: PerFlowBackend,
pub policy_digest: Option<String>,
pub keyset_id: Option<String>,
pub issuer_kid: Option<String>,
}
pub fn build_activation_from_env(
cell_id: &str,
run_id: &str,
policy_digest: Option<String>,
keyset_id: Option<String>,
issuer_kid: Option<String>,
) -> Option<PerFlowActivation> {
let ebpf_on = std::env::var(ENV_PER_FLOW_EBPF).as_deref() == Ok("1");
let realtime_on = std::env::var(ENV_PER_FLOW_REALTIME).as_deref() == Ok("1");
if !ebpf_on && !realtime_on {
return None;
}
let nflog_group = std::env::var(ENV_PER_FLOW_NFLOG_GROUP)
.ok()
.and_then(|s| s.trim().parse::<u16>().ok())
.unwrap_or(DEFAULT_NFLOG_GROUP);
let backend = select_backend_from_env();
Some(PerFlowActivation {
cell_id: cell_id.to_string(),
run_id: run_id.to_string(),
nflog_group,
backend,
policy_digest,
keyset_id,
issuer_kid,
})
}
pub fn select_backend_from_env() -> PerFlowBackend {
let legacy = std::env::var(ENV_PER_FLOW_BACKEND).ok();
let canonical = std::env::var(ENV_PER_FLOW_BACKEND_E7).ok();
let resolved = legacy.as_deref().or(canonical.as_deref()).unwrap_or("");
match resolved {
"ebpf" => PerFlowBackend::Ebpf,
_ => PerFlowBackend::Nflog,
}
}
pub fn augment_ruleset_with_log_actions(ruleset: &str, group: u16) -> String {
let mut out = String::with_capacity(ruleset.len() + 64);
for (idx, line) in ruleset.lines().enumerate() {
if idx > 0 {
out.push('\n');
}
let augmented = augment_line(line, group);
out.push_str(&augmented);
}
if ruleset.ends_with('\n') && !out.ends_with('\n') {
out.push('\n');
}
out
}
fn augment_line(line: &str, group: u16) -> String {
let trimmed = line.trim_start();
let leading_ws_len = line.len() - trimmed.len();
if trimmed.contains("log group ") {
return line.to_string();
}
if trimmed.starts_with("oif \"lo\" accept") {
return line.to_string();
}
if trimmed.starts_with("policy ")
|| trimmed.starts_with("type filter hook")
|| trimmed.starts_with("chain ")
|| trimmed.starts_with("table ")
|| trimmed == "}"
|| trimmed == "{"
|| trimmed.is_empty()
{
return line.to_string();
}
let last_tok = trimmed.split_whitespace().last().unwrap_or("");
let verdict = match last_tok {
"accept" => "accept",
"drop" => "drop",
_ => return line.to_string(),
};
let prefix = &line[..leading_ws_len];
let body_without_verdict = trimmed
.rsplit_once(char::is_whitespace)
.map(|(head, _)| head)
.unwrap_or("");
format!(
"{prefix}{body_without_verdict} log group {group} prefix \"{LOG_PREFIX_BASE} {verdict}\" {verdict}"
)
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DecodedNflog {
pub prefix: String,
pub payload: Vec<u8>,
}
#[derive(Debug, thiserror::Error)]
pub enum NflogDecodeError {
#[error("nflog datagram too short ({0} bytes)")]
TooShort(usize),
#[error("nflog datagram missing required attributes")]
MissingAttrs,
}
pub(crate) const NFULA_PACKET_HDR: u16 = 1;
pub(crate) const NFULA_PAYLOAD: u16 = 9;
pub(crate) const NFULA_PREFIX: u16 = 10;
pub(crate) const NFULNL_CFG_CMD_BIND: u8 = 1;
pub(crate) const NFULNL_CFG_CMD_PF_BIND: u8 = 4;
pub(crate) const NFULNL_CFG_CMD_PF_UNBIND: u8 = 5;
pub(crate) const NFULNL_COPY_PACKET: u8 = 2;
pub(crate) const NFNL_SUBSYS_ULOG: u8 = 4;
pub(crate) const NFULNL_MSG_PACKET: u8 = 0;
pub(crate) const NFULNL_MSG_CONFIG: u8 = 1;
pub(crate) const AF_NETLINK_LITERAL: i32 = 16;
pub(crate) const NETLINK_NETFILTER_LITERAL: i32 = 12;
pub fn decode_nflog_datagram(body: &[u8]) -> Result<DecodedNflog, NflogDecodeError> {
if body.len() < 4 {
return Err(NflogDecodeError::TooShort(body.len()));
}
let mut prefix: Option<String> = None;
let mut payload: Option<Vec<u8>> = None;
let mut cursor = 0usize;
while cursor + 4 <= body.len() {
let nla_len = u16::from_ne_bytes([body[cursor], body[cursor + 1]]) as usize;
let nla_type = u16::from_ne_bytes([body[cursor + 2], body[cursor + 3]]);
if nla_len < 4 || cursor + nla_len > body.len() {
break;
}
let value_start = cursor + 4;
let value_end = cursor + nla_len;
let value = &body[value_start..value_end];
match nla_type {
NFULA_PREFIX => {
let s = value
.iter()
.position(|b| *b == 0)
.map(|n| &value[..n])
.unwrap_or(value);
prefix = Some(String::from_utf8_lossy(s).into_owned());
}
NFULA_PAYLOAD => {
payload = Some(value.to_vec());
}
_ => {}
}
cursor += (nla_len + 3) & !3;
}
let prefix = prefix.ok_or(NflogDecodeError::MissingAttrs)?;
Ok(DecodedNflog {
prefix,
payload: payload.unwrap_or_default(),
})
}
#[derive(Debug, Default, Clone, PartialEq, Eq)]
pub struct FlowAttribution {
pub src_addr: Option<String>,
pub src_port: Option<u16>,
pub dst_addr: Option<String>,
pub dst_port: Option<u16>,
pub protocol: Option<String>,
pub protocol_byte: Option<u8>,
}
pub fn decode_l3_l4_attribution(payload: &[u8]) -> FlowAttribution {
if payload.is_empty() {
return FlowAttribution::default();
}
let version = payload[0] >> 4;
match version {
4 => decode_ipv4(payload),
6 => decode_ipv6(payload),
_ => FlowAttribution::default(),
}
}
fn decode_ipv4(p: &[u8]) -> FlowAttribution {
if p.len() < 20 {
return FlowAttribution::default();
}
let ihl = (p[0] & 0x0f) as usize * 4;
if ihl < 20 || p.len() < ihl {
return FlowAttribution::default();
}
let proto_byte = p[9];
let src = std::net::Ipv4Addr::new(p[12], p[13], p[14], p[15]).to_string();
let dst = std::net::Ipv4Addr::new(p[16], p[17], p[18], p[19]).to_string();
let mut attr = FlowAttribution {
src_addr: Some(src),
src_port: None,
dst_addr: Some(dst),
dst_port: None,
protocol: None,
protocol_byte: Some(proto_byte),
};
match proto_byte {
6 => {
attr.protocol = Some("tcp".to_string());
attr.src_port = parse_src_port(p, ihl);
attr.dst_port = parse_dst_port(p, ihl);
}
17 => {
attr.protocol = Some("udp".to_string());
attr.src_port = parse_src_port(p, ihl);
attr.dst_port = parse_dst_port(p, ihl);
}
1 => attr.protocol = Some("icmp".to_string()),
_ => {}
}
attr
}
fn decode_ipv6(p: &[u8]) -> FlowAttribution {
if p.len() < 40 {
return FlowAttribution::default();
}
let next_header = p[6];
let mut src_octets = [0u8; 16];
src_octets.copy_from_slice(&p[8..24]);
let src = std::net::Ipv6Addr::from(src_octets).to_string();
let mut dst_octets = [0u8; 16];
dst_octets.copy_from_slice(&p[24..40]);
let dst = std::net::Ipv6Addr::from(dst_octets).to_string();
let mut attr = FlowAttribution {
src_addr: Some(src),
src_port: None,
dst_addr: Some(dst),
dst_port: None,
protocol: None,
protocol_byte: Some(next_header),
};
match next_header {
6 => {
attr.protocol = Some("tcp".to_string());
attr.src_port = parse_src_port(p, 40);
attr.dst_port = parse_dst_port(p, 40);
}
17 => {
attr.protocol = Some("udp".to_string());
attr.src_port = parse_src_port(p, 40);
attr.dst_port = parse_dst_port(p, 40);
}
58 => attr.protocol = Some("icmp6".to_string()),
_ => {}
}
attr
}
fn parse_src_port(p: &[u8], l4_offset: usize) -> Option<u16> {
if p.len() < l4_offset + 4 {
return None;
}
Some(u16::from_be_bytes([p[l4_offset], p[l4_offset + 1]]))
}
fn parse_dst_port(p: &[u8], l4_offset: usize) -> Option<u16> {
if p.len() < l4_offset + 4 {
return None;
}
Some(u16::from_be_bytes([p[l4_offset + 2], p[l4_offset + 3]]))
}
pub fn flow_key_from_attribution(
attr: &FlowAttribution,
) -> Option<crate::ebpf_flow::connection_tracking::FlowKey> {
use crate::ebpf_flow::connection_tracking::FlowKey;
let src_addr: std::net::IpAddr = attr.src_addr.as_deref()?.parse().ok()?;
let dst_addr: std::net::IpAddr = attr.dst_addr.as_deref()?.parse().ok()?;
let src_port = attr.src_port?;
let dst_port = attr.dst_port?;
let protocol = attr.protocol_byte?;
Some(FlowKey {
src_addr,
src_port,
dst_addr,
dst_port,
protocol,
})
}
pub fn record_opened_flow(
accumulator: &std::sync::Arc<
std::sync::Mutex<crate::ebpf_flow::connection_tracking::FlowAccumulator>,
>,
key: crate::ebpf_flow::connection_tracking::FlowKey,
) {
use crate::ebpf_flow::connection_tracking::{FlowEvent, FlowEventKind};
let event = FlowEvent {
key,
kind: FlowEventKind::Opened,
timestamp_ns: 0,
};
match accumulator.lock() {
Ok(mut guard) => guard.record(&event),
Err(poisoned) => {
poisoned.into_inner().record(&event);
}
}
}
pub fn build_decision(
activation: &PerFlowActivation,
prefix: &str,
payload: &[u8],
observed_at: &str,
) -> NetworkFlowDecision {
let attribution = decode_l3_l4_attribution(payload);
let (decision, reason_code) = if prefix.contains("accept") {
(NetworkFlowDecisionOutcome::Allow, "nft_log_accept")
} else if prefix.contains("drop") {
(NetworkFlowDecisionOutcome::Deny, "nft_log_drop")
} else {
(NetworkFlowDecisionOutcome::Deny, "nft_log_unknown")
};
NetworkFlowDecision {
schema_version: "1.0.0".to_string(),
cell_id: activation.cell_id.clone(),
run_id: activation.run_id.clone(),
decision_id: uuid::Uuid::new_v4().to_string(),
direction: NetworkFlowDirection::Egress,
decision,
reason_code: reason_code.to_string(),
nft_rule_ref: None,
dst_addr: attribution.dst_addr,
dst_port: attribution.dst_port,
protocol: attribution.protocol,
packet_count: None,
byte_count: None,
policy_digest: activation.policy_digest.clone(),
keyset_id: activation.keyset_id.clone(),
issuer_kid: activation.issuer_kid.clone(),
correlation_id: None,
observed_at: observed_at.to_string(),
}
}
pub struct PerFlowListenerHandle {
pub shutdown: Arc<AtomicBool>,
#[cfg(target_os = "linux")]
pub thread: Option<std::thread::JoinHandle<PerFlowListenerStats>>,
}
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
pub struct PerFlowListenerStats {
pub datagrams_total: u64,
pub datagrams_matched: u64,
pub datagrams_decode_failed: u64,
pub events_emitted: u64,
}
impl PerFlowListenerHandle {
#[cfg(target_os = "linux")]
pub fn join(&mut self) -> Option<PerFlowListenerStats> {
let handle = self.thread.take()?;
match handle.join() {
Ok(s) => Some(s),
Err(_) => {
tracing::warn!(
target: "cellos.supervisor.per_flow",
"per-flow listener thread panicked on join"
);
None
}
}
}
#[cfg(not(target_os = "linux"))]
pub fn join(&mut self) -> Option<PerFlowListenerStats> {
None
}
}
#[cfg(target_os = "linux")]
const LISTENER_RECV_TIMEOUT_MS: i64 = 100;
#[cfg(target_os = "linux")]
pub fn spawn_per_flow_listener_in_netns(
child_pid: u32,
activation: PerFlowActivation,
sink: Arc<dyn cellos_core::ports::EventSink>,
shutdown: Arc<AtomicBool>,
accumulator: Option<
Arc<std::sync::Mutex<crate::ebpf_flow::connection_tracking::FlowAccumulator>>,
>,
) -> std::io::Result<PerFlowListenerHandle> {
use std::fs::File;
use std::os::unix::io::AsRawFd;
let netns_path = format!("/proc/{child_pid}/ns/net");
let netns_file = File::open(&netns_path)
.map_err(|e| std::io::Error::new(e.kind(), format!("open netns at {netns_path}: {e}")))?;
let runtime_handle = tokio::runtime::Handle::try_current().ok();
let shutdown_for_thread = shutdown.clone();
let thread = std::thread::Builder::new()
.name(format!("cellos-per-flow-{child_pid}"))
.spawn(move || {
let setns_rc = unsafe { libc::setns(netns_file.as_raw_fd(), libc::CLONE_NEWNET) };
if setns_rc != 0 {
let err = std::io::Error::last_os_error();
tracing::warn!(
target: "cellos.supervisor.per_flow",
error = %err,
child_pid = child_pid,
"setns(CLONE_NEWNET) failed — per-flow listener bailing"
);
return PerFlowListenerStats::default();
}
run_listener_loop(
activation,
sink,
shutdown_for_thread,
runtime_handle,
accumulator,
)
})?;
Ok(PerFlowListenerHandle {
shutdown,
thread: Some(thread),
})
}
#[cfg(not(target_os = "linux"))]
pub fn spawn_per_flow_listener_in_netns(
_child_pid: u32,
_activation: PerFlowActivation,
_sink: Arc<dyn cellos_core::ports::EventSink>,
shutdown: Arc<AtomicBool>,
_accumulator: Option<
Arc<std::sync::Mutex<crate::ebpf_flow::connection_tracking::FlowAccumulator>>,
>,
) -> std::io::Result<PerFlowListenerHandle> {
Ok(PerFlowListenerHandle { shutdown })
}
#[cfg(target_os = "linux")]
fn run_listener_loop(
activation: PerFlowActivation,
sink: Arc<dyn cellos_core::ports::EventSink>,
shutdown: Arc<AtomicBool>,
runtime_handle: Option<tokio::runtime::Handle>,
accumulator: Option<
Arc<std::sync::Mutex<crate::ebpf_flow::connection_tracking::FlowAccumulator>>,
>,
) -> PerFlowListenerStats {
use std::os::unix::io::FromRawFd;
use std::os::unix::io::OwnedFd;
let mut stats = PerFlowListenerStats::default();
let sock_fd =
unsafe { libc::socket(libc::AF_NETLINK, libc::SOCK_RAW, NETLINK_NETFILTER_LITERAL) };
if sock_fd < 0 {
let err = std::io::Error::last_os_error();
tracing::warn!(
target: "cellos.supervisor.per_flow",
error = %err,
"socket(AF_NETLINK, NETLINK_NETFILTER) failed — per-flow listener bailing"
);
return stats;
}
let _sock_guard = unsafe { OwnedFd::from_raw_fd(sock_fd) };
let mut sa: libc::sockaddr_nl = unsafe { std::mem::zeroed() };
sa.nl_family = libc::AF_NETLINK as u16;
sa.nl_pid = 0;
sa.nl_groups = 0;
let bind_rc = unsafe {
libc::bind(
sock_fd,
&sa as *const _ as *const libc::sockaddr,
std::mem::size_of::<libc::sockaddr_nl>() as u32,
)
};
if bind_rc != 0 {
let err = std::io::Error::last_os_error();
tracing::warn!(
target: "cellos.supervisor.per_flow",
error = %err,
"bind() on netlink socket failed — per-flow listener bailing"
);
return stats;
}
let tv = libc::timeval {
tv_sec: 0,
tv_usec: (LISTENER_RECV_TIMEOUT_MS * 1000) as libc::suseconds_t,
};
let _ = unsafe {
libc::setsockopt(
sock_fd,
libc::SOL_SOCKET,
libc::SO_RCVTIMEO,
&tv as *const _ as *const libc::c_void,
std::mem::size_of::<libc::timeval>() as u32,
)
};
if let Err(e) = send_nflog_cfg_pf_bind(sock_fd, libc::AF_INET as u16) {
tracing::warn!(target: "cellos.supervisor.per_flow", error=%e, "PF_BIND v4 failed");
return stats;
}
if let Err(e) = send_nflog_cfg_pf_bind(sock_fd, libc::AF_INET6 as u16) {
tracing::warn!(target: "cellos.supervisor.per_flow", error=%e, "PF_BIND v6 failed");
return stats;
}
if let Err(e) = send_nflog_cfg_bind_group(sock_fd, activation.nflog_group) {
tracing::warn!(target: "cellos.supervisor.per_flow", error=%e, "BIND group failed");
return stats;
}
if let Err(e) = send_nflog_cfg_copy_packet(sock_fd, activation.nflog_group) {
tracing::warn!(target: "cellos.supervisor.per_flow", error=%e, "COPY_PACKET failed");
return stats;
}
let mut buf = vec![0u8; 65536];
use std::sync::atomic::Ordering;
while !shutdown.load(Ordering::SeqCst) {
let n = unsafe { libc::recv(sock_fd, buf.as_mut_ptr() as *mut libc::c_void, buf.len(), 0) };
if n < 0 {
let err = std::io::Error::last_os_error();
if matches!(err.raw_os_error(), Some(libc::EAGAIN)) {
continue;
}
tracing::debug!(
target: "cellos.supervisor.per_flow",
error = %err,
"recv() error — exiting listener loop"
);
break;
}
let received = &buf[..n as usize];
stats.datagrams_total += 1;
let mut offset = 0usize;
while offset < received.len() {
if received.len() - offset < 16 {
break;
}
let nlmsg_len = u32::from_ne_bytes([
received[offset],
received[offset + 1],
received[offset + 2],
received[offset + 3],
]) as usize;
let nlmsg_type = u16::from_ne_bytes([received[offset + 4], received[offset + 5]]);
if nlmsg_len < 16 || offset + nlmsg_len > received.len() {
break;
}
let subsys = (nlmsg_type >> 8) as u8;
let msg_kind = (nlmsg_type & 0xff) as u8;
let body_start = offset + 16 + 4;
if subsys == NFNL_SUBSYS_ULOG
&& msg_kind == NFULNL_MSG_PACKET
&& body_start <= offset + nlmsg_len
{
let body = &received[body_start..offset + nlmsg_len];
match decode_nflog_datagram(body) {
Ok(decoded) => {
if decoded.prefix.starts_with(LOG_PREFIX_BASE) {
stats.datagrams_matched += 1;
if decoded.prefix.contains("accept") {
if let Some(acc) = accumulator.as_ref() {
let attribution = decode_l3_l4_attribution(&decoded.payload);
if let Some(key) = flow_key_from_attribution(&attribution) {
record_opened_flow(acc, key);
}
}
}
let now = chrono::Utc::now().to_rfc3339();
let decision = build_decision(
&activation,
&decoded.prefix,
&decoded.payload,
&now,
);
match cloud_event_v1_network_flow_decision(
"cellos-supervisor",
&now,
&decision,
) {
Ok(event) => {
if let Some(rt) = runtime_handle.as_ref() {
let sink = sink.clone();
rt.spawn(async move {
if let Err(e) = sink.emit(&event).await {
tracing::warn!(
target: "cellos.supervisor.per_flow",
error = %e,
"sink emit failed for network_flow_decision event"
);
}
});
stats.events_emitted += 1;
}
}
Err(e) => {
tracing::debug!(
target: "cellos.supervisor.per_flow",
error = %e,
"network_flow_decision envelope build failed"
);
}
}
}
}
Err(_) => {
stats.datagrams_decode_failed += 1;
}
}
}
offset += (nlmsg_len + 3) & !3;
}
}
stats
}
#[cfg(target_os = "linux")]
fn send_nflog_cfg_pf_bind(sock_fd: i32, pf: u16) -> std::io::Result<()> {
let payload = build_nfnl_cfg_msg(0, NFULNL_CFG_CMD_PF_BIND, pf);
send_netlink(sock_fd, &payload)
}
#[cfg(target_os = "linux")]
fn send_nflog_cfg_bind_group(sock_fd: i32, group: u16) -> std::io::Result<()> {
let payload = build_nfnl_cfg_msg(group, NFULNL_CFG_CMD_BIND, 0);
send_netlink(sock_fd, &payload)
}
#[cfg(target_os = "linux")]
fn send_nflog_cfg_copy_packet(sock_fd: i32, group: u16) -> std::io::Result<()> {
let mut buf = Vec::new();
let attr_type: u16 = 2; let nla_len: u16 = 4 + 4 + 4; buf.extend_from_slice(&nla_len.to_ne_bytes());
buf.extend_from_slice(&attr_type.to_ne_bytes());
buf.extend_from_slice(&0u32.to_be_bytes()); buf.push(NFULNL_COPY_PACKET);
buf.extend_from_slice(&[0u8; 3]); let envelope = build_nfnl_cfg_envelope(group, &buf);
send_netlink(sock_fd, &envelope)
}
#[cfg(target_os = "linux")]
fn build_nfnl_cfg_msg(group: u16, cmd: u8, pf: u16) -> Vec<u8> {
let mut attr = Vec::new();
let nla_len: u16 = 4 + 1 + 3; attr.extend_from_slice(&nla_len.to_ne_bytes());
attr.extend_from_slice(&1u16.to_ne_bytes()); attr.push(cmd);
attr.extend_from_slice(&[0u8; 3]); let _ = pf; build_nfnl_cfg_envelope(group, &attr)
}
#[cfg(target_os = "linux")]
fn build_nfnl_cfg_envelope(group: u16, attrs: &[u8]) -> Vec<u8> {
let total_len: u32 = 16 + 4 + attrs.len() as u32;
let mut out = Vec::with_capacity(total_len as usize);
out.extend_from_slice(&total_len.to_ne_bytes());
let nlmsg_type: u16 = ((NFNL_SUBSYS_ULOG as u16) << 8) | NFULNL_MSG_CONFIG as u16;
out.extend_from_slice(&nlmsg_type.to_ne_bytes());
let flags: u16 = (libc::NLM_F_REQUEST | libc::NLM_F_ACK) as u16;
out.extend_from_slice(&flags.to_ne_bytes());
out.extend_from_slice(&0u32.to_ne_bytes());
out.extend_from_slice(&0u32.to_ne_bytes());
out.push(0); out.push(0); out.extend_from_slice(&group.to_be_bytes()); out.extend_from_slice(attrs);
out
}
#[cfg(target_os = "linux")]
fn send_netlink(sock_fd: i32, payload: &[u8]) -> std::io::Result<()> {
let mut sa: libc::sockaddr_nl = unsafe { std::mem::zeroed() };
sa.nl_family = libc::AF_NETLINK as u16;
let n = unsafe {
libc::sendto(
sock_fd,
payload.as_ptr() as *const libc::c_void,
payload.len(),
0,
&sa as *const _ as *const libc::sockaddr,
std::mem::size_of::<libc::sockaddr_nl>() as u32,
)
};
if n < 0 {
return Err(std::io::Error::last_os_error());
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Mutex;
static ENV_LOCK: Mutex<()> = Mutex::new(());
fn sample_activation() -> PerFlowActivation {
PerFlowActivation {
cell_id: "cell-test".to_string(),
run_id: "run-uuid".to_string(),
nflog_group: 100,
backend: PerFlowBackend::Nflog,
policy_digest: Some("sha256:deadbeef".to_string()),
keyset_id: Some("keyset-1".to_string()),
issuer_kid: Some("kid-1".to_string()),
}
}
#[test]
fn build_activation_returns_none_when_env_unset() {
let _g = ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
let prev_ebpf = std::env::var(ENV_PER_FLOW_EBPF).ok();
let prev_rt = std::env::var(ENV_PER_FLOW_REALTIME).ok();
unsafe {
std::env::remove_var(ENV_PER_FLOW_EBPF);
std::env::remove_var(ENV_PER_FLOW_REALTIME);
}
let act = build_activation_from_env("c", "r", None, None, None);
assert!(act.is_none());
unsafe {
match prev_ebpf {
Some(v) => std::env::set_var(ENV_PER_FLOW_EBPF, v),
None => std::env::remove_var(ENV_PER_FLOW_EBPF),
}
match prev_rt {
Some(v) => std::env::set_var(ENV_PER_FLOW_REALTIME, v),
None => std::env::remove_var(ENV_PER_FLOW_REALTIME),
}
}
}
#[test]
fn select_backend_defaults_to_nflog() {
let _g = ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
let prev_legacy = std::env::var(ENV_PER_FLOW_BACKEND).ok();
let prev_canonical = std::env::var(ENV_PER_FLOW_BACKEND_E7).ok();
unsafe {
std::env::remove_var(ENV_PER_FLOW_BACKEND);
std::env::remove_var(ENV_PER_FLOW_BACKEND_E7);
}
assert_eq!(select_backend_from_env(), PerFlowBackend::Nflog);
unsafe {
if let Some(v) = prev_legacy {
std::env::set_var(ENV_PER_FLOW_BACKEND, v);
}
if let Some(v) = prev_canonical {
std::env::set_var(ENV_PER_FLOW_BACKEND_E7, v);
}
}
}
#[test]
fn select_backend_ebpf_returns_ebpf_variant() {
let _g = ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
let prev_legacy = std::env::var(ENV_PER_FLOW_BACKEND).ok();
let prev_canonical = std::env::var(ENV_PER_FLOW_BACKEND_E7).ok();
unsafe {
std::env::remove_var(ENV_PER_FLOW_BACKEND_E7);
std::env::set_var(ENV_PER_FLOW_BACKEND, "ebpf");
}
assert_eq!(select_backend_from_env(), PerFlowBackend::Ebpf);
unsafe {
match prev_legacy {
Some(v) => std::env::set_var(ENV_PER_FLOW_BACKEND, v),
None => std::env::remove_var(ENV_PER_FLOW_BACKEND),
}
match prev_canonical {
Some(v) => std::env::set_var(ENV_PER_FLOW_BACKEND_E7, v),
None => std::env::remove_var(ENV_PER_FLOW_BACKEND_E7),
}
}
}
#[test]
fn select_backend_honours_e7_canonical_env_name() {
let _g = ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
let prev_legacy = std::env::var(ENV_PER_FLOW_BACKEND).ok();
let prev_canonical = std::env::var(ENV_PER_FLOW_BACKEND_E7).ok();
unsafe {
std::env::remove_var(ENV_PER_FLOW_BACKEND);
std::env::set_var(ENV_PER_FLOW_BACKEND_E7, "ebpf");
}
assert_eq!(select_backend_from_env(), PerFlowBackend::Ebpf);
unsafe {
match prev_legacy {
Some(v) => std::env::set_var(ENV_PER_FLOW_BACKEND, v),
None => std::env::remove_var(ENV_PER_FLOW_BACKEND),
}
match prev_canonical {
Some(v) => std::env::set_var(ENV_PER_FLOW_BACKEND_E7, v),
None => std::env::remove_var(ENV_PER_FLOW_BACKEND_E7),
}
}
}
#[test]
fn select_backend_legacy_name_wins_over_canonical_when_both_set() {
let _g = ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
let prev_legacy = std::env::var(ENV_PER_FLOW_BACKEND).ok();
let prev_canonical = std::env::var(ENV_PER_FLOW_BACKEND_E7).ok();
unsafe {
std::env::set_var(ENV_PER_FLOW_BACKEND, "nflog");
std::env::set_var(ENV_PER_FLOW_BACKEND_E7, "ebpf");
}
assert_eq!(
select_backend_from_env(),
PerFlowBackend::Nflog,
"legacy var must win when both set"
);
unsafe {
match prev_legacy {
Some(v) => std::env::set_var(ENV_PER_FLOW_BACKEND, v),
None => std::env::remove_var(ENV_PER_FLOW_BACKEND),
}
match prev_canonical {
Some(v) => std::env::set_var(ENV_PER_FLOW_BACKEND_E7, v),
None => std::env::remove_var(ENV_PER_FLOW_BACKEND_E7),
}
}
}
#[test]
fn select_backend_unknown_value_defaults_to_nflog() {
let _g = ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
let prev_legacy = std::env::var(ENV_PER_FLOW_BACKEND).ok();
let prev_canonical = std::env::var(ENV_PER_FLOW_BACKEND_E7).ok();
unsafe {
std::env::remove_var(ENV_PER_FLOW_BACKEND_E7);
std::env::set_var(ENV_PER_FLOW_BACKEND, "xdp"); }
assert_eq!(select_backend_from_env(), PerFlowBackend::Nflog);
unsafe {
match prev_legacy {
Some(v) => std::env::set_var(ENV_PER_FLOW_BACKEND, v),
None => std::env::remove_var(ENV_PER_FLOW_BACKEND),
}
match prev_canonical {
Some(v) => std::env::set_var(ENV_PER_FLOW_BACKEND_E7, v),
None => std::env::remove_var(ENV_PER_FLOW_BACKEND_E7),
}
}
}
#[test]
fn augment_rewrites_accept_verdict() {
let input = " ip daddr 10.0.0.1 tcp dport 443 accept";
let out = augment_ruleset_with_log_actions(input, 100);
assert!(out.contains("log group 100 prefix \"cellos-flow accept\""));
assert!(out.ends_with("accept"));
}
#[test]
fn augment_rewrites_drop_verdict() {
let input = " udp dport 53 drop";
let out = augment_ruleset_with_log_actions(input, 200);
assert!(out.contains("log group 200 prefix \"cellos-flow drop\""));
assert!(out.ends_with("drop"));
}
#[test]
fn augment_skips_loopback_shortcut() {
let input = " oif \"lo\" accept";
let out = augment_ruleset_with_log_actions(input, 100);
assert_eq!(out, input);
}
#[test]
fn augment_skips_policy_lines() {
let input = " type filter hook output priority 0; policy drop;";
let out = augment_ruleset_with_log_actions(input, 100);
assert_eq!(out, input);
}
#[test]
fn augment_is_idempotent() {
let input = " ip daddr 10.0.0.1 tcp dport 443 accept";
let once = augment_ruleset_with_log_actions(input, 100);
let twice = augment_ruleset_with_log_actions(&once, 100);
assert_eq!(once, twice);
}
#[test]
fn augment_preserves_chain_structure() {
let input = "table inet cellos_test {\n chain output {\n type filter hook output priority 0; policy drop;\n oif \"lo\" accept\n udp dport 53 drop\n }\n}";
let out = augment_ruleset_with_log_actions(input, 100);
assert!(out.contains("table inet cellos_test {"));
assert!(out.contains("chain output {"));
assert!(out.contains("policy drop;"));
assert!(out.contains("oif \"lo\" accept"));
assert!(out.contains("log group 100 prefix \"cellos-flow drop\" drop"));
assert!(out.ends_with("}"));
}
#[test]
fn decode_l4_ipv4_tcp_extracts_dst() {
let mut p = vec![0u8; 40];
p[0] = 0x45;
p[9] = 6; p[16] = 10;
p[17] = 0;
p[18] = 0;
p[19] = 1;
p[22] = 0x01; p[23] = 0xbb; let attr = decode_l3_l4_attribution(&p);
assert_eq!(attr.dst_addr.as_deref(), Some("10.0.0.1"));
assert_eq!(attr.dst_port, Some(443));
assert_eq!(attr.protocol.as_deref(), Some("tcp"));
}
#[test]
fn decode_l4_ipv4_udp_extracts_dst() {
let mut p = vec![0u8; 40];
p[0] = 0x45;
p[9] = 17; p[16] = 1;
p[17] = 1;
p[18] = 1;
p[19] = 1;
p[22] = 0x00; p[23] = 0x35; let attr = decode_l3_l4_attribution(&p);
assert_eq!(attr.dst_addr.as_deref(), Some("1.1.1.1"));
assert_eq!(attr.dst_port, Some(53));
assert_eq!(attr.protocol.as_deref(), Some("udp"));
}
#[test]
fn decode_l4_ipv4_icmp() {
let mut p = vec![0u8; 28];
p[0] = 0x45;
p[9] = 1; p[16] = 8;
p[17] = 8;
p[18] = 8;
p[19] = 8;
let attr = decode_l3_l4_attribution(&p);
assert_eq!(attr.dst_addr.as_deref(), Some("8.8.8.8"));
assert_eq!(attr.protocol.as_deref(), Some("icmp"));
assert_eq!(attr.dst_port, None);
}
#[test]
fn decode_l4_ipv6_tcp_extracts_dst() {
let mut p = vec![0u8; 60];
p[0] = 0x60; p[6] = 6; p[24] = 0x20;
p[25] = 0x01;
p[26] = 0x0d;
p[27] = 0xb8;
p[39] = 0x01;
p[42] = 0x01;
p[43] = 0xbb; let attr = decode_l3_l4_attribution(&p);
assert_eq!(attr.dst_addr.as_deref(), Some("2001:db8::1"));
assert_eq!(attr.dst_port, Some(443));
assert_eq!(attr.protocol.as_deref(), Some("tcp"));
}
#[test]
fn decode_l4_ipv6_icmp6() {
let mut p = vec![0u8; 48];
p[0] = 0x60;
p[6] = 58; let attr = decode_l3_l4_attribution(&p);
assert_eq!(attr.protocol.as_deref(), Some("icmp6"));
assert_eq!(attr.dst_port, None);
}
#[test]
fn decode_l4_handles_empty_payload() {
let attr = decode_l3_l4_attribution(&[]);
assert!(attr.dst_addr.is_none());
assert!(attr.protocol.is_none());
}
#[test]
fn decode_nflog_extracts_prefix_and_payload() {
let mut buf = Vec::new();
let prefix_str = b"cellos-flow accept\0";
let nla_len: u16 = 4 + prefix_str.len() as u16;
buf.extend_from_slice(&nla_len.to_ne_bytes());
buf.extend_from_slice(&NFULA_PREFIX.to_ne_bytes());
buf.extend_from_slice(prefix_str);
while buf.len() % 4 != 0 {
buf.push(0);
}
let pkt = b"abcdef";
let nla_len2: u16 = 4 + pkt.len() as u16;
buf.extend_from_slice(&nla_len2.to_ne_bytes());
buf.extend_from_slice(&NFULA_PAYLOAD.to_ne_bytes());
buf.extend_from_slice(pkt);
while buf.len() % 4 != 0 {
buf.push(0);
}
let decoded = decode_nflog_datagram(&buf).expect("decode ok");
assert_eq!(decoded.prefix, "cellos-flow accept");
assert_eq!(decoded.payload, pkt);
}
#[test]
fn decode_nflog_round_trip_with_packet_attribution() {
let prefix_str = b"cellos-flow drop\0";
let mut packet = vec![0u8; 40];
packet[0] = 0x45;
packet[9] = 6; packet[16] = 192;
packet[17] = 0;
packet[18] = 2;
packet[19] = 1;
packet[22] = 0x01;
packet[23] = 0xbb;
let mut buf = Vec::new();
let nla_len: u16 = 4 + prefix_str.len() as u16;
buf.extend_from_slice(&nla_len.to_ne_bytes());
buf.extend_from_slice(&NFULA_PREFIX.to_ne_bytes());
buf.extend_from_slice(prefix_str);
while buf.len() % 4 != 0 {
buf.push(0);
}
let nla_len2: u16 = 4 + packet.len() as u16;
buf.extend_from_slice(&nla_len2.to_ne_bytes());
buf.extend_from_slice(&NFULA_PAYLOAD.to_ne_bytes());
buf.extend_from_slice(&packet);
while buf.len() % 4 != 0 {
buf.push(0);
}
let decoded = decode_nflog_datagram(&buf).expect("decode ok");
assert_eq!(decoded.prefix, "cellos-flow drop");
let activation = sample_activation();
let decision = build_decision(
&activation,
&decoded.prefix,
&decoded.payload,
"2026-01-01T00:00:00Z",
);
assert_eq!(decision.decision, NetworkFlowDecisionOutcome::Deny);
assert_eq!(decision.reason_code, "nft_log_drop");
assert_eq!(decision.dst_addr.as_deref(), Some("192.0.2.1"));
assert_eq!(decision.dst_port, Some(443));
assert_eq!(decision.protocol.as_deref(), Some("tcp"));
assert_eq!(decision.cell_id, "cell-test");
assert_eq!(decision.run_id, "run-uuid");
}
#[test]
fn decode_nflog_errors_on_short_input() {
let err = decode_nflog_datagram(&[0u8; 2]).expect_err("must error");
assert!(matches!(err, NflogDecodeError::TooShort(_)));
}
#[test]
fn decode_nflog_errors_on_missing_prefix() {
let pkt = vec![0u8; 8];
let mut buf = Vec::new();
let nla_len: u16 = 4 + pkt.len() as u16;
buf.extend_from_slice(&nla_len.to_ne_bytes());
buf.extend_from_slice(&NFULA_PAYLOAD.to_ne_bytes());
buf.extend_from_slice(&pkt);
let err = decode_nflog_datagram(&buf).expect_err("must error");
assert!(matches!(err, NflogDecodeError::MissingAttrs));
}
#[test]
fn build_decision_for_accept_yields_allow() {
let activation = sample_activation();
let payload = vec![0u8; 0];
let d = build_decision(&activation, "cellos-flow accept", &payload, "now");
assert_eq!(d.decision, NetworkFlowDecisionOutcome::Allow);
assert_eq!(d.reason_code, "nft_log_accept");
}
#[test]
fn build_decision_for_drop_yields_deny() {
let activation = sample_activation();
let payload = vec![0u8; 0];
let d = build_decision(&activation, "cellos-flow drop", &payload, "now");
assert_eq!(d.decision, NetworkFlowDecisionOutcome::Deny);
assert_eq!(d.reason_code, "nft_log_drop");
}
#[test]
fn build_decision_unknown_prefix_falls_back_to_deny() {
let activation = sample_activation();
let d = build_decision(&activation, "cellos-flow weirdo", &[], "now");
assert_eq!(d.decision, NetworkFlowDecisionOutcome::Deny);
assert_eq!(d.reason_code, "nft_log_unknown");
}
fn save_realtime_env() -> (Option<String>, Option<String>) {
let prev_ebpf = std::env::var(ENV_PER_FLOW_EBPF).ok();
let prev_rt = std::env::var(ENV_PER_FLOW_REALTIME).ok();
unsafe {
std::env::remove_var(ENV_PER_FLOW_EBPF);
std::env::remove_var(ENV_PER_FLOW_REALTIME);
}
(prev_ebpf, prev_rt)
}
fn restore_realtime_env(prev: (Option<String>, Option<String>)) {
unsafe {
match prev.0 {
Some(v) => std::env::set_var(ENV_PER_FLOW_EBPF, v),
None => std::env::remove_var(ENV_PER_FLOW_EBPF),
}
match prev.1 {
Some(v) => std::env::set_var(ENV_PER_FLOW_REALTIME, v),
None => std::env::remove_var(ENV_PER_FLOW_REALTIME),
}
}
}
#[test]
fn build_activation_off_when_neither_env_var_set() {
let _g = ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
let prev = save_realtime_env();
let act = build_activation_from_env("c", "r", None, None, None);
assert!(act.is_none(), "no env vars → no activation");
restore_realtime_env(prev);
}
#[test]
fn build_activation_on_via_legacy_ebpf_env() {
let _g = ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
let prev = save_realtime_env();
unsafe {
std::env::set_var(ENV_PER_FLOW_EBPF, "1");
}
let act = build_activation_from_env("cell-x", "run-x", None, None, None);
assert!(act.is_some(), "legacy _EBPF=1 must still opt in");
restore_realtime_env(prev);
}
#[test]
fn build_activation_on_via_realtime_alias() {
let _g = ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
let prev = save_realtime_env();
unsafe {
std::env::set_var(ENV_PER_FLOW_REALTIME, "1");
}
let act = build_activation_from_env("cell-x", "run-x", None, None, None);
assert!(act.is_some(), "CELLOS_PER_FLOW_REALTIME=1 must opt in");
let act = act.unwrap();
assert_eq!(act.cell_id, "cell-x");
assert_eq!(act.run_id, "run-x");
assert_eq!(act.nflog_group, DEFAULT_NFLOG_GROUP);
restore_realtime_env(prev);
}
#[test]
fn build_activation_rejects_truthy_lookalikes_for_realtime() {
let _g = ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
let prev = save_realtime_env();
for bad in ["true", "yes", "on", "TRUE", "0", "", "2"] {
unsafe {
std::env::set_var(ENV_PER_FLOW_REALTIME, bad);
}
let act = build_activation_from_env("c", "r", None, None, None);
assert!(
act.is_none(),
"value {bad:?} must not enable per-flow realtime"
);
}
restore_realtime_env(prev);
}
#[test]
fn flow_event_payload_serialises_as_cloud_event_schema_compatible_json() {
let activation = sample_activation();
let payload = vec![0u8; 0];
let decision = build_decision(
&activation,
"cellos-flow accept",
&payload,
"2026-05-16T00:00:00Z",
);
let json = serde_json::to_value(&decision).expect("serialise");
let obj = json.as_object().expect("object");
for required in [
"cellId",
"runId",
"decisionId",
"direction",
"decision",
"reasonCode",
"observedAt",
] {
assert!(
obj.contains_key(required),
"missing required field {required}; payload={json}"
);
}
assert_eq!(obj["direction"], "egress");
assert_eq!(obj["decision"], "allow");
assert_eq!(obj["reasonCode"], "nft_log_accept");
}
fn ipv4_tcp_payload(src: [u8; 4], src_port: u16, dst: [u8; 4], dst_port: u16) -> Vec<u8> {
let mut p = vec![0u8; 40];
p[0] = 0x45; p[9] = 6; p[12..16].copy_from_slice(&src);
p[16..20].copy_from_slice(&dst);
p[20..22].copy_from_slice(&src_port.to_be_bytes());
p[22..24].copy_from_slice(&dst_port.to_be_bytes());
p
}
#[test]
fn nflog_populates_flow_accumulator() {
let payload = ipv4_tcp_payload([10, 0, 0, 5], 40000, [192, 0, 2, 1], 443);
let attribution = decode_l3_l4_attribution(&payload);
let key = flow_key_from_attribution(&attribution).expect("5-tuple fully populated");
let accumulator = std::sync::Arc::new(std::sync::Mutex::new(
crate::ebpf_flow::connection_tracking::FlowAccumulator::new(),
));
record_opened_flow(&accumulator, key);
let count = accumulator
.lock()
.expect("acquire lock")
.unique_flow_count();
assert_eq!(
count, 1,
"single nflog-derived 5-tuple must yield unique_flow_count() == 1"
);
}
#[test]
fn duplicate_nflog_entries_are_deduplicated() {
let payload = ipv4_tcp_payload([10, 0, 0, 5], 40000, [192, 0, 2, 1], 443);
let attribution_a = decode_l3_l4_attribution(&payload);
let key_a = flow_key_from_attribution(&attribution_a).expect("attribution a");
let attribution_b = decode_l3_l4_attribution(&payload);
let key_b = flow_key_from_attribution(&attribution_b).expect("attribution b");
let accumulator = std::sync::Arc::new(std::sync::Mutex::new(
crate::ebpf_flow::connection_tracking::FlowAccumulator::new(),
));
record_opened_flow(&accumulator, key_a);
record_opened_flow(&accumulator, key_b);
let count = accumulator
.lock()
.expect("acquire lock")
.unique_flow_count();
assert_eq!(
count, 1,
"two nflog datagrams for the same 5-tuple must dedup to one connection"
);
}
}