pub mod enforcement;
pub mod event;
pub mod metrics;
pub use event::{EnrichedEvent, EventSource, LayerDegradationInfo, PipelineEvent};
pub use metrics::PipelineMetrics;
use crate::approval::{ApprovalDecision as RuntimeApprovalDecision, ApprovalQueue, ApprovalRequest};
use crate::config::RuntimeConfig;
use crate::gateway_client::GatewayClient;
use crate::ipc::{IpcFrame, IpcResponse, ResponseRouter};
use crate::policy::PolicyRules;
use aa_proto::assembly::audit::v1::{audit_event::Detail, AuditEvent, PolicyViolation};
use aa_proto::assembly::common::v1::{ActionType, Decision};
use aa_proto::assembly::event::v1::ApprovalDecision as ProtoApprovalDecision;
use aa_proto::assembly::policy::v1::CheckActionResponse;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::sync::{broadcast, mpsc, Mutex};
use tokio_util::sync::CancellationToken;
use uuid::Uuid;
#[derive(Debug, Clone)]
pub struct PipelineConfig {
pub input_buffer: usize,
pub batch_size: usize,
pub flush_interval: Duration,
pub broadcast_capacity: usize,
pub agent_id: String,
pub enforcement: enforcement::EnforcementConfig,
}
impl PipelineConfig {
pub fn from_runtime_config(c: &RuntimeConfig) -> Self {
Self {
input_buffer: c.pipeline_input_buffer,
batch_size: c.pipeline_batch_size,
flush_interval: Duration::from_millis(c.pipeline_flush_interval_ms),
broadcast_capacity: c.pipeline_broadcast_capacity,
agent_id: c.agent_id.clone(),
enforcement: enforcement::EnforcementConfig::from_runtime_config(c),
}
}
}
#[allow(clippy::too_many_arguments)]
pub async fn run(
mut rx: mpsc::Receiver<(u64, IpcFrame)>,
broadcast_tx: broadcast::Sender<PipelineEvent>,
config: PipelineConfig,
metrics: Arc<PipelineMetrics>,
token: CancellationToken,
policy: Arc<PolicyRules>,
response_router: ResponseRouter,
approval_queue: Arc<ApprovalQueue>,
gateway_client: Option<Arc<Mutex<GatewayClient>>>,
seq: Arc<AtomicU64>,
) {
let mut batch: Vec<EnrichedEvent> = Vec::with_capacity(config.batch_size);
let mut ticker = tokio::time::interval(config.flush_interval);
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
let scanner = enforcement::RuntimeScanner::with_config(config.enforcement.clone());
loop {
tokio::select! {
biased;
_ = token.cancelled() => {
if !batch.is_empty() {
flush(&mut batch, &broadcast_tx, &metrics);
}
break;
}
Some((connection_id, frame)) = rx.recv() => {
match frame {
IpcFrame::EventReport(event) => {
let mut enriched = enrich(event, &config.agent_id, connection_id, &seq);
scanner.enforce(&mut enriched);
tracing::debug!(sequence_number = enriched.sequence_number, connection_id, "event enriched");
metrics.record_processed(1);
::metrics::counter!("aa_events_received_total").increment(1);
if is_policy_violation(&enriched, &policy) {
::metrics::counter!("aa_policy_violations_total").increment(1);
push_violation_alert(&enriched, &response_router).await;
let _ = broadcast_tx.send(PipelineEvent::Audit(Box::new(enriched)));
} else {
batch.push(enriched);
if batch.len() >= config.batch_size {
flush(&mut batch, &broadcast_tx, &metrics);
}
}
}
IpcFrame::PolicyQuery(req) => {
handle_policy_query(
connection_id,
req,
&policy,
&approval_queue,
&response_router,
&gateway_client,
&broadcast_tx,
&seq,
)
.await;
}
_ => {}
}
}
_ = ticker.tick() => {
if !batch.is_empty() {
flush(&mut batch, &broadcast_tx, &metrics);
}
}
}
}
tracing::info!("pipeline task stopped");
}
fn enrich(event: AuditEvent, agent_id: &str, connection_id: u64, seq: &AtomicU64) -> EnrichedEvent {
use std::time::{SystemTime, UNIX_EPOCH};
let received_at_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or(Duration::ZERO)
.as_millis() as i64;
let sequence_number = seq.fetch_add(1, Ordering::Relaxed);
EnrichedEvent {
inner: event,
received_at_ms,
source: EventSource::Sdk,
agent_id: agent_id.to_string(),
connection_id,
sequence_number,
}
}
fn is_policy_violation(event: &EnrichedEvent, policy: &PolicyRules) -> bool {
if matches!(event.inner.detail, Some(Detail::Violation(_))) {
return true;
}
let action_str = ActionType::try_from(event.inner.action_type)
.map(|a| a.as_str_name())
.unwrap_or("");
for rule in &policy.rules {
if rule.blocked_actions.iter().any(|ba| ba == action_str) {
tracing::warn!(
rule = %rule.name,
action = %action_str,
"policy rule matched — event bypassing batch"
);
return true;
}
}
false
}
fn extract_violation(event: &EnrichedEvent) -> Option<PolicyViolation> {
match &event.inner.detail {
Some(Detail::Violation(v)) => Some(v.clone()),
_ => None,
}
}
async fn push_violation_alert(event: &EnrichedEvent, router: &crate::ipc::ResponseRouter) {
let Some(violation) = extract_violation(event) else {
return;
};
let sender = {
let map = router.read().await;
map.get(&event.connection_id).cloned()
};
if let Some(tx) = sender {
if tx.send(IpcResponse::ViolationAlert(violation)).await.is_err() {
tracing::debug!(
connection_id = event.connection_id,
"ViolationAlert dropped — connection already closed"
);
}
}
}
async fn send_ipc_response(connection_id: u64, response: IpcResponse, router: &ResponseRouter) {
let sender = {
let map = router.read().await;
map.get(&connection_id).cloned()
};
if let Some(tx) = sender {
if tx.send(response).await.is_err() {
tracing::debug!(connection_id, "IpcResponse dropped — connection already closed");
}
}
}
#[allow(clippy::too_many_arguments)]
async fn handle_policy_query(
connection_id: u64,
req: aa_proto::assembly::policy::v1::CheckActionRequest,
policy: &PolicyRules,
approval_queue: &Arc<ApprovalQueue>,
response_router: &ResponseRouter,
gateway_client: &Option<Arc<Mutex<GatewayClient>>>,
broadcast_tx: &broadcast::Sender<PipelineEvent>,
sequence_counter: &AtomicU64,
) {
if let Some(client) = gateway_client {
let mut guard = client.lock().await;
match guard.check_action(req.clone()).await {
Ok(resp) => {
tracing::debug!(connection_id, decision = resp.decision, "gateway responded");
if resp.decision == aa_proto::assembly::common::v1::Decision::Deny as i32 {
emit_gateway_violation(&req, &resp, connection_id, broadcast_tx, sequence_counter);
}
send_ipc_response(connection_id, IpcResponse::PolicyResponse(resp), response_router).await;
return;
}
Err(e) => {
tracing::warn!(
connection_id,
error = %e,
"gateway call failed — falling back to local policy evaluation"
);
}
}
}
let action_str = ActionType::try_from(req.action_type)
.map(|a| a.as_str_name())
.unwrap_or("");
let agent_id_str = req
.agent_id
.as_ref()
.map(|a| a.agent_id.as_str())
.unwrap_or("")
.to_string();
for rule in &policy.rules {
if rule.requires_approval_actions.iter().any(|a| a == action_str) {
let request_id = Uuid::new_v4();
let submitted_at = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or(Duration::ZERO)
.as_secs();
let approval_req = ApprovalRequest {
request_id,
agent_id: agent_id_str.clone(),
action: action_str.to_string(),
condition_triggered: rule.name.clone(),
submitted_at,
timeout_secs: rule.approval_timeout_secs as u64,
fallback: aa_core::PolicyResult::Deny {
reason: "approval timed out".to_string(),
},
team_id: None,
timeout_override_secs: None,
escalation_role_override: None,
};
let (rid, fut) = approval_queue.submit(approval_req);
send_ipc_response(
connection_id,
IpcResponse::PolicyResponse(CheckActionResponse {
decision: Decision::Pending as i32,
approval_id: rid.to_string(),
policy_rule: rule.name.clone(),
..Default::default()
}),
response_router,
)
.await;
let router = Arc::clone(response_router);
tokio::spawn(async move {
if let Ok(decision) = fut.await {
let (approved, decided_by, reason) = match decision {
RuntimeApprovalDecision::Approved { by, reason } => (true, by, reason.unwrap_or_default()),
RuntimeApprovalDecision::Rejected { by, reason } => (false, by, reason),
RuntimeApprovalDecision::TimedOut { .. } => {
(false, "timeout".to_string(), "approval timed out".to_string())
}
};
let decided_at_unix_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or(Duration::ZERO)
.as_millis() as i64;
let proto = ProtoApprovalDecision {
approval_id: rid.to_string(),
approved,
decided_by,
reason,
decided_at_unix_ms,
};
send_ipc_response(connection_id, IpcResponse::ApprovalDecision(proto), &router).await;
}
});
return;
}
}
for rule in &policy.rules {
if rule.blocked_actions.iter().any(|ba| ba == action_str) {
send_ipc_response(
connection_id,
IpcResponse::PolicyResponse(CheckActionResponse {
decision: Decision::Deny as i32,
reason: format!("blocked by rule: {}", rule.name),
policy_rule: rule.name.clone(),
..Default::default()
}),
response_router,
)
.await;
return;
}
}
send_ipc_response(
connection_id,
IpcResponse::PolicyResponse(CheckActionResponse {
decision: Decision::Allow as i32,
..Default::default()
}),
response_router,
)
.await;
}
fn emit_gateway_violation(
req: &aa_proto::assembly::policy::v1::CheckActionRequest,
resp: &aa_proto::assembly::policy::v1::CheckActionResponse,
connection_id: u64,
broadcast_tx: &broadcast::Sender<PipelineEvent>,
sequence_counter: &AtomicU64,
) {
use aa_proto::assembly::audit::v1::{audit_event::Detail, AuditEvent, PolicyViolation};
use aa_proto::assembly::common::v1::ActionType;
let action_name = ActionType::try_from(req.action_type)
.map(|a| a.as_str_name())
.unwrap_or("ACTION_UNSPECIFIED")
.to_string();
let agent_id_str = req
.agent_id
.as_ref()
.map(|a| a.agent_id.as_str())
.unwrap_or("")
.to_string();
let event = AuditEvent {
action_type: req.action_type,
decision: resp.decision,
trace_id: req.trace_id.clone(),
span_id: req.span_id.clone(),
detail: Some(Detail::Violation(PolicyViolation {
policy_rule: resp.policy_rule.clone(),
blocked_action: action_name,
reason: resp.reason.clone(),
latency_ms: resp.decision_latency_us / 1_000,
})),
..AuditEvent::default()
};
let enriched = enrich(event, &agent_id_str, connection_id, sequence_counter);
if broadcast_tx.send(PipelineEvent::Audit(Box::new(enriched))).is_err() {
tracing::trace!("dropped synthetic violation event; no subscribers");
}
}
fn flush(batch: &mut Vec<EnrichedEvent>, broadcast_tx: &broadcast::Sender<PipelineEvent>, metrics: &PipelineMetrics) {
let n = batch.len() as u64;
for event in batch.drain(..) {
let _ = broadcast_tx.send(PipelineEvent::Audit(Box::new(event)));
}
::metrics::counter!("aa_events_emitted_total").increment(n);
metrics.record_batch_size(n);
}
#[cfg(test)]
mod tests {
use super::*;
use crate::policy::PolicyRules;
use aa_proto::assembly::audit::v1::{audit_event::Detail, AuditEvent, PolicyViolation};
fn unwrap_audit(event: PipelineEvent) -> EnrichedEvent {
match event {
PipelineEvent::Audit(e) => *e,
other => panic!("expected PipelineEvent::Audit, got {other:?}"),
}
}
fn make_audit_event() -> AuditEvent {
AuditEvent::default()
}
fn make_policy_violation_event() -> AuditEvent {
AuditEvent {
detail: Some(Detail::Violation(PolicyViolation {
policy_rule: "test-rule".to_string(),
blocked_action: "test-action".to_string(),
reason: "test-reason".to_string(),
latency_ms: 0,
})),
..Default::default()
}
}
#[test]
fn emit_gateway_violation_on_deny_pushes_structured_audit_event() {
use aa_proto::assembly::common::v1::{ActionType, AgentId as ProtoAgentId, Decision};
use aa_proto::assembly::policy::v1::{CheckActionRequest, CheckActionResponse};
let (tx, mut rx) = broadcast::channel::<PipelineEvent>(4);
let seq = AtomicU64::new(0);
let req = CheckActionRequest {
agent_id: Some(ProtoAgentId {
agent_id: "support-agent".into(),
..Default::default()
}),
credential_token: String::new(),
trace_id: "trace-1".into(),
span_id: "span-1".into(),
action_type: ActionType::FileOperation as i32,
context: Default::default(),
caller_agent_id: None,
};
let resp = CheckActionResponse {
decision: Decision::Deny as i32,
reason: "blocked by policy".into(),
policy_rule: "no-secrets".into(),
approval_id: String::new(),
redact: None,
decision_latency_us: 12_000,
};
emit_gateway_violation(&req, &resp, 99, &tx, &seq);
let event = rx.try_recv().expect("expected one event on the channel");
let enriched = unwrap_audit(event);
assert_eq!(enriched.agent_id, "support-agent");
assert_eq!(enriched.connection_id, 99);
match enriched.inner.detail {
Some(Detail::Violation(v)) => {
assert_eq!(v.policy_rule, "no-secrets");
assert_eq!(v.blocked_action, "FILE_OPERATION");
assert_eq!(v.reason, "blocked by policy");
assert_eq!(v.latency_ms, 12);
}
other => panic!("expected Violation detail, got {other:?}"),
}
}
#[test]
fn emit_gateway_violation_sub_millisecond_decision_floors_to_zero() {
use aa_proto::assembly::common::v1::{ActionType, AgentId as ProtoAgentId, Decision};
use aa_proto::assembly::policy::v1::{CheckActionRequest, CheckActionResponse};
let (tx, mut rx) = broadcast::channel::<PipelineEvent>(4);
let seq = AtomicU64::new(0);
let req = CheckActionRequest {
agent_id: Some(ProtoAgentId {
agent_id: "fast-agent".into(),
..Default::default()
}),
credential_token: String::new(),
trace_id: String::new(),
span_id: String::new(),
action_type: ActionType::ToolCall as i32,
context: Default::default(),
caller_agent_id: None,
};
let resp = CheckActionResponse {
decision: Decision::Deny as i32,
reason: "fast-block".into(),
policy_rule: "rule".into(),
approval_id: String::new(),
redact: None,
decision_latency_us: 500,
};
emit_gateway_violation(&req, &resp, 1, &tx, &seq);
let event = rx.try_recv().expect("expected one event on the channel");
match unwrap_audit(event).inner.detail {
Some(Detail::Violation(v)) => assert_eq!(v.latency_ms, 0),
other => panic!("expected Violation detail, got {other:?}"),
}
}
#[test]
fn enrich_sets_agent_id() {
let event = make_audit_event();
let seq = AtomicU64::new(0);
let enriched = enrich(event, "my-agent", 0, &seq);
assert_eq!(enriched.agent_id, "my-agent");
}
#[test]
fn enrich_sets_received_at_ms_positive() {
let event = make_audit_event();
let seq = AtomicU64::new(0);
let enriched = enrich(event, "agent", 0, &seq);
assert!(enriched.received_at_ms > 0);
}
#[test]
fn enrich_sets_source_to_sdk() {
let event = make_audit_event();
let seq = AtomicU64::new(0);
let enriched = enrich(event, "agent", 0, &seq);
assert_eq!(enriched.source, EventSource::Sdk);
}
#[test]
fn is_policy_violation_true_for_violation_detail() {
let event = make_policy_violation_event();
let seq = AtomicU64::new(0);
let enriched = enrich(event, "agent", 0, &seq);
assert!(is_policy_violation(&enriched, &PolicyRules::default()));
}
#[test]
fn is_policy_violation_false_for_normal_event() {
let event = make_audit_event(); let seq = AtomicU64::new(0);
let enriched = enrich(event, "agent", 0, &seq);
assert!(!is_policy_violation(&enriched, &PolicyRules::default()));
}
#[test]
fn flush_empty_batch_does_nothing() {
let (tx, _rx) = broadcast::channel::<PipelineEvent>(16);
let metrics = PipelineMetrics::default();
let mut batch: Vec<EnrichedEvent> = vec![];
flush(&mut batch, &tx, &metrics);
assert_eq!(metrics.last_batch_size(), 0);
assert_eq!(metrics.processed(), 0);
}
#[test]
fn flush_broadcasts_all_events_and_records_batch_size() {
let (tx, mut rx) = broadcast::channel::<PipelineEvent>(16);
let metrics = PipelineMetrics::default();
let seq = AtomicU64::new(0);
let mut batch = vec![
enrich(make_audit_event(), "a", 0, &seq),
enrich(make_audit_event(), "b", 0, &seq),
];
flush(&mut batch, &tx, &metrics);
assert!(batch.is_empty());
assert_eq!(metrics.last_batch_size(), 2);
assert!(rx.try_recv().is_ok());
assert!(rx.try_recv().is_ok());
}
#[test]
fn from_runtime_config_copies_all_fields() {
let runtime_config = RuntimeConfig {
agent_id: "test-agent".to_string(),
worker_threads: 0,
shutdown_timeout_secs: 30,
ipc_max_connections: 64,
pipeline_input_buffer: 5_000,
pipeline_batch_size: 50,
pipeline_flush_interval_ms: 200,
pipeline_broadcast_capacity: 512,
metrics_addr: "0.0.0.0:8080".to_string(),
policy_path: None,
gateway_endpoint: None,
correlation_window_ms: 5_000,
correlation_interval_ms: 1_000,
nats_config_path: None,
audit_buffer_path: std::path::PathBuf::from("/tmp/aa-audit-buffer-test.db"),
enforcement_max_field_bytes: enforcement::DEFAULT_MAX_FIELD_BYTES,
};
let pipeline_config = PipelineConfig::from_runtime_config(&runtime_config);
assert_eq!(pipeline_config.input_buffer, runtime_config.pipeline_input_buffer);
assert_eq!(pipeline_config.batch_size, runtime_config.pipeline_batch_size);
assert_eq!(
pipeline_config.flush_interval,
Duration::from_millis(runtime_config.pipeline_flush_interval_ms)
);
assert_eq!(
pipeline_config.broadcast_capacity,
runtime_config.pipeline_broadcast_capacity
);
assert_eq!(pipeline_config.agent_id, runtime_config.agent_id);
}
#[test]
fn pipeline_config_is_clone() {
let pipeline_config = PipelineConfig {
input_buffer: 5_000,
batch_size: 50,
flush_interval: Duration::from_millis(200),
broadcast_capacity: 512,
agent_id: "test-agent".to_string(),
enforcement: enforcement::EnforcementConfig::default(),
};
let cloned = pipeline_config.clone();
assert_eq!(cloned.agent_id, pipeline_config.agent_id);
}
fn test_config(batch_size: usize, flush_interval_ms: u64) -> PipelineConfig {
PipelineConfig {
input_buffer: 1_024,
batch_size,
flush_interval: Duration::from_millis(flush_interval_ms),
broadcast_capacity: 1_024,
agent_id: "test-agent".to_string(),
enforcement: enforcement::EnforcementConfig::default(),
}
}
fn normal_event() -> AuditEvent {
AuditEvent::default()
}
fn violation_event() -> AuditEvent {
AuditEvent {
detail: Some(Detail::Violation(PolicyViolation {
policy_rule: "rule".to_string(),
blocked_action: "action".to_string(),
reason: "reason".to_string(),
latency_ms: 0,
})),
..Default::default()
}
}
#[tokio::test]
async fn batch_flushes_on_size_threshold() {
let config = test_config(3, 10_000); let (tx, rx) = mpsc::channel::<(u64, IpcFrame)>(64);
let (broadcast_tx, mut broadcast_rx) = broadcast::channel::<PipelineEvent>(64);
let metrics = Arc::new(PipelineMetrics::default());
let token = CancellationToken::new();
tokio::spawn(run(
rx,
broadcast_tx,
config,
metrics.clone(),
token.clone(),
Arc::new(PolicyRules::default()),
crate::ipc::new_response_router(),
crate::approval::ApprovalQueue::new(),
None,
Arc::new(AtomicU64::new(0)),
));
for _ in 0..3 {
tx.send((0, IpcFrame::EventReport(normal_event()))).await.unwrap();
}
for _ in 0..3 {
tokio::time::timeout(Duration::from_millis(500), broadcast_rx.recv())
.await
.expect("timed out waiting for event")
.expect("broadcast error");
}
assert_eq!(metrics.processed(), 3);
token.cancel();
}
#[tokio::test]
async fn batch_flushes_on_interval() {
let config = test_config(100, 50); let (tx, rx) = mpsc::channel::<(u64, IpcFrame)>(64);
let (broadcast_tx, mut broadcast_rx) = broadcast::channel::<PipelineEvent>(64);
let metrics = Arc::new(PipelineMetrics::default());
let token = CancellationToken::new();
tokio::spawn(run(
rx,
broadcast_tx,
config,
metrics.clone(),
token.clone(),
Arc::new(PolicyRules::default()),
crate::ipc::new_response_router(),
crate::approval::ApprovalQueue::new(),
None,
Arc::new(AtomicU64::new(0)),
));
for _ in 0..5 {
tx.send((0, IpcFrame::EventReport(normal_event()))).await.unwrap();
}
for _ in 0..5 {
tokio::time::timeout(Duration::from_millis(500), broadcast_rx.recv())
.await
.expect("timed out waiting for event from interval flush")
.expect("broadcast error");
}
assert_eq!(metrics.processed(), 5);
token.cancel();
}
#[tokio::test]
async fn policy_violation_bypasses_batch() {
let config = test_config(100, 10_000);
let (tx, rx) = mpsc::channel::<(u64, IpcFrame)>(64);
let (broadcast_tx, mut broadcast_rx) = broadcast::channel::<PipelineEvent>(64);
let metrics = Arc::new(PipelineMetrics::default());
let token = CancellationToken::new();
tokio::spawn(run(
rx,
broadcast_tx,
config,
metrics.clone(),
token.clone(),
Arc::new(PolicyRules::default()),
crate::ipc::new_response_router(),
crate::approval::ApprovalQueue::new(),
None,
Arc::new(AtomicU64::new(0)),
));
tx.send((0, IpcFrame::EventReport(violation_event()))).await.unwrap();
let event = unwrap_audit(
tokio::time::timeout(Duration::from_millis(200), broadcast_rx.recv())
.await
.expect("violation event should arrive immediately, before any flush interval")
.expect("broadcast error"),
);
assert!(matches!(event.inner.detail, Some(Detail::Violation(_))));
assert_eq!(metrics.processed(), 1);
token.cancel();
}
#[tokio::test]
async fn cancellation_flushes_pending_batch() {
let config = test_config(100, 10_000); let (tx, rx) = mpsc::channel::<(u64, IpcFrame)>(64);
let (broadcast_tx, mut broadcast_rx) = broadcast::channel::<PipelineEvent>(64);
let metrics = Arc::new(PipelineMetrics::default());
let token = CancellationToken::new();
let handle = tokio::spawn(run(
rx,
broadcast_tx,
config,
metrics.clone(),
token.clone(),
Arc::new(PolicyRules::default()),
crate::ipc::new_response_router(),
crate::approval::ApprovalQueue::new(),
None,
Arc::new(AtomicU64::new(0)),
));
for _ in 0..5 {
tx.send((0, IpcFrame::EventReport(normal_event()))).await.unwrap();
}
let deadline = std::time::Instant::now() + Duration::from_millis(200);
loop {
if metrics.processed() == 5 {
break;
}
assert!(
std::time::Instant::now() < deadline,
"events were not processed within 200ms"
);
tokio::task::yield_now().await;
}
token.cancel();
tokio::time::timeout(Duration::from_millis(500), handle)
.await
.expect("pipeline did not stop after cancellation")
.expect("pipeline task panicked");
let mut received = 0;
while broadcast_rx.try_recv().is_ok() {
received += 1;
}
assert_eq!(received, 5, "expected 5 events flushed on cancellation");
}
#[tokio::test]
async fn non_event_frames_ignored() {
let config = test_config(100, 50);
let (tx, rx) = mpsc::channel::<(u64, IpcFrame)>(64);
let (broadcast_tx, _broadcast_rx) = broadcast::channel::<PipelineEvent>(64);
let metrics = Arc::new(PipelineMetrics::default());
let token = CancellationToken::new();
tokio::spawn(run(
rx,
broadcast_tx,
config,
metrics.clone(),
token.clone(),
Arc::new(PolicyRules::default()),
crate::ipc::new_response_router(),
crate::approval::ApprovalQueue::new(),
None,
Arc::new(AtomicU64::new(0)),
));
tx.send((0, IpcFrame::Heartbeat)).await.unwrap();
tokio::time::sleep(Duration::from_millis(20)).await;
assert_eq!(metrics.processed(), 0);
token.cancel();
}
#[tokio::test]
async fn rule_match_bypasses_batch() {
use crate::policy::{PolicyRule, PolicyRules};
use aa_proto::assembly::common::v1::ActionType;
let policy = std::sync::Arc::new(PolicyRules {
rules: vec![PolicyRule {
name: "block-files".to_string(),
blocked_actions: vec![ActionType::FileOperation.as_str_name().to_string()],
..Default::default()
}],
});
let config = test_config(100, 10_000);
let (tx, rx) = mpsc::channel::<(u64, IpcFrame)>(64);
let (broadcast_tx, mut broadcast_rx) = broadcast::channel::<PipelineEvent>(64);
let metrics = Arc::new(PipelineMetrics::default());
let token = CancellationToken::new();
tokio::spawn(run(
rx,
broadcast_tx,
config,
metrics.clone(),
token.clone(),
policy,
crate::ipc::new_response_router(),
crate::approval::ApprovalQueue::new(),
None,
Arc::new(AtomicU64::new(0)),
));
let event = AuditEvent {
action_type: ActionType::FileOperation as i32,
..Default::default()
};
tx.send((0, IpcFrame::EventReport(event))).await.unwrap();
let received = unwrap_audit(
tokio::time::timeout(Duration::from_millis(200), broadcast_rx.recv())
.await
.expect("rule-matched event should bypass batch and arrive immediately")
.expect("broadcast error"),
);
assert_eq!(received.source, EventSource::Sdk);
assert_eq!(metrics.processed(), 1);
token.cancel();
}
#[tokio::test(start_paused = true)]
async fn non_matching_action_stays_in_batch() {
use crate::policy::{PolicyRule, PolicyRules};
use aa_proto::assembly::common::v1::ActionType;
let policy = std::sync::Arc::new(PolicyRules {
rules: vec![PolicyRule {
name: "block-files".to_string(),
blocked_actions: vec![ActionType::FileOperation.as_str_name().to_string()],
..Default::default()
}],
});
let config = test_config(100, 10_000);
let (tx, rx) = mpsc::channel::<(u64, IpcFrame)>(64);
let (broadcast_tx, mut broadcast_rx) = broadcast::channel::<PipelineEvent>(64);
let metrics = Arc::new(PipelineMetrics::default());
let token = CancellationToken::new();
tokio::spawn(run(
rx,
broadcast_tx,
config,
metrics.clone(),
token.clone(),
policy,
crate::ipc::new_response_router(),
crate::approval::ApprovalQueue::new(),
None,
Arc::new(AtomicU64::new(0)),
));
tokio::task::yield_now().await;
let event = AuditEvent {
action_type: ActionType::ToolCall as i32,
..Default::default()
};
tx.send((0, IpcFrame::EventReport(event))).await.unwrap();
while metrics.processed() == 0 {
tokio::task::yield_now().await;
}
tokio::time::advance(Duration::from_millis(100)).await;
assert!(
broadcast_rx.try_recv().is_err(),
"non-matching event should stay in batch, not arrive before flush interval"
);
token.cancel();
}
#[tokio::test]
async fn sequence_numbers_are_consecutive_within_a_batch() {
let config = test_config(3, 10_000);
let (tx, rx) = mpsc::channel::<(u64, IpcFrame)>(64);
let (broadcast_tx, mut broadcast_rx) = broadcast::channel::<PipelineEvent>(64);
let metrics = Arc::new(PipelineMetrics::default());
let token = CancellationToken::new();
tokio::spawn(run(
rx,
broadcast_tx,
config,
metrics.clone(),
token.clone(),
Arc::new(PolicyRules::default()),
crate::ipc::new_response_router(),
crate::approval::ApprovalQueue::new(),
None,
Arc::new(AtomicU64::new(0)),
));
for _ in 0..3 {
tx.send((0, IpcFrame::EventReport(normal_event()))).await.unwrap();
}
let mut seq_numbers = Vec::new();
for _ in 0..3 {
let event = unwrap_audit(
tokio::time::timeout(Duration::from_millis(500), broadcast_rx.recv())
.await
.expect("timed out waiting for event")
.expect("broadcast error"),
);
seq_numbers.push(event.sequence_number);
}
assert_eq!(
seq_numbers,
vec![0, 1, 2],
"expected consecutive sequence numbers 0, 1, 2"
);
token.cancel();
}
#[tokio::test]
async fn sequence_numbers_are_monotonic_across_batches() {
let config = test_config(2, 10_000); let (tx, rx) = mpsc::channel::<(u64, IpcFrame)>(64);
let (broadcast_tx, mut broadcast_rx) = broadcast::channel::<PipelineEvent>(64);
let metrics = Arc::new(PipelineMetrics::default());
let token = CancellationToken::new();
tokio::spawn(run(
rx,
broadcast_tx,
config,
metrics.clone(),
token.clone(),
Arc::new(PolicyRules::default()),
crate::ipc::new_response_router(),
crate::approval::ApprovalQueue::new(),
None,
Arc::new(AtomicU64::new(0)),
));
for _ in 0..2 {
tx.send((0, IpcFrame::EventReport(normal_event()))).await.unwrap();
}
let first_batch: Vec<u64> = {
let mut v = Vec::new();
for _ in 0..2 {
let e = unwrap_audit(
tokio::time::timeout(Duration::from_millis(500), broadcast_rx.recv())
.await
.expect("timed out waiting for first batch")
.expect("broadcast error"),
);
v.push(e.sequence_number);
}
v
};
for _ in 0..2 {
tx.send((0, IpcFrame::EventReport(normal_event()))).await.unwrap();
}
let second_batch: Vec<u64> = {
let mut v = Vec::new();
for _ in 0..2 {
let e = unwrap_audit(
tokio::time::timeout(Duration::from_millis(500), broadcast_rx.recv())
.await
.expect("timed out waiting for second batch")
.expect("broadcast error"),
);
v.push(e.sequence_number);
}
v
};
assert_eq!(first_batch, vec![0, 1]);
assert_eq!(
second_batch,
vec![2, 3],
"sequence counter must not reset between batches"
);
token.cancel();
}
#[tokio::test]
#[ignore]
async fn pipeline_load_benchmark() {
const EVENT_COUNT: u64 = 100_000;
let config = test_config(100, 10);
let (tx, rx) = mpsc::channel::<(u64, IpcFrame)>(10_000);
let (broadcast_tx, mut broadcast_rx) = broadcast::channel::<PipelineEvent>(10_000);
let metrics = Arc::new(PipelineMetrics::default());
let token = CancellationToken::new();
tokio::spawn(run(
rx,
broadcast_tx,
config,
metrics.clone(),
token.clone(),
Arc::new(PolicyRules::default()),
crate::ipc::new_response_router(),
crate::approval::ApprovalQueue::new(),
None,
Arc::new(AtomicU64::new(0)),
));
tokio::spawn(async move { while broadcast_rx.recv().await.is_ok() {} });
let start = std::time::Instant::now();
for _ in 0..EVENT_COUNT {
tx.send((0, IpcFrame::EventReport(normal_event()))).await.unwrap();
}
let deadline = std::time::Instant::now() + Duration::from_secs(10);
loop {
if metrics.processed() >= EVENT_COUNT {
break;
}
if std::time::Instant::now() > deadline {
panic!(
"load benchmark timeout: only {} / {} events processed",
metrics.processed(),
EVENT_COUNT
);
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
let elapsed = start.elapsed();
println!(
"pipeline_load_benchmark: {} events in {:?} ({:.0} events/sec)",
EVENT_COUNT,
elapsed,
EVENT_COUNT as f64 / elapsed.as_secs_f64()
);
assert!(elapsed.as_secs() < 5, "100k events took more than 5s: {:?}", elapsed);
token.cancel();
}
fn make_router_with_receiver() -> (ResponseRouter, tokio::sync::mpsc::Receiver<IpcResponse>) {
let router = crate::ipc::new_response_router();
let (tx, rx) = tokio::sync::mpsc::channel::<IpcResponse>(16);
router.try_write().unwrap().insert(0, tx);
(router, rx)
}
fn policy_query_frame(action_type: aa_proto::assembly::common::v1::ActionType) -> IpcFrame {
IpcFrame::PolicyQuery(aa_proto::assembly::policy::v1::CheckActionRequest {
action_type: action_type as i32,
..Default::default()
})
}
#[tokio::test]
async fn policy_query_no_rules_responds_allow() {
use aa_proto::assembly::common::v1::{ActionType, Decision};
let config = test_config(100, 10_000);
let (tx, rx) = mpsc::channel::<(u64, IpcFrame)>(64);
let (broadcast_tx, _rx) = broadcast::channel::<PipelineEvent>(64);
let metrics = Arc::new(PipelineMetrics::default());
let token = CancellationToken::new();
let (router, mut resp_rx) = make_router_with_receiver();
let approval_queue = crate::approval::ApprovalQueue::new();
tokio::spawn(run(
rx,
broadcast_tx,
config,
metrics,
token.clone(),
Arc::new(PolicyRules::default()),
router,
approval_queue,
None,
Arc::new(AtomicU64::new(0)),
));
tx.send((0, policy_query_frame(ActionType::ToolCall))).await.unwrap();
let resp = tokio::time::timeout(Duration::from_millis(200), resp_rx.recv())
.await
.expect("response timed out")
.expect("channel closed");
if let IpcResponse::PolicyResponse(r) = resp {
assert_eq!(r.decision, Decision::Allow as i32);
} else {
panic!("expected PolicyResponse, got {resp:?}");
}
token.cancel();
}
#[tokio::test]
async fn policy_query_blocked_action_responds_deny() {
use crate::policy::{PolicyRule, PolicyRules};
use aa_proto::assembly::common::v1::{ActionType, Decision};
let policy = Arc::new(PolicyRules {
rules: vec![PolicyRule {
name: "block-tool".to_string(),
blocked_actions: vec![ActionType::ToolCall.as_str_name().to_string()],
..Default::default()
}],
});
let config = test_config(100, 10_000);
let (tx, rx) = mpsc::channel::<(u64, IpcFrame)>(64);
let (broadcast_tx, _rx) = broadcast::channel::<PipelineEvent>(64);
let metrics = Arc::new(PipelineMetrics::default());
let token = CancellationToken::new();
let (router, mut resp_rx) = make_router_with_receiver();
let approval_queue = crate::approval::ApprovalQueue::new();
tokio::spawn(run(
rx,
broadcast_tx,
config,
metrics,
token.clone(),
policy,
router,
approval_queue,
None,
Arc::new(AtomicU64::new(0)),
));
tx.send((0, policy_query_frame(ActionType::ToolCall))).await.unwrap();
let resp = tokio::time::timeout(Duration::from_millis(200), resp_rx.recv())
.await
.expect("response timed out")
.expect("channel closed");
if let IpcResponse::PolicyResponse(r) = resp {
assert_eq!(r.decision, Decision::Deny as i32);
} else {
panic!("expected PolicyResponse, got {resp:?}");
}
token.cancel();
}
#[tokio::test]
async fn policy_query_requires_approval_responds_pending_and_adds_to_queue() {
use crate::policy::{PolicyRule, PolicyRules};
use aa_proto::assembly::common::v1::{ActionType, Decision};
let policy = Arc::new(PolicyRules {
rules: vec![PolicyRule {
name: "approve-tool".to_string(),
requires_approval_actions: vec![ActionType::ToolCall.as_str_name().to_string()],
approval_timeout_secs: 60,
..Default::default()
}],
});
let config = test_config(100, 10_000);
let (tx, rx) = mpsc::channel::<(u64, IpcFrame)>(64);
let (broadcast_tx, _rx) = broadcast::channel::<PipelineEvent>(64);
let metrics = Arc::new(PipelineMetrics::default());
let token = CancellationToken::new();
let (router, mut resp_rx) = make_router_with_receiver();
let approval_queue = crate::approval::ApprovalQueue::new();
let queue_ref = Arc::clone(&approval_queue);
tokio::spawn(run(
rx,
broadcast_tx,
config,
metrics,
token.clone(),
policy,
router,
approval_queue,
None,
Arc::new(AtomicU64::new(0)),
));
tx.send((0, policy_query_frame(ActionType::ToolCall))).await.unwrap();
let resp = tokio::time::timeout(Duration::from_millis(200), resp_rx.recv())
.await
.expect("response timed out")
.expect("channel closed");
if let IpcResponse::PolicyResponse(r) = resp {
assert_eq!(r.decision, Decision::Pending as i32);
assert!(!r.approval_id.is_empty(), "approval_id should be set");
} else {
panic!("expected PolicyResponse(PENDING), got {resp:?}");
}
tokio::time::sleep(Duration::from_millis(10)).await;
assert_eq!(queue_ref.list().len(), 1);
token.cancel();
}
#[tokio::test]
async fn policy_query_pending_resolution_pushes_approval_decision() {
use crate::approval::ApprovalDecision as RuntimeApprovalDecision;
use crate::policy::{PolicyRule, PolicyRules};
use aa_proto::assembly::common::v1::ActionType;
let policy = Arc::new(PolicyRules {
rules: vec![PolicyRule {
name: "approve-tool".to_string(),
requires_approval_actions: vec![ActionType::ToolCall.as_str_name().to_string()],
approval_timeout_secs: 60,
..Default::default()
}],
});
let config = test_config(100, 10_000);
let (tx, rx) = mpsc::channel::<(u64, IpcFrame)>(64);
let (broadcast_tx, _rx) = broadcast::channel::<PipelineEvent>(64);
let metrics = Arc::new(PipelineMetrics::default());
let token = CancellationToken::new();
let (router, mut resp_rx) = make_router_with_receiver();
let approval_queue = crate::approval::ApprovalQueue::new();
let queue_ref = Arc::clone(&approval_queue);
tokio::spawn(run(
rx,
broadcast_tx,
config,
metrics,
token.clone(),
policy,
router,
approval_queue,
None,
Arc::new(AtomicU64::new(0)),
));
tx.send((0, policy_query_frame(ActionType::ToolCall))).await.unwrap();
let pending_resp = tokio::time::timeout(Duration::from_millis(200), resp_rx.recv())
.await
.expect("response timed out")
.expect("channel closed");
let approval_id = if let IpcResponse::PolicyResponse(r) = pending_resp {
uuid::Uuid::parse_str(&r.approval_id).expect("invalid UUID in approval_id")
} else {
panic!("expected PolicyResponse(PENDING), got {pending_resp:?}");
};
queue_ref
.decide(
approval_id,
RuntimeApprovalDecision::Approved {
by: "test-operator".to_string(),
reason: Some("looks safe".to_string()),
},
)
.expect("decide should succeed");
let decision_resp = tokio::time::timeout(Duration::from_millis(200), resp_rx.recv())
.await
.expect("ApprovalDecision push timed out")
.expect("channel closed");
if let IpcResponse::ApprovalDecision(proto) = decision_resp {
assert!(proto.approved);
assert_eq!(proto.decided_by, "test-operator");
assert_eq!(proto.approval_id, approval_id.to_string());
} else {
panic!("expected IpcResponse::ApprovalDecision, got {decision_resp:?}");
}
token.cancel();
}
const GATE_SECRET: &str = "AKIAIOSFODNN7EXAMPLE";
fn tool_call_with_secret() -> AuditEvent {
use aa_proto::assembly::audit::v1::ToolCallDetail;
AuditEvent {
action_type: ActionType::ToolCall as i32,
detail: Some(Detail::ToolCall(ToolCallDetail {
args_json: format!(r#"{{"api_key": "{GATE_SECRET}"}}"#).into_bytes(),
..Default::default()
})),
..Default::default()
}
}
fn assert_args_redacted(event: PipelineEvent) {
let enriched = unwrap_audit(event);
let Some(Detail::ToolCall(tc)) = enriched.inner.detail else {
panic!("expected ToolCall detail");
};
let body = String::from_utf8(tc.args_json).expect("redacted text is utf-8");
assert!(!body.contains(GATE_SECRET), "raw secret must not leave the runtime");
assert!(body.contains("[REDACTED:"), "redaction marker present");
}
#[tokio::test]
async fn secret_is_redacted_on_batch_path() {
let config = test_config(1, 10_000); let (tx, rx) = mpsc::channel::<(u64, IpcFrame)>(64);
let (broadcast_tx, mut broadcast_rx) = broadcast::channel::<PipelineEvent>(64);
let metrics = Arc::new(PipelineMetrics::default());
let token = CancellationToken::new();
tokio::spawn(run(
rx,
broadcast_tx,
config,
metrics.clone(),
token.clone(),
Arc::new(PolicyRules::default()),
crate::ipc::new_response_router(),
crate::approval::ApprovalQueue::new(),
None,
Arc::new(AtomicU64::new(0)),
));
tx.send((0, IpcFrame::EventReport(tool_call_with_secret())))
.await
.unwrap();
let event = tokio::time::timeout(Duration::from_millis(500), broadcast_rx.recv())
.await
.expect("timed out waiting for batched event")
.expect("broadcast error");
assert_args_redacted(event);
token.cancel();
}
#[tokio::test]
async fn secret_is_redacted_on_violation_path() {
use crate::policy::{PolicyRule, PolicyRules};
let config = test_config(100, 10_000);
let (tx, rx) = mpsc::channel::<(u64, IpcFrame)>(64);
let (broadcast_tx, mut broadcast_rx) = broadcast::channel::<PipelineEvent>(64);
let metrics = Arc::new(PipelineMetrics::default());
let token = CancellationToken::new();
let policy = PolicyRules {
rules: vec![PolicyRule {
name: "block-tools".to_string(),
blocked_actions: vec!["TOOL_CALL".to_string()],
..Default::default()
}],
};
tokio::spawn(run(
rx,
broadcast_tx,
config,
metrics.clone(),
token.clone(),
Arc::new(policy),
crate::ipc::new_response_router(),
crate::approval::ApprovalQueue::new(),
None,
Arc::new(AtomicU64::new(0)),
));
tx.send((0, IpcFrame::EventReport(tool_call_with_secret())))
.await
.unwrap();
let event = tokio::time::timeout(Duration::from_millis(500), broadcast_rx.recv())
.await
.expect("timed out waiting for violation event")
.expect("broadcast error");
assert_args_redacted(event);
token.cancel();
}
}