use std::time::Duration;
use tokio_util::sync::CancellationToken;
use tokio_util::task::TaskTracker;
use crate::config::RuntimeConfig;
use crate::lifecycle::wait_for_shutdown_signal;
fn load_policy(policy_path: &Option<std::path::PathBuf>) -> std::sync::Arc<crate::policy::PolicyRules> {
let rules = match policy_path {
None => {
tracing::info!("policy enforcement disabled (AA_POLICY_PATH set to empty)");
crate::policy::PolicyRules::default()
}
Some(path) => match crate::policy::load_policy(path) {
Ok(rules) => {
tracing::info!(
path = %path.display(),
rule_count = rules.rules.len(),
"policy loaded"
);
rules
}
Err(e) => {
tracing::error!(error = %e, path = %path.display(), "failed to parse policy file — aborting");
std::process::exit(1);
}
},
};
std::sync::Arc::new(rules)
}
fn spawn_proxy(
tracker: &TaskTracker,
broadcast_tx: &tokio::sync::broadcast::Sender<crate::pipeline::PipelineEvent>,
active_layers: crate::layer::LayerSet,
degraded_layers: &mut Vec<String>,
) {
let proxy_bin = match which::which("aa-proxy") {
Ok(path) => path,
Err(e) => {
tracing::warn!(error = %e, "aa-proxy binary not found — degrading proxy layer");
emit_proxy_degradation(broadcast_tx, active_layers, format!("binary not found: {e}"));
degraded_layers.push("proxy".to_string());
return;
}
};
let proxy_broadcast_tx = broadcast_tx.clone();
let proxy_bin_display = proxy_bin.display().to_string();
tracker.spawn(async move {
let result = tokio::process::Command::new(&proxy_bin)
.kill_on_drop(true)
.status()
.await;
match result {
Ok(status) if status.success() => {
tracing::info!("proxy subsystem exited normally");
}
Ok(status) => {
let reason = format!("proxy exited with {status}");
tracing::warn!(%reason, "proxy subsystem failed");
emit_proxy_degradation(&proxy_broadcast_tx, active_layers, reason);
}
Err(e) => {
let reason = format!("failed to spawn proxy: {e}");
tracing::warn!(%reason, "proxy subsystem failed");
emit_proxy_degradation(&proxy_broadcast_tx, active_layers, reason);
}
}
});
tracing::info!(binary = %proxy_bin_display, "proxy subsystem task spawned");
}
fn emit_ebpf_degradation(
broadcast_tx: &tokio::sync::broadcast::Sender<crate::pipeline::PipelineEvent>,
sub_layer: &str,
reason: String,
) {
let info = crate::pipeline::LayerDegradationInfo {
layer: sub_layer.to_string(),
reason,
remaining_layers: Vec::new(),
};
let _ = broadcast_tx.send(crate::pipeline::PipelineEvent::LayerDegradation(info));
}
#[cfg(target_os = "linux")]
fn spawn_ebpf_tls(
tracker: &tokio_util::task::TaskTracker,
broadcast_tx: &tokio::sync::broadcast::Sender<crate::pipeline::PipelineEvent>,
degraded_layers: &mut Vec<String>,
) {
let mut bpf = match aa_ebpf::EbpfLoader::load() {
Ok(b) => b,
Err(e) => {
let reason = format!("TLS BPF load failed: {e}");
tracing::warn!(%reason, "degrading ebpf/tls sub-layer");
emit_ebpf_degradation(broadcast_tx, "ebpf/tls", reason);
degraded_layers.push("ebpf/tls".to_string());
return;
}
};
let pid = std::process::id() as i32;
if let Err(e) = aa_ebpf::uprobe::UprobeManager::attach(&mut bpf, Some(pid)) {
let reason = format!("TLS uprobe attach failed: {e}");
tracing::warn!(%reason, "degrading ebpf/tls sub-layer");
emit_ebpf_degradation(broadcast_tx, "ebpf/tls", reason);
degraded_layers.push("ebpf/tls".to_string());
return;
}
let mut reader = match aa_ebpf::ringbuf::RingBufReader::new(bpf) {
Ok(r) => r,
Err(e) => {
let reason = format!("ring buffer init failed: {e}");
tracing::warn!(%reason, "degrading ebpf/tls sub-layer");
emit_ebpf_degradation(broadcast_tx, "ebpf/tls", reason);
degraded_layers.push("ebpf/tls".to_string());
return;
}
};
let tls_broadcast_tx = broadcast_tx.clone();
tracker.spawn(async move {
loop {
match reader.next().await {
Ok(Some(event)) => {
tracing::debug!(?event, "TLS ring buffer event");
let _ = &tls_broadcast_tx; }
Ok(None) => {
tracing::info!("TLS ring buffer closed");
break;
}
Err(e) => {
tracing::warn!(error = %e, "TLS ring buffer read error");
emit_ebpf_degradation(&tls_broadcast_tx, "ebpf/tls", format!("ring buffer error: {e}"));
break;
}
}
}
});
tracing::info!("ebpf/tls sub-layer task spawned");
}
#[cfg(not(target_os = "linux"))]
fn spawn_ebpf_tls(
_tracker: &tokio_util::task::TaskTracker,
broadcast_tx: &tokio::sync::broadcast::Sender<crate::pipeline::PipelineEvent>,
degraded_layers: &mut Vec<String>,
) {
emit_ebpf_degradation(
broadcast_tx,
"ebpf/tls",
"eBPF not supported on this platform".to_string(),
);
degraded_layers.push("ebpf/tls".to_string());
}
#[cfg(target_os = "linux")]
fn spawn_ebpf_file_io(
tracker: &tokio_util::task::TaskTracker,
broadcast_tx: &tokio::sync::broadcast::Sender<crate::pipeline::PipelineEvent>,
seq: &std::sync::Arc<std::sync::atomic::AtomicU64>,
agent_id: &str,
degraded_layers: &mut Vec<String>,
) {
let pid = std::process::id();
let mut loader = aa_ebpf::FileIoLoader::new(pid);
if let Err(e) = loader.load() {
let reason = format!("file I/O BPF load failed: {e}");
tracing::warn!(%reason, "degrading ebpf/file_io sub-layer");
emit_ebpf_degradation(broadcast_tx, "ebpf/file_io", reason);
degraded_layers.push("ebpf/file_io".to_string());
return;
}
if let Err(e) = loader.attach_kprobes() {
let reason = format!("file I/O kprobe attach failed: {e}");
tracing::warn!(%reason, "degrading ebpf/file_io sub-layer");
emit_ebpf_degradation(broadcast_tx, "ebpf/file_io", reason);
degraded_layers.push("ebpf/file_io".to_string());
return;
}
let mut rx = match loader.start_event_reader() {
Ok(r) => r,
Err(e) => {
let reason = format!("file I/O event reader failed: {e}");
tracing::warn!(%reason, "degrading ebpf/file_io sub-layer");
emit_ebpf_degradation(broadcast_tx, "ebpf/file_io", reason);
degraded_layers.push("ebpf/file_io".to_string());
return;
}
};
let fio_broadcast_tx = broadcast_tx.clone();
let fio_seq = std::sync::Arc::clone(seq);
let fio_agent_id = agent_id.to_string();
tracker.spawn(async move {
let _loader = loader;
while let Some(event) = rx.recv().await {
let audit = crate::ebpf_bridge::file_io_to_audit(&event);
let enriched = crate::ebpf_bridge::enrich_ebpf(audit, &fio_agent_id, &fio_seq);
let _ = fio_broadcast_tx.send(crate::pipeline::PipelineEvent::Audit(Box::new(enriched)));
}
tracing::info!("ebpf/file_io event reader closed");
});
tracing::info!("ebpf/file_io sub-layer task spawned");
}
#[cfg(not(target_os = "linux"))]
fn spawn_ebpf_file_io(
_tracker: &tokio_util::task::TaskTracker,
broadcast_tx: &tokio::sync::broadcast::Sender<crate::pipeline::PipelineEvent>,
_seq: &std::sync::Arc<std::sync::atomic::AtomicU64>,
_agent_id: &str,
degraded_layers: &mut Vec<String>,
) {
emit_ebpf_degradation(
broadcast_tx,
"ebpf/file_io",
"eBPF not supported on this platform".to_string(),
);
degraded_layers.push("ebpf/file_io".to_string());
}
#[cfg(target_os = "linux")]
fn spawn_ebpf_exec_tracepoints(
tracker: &tokio_util::task::TaskTracker,
broadcast_tx: &tokio::sync::broadcast::Sender<crate::pipeline::PipelineEvent>,
token: &tokio_util::sync::CancellationToken,
degraded_layers: &mut Vec<String>,
) {
let pid = std::process::id();
let mut loader = aa_ebpf::ExecLoader::new(pid);
if let Err(e) = loader.load() {
let reason = format!("exec BPF load failed: {e}");
tracing::warn!(%reason, "degrading ebpf/exec sub-layer");
emit_ebpf_degradation(broadcast_tx, "ebpf/exec", reason);
degraded_layers.push("ebpf/exec".to_string());
return;
}
if let Err(e) = loader.attach_tracepoints() {
let reason = format!("exec tracepoint attach failed: {e}");
tracing::warn!(%reason, "degrading ebpf/exec sub-layer");
emit_ebpf_degradation(broadcast_tx, "ebpf/exec", reason);
degraded_layers.push("ebpf/exec".to_string());
return;
}
let exec_token = token.clone();
tracker.spawn(async move {
let _loader = loader;
exec_token.cancelled().await;
tracing::info!("ebpf/exec sub-layer shutting down");
});
tracing::info!("ebpf/exec sub-layer task spawned");
}
#[cfg(not(target_os = "linux"))]
fn spawn_ebpf_exec_tracepoints(
_tracker: &tokio_util::task::TaskTracker,
broadcast_tx: &tokio::sync::broadcast::Sender<crate::pipeline::PipelineEvent>,
_token: &tokio_util::sync::CancellationToken,
degraded_layers: &mut Vec<String>,
) {
emit_ebpf_degradation(
broadcast_tx,
"ebpf/exec",
"eBPF not supported on this platform".to_string(),
);
degraded_layers.push("ebpf/exec".to_string());
}
fn emit_proxy_degradation(
broadcast_tx: &tokio::sync::broadcast::Sender<crate::pipeline::PipelineEvent>,
active_layers: crate::layer::LayerSet,
reason: String,
) {
let remaining = active_layers
.difference(crate::layer::LayerSet::PROXY)
.names()
.iter()
.map(|s| (*s).to_string())
.collect();
let info = crate::pipeline::LayerDegradationInfo {
layer: "proxy".to_string(),
reason,
remaining_layers: remaining,
};
let _ = broadcast_tx.send(crate::pipeline::PipelineEvent::LayerDegradation(info));
}
const AUDIT_CHANNEL_CAPACITY: usize = 8_192;
const AUDIT_BUFFER_CAPACITY: usize = 100_000;
const AUDIT_FLUSH_INTERVAL: Duration = Duration::from_secs(5);
async fn build_audit_publisher(
config: &RuntimeConfig,
) -> Option<std::sync::Arc<crate::audit_publisher::AuditPublisher>> {
let path = config.nats_config_path.as_ref()?;
let toml = match std::fs::read_to_string(path) {
Ok(toml) => toml,
Err(err) => {
tracing::warn!(error = %err, path = %path.display(), "audit publisher disabled — cannot read NATS config");
return None;
}
};
let nats_config = match crate::audit_publisher::NatsConfig::from_toml_str(&toml) {
Ok(cfg) => cfg,
Err(err) => {
tracing::warn!(error = %err, "audit publisher disabled — invalid [gateway.nats]");
return None;
}
};
let buffer = match aa_storage_sqlite_buffer::EventBuffer::new(&config.audit_buffer_path, AUDIT_BUFFER_CAPACITY) {
Ok(buffer) => std::sync::Arc::new(buffer),
Err(err) => {
tracing::warn!(error = %err, path = %config.audit_buffer_path.display(), "audit publisher disabled — cannot open buffer");
return None;
}
};
let sink = match crate::audit_publisher::NatsAuditSink::connect(&nats_config).await {
Ok(sink) => std::sync::Arc::new(sink) as std::sync::Arc<dyn aa_core::storage::AuditSink>,
Err(err) => {
tracing::warn!(error = %err, url = %nats_config.url, "audit publisher disabled — initial NATS connect failed");
return None;
}
};
tracing::info!(url = %nats_config.url, "audit publisher enabled");
Some(std::sync::Arc::new(crate::audit_publisher::AuditPublisher::new(
sink, buffer,
)))
}
fn spawn_audit_drain(
tracker: &TaskTracker,
mut rx: tokio::sync::mpsc::Receiver<aa_core::storage::AuditEntry>,
publisher: std::sync::Arc<crate::audit_publisher::AuditPublisher>,
token: CancellationToken,
) {
tracker.spawn(async move {
loop {
tokio::select! {
_ = token.cancelled() => {
while let Ok(entry) = rx.try_recv() {
publisher.publish(entry).await;
}
break;
}
maybe = rx.recv() => match maybe {
Some(entry) => publisher.publish(entry).await,
None => break,
},
}
}
tracing::info!("audit drain task exiting");
});
}
pub async fn run(config: RuntimeConfig) {
let prometheus_handle = metrics_exporter_prometheus::PrometheusBuilder::new()
.install_recorder()
.expect("failed to install Prometheus recorder");
metrics::counter!("aa_events_received_total").increment(0);
metrics::counter!("aa_events_emitted_total").increment(0);
metrics::counter!("aa_policy_violations_total").increment(0);
metrics::counter!("aa_policy_evaluations_total").increment(0); metrics::gauge!("aa_active_connections").set(0.0);
metrics::gauge!("aa_channel_utilization_ratio").set(0.0);
let (ready_tx, ready_rx) = tokio::sync::watch::channel(false);
tracing::info!("aa-runtime starting");
let tracker = TaskTracker::new();
let token = CancellationToken::new();
tracing::info!("structured concurrency primitives initialised");
let policy = load_policy(&config.policy_path);
let active_layers = crate::layer::LayerDetector::detect();
tracing::info!(layers = %active_layers, "active interception layers");
let mut degraded_layers: Vec<String> = Vec::new();
if !active_layers.contains(crate::layer::LayerSet::EBPF) {
tracing::warn!(
remaining = %active_layers,
"eBPF layer unavailable — requires Linux >= 5.8, BTF, and CAP_BPF"
);
degraded_layers.push("ebpf".to_string());
}
if !active_layers.contains(crate::layer::LayerSet::PROXY) {
tracing::warn!(
remaining = %active_layers,
"proxy layer unavailable — aa-proxy binary not found in PATH"
);
degraded_layers.push("proxy".to_string());
}
let pipeline_config = crate::pipeline::PipelineConfig::from_runtime_config(&config);
let (inbound_tx, inbound_rx) =
tokio::sync::mpsc::channel::<(u64, crate::ipc::IpcFrame)>(pipeline_config.input_buffer);
let (broadcast_tx, correlation_rx) =
tokio::sync::broadcast::channel::<crate::pipeline::PipelineEvent>(pipeline_config.broadcast_capacity);
if active_layers.contains(crate::layer::LayerSet::PROXY) {
spawn_proxy(&tracker, &broadcast_tx, active_layers, &mut degraded_layers);
}
let pipeline_metrics = std::sync::Arc::new(crate::pipeline::PipelineMetrics::default());
let active_connections = std::sync::Arc::new(std::sync::atomic::AtomicI64::new(0));
let response_router = crate::ipc::new_response_router();
let audit_publisher = build_audit_publisher(&config).await;
let audit_flush_loop = audit_publisher
.as_ref()
.map(|publisher| std::sync::Arc::clone(publisher).spawn_reconnect_flush_loop(AUDIT_FLUSH_INTERVAL));
let approval_queue = match &audit_publisher {
Some(publisher) => {
let (audit_tx, audit_rx) = tokio::sync::mpsc::channel(AUDIT_CHANNEL_CAPACITY);
spawn_audit_drain(&tracker, audit_rx, std::sync::Arc::clone(publisher), token.clone());
crate::approval::ApprovalQueue::with_audit(audit_tx, [0u8; 32])
}
None => crate::approval::ApprovalQueue::new(),
};
let inbound_tx_health = inbound_tx.clone();
let ipc_config = crate::ipc::server::IpcServerConfig::from_runtime_config(&config);
match crate::ipc::server::IpcServer::bind(ipc_config) {
Ok(ipc_server) => {
let _ = ready_tx.send(true);
let ipc_tracker = tracker.clone();
let ipc_token = token.clone();
let ipc_active_connections = std::sync::Arc::clone(&active_connections);
let ipc_router = std::sync::Arc::clone(&response_router);
tracker.spawn(async move {
ipc_server
.run(ipc_tracker, ipc_token, inbound_tx, ipc_active_connections, ipc_router)
.await;
});
tracing::info!("IPC server task spawned");
}
Err(e) => {
tracing::error!(error = %e, "failed to bind IPC socket — continuing without IPC");
}
}
let seq = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0));
if active_layers.contains(crate::layer::LayerSet::EBPF) {
spawn_ebpf_tls(&tracker, &broadcast_tx, &mut degraded_layers);
spawn_ebpf_file_io(&tracker, &broadcast_tx, &seq, &config.agent_id, &mut degraded_layers);
spawn_ebpf_exec_tracepoints(&tracker, &broadcast_tx, &token, &mut degraded_layers);
}
{
let pipeline_token = token.clone();
let pm = pipeline_metrics.clone();
let pipeline_policy = std::sync::Arc::clone(&policy);
let pipeline_router = std::sync::Arc::clone(&response_router);
let pipeline_approval_queue = std::sync::Arc::clone(&approval_queue);
let pipeline_seq = std::sync::Arc::clone(&seq);
tracker.spawn(async move {
crate::pipeline::run(
inbound_rx,
broadcast_tx,
pipeline_config,
pm,
pipeline_token,
pipeline_policy,
pipeline_router,
pipeline_approval_queue,
None,
pipeline_seq,
)
.await;
});
tracing::info!("pipeline task spawned");
}
{
let corr_config = crate::correlation::CorrelationConfig::from_runtime_config(&config);
let corr_interval = Duration::from_millis(corr_config.eviction_interval_ms);
let mut engine = crate::correlation::CorrelationEngine::new(corr_config);
let corr_token = token.clone();
let mut corr_rx = correlation_rx;
tracker.spawn(async move {
let mut ticker = tokio::time::interval(corr_interval);
loop {
tokio::select! {
_ = corr_token.cancelled() => {
tracing::info!("correlation subscriber shutting down");
break;
}
_ = ticker.tick() => {
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
let outcomes = engine.correlate();
for outcome in &outcomes {
match outcome {
crate::correlation::CorrelationOutcome::Matched(c) => {
tracing::info!(
intent = %c.intent_event_id,
action = %c.action_event_id,
strength = c.correlation_strength,
delta_ms = c.time_delta_ms,
"correlation matched"
);
}
crate::correlation::CorrelationOutcome::UnexpectedAction { action_event_id } => {
tracing::warn!(
action = %action_event_id,
"unexpected action — no matching intent"
);
}
crate::correlation::CorrelationOutcome::IntentWithoutAction { intent_event_id } => {
tracing::info!(
intent = %intent_event_id,
"intent without observed action"
);
}
}
}
engine.evict(now_ms);
}
result = corr_rx.recv() => {
match result {
Ok(crate::pipeline::PipelineEvent::Audit(enriched)) => {
if let Some(corr_event) = crate::correlation::try_from_enriched(&enriched) {
engine.ingest(corr_event);
}
}
Ok(crate::pipeline::PipelineEvent::LayerDegradation(_)) => {
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
tracing::warn!(dropped = n, "correlation subscriber lagged — events lost");
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
tracing::info!("broadcast channel closed — correlation subscriber exiting");
break;
}
}
}
}
}
});
tracing::info!("correlation subscriber task spawned");
}
{
let health_state = crate::health::HealthState {
start_time: std::time::Instant::now(),
pipeline_metrics: std::sync::Arc::clone(&pipeline_metrics),
ready_rx,
prometheus_handle,
active_connections: std::sync::Arc::clone(&active_connections),
inbound_tx: inbound_tx_health,
active_layers,
degraded_layers,
};
let addr: std::net::SocketAddr = config
.metrics_addr
.parse()
.expect("invalid AA_METRICS_ADDR — must be a valid socket address");
let health_token = token.clone();
tracker.spawn(async move {
match tokio::net::TcpListener::bind(addr).await {
Ok(listener) => {
tracing::info!(%addr, "health server bound");
axum::serve(listener, crate::health::router(health_state))
.with_graceful_shutdown(async move { health_token.cancelled().await })
.await
.ok();
}
Err(e) => {
tracing::error!(error = %e, %addr, "failed to bind health server");
}
}
});
tracing::info!(%addr, "health server task spawned");
}
wait_for_shutdown_signal().await;
token.cancel();
tracing::info!("cancellation token fired — draining tasks");
tracker.close();
let timeout = Duration::from_secs(config.shutdown_timeout_secs);
if tokio::time::timeout(timeout, tracker.wait()).await.is_err() {
tracing::error!(
timeout_secs = config.shutdown_timeout_secs,
"shutdown timeout exceeded — forcing exit"
);
} else {
tracing::info!("all tasks completed cleanly");
}
if let Some(handle) = audit_flush_loop {
handle.abort();
}
if let Some(publisher) = &audit_publisher {
match publisher.flush_pending().await {
Ok(n) if n > 0 => tracing::info!(flushed = n, "flushed buffered audit events on shutdown"),
Ok(_) => {}
Err(err) => tracing::warn!(error = %err, "failed to flush audit buffer on shutdown"),
}
}
tracing::info!("aa-runtime stopped");
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use tokio_util::sync::CancellationToken;
use tokio_util::task::TaskTracker;
#[test]
fn load_policy_none_returns_empty_rules() {
let policy = super::load_policy(&None);
assert!(policy.rules.is_empty());
}
#[test]
fn load_policy_some_loads_rules_from_file() {
use std::io::Write;
let mut tmp = tempfile::NamedTempFile::new().expect("tempfile");
writeln!(tmp, "[[rules]]").unwrap();
writeln!(tmp, r#"name = "test-rule""#).unwrap();
writeln!(tmp, r#"blocked_actions = ["FILE_OPERATION"]"#).unwrap();
tmp.flush().unwrap();
let policy = super::load_policy(&Some(tmp.path().to_path_buf()));
assert_eq!(policy.rules.len(), 1);
assert_eq!(policy.rules[0].name, "test-rule");
}
#[tokio::test]
async fn graceful_shutdown_drains_all_tasks() {
const TASK_COUNT: usize = 10;
const TIMEOUT: Duration = Duration::from_secs(5);
let tracker = TaskTracker::new();
let token = CancellationToken::new();
for i in 0..TASK_COUNT {
let child_token = token.clone();
tracker.spawn(async move {
loop {
tokio::select! {
_ = child_token.cancelled() => {
break;
}
_ = tokio::time::sleep(Duration::from_millis(10)) => {
}
}
}
tracing::debug!(task = i, "task completed cleanly");
});
}
token.cancel();
tracker.close();
tokio::time::timeout(TIMEOUT, tracker.wait())
.await
.expect("tasks did not complete within timeout");
}
#[tokio::test]
async fn shutdown_timeout_fires_when_tasks_hang() {
let tracker = TaskTracker::new();
let token = CancellationToken::new();
tracker.spawn(async move {
let _token = token; tokio::time::sleep(Duration::from_secs(3600)).await;
});
tracker.close();
let result = tokio::time::timeout(Duration::from_millis(100), tracker.wait()).await;
assert!(result.is_err(), "expected timeout but tasks completed");
}
#[tokio::test]
async fn correlation_subscriber_ingests_and_correlates() {
use crate::correlation::{CorrelationConfig, CorrelationEngine};
use crate::pipeline::event::{EnrichedEvent, EventSource};
use aa_proto::assembly::audit::v1::AuditEvent;
use aa_proto::assembly::common::v1::ActionType;
let config = CorrelationConfig {
window_ms: 500,
max_window_size: 100,
eviction_interval_ms: 50,
};
let mut engine = CorrelationEngine::new(config);
let intent_enriched = EnrichedEvent {
inner: AuditEvent {
event_id: "550e8400-e29b-41d4-a716-446655440001".to_string(),
action_type: ActionType::ToolCall as i32,
..AuditEvent::default()
},
received_at_ms: 1000,
source: EventSource::Sdk,
agent_id: "test".to_string(),
connection_id: 1,
sequence_number: 0,
};
let action_enriched = EnrichedEvent {
inner: AuditEvent {
event_id: "550e8400-e29b-41d4-a716-446655440002".to_string(),
action_type: ActionType::FileOperation as i32,
..AuditEvent::default()
},
received_at_ms: 1050,
source: EventSource::EBpf,
agent_id: "test".to_string(),
connection_id: 1,
sequence_number: 1,
};
let intent_event = crate::correlation::try_from_enriched(&intent_enriched);
assert!(intent_event.is_some(), "SDK/TOOL_CALL should produce Intent");
engine.ingest(intent_event.unwrap());
let action_event = crate::correlation::try_from_enriched(&action_enriched);
assert!(action_event.is_some(), "eBPF/FILE_OPERATION should produce Action");
engine.ingest(action_event.unwrap());
let outcomes = engine.correlate();
assert!(!outcomes.is_empty(), "expected at least one correlation outcome");
}
#[test]
fn emit_proxy_degradation_sends_event() {
let (tx, mut rx) = tokio::sync::broadcast::channel::<crate::pipeline::PipelineEvent>(16);
let active_layers = crate::layer::LayerSet::PROXY | crate::layer::LayerSet::SDK;
super::emit_proxy_degradation(&tx, active_layers, "test reason".to_string());
let event = rx.try_recv().unwrap();
match event {
crate::pipeline::PipelineEvent::LayerDegradation(info) => {
assert_eq!(info.layer, "proxy");
assert_eq!(info.reason, "test reason");
assert_eq!(info.remaining_layers, vec!["sdk"]);
}
_ => panic!("expected LayerDegradation event"),
}
}
#[test]
fn spawn_proxy_binary_not_found_emits_degradation() {
let orig_path = std::env::var("PATH").unwrap_or_default();
std::env::set_var("PATH", "");
let tracker = TaskTracker::new();
let (tx, mut rx) = tokio::sync::broadcast::channel::<crate::pipeline::PipelineEvent>(16);
let active_layers = crate::layer::LayerSet::PROXY | crate::layer::LayerSet::SDK;
let mut degraded = Vec::new();
super::spawn_proxy(&tracker, &tx, active_layers, &mut degraded);
std::env::set_var("PATH", &orig_path);
assert!(degraded.contains(&"proxy".to_string()));
let event = rx.try_recv().unwrap();
match event {
crate::pipeline::PipelineEvent::LayerDegradation(info) => {
assert_eq!(info.layer, "proxy");
assert!(info.reason.contains("not found"));
assert_eq!(info.remaining_layers, vec!["sdk"]);
}
_ => panic!("expected LayerDegradation event"),
}
}
#[tokio::test]
async fn spawn_proxy_failing_binary_emits_degradation() {
let tmp = tempfile::tempdir().unwrap();
#[cfg(unix)]
{
let link_path = tmp.path().join("aa-proxy");
std::os::unix::fs::symlink("/usr/bin/false", &link_path).unwrap();
}
#[cfg(not(unix))]
{
return;
}
let orig_path = std::env::var("PATH").unwrap_or_default();
std::env::set_var("PATH", format!("{}:{orig_path}", tmp.path().display()));
let tracker = TaskTracker::new();
let (tx, mut rx) = tokio::sync::broadcast::channel::<crate::pipeline::PipelineEvent>(16);
let active_layers = crate::layer::LayerSet::PROXY | crate::layer::LayerSet::SDK;
let mut degraded = Vec::new();
super::spawn_proxy(&tracker, &tx, active_layers, &mut degraded);
assert!(!degraded.contains(&"proxy".to_string()));
tracker.close();
tokio::time::timeout(Duration::from_secs(5), tracker.wait())
.await
.expect("proxy task did not exit within timeout");
std::env::set_var("PATH", &orig_path);
let event = rx.try_recv().unwrap();
match event {
crate::pipeline::PipelineEvent::LayerDegradation(info) => {
assert_eq!(info.layer, "proxy");
assert!(info.reason.contains("proxy exited"));
assert_eq!(info.remaining_layers, vec!["sdk"]);
}
_ => panic!("expected LayerDegradation event"),
}
}
#[tokio::test]
async fn broadcast_channel_delivers_to_correlation() {
use crate::pipeline::event::{EnrichedEvent, EventSource, PipelineEvent};
use aa_proto::assembly::audit::v1::AuditEvent;
use aa_proto::assembly::common::v1::ActionType;
let (tx, mut rx) = tokio::sync::broadcast::channel::<PipelineEvent>(16);
let enriched = EnrichedEvent {
inner: AuditEvent {
event_id: "550e8400-e29b-41d4-a716-446655440003".to_string(),
action_type: ActionType::ToolCall as i32,
..AuditEvent::default()
},
received_at_ms: 2000,
source: EventSource::Sdk,
agent_id: "test".to_string(),
connection_id: 1,
sequence_number: 0,
};
tx.send(PipelineEvent::Audit(Box::new(enriched))).unwrap();
let received = rx.recv().await.unwrap();
match received {
PipelineEvent::Audit(e) => {
let corr = crate::correlation::try_from_enriched(&e);
assert!(corr.is_some());
}
_ => panic!("expected Audit event"),
}
}
#[test]
fn spawn_ebpf_tls_degrades_on_non_linux() {
let tracker = tokio_util::task::TaskTracker::new();
let (tx, mut rx) = tokio::sync::broadcast::channel::<crate::pipeline::PipelineEvent>(16);
let mut degraded = Vec::new();
super::spawn_ebpf_tls(&tracker, &tx, &mut degraded);
#[cfg(not(target_os = "linux"))]
{
assert!(degraded.contains(&"ebpf/tls".to_string()));
let event = rx.try_recv().unwrap();
match event {
crate::pipeline::PipelineEvent::LayerDegradation(info) => {
assert_eq!(info.layer, "ebpf/tls");
}
_ => panic!("expected LayerDegradation event"),
}
}
#[cfg(target_os = "linux")]
{
assert!(degraded.contains(&"ebpf/tls".to_string()));
let event = rx.try_recv().unwrap();
match event {
crate::pipeline::PipelineEvent::LayerDegradation(info) => {
assert_eq!(info.layer, "ebpf/tls");
}
_ => panic!("expected LayerDegradation event"),
}
}
}
#[test]
fn spawn_ebpf_file_io_degrades_on_non_linux() {
let tracker = tokio_util::task::TaskTracker::new();
let (tx, mut rx) = tokio::sync::broadcast::channel::<crate::pipeline::PipelineEvent>(16);
let seq = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0));
let mut degraded = Vec::new();
super::spawn_ebpf_file_io(&tracker, &tx, &seq, "test-agent", &mut degraded);
#[cfg(not(target_os = "linux"))]
{
assert!(degraded.contains(&"ebpf/file_io".to_string()));
let event = rx.try_recv().unwrap();
match event {
crate::pipeline::PipelineEvent::LayerDegradation(info) => {
assert_eq!(info.layer, "ebpf/file_io");
}
_ => panic!("expected LayerDegradation event"),
}
}
#[cfg(target_os = "linux")]
{
assert!(degraded.contains(&"ebpf/file_io".to_string()));
let event = rx.try_recv().unwrap();
match event {
crate::pipeline::PipelineEvent::LayerDegradation(info) => {
assert_eq!(info.layer, "ebpf/file_io");
}
_ => panic!("expected LayerDegradation event"),
}
}
}
#[test]
fn spawn_ebpf_exec_tracepoints_degrades_on_non_linux() {
let tracker = tokio_util::task::TaskTracker::new();
let (tx, mut rx) = tokio::sync::broadcast::channel::<crate::pipeline::PipelineEvent>(16);
let token = tokio_util::sync::CancellationToken::new();
let mut degraded = Vec::new();
super::spawn_ebpf_exec_tracepoints(&tracker, &tx, &token, &mut degraded);
#[cfg(not(target_os = "linux"))]
{
assert!(degraded.contains(&"ebpf/exec".to_string()));
let event = rx.try_recv().unwrap();
match event {
crate::pipeline::PipelineEvent::LayerDegradation(info) => {
assert_eq!(info.layer, "ebpf/exec");
}
_ => panic!("expected LayerDegradation event"),
}
}
#[cfg(target_os = "linux")]
{
assert!(degraded.contains(&"ebpf/exec".to_string()));
let event = rx.try_recv().unwrap();
match event {
crate::pipeline::PipelineEvent::LayerDegradation(info) => {
assert_eq!(info.layer, "ebpf/exec");
}
_ => panic!("expected LayerDegradation event"),
}
}
}
fn audit_test_config(nats_config_path: Option<std::path::PathBuf>) -> crate::config::RuntimeConfig {
crate::config::RuntimeConfig {
agent_id: "audit-test".to_string(),
worker_threads: 0,
shutdown_timeout_secs: 30,
ipc_max_connections: 64,
pipeline_input_buffer: 10_000,
pipeline_batch_size: 100,
pipeline_flush_interval_ms: 100,
pipeline_broadcast_capacity: 1_024,
metrics_addr: "0.0.0.0:8080".to_string(),
policy_path: None,
gateway_endpoint: None,
correlation_window_ms: 5_000,
correlation_interval_ms: 1_000,
nats_config_path,
audit_buffer_path: std::env::temp_dir().join("aa-audit-buffer-audit-test.db"),
enforcement_max_field_bytes: crate::pipeline::enforcement::DEFAULT_MAX_FIELD_BYTES,
}
}
#[tokio::test]
async fn build_audit_publisher_disabled_when_unconfigured_or_unreadable() {
assert!(super::build_audit_publisher(&audit_test_config(None)).await.is_none());
let missing = std::env::temp_dir().join("aa-nonexistent-nats-config-xyz.toml");
assert!(super::build_audit_publisher(&audit_test_config(Some(missing)))
.await
.is_none());
}
}
#[cfg(all(test, target_os = "linux", feature = "integration-test"))]
mod layer_integration {
use std::time::Duration;
use crate::pipeline::PipelineEvent;
async fn collect_events(
rx: &mut tokio::sync::broadcast::Receiver<PipelineEvent>,
timeout: Duration,
) -> Vec<PipelineEvent> {
let mut events = Vec::new();
let deadline = tokio::time::Instant::now() + timeout;
loop {
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
if remaining.is_zero() {
break;
}
match tokio::time::timeout(remaining, rx.recv()).await {
Ok(Ok(event)) => events.push(event),
Ok(Err(_)) => break, Err(_) => break, }
}
events
}
#[tokio::test]
async fn both_layers_emit_events_on_shared_channel() {
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use crate::pipeline::EventSource;
let tracker = tokio_util::task::TaskTracker::new();
let (tx, mut rx) = tokio::sync::broadcast::channel::<PipelineEvent>(64);
let seq = Arc::new(AtomicU64::new(0));
let token = tokio_util::sync::CancellationToken::new();
let mut degraded = Vec::new();
super::spawn_ebpf_file_io(&tracker, &tx, &seq, "integration-test", &mut degraded);
if degraded.contains(&"ebpf/file_io".to_string()) {
eprintln!(
"SKIPPING both_layers_emit_events: eBPF file_io degraded \
(probably missing root/CAP_BPF)"
);
return;
}
super::spawn_ebpf_exec_tracepoints(&tracker, &tx, &token, &mut degraded);
tokio::time::sleep(Duration::from_secs(1)).await;
let trigger_path = "/tmp/aa-integration-test-trigger";
for _ in 0..5 {
std::fs::write(trigger_path, b"integration-test").expect("write trigger file");
let _ = std::fs::read_to_string(trigger_path);
tokio::time::sleep(Duration::from_millis(200)).await;
}
let _ = std::fs::remove_file(trigger_path);
let events = collect_events(&mut rx, Duration::from_secs(3)).await;
let ebpf_events: Vec<_> = events
.iter()
.filter(|e| matches!(e, PipelineEvent::Audit(ref a) if a.source == EventSource::EBpf))
.collect();
assert!(
!ebpf_events.is_empty(),
"expected at least one eBPF audit event, got none. \
Total events: {}, types: {:?}",
events.len(),
events
.iter()
.map(|e| match e {
PipelineEvent::Audit(a) => format!("Audit({:?})", a.source),
PipelineEvent::LayerDegradation(info) => {
format!("Degradation({})", info.layer)
}
})
.collect::<Vec<_>>()
);
let mut seq_numbers: Vec<u64> = events
.iter()
.filter_map(|e| match e {
PipelineEvent::Audit(a) => Some(a.sequence_number),
_ => None,
})
.collect();
let original = seq_numbers.clone();
seq_numbers.sort();
seq_numbers.dedup();
assert_eq!(
seq_numbers, original,
"sequence numbers should be unique and monotonically increasing"
);
token.cancel();
tracker.close();
let _ = tokio::time::timeout(Duration::from_secs(1), tracker.wait()).await;
}
#[tokio::test]
async fn all_layers_run_independently() {
let tracker = tokio_util::task::TaskTracker::new();
let (tx, mut rx) = tokio::sync::broadcast::channel::<PipelineEvent>(64);
let seq = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0));
let token = tokio_util::sync::CancellationToken::new();
let mut degraded = Vec::new();
super::spawn_ebpf_tls(&tracker, &tx, &mut degraded);
super::spawn_ebpf_file_io(&tracker, &tx, &seq, "test-agent", &mut degraded);
super::spawn_ebpf_exec_tracepoints(&tracker, &tx, &token, &mut degraded);
super::spawn_proxy(
&tracker,
&tx,
crate::layer::LayerSet::EBPF | crate::layer::LayerSet::PROXY,
&mut degraded,
);
let events = collect_events(&mut rx, Duration::from_secs(2)).await;
let degradation_layers: Vec<String> = events
.iter()
.filter_map(|e| match e {
PipelineEvent::LayerDegradation(info) => Some(info.layer.clone()),
_ => None,
})
.collect();
for layer in °raded {
assert!(
degradation_layers.contains(layer),
"layer '{layer}' is in degraded list but has no LayerDegradation event. \
degraded: {degraded:?}, events: {degradation_layers:?}"
);
}
for layer in °radation_layers {
assert!(
degraded.contains(layer),
"LayerDegradation event for '{layer}' but layer not in degraded list. \
degraded: {degraded:?}, events: {degradation_layers:?}"
);
}
let all_layers = ["ebpf/tls", "ebpf/file_io", "ebpf/exec", "proxy"];
let spawned_or_degraded: Vec<&str> = all_layers
.iter()
.filter(|l| degraded.contains(&l.to_string()) || !degraded.contains(&l.to_string()))
.copied()
.collect();
assert_eq!(
spawned_or_degraded.len(),
4,
"expected all 4 layers to have been attempted"
);
token.cancel();
tracker.close();
let _ = tokio::time::timeout(Duration::from_secs(1), tracker.wait()).await;
}
#[tokio::test]
async fn proxy_degradation_does_not_block_ebpf() {
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
let tracker = tokio_util::task::TaskTracker::new();
let (tx, mut rx) = tokio::sync::broadcast::channel::<PipelineEvent>(64);
let seq = Arc::new(AtomicU64::new(0));
let token = tokio_util::sync::CancellationToken::new();
let mut degraded = Vec::new();
super::spawn_proxy(
&tracker,
&tx,
crate::layer::LayerSet::EBPF | crate::layer::LayerSet::PROXY,
&mut degraded,
);
assert!(
degraded.contains(&"proxy".to_string()),
"expected proxy in degraded list (aa-proxy not on PATH): {degraded:?}"
);
super::spawn_ebpf_file_io(&tracker, &tx, &seq, "integration-test", &mut degraded);
let events = collect_events(&mut rx, Duration::from_secs(2)).await;
let has_proxy_degradation = events.iter().any(|e| {
matches!(
e,
PipelineEvent::LayerDegradation(info) if info.layer == "proxy"
)
});
assert!(has_proxy_degradation, "expected LayerDegradation for proxy layer");
if degraded.contains(&"ebpf/file_io".to_string()) {
let has_ebpf_degradation = events.iter().any(|e| {
matches!(
e,
PipelineEvent::LayerDegradation(info) if info.layer == "ebpf/file_io"
)
});
assert!(
has_ebpf_degradation,
"ebpf/file_io degraded but no LayerDegradation event"
);
}
token.cancel();
tracker.close();
let _ = tokio::time::timeout(Duration::from_secs(1), tracker.wait()).await;
}
}