use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use std::time::Duration;
use aa_proto::assembly::audit::v1::{audit_event::Detail, AuditEvent, ToolCallDetail};
use aa_proto::assembly::common::v1::ActionType;
use aa_runtime::approval::ApprovalQueue;
use aa_runtime::ipc::{new_response_router, IpcFrame};
use aa_runtime::pipeline::enforcement::{EnforcementConfig, OVERSIZED_MARKER};
use aa_runtime::pipeline::{run, PipelineConfig, PipelineEvent, PipelineMetrics};
use aa_runtime::policy::{PolicyRule, PolicyRules};
use tokio::sync::{broadcast, mpsc};
use tokio_util::sync::CancellationToken;
const SECRET: &str = "AKIAIOSFODNN7EXAMPLE";
fn verify_config(batch_size: usize) -> PipelineConfig {
PipelineConfig {
input_buffer: 1_024,
batch_size,
flush_interval: Duration::from_millis(10_000),
broadcast_capacity: 1_024,
agent_id: "verify-agent".to_string(),
enforcement: aa_runtime::pipeline::enforcement::EnforcementConfig::default(),
}
}
fn tool_call_with_secret() -> AuditEvent {
AuditEvent {
action_type: ActionType::ToolCall as i32,
detail: Some(Detail::ToolCall(ToolCallDetail {
args_json: format!(r#"{{"api_key": "{SECRET}"}}"#).into_bytes(),
..Default::default()
})),
..Default::default()
}
}
fn assert_redacted(event: PipelineEvent) {
let PipelineEvent::Audit(enriched) = event else {
panic!("expected a PipelineEvent::Audit");
};
let Some(Detail::ToolCall(tc)) = enriched.inner.detail else {
panic!("expected ToolCall detail");
};
let body = String::from_utf8(tc.args_json).expect("redacted text is utf-8");
assert!(!body.contains(SECRET), "raw secret must never leave the runtime");
assert!(body.contains("[REDACTED:"), "redaction marker present");
}
async fn drive_one(policy: PolicyRules) -> PipelineEvent {
let (tx, rx) = mpsc::channel::<(u64, IpcFrame)>(64);
let (broadcast_tx, mut broadcast_rx) = broadcast::channel::<PipelineEvent>(64);
let metrics = Arc::new(PipelineMetrics::default());
let token = CancellationToken::new();
tokio::spawn(run(
rx,
broadcast_tx,
verify_config(1),
metrics,
token.clone(),
Arc::new(policy),
new_response_router(),
ApprovalQueue::new(),
None,
Arc::new(AtomicU64::new(0)),
));
tx.send((0, IpcFrame::EventReport(tool_call_with_secret())))
.await
.expect("send event");
let event = tokio::time::timeout(Duration::from_millis(500), broadcast_rx.recv())
.await
.expect("timed out waiting for forwarded event")
.expect("broadcast error");
token.cancel();
event
}
#[tokio::test]
async fn gate_redacts_on_batch_path() {
let event = drive_one(PolicyRules::default()).await;
assert_redacted(event);
}
#[tokio::test]
async fn gate_redacts_on_violation_path() {
let policy = PolicyRules {
rules: vec![PolicyRule {
name: "block-tools".to_string(),
blocked_actions: vec!["TOOL_CALL".to_string()],
..Default::default()
}],
};
let event = drive_one(policy).await;
assert_redacted(event);
}
#[tokio::test]
async fn configured_size_cap_redacts_oversized_field_through_run() {
let (tx, rx) = mpsc::channel::<(u64, IpcFrame)>(64);
let (broadcast_tx, mut broadcast_rx) = broadcast::channel::<PipelineEvent>(64);
let metrics = Arc::new(PipelineMetrics::default());
let token = CancellationToken::new();
let mut config = verify_config(1);
config.enforcement = EnforcementConfig {
max_field_bytes: 16,
..Default::default()
};
tokio::spawn(run(
rx,
broadcast_tx,
config,
metrics,
token.clone(),
Arc::new(PolicyRules::default()),
new_response_router(),
ApprovalQueue::new(),
None,
Arc::new(AtomicU64::new(0)),
));
tx.send((0, IpcFrame::EventReport(tool_call_with_secret())))
.await
.expect("send event");
let event = tokio::time::timeout(Duration::from_millis(500), broadcast_rx.recv())
.await
.expect("timed out waiting for forwarded event")
.expect("broadcast error");
token.cancel();
let PipelineEvent::Audit(enriched) = event else {
panic!("expected a PipelineEvent::Audit");
};
let Some(Detail::ToolCall(tc)) = enriched.inner.detail else {
panic!("expected ToolCall detail");
};
let body = String::from_utf8(tc.args_json).expect("marker is utf-8");
assert_eq!(body, OVERSIZED_MARKER, "oversized field is redacted whole");
assert!(!body.contains(SECRET), "raw secret must never leave the runtime");
}