use std::collections::{BTreeMap, BTreeSet};
use std::time::Duration;
use async_nats::jetstream;
use cellos_core::CloudEventV1;
use cellos_projector::{decode_event, EventVerifierConfig};
use futures_util::StreamExt;
use serde_json::{json, Map, Value};
use tracing::{info, warn};
const COMPLIANCE_EVENT_TYPE: &str = "dev.cellos.events.cell.compliance.v1.summary";
const DEFAULT_THRESHOLD: usize = 5;
const DEFAULT_DRAIN_TIMEOUT_MS: u64 = 2000;
const ERROR_KEYWORDS: &[&str] = &["error", "exception", "undefined", "null", "todo", "fixme"];
const HELP_TEXT: &str = "\
cellos-audit-justification — SEC-19 fleet DNS egress justification entropy audit
USAGE:
cellos-audit-justification [--help]
DESCRIPTION:
Replays compliance summary events from NATS JetStream and reports
monoculture / template patterns in dnsEgressJustification strings.
OUTPUT:
JSON report on stdout, human summary on stderr.
ENVIRONMENT:
CELLOS_NATS_URL (default: nats://localhost:4222)
CELLOS_NATS_STREAM (default: CELLOS_EVENTS)
CELLOS_EVENT_SUBJECT (default: cellos.events.>)
CELLOS_AUDIT_MONOCULTURE_THRESHOLD (default: 5)
CELLOS_AUDIT_DRAIN_TIMEOUT_MS (default: 2000)
";
#[tokio::main]
async fn main() -> anyhow::Result<()> {
if let Some(arg) = std::env::args().nth(1) {
match arg.as_str() {
"-h" | "--help" => {
println!("{HELP_TEXT}");
return Ok(());
}
"--version" | "-V" => {
println!(
"{}",
cellos_projector::build_info::version_line(
"cellos-audit-justification",
env!("CARGO_PKG_VERSION"),
)
);
return Ok(());
}
other => {
eprintln!("unknown argument: {other}\n\n{HELP_TEXT}");
std::process::exit(2);
}
}
}
{
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::Layer;
let fmt_layer = tracing_subscriber::fmt::layer()
.with_writer(std::io::stderr)
.with_filter(cellos_core::observability::redacted_filter());
tracing_subscriber::registry()
.with(tracing_subscriber::EnvFilter::from_default_env())
.with(fmt_layer)
.init();
}
let nats_url =
std::env::var("CELLOS_NATS_URL").unwrap_or_else(|_| "nats://localhost:4222".into());
let stream_name =
std::env::var("CELLOS_NATS_STREAM").unwrap_or_else(|_| "CELLOS_EVENTS".into());
let event_subject =
std::env::var("CELLOS_EVENT_SUBJECT").unwrap_or_else(|_| "cellos.events.>".into());
let threshold: usize = std::env::var("CELLOS_AUDIT_MONOCULTURE_THRESHOLD")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(DEFAULT_THRESHOLD);
let drain_timeout_ms: u64 = std::env::var("CELLOS_AUDIT_DRAIN_TIMEOUT_MS")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(DEFAULT_DRAIN_TIMEOUT_MS);
info!(
%nats_url, %stream_name, %event_subject, threshold, drain_timeout_ms,
"starting cellos-audit-justification"
);
let verifier_cfg = EventVerifierConfig::from_env()
.map_err(|e| anyhow::anyhow!("event-verifier configuration: {e}"))?;
if verifier_cfg.has_keys() {
info!(
keys = verifier_cfg.verifying_keys.len(),
require_signed = verifier_cfg.require_signed,
"I5 per-event signing: verifier active"
);
}
let mut analyzer = Analyzer::new(threshold);
let conn = async_nats::connect(&nats_url)
.await
.map_err(|e| anyhow::anyhow!("NATS connect: {e}"))?;
let js = jetstream::new(conn);
let mut stream = js
.get_stream(&stream_name)
.await
.map_err(|e| anyhow::anyhow!("get JetStream stream {stream_name:?}: {e}"))?;
let stream_info = stream
.info()
.await
.map_err(|e| anyhow::anyhow!("stream info: {e}"))?;
let total_messages = stream_info.state.messages;
info!(total_messages, "stream info fetched");
let consumer = stream
.create_consumer(jetstream::consumer::pull::Config {
deliver_policy: jetstream::consumer::DeliverPolicy::All,
filter_subject: event_subject.clone(),
..Default::default()
})
.await
.map_err(|e| anyhow::anyhow!("create JetStream ephemeral consumer: {e}"))?;
let mut messages = consumer
.messages()
.await
.map_err(|e| anyhow::anyhow!("consumer messages stream: {e}"))?;
let drain_timeout = Duration::from_millis(drain_timeout_ms);
let mut scanned: u64 = 0;
let mut consumed: u64 = 0;
while consumed < total_messages {
let next = tokio::time::timeout(drain_timeout, messages.next()).await;
match next {
Err(_) => {
info!(consumed, total_messages, "drain timeout reached, stopping");
break;
}
Ok(None) => {
info!("consumer message stream ended");
break;
}
Ok(Some(Err(e))) => {
warn!(error = %e, "consumer stream error");
break;
}
Ok(Some(Ok(msg))) => {
consumed += 1;
if let Err(e) = msg.ack().await {
warn!(error = %e, "ack failed");
}
match decode_event(&msg.payload, &verifier_cfg) {
Ok(event) => {
if event.ty == COMPLIANCE_EVENT_TYPE {
analyzer.record(&event);
scanned += 1;
}
}
Err(e) => warn!(error = %e, "skip undecodable/unverifiable payload"),
}
}
}
}
let report = analyzer.report(scanned);
println!("{}", serde_json::to_string_pretty(&report)?);
let summary = analyzer.summary_line(scanned);
eprintln!("{summary}");
Ok(())
}
#[derive(Debug, Default)]
pub struct PackStats {
pub by_justification: BTreeMap<String, BTreeSet<String>>,
}
#[derive(Debug)]
pub struct Analyzer {
pub threshold: usize,
pub packs: BTreeMap<String, PackStats>,
}
impl Analyzer {
pub fn new(threshold: usize) -> Self {
Self {
threshold,
packs: BTreeMap::new(),
}
}
pub fn record(&mut self, event: &CloudEventV1) {
let Some(data) = event.data.as_ref() else {
return;
};
let pack_id = data
.get("policyPackId")
.and_then(Value::as_str)
.unwrap_or("<unspecified>")
.to_string();
let cell_id = data
.get("cellId")
.and_then(Value::as_str)
.unwrap_or("<unknown>")
.to_string();
let pack = self.packs.entry(pack_id).or_default();
for justification in extract_justifications(data) {
pack.by_justification
.entry(justification)
.or_default()
.insert(cell_id.clone());
}
}
pub fn report(&self, scanned_events: u64) -> Value {
let mut packs_obj = Map::new();
for (pack_id, stats) in &self.packs {
let mut just_obj = Map::new();
let mut flagged_count: u64 = 0;
for (just, cells) in &stats.by_justification {
let cell_count = cells.len();
let mut flags: Vec<&'static str> = Vec::new();
if cell_count >= self.threshold {
flags.push("monoculture");
}
if let Some(_reason) = is_template_pattern(just) {
flags.push("template_pattern");
}
if !flags.is_empty() {
flagged_count += 1;
}
just_obj.insert(
just.clone(),
json!({
"cell_count": cell_count,
"flags": flags,
}),
);
}
packs_obj.insert(
pack_id.clone(),
json!({
"justification_strings": just_obj,
"flagged_count": flagged_count,
}),
);
}
json!({
"scanned_events": scanned_events,
"policy_packs": packs_obj,
})
}
pub fn summary_line(&self, scanned_events: u64) -> String {
let pack_count = self.packs.len();
let distinct_strings: usize = self.packs.values().map(|p| p.by_justification.len()).sum();
let flagged: usize = self
.packs
.values()
.flat_map(|p| p.by_justification.iter())
.filter(|(s, cells)| cells.len() >= self.threshold || is_template_pattern(s).is_some())
.count();
format!(
"scanned={} policy_packs={} distinct_justifications={} flagged={} threshold={}",
scanned_events, pack_count, distinct_strings, flagged, self.threshold
)
}
}
pub fn extract_justifications(data: &Value) -> Vec<String> {
let mut out: Vec<String> = Vec::new();
if let Some(s) = data.get("dnsEgressJustification").and_then(Value::as_str) {
out.push(s.to_string());
}
if let Some(rules) = data.get("egressRules").and_then(Value::as_array) {
for rule in rules {
if let Some(s) = rule.get("dnsEgressJustification").and_then(Value::as_str) {
out.push(s.to_string());
}
}
}
out
}
pub fn is_template_pattern(s: &str) -> Option<&'static str> {
let trimmed = s.trim();
if trimmed.is_empty() {
return Some("empty");
}
if !trimmed.is_empty() && trimmed.chars().all(|c| c.is_ascii_uppercase() || c == '_') {
return Some("all_caps_variable");
}
let lower = trimmed.to_ascii_lowercase();
for kw in ERROR_KEYWORDS {
if lower.contains(kw) {
return Some("error_keyword");
}
}
None
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn version_compiles() {
let _ = cellos_projector::build_info::version_line(
"cellos-audit-justification",
env!("CARGO_PKG_VERSION"),
);
}
fn compliance_event(cell_id: &str, pack_id: &str, justifications: &[&str]) -> CloudEventV1 {
let rules: Vec<Value> = justifications
.iter()
.map(|j| {
json!({
"host": "*",
"port": 53,
"dnsEgressJustification": j,
})
})
.collect();
let data = json!({
"cellId": cell_id,
"specId": format!("spec-{cell_id}"),
"policyPackId": pack_id,
"egressRules": rules,
});
CloudEventV1 {
specversion: "1.0".into(),
id: format!("evt-{cell_id}"),
source: "urn:test".into(),
ty: COMPLIANCE_EVENT_TYPE.into(),
datacontenttype: None,
data: Some(data),
time: None,
traceparent: None,
}
}
fn inline_event(cell_id: &str, pack_id: &str, justification: &str) -> CloudEventV1 {
let data = json!({
"cellId": cell_id,
"specId": format!("spec-{cell_id}"),
"policyPackId": pack_id,
"dnsEgressJustification": justification,
});
CloudEventV1 {
specversion: "1.0".into(),
id: format!("evt-{cell_id}"),
source: "urn:test".into(),
ty: COMPLIANCE_EVENT_TYPE.into(),
datacontenttype: None,
data: Some(data),
time: None,
traceparent: None,
}
}
#[test]
fn extract_reads_top_level_dns_egress_justification() {
let data = json!({
"dnsEgressJustification": "package mirror access",
});
assert_eq!(
extract_justifications(&data),
vec!["package mirror access".to_string()]
);
}
#[test]
fn extract_reads_per_rule_dns_egress_justification() {
let data = json!({
"egressRules": [
{ "host": "*", "port": 53, "dnsEgressJustification": "mirror" },
{ "host": "registry", "port": 443 },
{ "host": "*", "port": 53, "dnsEgressJustification": "telemetry" },
]
});
let mut got = extract_justifications(&data);
got.sort();
assert_eq!(got, vec!["mirror".to_string(), "telemetry".to_string()]);
}
#[test]
fn extract_returns_empty_when_no_justifications_present() {
let data = json!({ "cellId": "c1" });
assert!(extract_justifications(&data).is_empty());
}
#[test]
fn template_flags_empty_and_whitespace() {
assert_eq!(is_template_pattern(""), Some("empty"));
assert_eq!(is_template_pattern(" "), Some("empty"));
assert_eq!(is_template_pattern("\t\n"), Some("empty"));
}
#[test]
fn template_flags_all_caps_variable() {
assert_eq!(is_template_pattern("TODO"), Some("all_caps_variable"));
assert_eq!(is_template_pattern("FIXME"), Some("all_caps_variable"));
assert_eq!(
is_template_pattern("PLACEHOLDER_VAR"),
Some("all_caps_variable")
);
}
#[test]
fn template_flags_error_keywords_case_insensitively() {
assert_eq!(
is_template_pattern("undefined behavior"),
Some("error_keyword")
);
assert_eq!(
is_template_pattern("Some Error occurred"),
Some("error_keyword")
);
assert_eq!(
is_template_pattern("returned NULL pointer"),
Some("error_keyword")
);
assert_eq!(
is_template_pattern("uncaught Exception thrown"),
Some("error_keyword")
);
}
#[test]
fn template_does_not_flag_benign_descriptions() {
assert_eq!(
is_template_pattern("Required for package mirror access"),
None
);
assert_eq!(is_template_pattern("Telemetry export to vendor SaaS"), None);
assert_eq!(is_template_pattern("Mixed Case Justification"), None);
}
#[test]
fn analyzer_flags_monoculture_at_threshold() {
let mut a = Analyzer::new(5);
let shared = "Required for package mirror access";
for i in 0..5 {
let cell = format!("cell-{i}");
a.record(&compliance_event(&cell, "ci-runner-standard", &[shared]));
}
let report = a.report(5);
let entry = &report["policy_packs"]["ci-runner-standard"]["justification_strings"][shared];
assert_eq!(entry["cell_count"], 5);
let flags: Vec<String> = entry["flags"]
.as_array()
.unwrap()
.iter()
.map(|v| v.as_str().unwrap().to_string())
.collect();
assert!(flags.contains(&"monoculture".to_string()));
assert_eq!(
report["policy_packs"]["ci-runner-standard"]["flagged_count"],
1
);
}
#[test]
fn analyzer_does_not_flag_below_threshold() {
let mut a = Analyzer::new(5);
let shared = "Required for package mirror access";
for i in 0..4 {
let cell = format!("cell-{i}");
a.record(&compliance_event(&cell, "ci-runner-standard", &[shared]));
}
let report = a.report(4);
let entry = &report["policy_packs"]["ci-runner-standard"]["justification_strings"][shared];
assert_eq!(entry["cell_count"], 4);
let flags: Vec<String> = entry["flags"]
.as_array()
.unwrap()
.iter()
.map(|v| v.as_str().unwrap().to_string())
.collect();
assert!(flags.is_empty());
assert_eq!(
report["policy_packs"]["ci-runner-standard"]["flagged_count"],
0
);
}
#[test]
fn analyzer_dedupes_repeated_cell_ids_in_count() {
let mut a = Analyzer::new(5);
let shared = "shared template";
for _ in 0..10 {
a.record(&compliance_event("cell-1", "pack-a", &[shared]));
}
let report = a.report(10);
let entry = &report["policy_packs"]["pack-a"]["justification_strings"][shared];
assert_eq!(entry["cell_count"], 1);
}
#[test]
fn analyzer_flags_template_strings_independent_of_count() {
let mut a = Analyzer::new(5);
for i in 0..3 {
let cell = format!("cell-{i}");
a.record(&compliance_event(&cell, "pack-a", &["TODO"]));
}
let report = a.report(3);
let entry = &report["policy_packs"]["pack-a"]["justification_strings"]["TODO"];
assert_eq!(entry["cell_count"], 3);
let flags: Vec<String> = entry["flags"]
.as_array()
.unwrap()
.iter()
.map(|v| v.as_str().unwrap().to_string())
.collect();
assert!(flags.contains(&"template_pattern".to_string()));
assert!(!flags.contains(&"monoculture".to_string()));
assert_eq!(report["policy_packs"]["pack-a"]["flagged_count"], 1);
}
#[test]
fn analyzer_passes_benign_unique_justifications() {
let mut a = Analyzer::new(5);
let benign = [
"Required for package mirror access at acme",
"Telemetry to honeycomb.io for tracing",
"Vault unseal nodes need DNS for raft peers",
];
for (i, j) in benign.iter().enumerate() {
let cell = format!("cell-{i}");
a.record(&compliance_event(&cell, "pack-z", &[j]));
}
let report = a.report(3);
assert_eq!(report["policy_packs"]["pack-z"]["flagged_count"], 0);
for j in benign {
let entry = &report["policy_packs"]["pack-z"]["justification_strings"][j];
assert_eq!(entry["cell_count"], 1);
assert!(entry["flags"].as_array().unwrap().is_empty());
}
}
#[test]
fn analyzer_handles_inline_top_level_shape() {
let mut a = Analyzer::new(2);
a.record(&inline_event("cell-1", "pack-x", "shared"));
a.record(&inline_event("cell-2", "pack-x", "shared"));
let report = a.report(2);
let entry = &report["policy_packs"]["pack-x"]["justification_strings"]["shared"];
assert_eq!(entry["cell_count"], 2);
let flags: Vec<String> = entry["flags"]
.as_array()
.unwrap()
.iter()
.map(|v| v.as_str().unwrap().to_string())
.collect();
assert!(flags.contains(&"monoculture".to_string()));
}
#[test]
fn summary_line_reports_totals() {
let mut a = Analyzer::new(5);
a.record(&compliance_event("cell-1", "pack-a", &["one"]));
a.record(&compliance_event("cell-2", "pack-a", &["two"]));
a.record(&compliance_event("cell-3", "pack-b", &["TODO"]));
let line = a.summary_line(3);
assert!(line.contains("scanned=3"));
assert!(line.contains("policy_packs=2"));
assert!(line.contains("distinct_justifications=3"));
assert!(line.contains("flagged=1")); assert!(line.contains("threshold=5"));
}
}