1#![allow(dead_code)]
47
48use std::sync::atomic::AtomicBool;
49use std::sync::Arc;
50
51use cellos_core::{NetworkFlowDecision, NetworkFlowDecisionOutcome, NetworkFlowDirection};
52
53pub const ENV_PER_FLOW_EBPF: &str = "CELLOS_FIRECRACKER_PER_FLOW_EBPF";
61
62pub const ENV_PER_FLOW_REALTIME: &str = "CELLOS_PER_FLOW_REALTIME";
70
71pub const ENV_PER_FLOW_NFLOG_GROUP: &str = "CELLOS_FIRECRACKER_PER_FLOW_NFLOG_GROUP";
74
75pub const ENV_PER_FLOW_BACKEND: &str = "CELLOS_FIRECRACKER_PER_FLOW_BACKEND";
80
81pub const ENV_PER_FLOW_BACKEND_E7: &str = "CELLOS_PER_FLOW_BACKEND";
86
87pub const DEFAULT_NFLOG_GROUP: u16 = 100;
91
92pub const LOG_PREFIX_BASE: &str = "cellos-flow";
97
98#[derive(Debug, Clone, Copy, PartialEq, Eq)]
100pub enum PerFlowBackend {
101 Nflog,
103 Ebpf,
106}
107
108pub struct PerFlowActivation {
115 pub cell_id: String,
117 pub run_id: String,
119 pub nflog_group: u16,
122 pub backend: PerFlowBackend,
125 pub policy_digest: Option<String>,
127 pub keyset_id: Option<String>,
129 pub issuer_kid: Option<String>,
131}
132
133pub fn build_activation_from_env(
139 cell_id: &str,
140 run_id: &str,
141 policy_digest: Option<String>,
142 keyset_id: Option<String>,
143 issuer_kid: Option<String>,
144) -> Option<PerFlowActivation> {
145 let ebpf_on = std::env::var(ENV_PER_FLOW_EBPF).as_deref() == Ok("1");
151 let realtime_on = std::env::var(ENV_PER_FLOW_REALTIME).as_deref() == Ok("1");
152 if !ebpf_on && !realtime_on {
153 return None;
154 }
155 let nflog_group = std::env::var(ENV_PER_FLOW_NFLOG_GROUP)
156 .ok()
157 .and_then(|s| s.trim().parse::<u16>().ok())
158 .unwrap_or(DEFAULT_NFLOG_GROUP);
159 let backend = select_backend_from_env();
160 Some(PerFlowActivation {
161 cell_id: cell_id.to_string(),
162 run_id: run_id.to_string(),
163 nflog_group,
164 backend,
165 policy_digest,
166 keyset_id,
167 issuer_kid,
168 })
169}
170
171pub fn select_backend_from_env() -> PerFlowBackend {
193 let legacy = std::env::var(ENV_PER_FLOW_BACKEND).ok();
194 let canonical = std::env::var(ENV_PER_FLOW_BACKEND_E7).ok();
195 let resolved = legacy.as_deref().or(canonical.as_deref()).unwrap_or("");
196 match resolved {
197 "ebpf" => PerFlowBackend::Ebpf,
198 _ => PerFlowBackend::Nflog,
199 }
200}
201
202pub fn augment_ruleset_with_log_actions(ruleset: &str, group: u16) -> String {
221 let mut out = String::with_capacity(ruleset.len() + 64);
222 for (idx, line) in ruleset.lines().enumerate() {
223 if idx > 0 {
224 out.push('\n');
225 }
226 let augmented = augment_line(line, group);
227 out.push_str(&augmented);
228 }
229 if ruleset.ends_with('\n') && !out.ends_with('\n') {
232 out.push('\n');
233 }
234 out
235}
236
237fn augment_line(line: &str, group: u16) -> String {
238 let trimmed = line.trim_start();
239 let leading_ws_len = line.len() - trimmed.len();
240 if trimmed.contains("log group ") {
242 return line.to_string();
243 }
244 if trimmed.starts_with("oif \"lo\" accept") {
246 return line.to_string();
247 }
248 if trimmed.starts_with("policy ")
250 || trimmed.starts_with("type filter hook")
251 || trimmed.starts_with("chain ")
252 || trimmed.starts_with("table ")
253 || trimmed == "}"
254 || trimmed == "{"
255 || trimmed.is_empty()
256 {
257 return line.to_string();
258 }
259 let last_tok = trimmed.split_whitespace().last().unwrap_or("");
263 let verdict = match last_tok {
264 "accept" => "accept",
265 "drop" => "drop",
266 _ => return line.to_string(),
267 };
268 let prefix = &line[..leading_ws_len];
271 let body_without_verdict = trimmed
272 .rsplit_once(char::is_whitespace)
273 .map(|(head, _)| head)
274 .unwrap_or("");
275 format!(
276 "{prefix}{body_without_verdict} log group {group} prefix \"{LOG_PREFIX_BASE} {verdict}\" {verdict}"
277 )
278}
279
280#[derive(Debug, Clone, PartialEq, Eq)]
292pub struct DecodedNflog {
293 pub prefix: String,
296 pub payload: Vec<u8>,
299}
300
301#[derive(Debug, thiserror::Error)]
303pub enum NflogDecodeError {
304 #[error("nflog datagram too short ({0} bytes)")]
305 TooShort(usize),
306 #[error("nflog datagram missing required attributes")]
307 MissingAttrs,
308}
309
310pub(crate) const NFULA_PACKET_HDR: u16 = 1;
314pub(crate) const NFULA_PAYLOAD: u16 = 9;
315pub(crate) const NFULA_PREFIX: u16 = 10;
316
317pub(crate) const NFULNL_CFG_CMD_BIND: u8 = 1;
319pub(crate) const NFULNL_CFG_CMD_PF_BIND: u8 = 4;
320pub(crate) const NFULNL_CFG_CMD_PF_UNBIND: u8 = 5;
321pub(crate) const NFULNL_COPY_PACKET: u8 = 2;
322
323pub(crate) const NFNL_SUBSYS_ULOG: u8 = 4;
325pub(crate) const NFULNL_MSG_PACKET: u8 = 0;
327pub(crate) const NFULNL_MSG_CONFIG: u8 = 1;
329
330pub(crate) const AF_NETLINK_LITERAL: i32 = 16;
334pub(crate) const NETLINK_NETFILTER_LITERAL: i32 = 12;
336
337pub fn decode_nflog_datagram(body: &[u8]) -> Result<DecodedNflog, NflogDecodeError> {
348 if body.len() < 4 {
349 return Err(NflogDecodeError::TooShort(body.len()));
350 }
351 let mut prefix: Option<String> = None;
352 let mut payload: Option<Vec<u8>> = None;
353
354 let mut cursor = 0usize;
355 while cursor + 4 <= body.len() {
356 let nla_len = u16::from_ne_bytes([body[cursor], body[cursor + 1]]) as usize;
357 let nla_type = u16::from_ne_bytes([body[cursor + 2], body[cursor + 3]]);
358 if nla_len < 4 || cursor + nla_len > body.len() {
359 break;
360 }
361 let value_start = cursor + 4;
362 let value_end = cursor + nla_len;
363 let value = &body[value_start..value_end];
364 match nla_type {
365 NFULA_PREFIX => {
366 let s = value
368 .iter()
369 .position(|b| *b == 0)
370 .map(|n| &value[..n])
371 .unwrap_or(value);
372 prefix = Some(String::from_utf8_lossy(s).into_owned());
373 }
374 NFULA_PAYLOAD => {
375 payload = Some(value.to_vec());
376 }
377 _ => {}
378 }
379 cursor += (nla_len + 3) & !3;
381 }
382 let prefix = prefix.ok_or(NflogDecodeError::MissingAttrs)?;
383 Ok(DecodedNflog {
384 prefix,
385 payload: payload.unwrap_or_default(),
386 })
387}
388
389#[derive(Debug, Default, Clone, PartialEq, Eq)]
404pub struct FlowAttribution {
405 pub src_addr: Option<String>,
406 pub src_port: Option<u16>,
407 pub dst_addr: Option<String>,
408 pub dst_port: Option<u16>,
409 pub protocol: Option<String>,
410 pub protocol_byte: Option<u8>,
414}
415
416pub fn decode_l3_l4_attribution(payload: &[u8]) -> FlowAttribution {
429 if payload.is_empty() {
430 return FlowAttribution::default();
431 }
432 let version = payload[0] >> 4;
433 match version {
434 4 => decode_ipv4(payload),
435 6 => decode_ipv6(payload),
436 _ => FlowAttribution::default(),
437 }
438}
439
440fn decode_ipv4(p: &[u8]) -> FlowAttribution {
441 if p.len() < 20 {
442 return FlowAttribution::default();
443 }
444 let ihl = (p[0] & 0x0f) as usize * 4;
445 if ihl < 20 || p.len() < ihl {
446 return FlowAttribution::default();
447 }
448 let proto_byte = p[9];
449 let src = std::net::Ipv4Addr::new(p[12], p[13], p[14], p[15]).to_string();
450 let dst = std::net::Ipv4Addr::new(p[16], p[17], p[18], p[19]).to_string();
451 let mut attr = FlowAttribution {
452 src_addr: Some(src),
453 src_port: None,
454 dst_addr: Some(dst),
455 dst_port: None,
456 protocol: None,
457 protocol_byte: Some(proto_byte),
458 };
459 match proto_byte {
460 6 => {
461 attr.protocol = Some("tcp".to_string());
462 attr.src_port = parse_src_port(p, ihl);
463 attr.dst_port = parse_dst_port(p, ihl);
464 }
465 17 => {
466 attr.protocol = Some("udp".to_string());
467 attr.src_port = parse_src_port(p, ihl);
468 attr.dst_port = parse_dst_port(p, ihl);
469 }
470 1 => attr.protocol = Some("icmp".to_string()),
471 _ => {}
472 }
473 attr
474}
475
476fn decode_ipv6(p: &[u8]) -> FlowAttribution {
477 if p.len() < 40 {
478 return FlowAttribution::default();
479 }
480 let next_header = p[6];
481 let mut src_octets = [0u8; 16];
482 src_octets.copy_from_slice(&p[8..24]);
483 let src = std::net::Ipv6Addr::from(src_octets).to_string();
484 let mut dst_octets = [0u8; 16];
485 dst_octets.copy_from_slice(&p[24..40]);
486 let dst = std::net::Ipv6Addr::from(dst_octets).to_string();
487 let mut attr = FlowAttribution {
488 src_addr: Some(src),
489 src_port: None,
490 dst_addr: Some(dst),
491 dst_port: None,
492 protocol: None,
493 protocol_byte: Some(next_header),
494 };
495 match next_header {
496 6 => {
497 attr.protocol = Some("tcp".to_string());
498 attr.src_port = parse_src_port(p, 40);
499 attr.dst_port = parse_dst_port(p, 40);
500 }
501 17 => {
502 attr.protocol = Some("udp".to_string());
503 attr.src_port = parse_src_port(p, 40);
504 attr.dst_port = parse_dst_port(p, 40);
505 }
506 58 => attr.protocol = Some("icmp6".to_string()),
507 _ => {}
508 }
509 attr
510}
511
512fn parse_src_port(p: &[u8], l4_offset: usize) -> Option<u16> {
513 if p.len() < l4_offset + 4 {
514 return None;
515 }
516 Some(u16::from_be_bytes([p[l4_offset], p[l4_offset + 1]]))
517}
518
519fn parse_dst_port(p: &[u8], l4_offset: usize) -> Option<u16> {
520 if p.len() < l4_offset + 4 {
521 return None;
522 }
523 Some(u16::from_be_bytes([p[l4_offset + 2], p[l4_offset + 3]]))
524}
525
526pub fn flow_key_from_attribution(
536 attr: &FlowAttribution,
537) -> Option<crate::ebpf_flow::connection_tracking::FlowKey> {
538 use crate::ebpf_flow::connection_tracking::FlowKey;
539 let src_addr: std::net::IpAddr = attr.src_addr.as_deref()?.parse().ok()?;
540 let dst_addr: std::net::IpAddr = attr.dst_addr.as_deref()?.parse().ok()?;
541 let src_port = attr.src_port?;
542 let dst_port = attr.dst_port?;
543 let protocol = attr.protocol_byte?;
544 Some(FlowKey {
545 src_addr,
546 src_port,
547 dst_addr,
548 dst_port,
549 protocol,
550 })
551}
552
553pub fn record_opened_flow(
561 accumulator: &std::sync::Arc<
562 std::sync::Mutex<crate::ebpf_flow::connection_tracking::FlowAccumulator>,
563 >,
564 key: crate::ebpf_flow::connection_tracking::FlowKey,
565) {
566 use crate::ebpf_flow::connection_tracking::{FlowEvent, FlowEventKind};
567 let event = FlowEvent {
568 key,
569 kind: FlowEventKind::Opened,
570 timestamp_ns: 0,
571 };
572 match accumulator.lock() {
573 Ok(mut guard) => guard.record(&event),
574 Err(poisoned) => {
575 poisoned.into_inner().record(&event);
580 }
581 }
582}
583
584pub fn build_decision(
599 activation: &PerFlowActivation,
600 prefix: &str,
601 payload: &[u8],
602 observed_at: &str,
603) -> NetworkFlowDecision {
604 let attribution = decode_l3_l4_attribution(payload);
605 let (decision, reason_code) = if prefix.contains("accept") {
606 (NetworkFlowDecisionOutcome::Allow, "nft_log_accept")
607 } else if prefix.contains("drop") {
608 (NetworkFlowDecisionOutcome::Deny, "nft_log_drop")
609 } else {
610 (NetworkFlowDecisionOutcome::Deny, "nft_log_unknown")
613 };
614 NetworkFlowDecision {
615 schema_version: "1.0.0".to_string(),
616 cell_id: activation.cell_id.clone(),
617 run_id: activation.run_id.clone(),
618 decision_id: uuid::Uuid::new_v4().to_string(),
619 direction: NetworkFlowDirection::Egress,
620 decision,
621 reason_code: reason_code.to_string(),
622 nft_rule_ref: None,
623 dst_addr: attribution.dst_addr,
624 dst_port: attribution.dst_port,
625 protocol: attribution.protocol,
626 packet_count: None,
627 byte_count: None,
628 policy_digest: activation.policy_digest.clone(),
629 keyset_id: activation.keyset_id.clone(),
630 issuer_kid: activation.issuer_kid.clone(),
631 correlation_id: None,
632 observed_at: observed_at.to_string(),
633 }
634}
635
636pub struct PerFlowListenerHandle {
642 pub shutdown: Arc<AtomicBool>,
645 #[cfg(target_os = "linux")]
648 pub thread: Option<std::thread::JoinHandle<PerFlowListenerStats>>,
649}
650
651#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
653pub struct PerFlowListenerStats {
654 pub datagrams_total: u64,
656 pub datagrams_matched: u64,
659 pub datagrams_decode_failed: u64,
661 pub events_emitted: u64,
663}
664
665impl PerFlowListenerHandle {
666 #[cfg(target_os = "linux")]
668 pub fn join(&mut self) -> Option<PerFlowListenerStats> {
669 let handle = self.thread.take()?;
670 match handle.join() {
671 Ok(s) => Some(s),
672 Err(_) => {
673 tracing::warn!(
674 target: "cellos.supervisor.per_flow",
675 "per-flow listener thread panicked on join"
676 );
677 None
678 }
679 }
680 }
681
682 #[cfg(not(target_os = "linux"))]
684 pub fn join(&mut self) -> Option<PerFlowListenerStats> {
685 None
686 }
687}
688
689#[cfg(target_os = "linux")]
692const LISTENER_RECV_TIMEOUT_MS: i64 = 100;
693
694#[cfg(target_os = "linux")]
704pub fn spawn_per_flow_listener_in_netns(
705 child_pid: u32,
706 activation: PerFlowActivation,
707 sink: Arc<dyn cellos_core::ports::EventSink>,
708 shutdown: Arc<AtomicBool>,
709 accumulator: Option<
710 Arc<std::sync::Mutex<crate::ebpf_flow::connection_tracking::FlowAccumulator>>,
711 >,
712) -> std::io::Result<PerFlowListenerHandle> {
713 use std::fs::File;
714 use std::os::unix::io::AsRawFd;
715
716 let netns_path = format!("/proc/{child_pid}/ns/net");
717 let netns_file = File::open(&netns_path)
718 .map_err(|e| std::io::Error::new(e.kind(), format!("open netns at {netns_path}: {e}")))?;
719
720 let runtime_handle = tokio::runtime::Handle::try_current().ok();
721 let shutdown_for_thread = shutdown.clone();
722
723 let thread = std::thread::Builder::new()
724 .name(format!("cellos-per-flow-{child_pid}"))
725 .spawn(move || {
726 let setns_rc = unsafe { libc::setns(netns_file.as_raw_fd(), libc::CLONE_NEWNET) };
732 if setns_rc != 0 {
733 let err = std::io::Error::last_os_error();
734 tracing::warn!(
735 target: "cellos.supervisor.per_flow",
736 error = %err,
737 child_pid = child_pid,
738 "setns(CLONE_NEWNET) failed — per-flow listener bailing"
739 );
740 return PerFlowListenerStats::default();
741 }
742 run_listener_loop(
743 activation,
744 sink,
745 shutdown_for_thread,
746 runtime_handle,
747 accumulator,
748 )
749 })?;
750
751 Ok(PerFlowListenerHandle {
752 shutdown,
753 thread: Some(thread),
754 })
755}
756
757#[cfg(not(target_os = "linux"))]
760pub fn spawn_per_flow_listener_in_netns(
761 _child_pid: u32,
762 _activation: PerFlowActivation,
763 _sink: Arc<dyn cellos_core::ports::EventSink>,
764 shutdown: Arc<AtomicBool>,
765 _accumulator: Option<
766 Arc<std::sync::Mutex<crate::ebpf_flow::connection_tracking::FlowAccumulator>>,
767 >,
768) -> std::io::Result<PerFlowListenerHandle> {
769 Ok(PerFlowListenerHandle { shutdown })
770}
771
772#[cfg(target_os = "linux")]
773fn run_listener_loop(
774 activation: PerFlowActivation,
775 sink: Arc<dyn cellos_core::ports::EventSink>,
776 shutdown: Arc<AtomicBool>,
777 runtime_handle: Option<tokio::runtime::Handle>,
778 accumulator: Option<
779 Arc<std::sync::Mutex<crate::ebpf_flow::connection_tracking::FlowAccumulator>>,
780 >,
781) -> PerFlowListenerStats {
782 use std::os::unix::io::FromRawFd;
783 use std::os::unix::io::OwnedFd;
784
785 let mut stats = PerFlowListenerStats::default();
786
787 let sock_fd =
789 unsafe { libc::socket(libc::AF_NETLINK, libc::SOCK_RAW, NETLINK_NETFILTER_LITERAL) };
790 if sock_fd < 0 {
791 let err = std::io::Error::last_os_error();
792 tracing::warn!(
793 target: "cellos.supervisor.per_flow",
794 error = %err,
795 "socket(AF_NETLINK, NETLINK_NETFILTER) failed — per-flow listener bailing"
796 );
797 return stats;
798 }
799 let _sock_guard = unsafe { OwnedFd::from_raw_fd(sock_fd) };
804
805 let mut sa: libc::sockaddr_nl = unsafe { std::mem::zeroed() };
807 sa.nl_family = libc::AF_NETLINK as u16;
808 sa.nl_pid = 0;
809 sa.nl_groups = 0;
810 let bind_rc = unsafe {
811 libc::bind(
812 sock_fd,
813 &sa as *const _ as *const libc::sockaddr,
814 std::mem::size_of::<libc::sockaddr_nl>() as u32,
815 )
816 };
817 if bind_rc != 0 {
818 let err = std::io::Error::last_os_error();
819 tracing::warn!(
820 target: "cellos.supervisor.per_flow",
821 error = %err,
822 "bind() on netlink socket failed — per-flow listener bailing"
823 );
824 return stats;
825 }
826
827 let tv = libc::timeval {
830 tv_sec: 0,
831 tv_usec: (LISTENER_RECV_TIMEOUT_MS * 1000) as libc::suseconds_t,
832 };
833 let _ = unsafe {
834 libc::setsockopt(
835 sock_fd,
836 libc::SOL_SOCKET,
837 libc::SO_RCVTIMEO,
838 &tv as *const _ as *const libc::c_void,
839 std::mem::size_of::<libc::timeval>() as u32,
840 )
841 };
842
843 if let Err(e) = send_nflog_cfg_pf_bind(sock_fd, libc::AF_INET as u16) {
845 tracing::warn!(target: "cellos.supervisor.per_flow", error=%e, "PF_BIND v4 failed");
846 return stats;
847 }
848 if let Err(e) = send_nflog_cfg_pf_bind(sock_fd, libc::AF_INET6 as u16) {
849 tracing::warn!(target: "cellos.supervisor.per_flow", error=%e, "PF_BIND v6 failed");
850 return stats;
851 }
852 if let Err(e) = send_nflog_cfg_bind_group(sock_fd, activation.nflog_group) {
853 tracing::warn!(target: "cellos.supervisor.per_flow", error=%e, "BIND group failed");
854 return stats;
855 }
856 if let Err(e) = send_nflog_cfg_copy_packet(sock_fd, activation.nflog_group) {
857 tracing::warn!(target: "cellos.supervisor.per_flow", error=%e, "COPY_PACKET failed");
858 return stats;
859 }
860
861 let mut buf = vec![0u8; 65536];
863 use std::sync::atomic::Ordering;
864 while !shutdown.load(Ordering::SeqCst) {
865 let n = unsafe { libc::recv(sock_fd, buf.as_mut_ptr() as *mut libc::c_void, buf.len(), 0) };
866 if n < 0 {
867 let err = std::io::Error::last_os_error();
868 if matches!(err.raw_os_error(), Some(libc::EAGAIN)) {
871 continue;
872 }
873 tracing::debug!(
874 target: "cellos.supervisor.per_flow",
875 error = %err,
876 "recv() error — exiting listener loop"
877 );
878 break;
879 }
880 let received = &buf[..n as usize];
881 stats.datagrams_total += 1;
882 let mut offset = 0usize;
883 while offset < received.len() {
884 if received.len() - offset < 16 {
886 break;
887 }
888 let nlmsg_len = u32::from_ne_bytes([
889 received[offset],
890 received[offset + 1],
891 received[offset + 2],
892 received[offset + 3],
893 ]) as usize;
894 let nlmsg_type = u16::from_ne_bytes([received[offset + 4], received[offset + 5]]);
895 if nlmsg_len < 16 || offset + nlmsg_len > received.len() {
896 break;
897 }
898 let subsys = (nlmsg_type >> 8) as u8;
899 let msg_kind = (nlmsg_type & 0xff) as u8;
900 let body_start = offset + 16 + 4;
902 if subsys == NFNL_SUBSYS_ULOG
903 && msg_kind == NFULNL_MSG_PACKET
904 && body_start <= offset + nlmsg_len
905 {
906 let body = &received[body_start..offset + nlmsg_len];
907 match decode_nflog_datagram(body) {
908 Ok(decoded) => {
909 if decoded.prefix.starts_with(LOG_PREFIX_BASE) {
910 stats.datagrams_matched += 1;
911 if decoded.prefix.contains("accept") {
920 if let Some(acc) = accumulator.as_ref() {
921 let attribution = decode_l3_l4_attribution(&decoded.payload);
922 if let Some(key) = flow_key_from_attribution(&attribution) {
923 record_opened_flow(acc, key);
924 }
925 }
926 }
927 let now = chrono::Utc::now().to_rfc3339();
928 let decision = build_decision(
929 &activation,
930 &decoded.prefix,
931 &decoded.payload,
932 &now,
933 );
934 match cloud_event_v1_network_flow_decision(
935 "cellos-supervisor",
936 &now,
937 &decision,
938 ) {
939 Ok(event) => {
940 if let Some(rt) = runtime_handle.as_ref() {
941 let sink = sink.clone();
942 rt.spawn(async move {
943 if let Err(e) = sink.emit(&event).await {
944 tracing::warn!(
945 target: "cellos.supervisor.per_flow",
946 error = %e,
947 "sink emit failed for network_flow_decision event"
948 );
949 }
950 });
951 stats.events_emitted += 1;
952 }
953 }
954 Err(e) => {
955 tracing::debug!(
956 target: "cellos.supervisor.per_flow",
957 error = %e,
958 "network_flow_decision envelope build failed"
959 );
960 }
961 }
962 }
963 }
964 Err(_) => {
965 stats.datagrams_decode_failed += 1;
966 }
967 }
968 }
969 offset += (nlmsg_len + 3) & !3;
971 }
972 }
973 stats
974}
975
976#[cfg(target_os = "linux")]
977fn send_nflog_cfg_pf_bind(sock_fd: i32, pf: u16) -> std::io::Result<()> {
978 let payload = build_nfnl_cfg_msg(0, NFULNL_CFG_CMD_PF_BIND, pf);
980 send_netlink(sock_fd, &payload)
981}
982
983#[cfg(target_os = "linux")]
984fn send_nflog_cfg_bind_group(sock_fd: i32, group: u16) -> std::io::Result<()> {
985 let payload = build_nfnl_cfg_msg(group, NFULNL_CFG_CMD_BIND, 0);
986 send_netlink(sock_fd, &payload)
987}
988
989#[cfg(target_os = "linux")]
990fn send_nflog_cfg_copy_packet(sock_fd: i32, group: u16) -> std::io::Result<()> {
991 let mut buf = Vec::new();
993 let attr_type: u16 = 2; let nla_len: u16 = 4 + 4 + 4; buf.extend_from_slice(&nla_len.to_ne_bytes());
997 buf.extend_from_slice(&attr_type.to_ne_bytes());
998 buf.extend_from_slice(&0u32.to_be_bytes()); buf.push(NFULNL_COPY_PACKET);
1000 buf.extend_from_slice(&[0u8; 3]); let envelope = build_nfnl_cfg_envelope(group, &buf);
1002 send_netlink(sock_fd, &envelope)
1003}
1004
1005#[cfg(target_os = "linux")]
1007fn build_nfnl_cfg_msg(group: u16, cmd: u8, pf: u16) -> Vec<u8> {
1008 let mut attr = Vec::new();
1010 let nla_len: u16 = 4 + 1 + 3; attr.extend_from_slice(&nla_len.to_ne_bytes());
1012 attr.extend_from_slice(&1u16.to_ne_bytes()); attr.push(cmd);
1014 attr.extend_from_slice(&[0u8; 3]); let _ = pf; build_nfnl_cfg_envelope(group, &attr)
1017}
1018
1019#[cfg(target_os = "linux")]
1021fn build_nfnl_cfg_envelope(group: u16, attrs: &[u8]) -> Vec<u8> {
1022 let total_len: u32 = 16 + 4 + attrs.len() as u32;
1023 let mut out = Vec::with_capacity(total_len as usize);
1024 out.extend_from_slice(&total_len.to_ne_bytes());
1026 let nlmsg_type: u16 = ((NFNL_SUBSYS_ULOG as u16) << 8) | NFULNL_MSG_CONFIG as u16;
1028 out.extend_from_slice(&nlmsg_type.to_ne_bytes());
1029 let flags: u16 = (libc::NLM_F_REQUEST | libc::NLM_F_ACK) as u16;
1031 out.extend_from_slice(&flags.to_ne_bytes());
1032 out.extend_from_slice(&0u32.to_ne_bytes());
1034 out.extend_from_slice(&0u32.to_ne_bytes());
1035 out.push(0); out.push(0); out.extend_from_slice(&group.to_be_bytes()); out.extend_from_slice(attrs);
1040 out
1041}
1042
1043#[cfg(target_os = "linux")]
1044fn send_netlink(sock_fd: i32, payload: &[u8]) -> std::io::Result<()> {
1045 let mut sa: libc::sockaddr_nl = unsafe { std::mem::zeroed() };
1046 sa.nl_family = libc::AF_NETLINK as u16;
1047 let n = unsafe {
1048 libc::sendto(
1049 sock_fd,
1050 payload.as_ptr() as *const libc::c_void,
1051 payload.len(),
1052 0,
1053 &sa as *const _ as *const libc::sockaddr,
1054 std::mem::size_of::<libc::sockaddr_nl>() as u32,
1055 )
1056 };
1057 if n < 0 {
1058 return Err(std::io::Error::last_os_error());
1059 }
1060 Ok(())
1061}
1062
1063#[cfg(test)]
1068mod tests {
1069 use super::*;
1070 use std::sync::Mutex;
1071
1072 static ENV_LOCK: Mutex<()> = Mutex::new(());
1083
1084 fn sample_activation() -> PerFlowActivation {
1085 PerFlowActivation {
1086 cell_id: "cell-test".to_string(),
1087 run_id: "run-uuid".to_string(),
1088 nflog_group: 100,
1089 backend: PerFlowBackend::Nflog,
1090 policy_digest: Some("sha256:deadbeef".to_string()),
1091 keyset_id: Some("keyset-1".to_string()),
1092 issuer_kid: Some("kid-1".to_string()),
1093 }
1094 }
1095
1096 #[test]
1098 fn build_activation_returns_none_when_env_unset() {
1099 let _g = ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
1106 let prev_ebpf = std::env::var(ENV_PER_FLOW_EBPF).ok();
1107 let prev_rt = std::env::var(ENV_PER_FLOW_REALTIME).ok();
1108 unsafe {
1110 std::env::remove_var(ENV_PER_FLOW_EBPF);
1111 std::env::remove_var(ENV_PER_FLOW_REALTIME);
1112 }
1113 let act = build_activation_from_env("c", "r", None, None, None);
1114 assert!(act.is_none());
1115 unsafe {
1118 match prev_ebpf {
1119 Some(v) => std::env::set_var(ENV_PER_FLOW_EBPF, v),
1120 None => std::env::remove_var(ENV_PER_FLOW_EBPF),
1121 }
1122 match prev_rt {
1123 Some(v) => std::env::set_var(ENV_PER_FLOW_REALTIME, v),
1124 None => std::env::remove_var(ENV_PER_FLOW_REALTIME),
1125 }
1126 }
1127 }
1128
1129 #[test]
1130 fn select_backend_defaults_to_nflog() {
1131 let _g = ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
1132 let prev_legacy = std::env::var(ENV_PER_FLOW_BACKEND).ok();
1133 let prev_canonical = std::env::var(ENV_PER_FLOW_BACKEND_E7).ok();
1134 unsafe {
1135 std::env::remove_var(ENV_PER_FLOW_BACKEND);
1136 std::env::remove_var(ENV_PER_FLOW_BACKEND_E7);
1137 }
1138 assert_eq!(select_backend_from_env(), PerFlowBackend::Nflog);
1139 unsafe {
1140 if let Some(v) = prev_legacy {
1141 std::env::set_var(ENV_PER_FLOW_BACKEND, v);
1142 }
1143 if let Some(v) = prev_canonical {
1144 std::env::set_var(ENV_PER_FLOW_BACKEND_E7, v);
1145 }
1146 }
1147 }
1148
1149 #[test]
1150 fn select_backend_ebpf_returns_ebpf_variant() {
1151 let _g = ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
1158 let prev_legacy = std::env::var(ENV_PER_FLOW_BACKEND).ok();
1159 let prev_canonical = std::env::var(ENV_PER_FLOW_BACKEND_E7).ok();
1160 unsafe {
1161 std::env::remove_var(ENV_PER_FLOW_BACKEND_E7);
1162 std::env::set_var(ENV_PER_FLOW_BACKEND, "ebpf");
1163 }
1164 assert_eq!(select_backend_from_env(), PerFlowBackend::Ebpf);
1165 unsafe {
1166 match prev_legacy {
1167 Some(v) => std::env::set_var(ENV_PER_FLOW_BACKEND, v),
1168 None => std::env::remove_var(ENV_PER_FLOW_BACKEND),
1169 }
1170 match prev_canonical {
1171 Some(v) => std::env::set_var(ENV_PER_FLOW_BACKEND_E7, v),
1172 None => std::env::remove_var(ENV_PER_FLOW_BACKEND_E7),
1173 }
1174 }
1175 }
1176
1177 #[test]
1178 fn select_backend_honours_e7_canonical_env_name() {
1179 let _g = ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
1183 let prev_legacy = std::env::var(ENV_PER_FLOW_BACKEND).ok();
1184 let prev_canonical = std::env::var(ENV_PER_FLOW_BACKEND_E7).ok();
1185 unsafe {
1186 std::env::remove_var(ENV_PER_FLOW_BACKEND);
1187 std::env::set_var(ENV_PER_FLOW_BACKEND_E7, "ebpf");
1188 }
1189 assert_eq!(select_backend_from_env(), PerFlowBackend::Ebpf);
1190 unsafe {
1191 match prev_legacy {
1192 Some(v) => std::env::set_var(ENV_PER_FLOW_BACKEND, v),
1193 None => std::env::remove_var(ENV_PER_FLOW_BACKEND),
1194 }
1195 match prev_canonical {
1196 Some(v) => std::env::set_var(ENV_PER_FLOW_BACKEND_E7, v),
1197 None => std::env::remove_var(ENV_PER_FLOW_BACKEND_E7),
1198 }
1199 }
1200 }
1201
1202 #[test]
1203 fn select_backend_legacy_name_wins_over_canonical_when_both_set() {
1204 let _g = ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
1206 let prev_legacy = std::env::var(ENV_PER_FLOW_BACKEND).ok();
1207 let prev_canonical = std::env::var(ENV_PER_FLOW_BACKEND_E7).ok();
1208 unsafe {
1209 std::env::set_var(ENV_PER_FLOW_BACKEND, "nflog");
1210 std::env::set_var(ENV_PER_FLOW_BACKEND_E7, "ebpf");
1211 }
1212 assert_eq!(
1213 select_backend_from_env(),
1214 PerFlowBackend::Nflog,
1215 "legacy var must win when both set"
1216 );
1217 unsafe {
1218 match prev_legacy {
1219 Some(v) => std::env::set_var(ENV_PER_FLOW_BACKEND, v),
1220 None => std::env::remove_var(ENV_PER_FLOW_BACKEND),
1221 }
1222 match prev_canonical {
1223 Some(v) => std::env::set_var(ENV_PER_FLOW_BACKEND_E7, v),
1224 None => std::env::remove_var(ENV_PER_FLOW_BACKEND_E7),
1225 }
1226 }
1227 }
1228
1229 #[test]
1230 fn select_backend_unknown_value_defaults_to_nflog() {
1231 let _g = ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
1232 let prev_legacy = std::env::var(ENV_PER_FLOW_BACKEND).ok();
1233 let prev_canonical = std::env::var(ENV_PER_FLOW_BACKEND_E7).ok();
1234 unsafe {
1235 std::env::remove_var(ENV_PER_FLOW_BACKEND_E7);
1236 std::env::set_var(ENV_PER_FLOW_BACKEND, "xdp"); }
1238 assert_eq!(select_backend_from_env(), PerFlowBackend::Nflog);
1239 unsafe {
1240 match prev_legacy {
1241 Some(v) => std::env::set_var(ENV_PER_FLOW_BACKEND, v),
1242 None => std::env::remove_var(ENV_PER_FLOW_BACKEND),
1243 }
1244 match prev_canonical {
1245 Some(v) => std::env::set_var(ENV_PER_FLOW_BACKEND_E7, v),
1246 None => std::env::remove_var(ENV_PER_FLOW_BACKEND_E7),
1247 }
1248 }
1249 }
1250
1251 #[test]
1253 fn augment_rewrites_accept_verdict() {
1254 let input = " ip daddr 10.0.0.1 tcp dport 443 accept";
1255 let out = augment_ruleset_with_log_actions(input, 100);
1256 assert!(out.contains("log group 100 prefix \"cellos-flow accept\""));
1257 assert!(out.ends_with("accept"));
1258 }
1259
1260 #[test]
1261 fn augment_rewrites_drop_verdict() {
1262 let input = " udp dport 53 drop";
1263 let out = augment_ruleset_with_log_actions(input, 200);
1264 assert!(out.contains("log group 200 prefix \"cellos-flow drop\""));
1265 assert!(out.ends_with("drop"));
1266 }
1267
1268 #[test]
1269 fn augment_skips_loopback_shortcut() {
1270 let input = " oif \"lo\" accept";
1271 let out = augment_ruleset_with_log_actions(input, 100);
1272 assert_eq!(out, input);
1273 }
1274
1275 #[test]
1276 fn augment_skips_policy_lines() {
1277 let input = " type filter hook output priority 0; policy drop;";
1278 let out = augment_ruleset_with_log_actions(input, 100);
1279 assert_eq!(out, input);
1280 }
1281
1282 #[test]
1283 fn augment_is_idempotent() {
1284 let input = " ip daddr 10.0.0.1 tcp dport 443 accept";
1285 let once = augment_ruleset_with_log_actions(input, 100);
1286 let twice = augment_ruleset_with_log_actions(&once, 100);
1287 assert_eq!(once, twice);
1288 }
1289
1290 #[test]
1291 fn augment_preserves_chain_structure() {
1292 let input = "table inet cellos_test {\n chain output {\n type filter hook output priority 0; policy drop;\n oif \"lo\" accept\n udp dport 53 drop\n }\n}";
1293 let out = augment_ruleset_with_log_actions(input, 100);
1294 assert!(out.contains("table inet cellos_test {"));
1295 assert!(out.contains("chain output {"));
1296 assert!(out.contains("policy drop;"));
1297 assert!(out.contains("oif \"lo\" accept"));
1298 assert!(out.contains("log group 100 prefix \"cellos-flow drop\" drop"));
1299 assert!(out.ends_with("}"));
1300 }
1301
1302 #[test]
1304 fn decode_l4_ipv4_tcp_extracts_dst() {
1305 let mut p = vec![0u8; 40];
1308 p[0] = 0x45;
1309 p[9] = 6; p[16] = 10;
1311 p[17] = 0;
1312 p[18] = 0;
1313 p[19] = 1;
1314 p[22] = 0x01; p[23] = 0xbb; let attr = decode_l3_l4_attribution(&p);
1317 assert_eq!(attr.dst_addr.as_deref(), Some("10.0.0.1"));
1318 assert_eq!(attr.dst_port, Some(443));
1319 assert_eq!(attr.protocol.as_deref(), Some("tcp"));
1320 }
1321
1322 #[test]
1323 fn decode_l4_ipv4_udp_extracts_dst() {
1324 let mut p = vec![0u8; 40];
1325 p[0] = 0x45;
1326 p[9] = 17; p[16] = 1;
1328 p[17] = 1;
1329 p[18] = 1;
1330 p[19] = 1;
1331 p[22] = 0x00; p[23] = 0x35; let attr = decode_l3_l4_attribution(&p);
1334 assert_eq!(attr.dst_addr.as_deref(), Some("1.1.1.1"));
1335 assert_eq!(attr.dst_port, Some(53));
1336 assert_eq!(attr.protocol.as_deref(), Some("udp"));
1337 }
1338
1339 #[test]
1340 fn decode_l4_ipv4_icmp() {
1341 let mut p = vec![0u8; 28];
1342 p[0] = 0x45;
1343 p[9] = 1; p[16] = 8;
1345 p[17] = 8;
1346 p[18] = 8;
1347 p[19] = 8;
1348 let attr = decode_l3_l4_attribution(&p);
1349 assert_eq!(attr.dst_addr.as_deref(), Some("8.8.8.8"));
1350 assert_eq!(attr.protocol.as_deref(), Some("icmp"));
1351 assert_eq!(attr.dst_port, None);
1352 }
1353
1354 #[test]
1355 fn decode_l4_ipv6_tcp_extracts_dst() {
1356 let mut p = vec![0u8; 60];
1357 p[0] = 0x60; p[6] = 6; p[24] = 0x20;
1361 p[25] = 0x01;
1362 p[26] = 0x0d;
1363 p[27] = 0xb8;
1364 p[39] = 0x01;
1365 p[42] = 0x01;
1366 p[43] = 0xbb; let attr = decode_l3_l4_attribution(&p);
1368 assert_eq!(attr.dst_addr.as_deref(), Some("2001:db8::1"));
1369 assert_eq!(attr.dst_port, Some(443));
1370 assert_eq!(attr.protocol.as_deref(), Some("tcp"));
1371 }
1372
1373 #[test]
1374 fn decode_l4_ipv6_icmp6() {
1375 let mut p = vec![0u8; 48];
1376 p[0] = 0x60;
1377 p[6] = 58; let attr = decode_l3_l4_attribution(&p);
1379 assert_eq!(attr.protocol.as_deref(), Some("icmp6"));
1380 assert_eq!(attr.dst_port, None);
1381 }
1382
1383 #[test]
1384 fn decode_l4_handles_empty_payload() {
1385 let attr = decode_l3_l4_attribution(&[]);
1386 assert!(attr.dst_addr.is_none());
1387 assert!(attr.protocol.is_none());
1388 }
1389
1390 #[test]
1392 fn decode_nflog_extracts_prefix_and_payload() {
1393 let mut buf = Vec::new();
1396 let prefix_str = b"cellos-flow accept\0";
1397 let nla_len: u16 = 4 + prefix_str.len() as u16;
1398 buf.extend_from_slice(&nla_len.to_ne_bytes());
1399 buf.extend_from_slice(&NFULA_PREFIX.to_ne_bytes());
1400 buf.extend_from_slice(prefix_str);
1401 while buf.len() % 4 != 0 {
1403 buf.push(0);
1404 }
1405 let pkt = b"abcdef";
1407 let nla_len2: u16 = 4 + pkt.len() as u16;
1408 buf.extend_from_slice(&nla_len2.to_ne_bytes());
1409 buf.extend_from_slice(&NFULA_PAYLOAD.to_ne_bytes());
1410 buf.extend_from_slice(pkt);
1411 while buf.len() % 4 != 0 {
1412 buf.push(0);
1413 }
1414 let decoded = decode_nflog_datagram(&buf).expect("decode ok");
1415 assert_eq!(decoded.prefix, "cellos-flow accept");
1416 assert_eq!(decoded.payload, pkt);
1417 }
1418
1419 #[test]
1420 fn decode_nflog_round_trip_with_packet_attribution() {
1421 let prefix_str = b"cellos-flow drop\0";
1425 let mut packet = vec![0u8; 40];
1426 packet[0] = 0x45;
1427 packet[9] = 6; packet[16] = 192;
1429 packet[17] = 0;
1430 packet[18] = 2;
1431 packet[19] = 1;
1432 packet[22] = 0x01;
1433 packet[23] = 0xbb;
1434 let mut buf = Vec::new();
1435 let nla_len: u16 = 4 + prefix_str.len() as u16;
1436 buf.extend_from_slice(&nla_len.to_ne_bytes());
1437 buf.extend_from_slice(&NFULA_PREFIX.to_ne_bytes());
1438 buf.extend_from_slice(prefix_str);
1439 while buf.len() % 4 != 0 {
1440 buf.push(0);
1441 }
1442 let nla_len2: u16 = 4 + packet.len() as u16;
1443 buf.extend_from_slice(&nla_len2.to_ne_bytes());
1444 buf.extend_from_slice(&NFULA_PAYLOAD.to_ne_bytes());
1445 buf.extend_from_slice(&packet);
1446 while buf.len() % 4 != 0 {
1447 buf.push(0);
1448 }
1449 let decoded = decode_nflog_datagram(&buf).expect("decode ok");
1450 assert_eq!(decoded.prefix, "cellos-flow drop");
1451 let activation = sample_activation();
1452 let decision = build_decision(
1453 &activation,
1454 &decoded.prefix,
1455 &decoded.payload,
1456 "2026-01-01T00:00:00Z",
1457 );
1458 assert_eq!(decision.decision, NetworkFlowDecisionOutcome::Deny);
1459 assert_eq!(decision.reason_code, "nft_log_drop");
1460 assert_eq!(decision.dst_addr.as_deref(), Some("192.0.2.1"));
1461 assert_eq!(decision.dst_port, Some(443));
1462 assert_eq!(decision.protocol.as_deref(), Some("tcp"));
1463 assert_eq!(decision.cell_id, "cell-test");
1464 assert_eq!(decision.run_id, "run-uuid");
1465 }
1466
1467 #[test]
1468 fn decode_nflog_errors_on_short_input() {
1469 let err = decode_nflog_datagram(&[0u8; 2]).expect_err("must error");
1470 assert!(matches!(err, NflogDecodeError::TooShort(_)));
1471 }
1472
1473 #[test]
1474 fn decode_nflog_errors_on_missing_prefix() {
1475 let pkt = vec![0u8; 8];
1477 let mut buf = Vec::new();
1478 let nla_len: u16 = 4 + pkt.len() as u16;
1479 buf.extend_from_slice(&nla_len.to_ne_bytes());
1480 buf.extend_from_slice(&NFULA_PAYLOAD.to_ne_bytes());
1481 buf.extend_from_slice(&pkt);
1482 let err = decode_nflog_datagram(&buf).expect_err("must error");
1483 assert!(matches!(err, NflogDecodeError::MissingAttrs));
1484 }
1485
1486 #[test]
1488 fn build_decision_for_accept_yields_allow() {
1489 let activation = sample_activation();
1490 let payload = vec![0u8; 0];
1491 let d = build_decision(&activation, "cellos-flow accept", &payload, "now");
1492 assert_eq!(d.decision, NetworkFlowDecisionOutcome::Allow);
1493 assert_eq!(d.reason_code, "nft_log_accept");
1494 }
1495
1496 #[test]
1497 fn build_decision_for_drop_yields_deny() {
1498 let activation = sample_activation();
1499 let payload = vec![0u8; 0];
1500 let d = build_decision(&activation, "cellos-flow drop", &payload, "now");
1501 assert_eq!(d.decision, NetworkFlowDecisionOutcome::Deny);
1502 assert_eq!(d.reason_code, "nft_log_drop");
1503 }
1504
1505 #[test]
1506 fn build_decision_unknown_prefix_falls_back_to_deny() {
1507 let activation = sample_activation();
1508 let d = build_decision(&activation, "cellos-flow weirdo", &[], "now");
1509 assert_eq!(d.decision, NetworkFlowDecisionOutcome::Deny);
1510 assert_eq!(d.reason_code, "nft_log_unknown");
1511 }
1512
1513 fn save_realtime_env() -> (Option<String>, Option<String>) {
1523 let prev_ebpf = std::env::var(ENV_PER_FLOW_EBPF).ok();
1524 let prev_rt = std::env::var(ENV_PER_FLOW_REALTIME).ok();
1525 unsafe {
1528 std::env::remove_var(ENV_PER_FLOW_EBPF);
1529 std::env::remove_var(ENV_PER_FLOW_REALTIME);
1530 }
1531 (prev_ebpf, prev_rt)
1532 }
1533
1534 fn restore_realtime_env(prev: (Option<String>, Option<String>)) {
1535 unsafe {
1536 match prev.0 {
1537 Some(v) => std::env::set_var(ENV_PER_FLOW_EBPF, v),
1538 None => std::env::remove_var(ENV_PER_FLOW_EBPF),
1539 }
1540 match prev.1 {
1541 Some(v) => std::env::set_var(ENV_PER_FLOW_REALTIME, v),
1542 None => std::env::remove_var(ENV_PER_FLOW_REALTIME),
1543 }
1544 }
1545 }
1546
1547 #[test]
1548 fn build_activation_off_when_neither_env_var_set() {
1549 let _g = ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
1550 let prev = save_realtime_env();
1551 let act = build_activation_from_env("c", "r", None, None, None);
1552 assert!(act.is_none(), "no env vars → no activation");
1553 restore_realtime_env(prev);
1554 }
1555
1556 #[test]
1557 fn build_activation_on_via_legacy_ebpf_env() {
1558 let _g = ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
1559 let prev = save_realtime_env();
1560 unsafe {
1561 std::env::set_var(ENV_PER_FLOW_EBPF, "1");
1562 }
1563 let act = build_activation_from_env("cell-x", "run-x", None, None, None);
1564 assert!(act.is_some(), "legacy _EBPF=1 must still opt in");
1565 restore_realtime_env(prev);
1566 }
1567
1568 #[test]
1569 fn build_activation_on_via_realtime_alias() {
1570 let _g = ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
1571 let prev = save_realtime_env();
1572 unsafe {
1573 std::env::set_var(ENV_PER_FLOW_REALTIME, "1");
1574 }
1575 let act = build_activation_from_env("cell-x", "run-x", None, None, None);
1576 assert!(act.is_some(), "CELLOS_PER_FLOW_REALTIME=1 must opt in");
1577 let act = act.unwrap();
1578 assert_eq!(act.cell_id, "cell-x");
1579 assert_eq!(act.run_id, "run-x");
1580 assert_eq!(act.nflog_group, DEFAULT_NFLOG_GROUP);
1581 restore_realtime_env(prev);
1582 }
1583
1584 #[test]
1585 fn build_activation_rejects_truthy_lookalikes_for_realtime() {
1586 let _g = ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
1587 let prev = save_realtime_env();
1588 for bad in ["true", "yes", "on", "TRUE", "0", "", "2"] {
1589 unsafe {
1590 std::env::set_var(ENV_PER_FLOW_REALTIME, bad);
1591 }
1592 let act = build_activation_from_env("c", "r", None, None, None);
1593 assert!(
1594 act.is_none(),
1595 "value {bad:?} must not enable per-flow realtime"
1596 );
1597 }
1598 restore_realtime_env(prev);
1599 }
1600
1601 #[test]
1602 fn flow_event_payload_serialises_as_cloud_event_schema_compatible_json() {
1603 let activation = sample_activation();
1608 let payload = vec![0u8; 0];
1609 let decision = build_decision(
1610 &activation,
1611 "cellos-flow accept",
1612 &payload,
1613 "2026-05-16T00:00:00Z",
1614 );
1615 let json = serde_json::to_value(&decision).expect("serialise");
1616 let obj = json.as_object().expect("object");
1617 for required in [
1618 "cellId",
1619 "runId",
1620 "decisionId",
1621 "direction",
1622 "decision",
1623 "reasonCode",
1624 "observedAt",
1625 ] {
1626 assert!(
1627 obj.contains_key(required),
1628 "missing required field {required}; payload={json}"
1629 );
1630 }
1631 assert_eq!(obj["direction"], "egress");
1632 assert_eq!(obj["decision"], "allow");
1633 assert_eq!(obj["reasonCode"], "nft_log_accept");
1634 }
1635
1636 fn ipv4_tcp_payload(src: [u8; 4], src_port: u16, dst: [u8; 4], dst_port: u16) -> Vec<u8> {
1652 let mut p = vec![0u8; 40];
1653 p[0] = 0x45; p[9] = 6; p[12..16].copy_from_slice(&src);
1656 p[16..20].copy_from_slice(&dst);
1657 p[20..22].copy_from_slice(&src_port.to_be_bytes());
1660 p[22..24].copy_from_slice(&dst_port.to_be_bytes());
1661 p
1662 }
1663
1664 #[test]
1665 fn nflog_populates_flow_accumulator() {
1666 let payload = ipv4_tcp_payload([10, 0, 0, 5], 40000, [192, 0, 2, 1], 443);
1669
1670 let attribution = decode_l3_l4_attribution(&payload);
1671 let key = flow_key_from_attribution(&attribution).expect("5-tuple fully populated");
1672
1673 let accumulator = std::sync::Arc::new(std::sync::Mutex::new(
1674 crate::ebpf_flow::connection_tracking::FlowAccumulator::new(),
1675 ));
1676 record_opened_flow(&accumulator, key);
1677
1678 let count = accumulator
1679 .lock()
1680 .expect("acquire lock")
1681 .unique_flow_count();
1682 assert_eq!(
1683 count, 1,
1684 "single nflog-derived 5-tuple must yield unique_flow_count() == 1"
1685 );
1686 }
1687
1688 #[test]
1689 fn duplicate_nflog_entries_are_deduplicated() {
1690 let payload = ipv4_tcp_payload([10, 0, 0, 5], 40000, [192, 0, 2, 1], 443);
1697
1698 let attribution_a = decode_l3_l4_attribution(&payload);
1699 let key_a = flow_key_from_attribution(&attribution_a).expect("attribution a");
1700
1701 let attribution_b = decode_l3_l4_attribution(&payload);
1704 let key_b = flow_key_from_attribution(&attribution_b).expect("attribution b");
1705
1706 let accumulator = std::sync::Arc::new(std::sync::Mutex::new(
1707 crate::ebpf_flow::connection_tracking::FlowAccumulator::new(),
1708 ));
1709 record_opened_flow(&accumulator, key_a);
1710 record_opened_flow(&accumulator, key_b);
1711
1712 let count = accumulator
1713 .lock()
1714 .expect("acquire lock")
1715 .unique_flow_count();
1716 assert_eq!(
1717 count, 1,
1718 "two nflog datagrams for the same 5-tuple must dedup to one connection"
1719 );
1720 }
1721}