use aa_core::audit::Lineage;
use aa_core::storage::AuditEntry;
use aa_core::{AgentId, AuditEventType, SessionId};
use aa_proto::assembly::audit::v1::audit_event::Detail;
use aa_proto::assembly::audit::v1::AuditEvent;
use aa_proto::assembly::common::v1::ActionType;
use sha2::{Digest, Sha256};
use crate::pipeline::event::{EnrichedEvent, EventSource};
const GENESIS_HASH: [u8; 32] = [0u8; 32];
pub fn enriched_to_audit_entry(event: &EnrichedEvent) -> AuditEntry {
let proto = &event.inner;
let event_type = event_type_for(proto);
let agent_id = derive_agent_id(event);
let session_id = derive_session_id(event);
let lineage = derive_lineage(proto);
let payload = build_payload(event);
let timestamp_ns = timestamp_ns_for(event);
AuditEntry::new_with_lineage(
event.sequence_number,
timestamp_ns,
event_type,
agent_id,
session_id,
payload,
GENESIS_HASH,
lineage,
)
}
fn event_type_for(proto: &AuditEvent) -> AuditEventType {
if matches!(proto.detail, Some(Detail::Violation(_))) {
return AuditEventType::PolicyViolation;
}
match ActionType::try_from(proto.action_type).unwrap_or(ActionType::ActionUnspecified) {
ActionType::ToolCall
| ActionType::FileOperation
| ActionType::NetworkCall
| ActionType::ProcessExec
| ActionType::LlmCall
| ActionType::ToolResult => AuditEventType::ToolCallIntercepted,
ActionType::AgentSpawn => AuditEventType::A2ACallIntercepted,
ActionType::ActionUnspecified => AuditEventType::ToolCallIntercepted,
}
}
fn derive_agent_id(event: &EnrichedEvent) -> AgentId {
if let Some(proto_agent) = &event.inner.agent_id {
if let Ok(uuid) = proto_agent.agent_id.parse::<uuid::Uuid>() {
return AgentId::from_bytes(*uuid.as_bytes());
}
if !proto_agent.agent_id.is_empty() {
return AgentId::from_bytes(hash_to_16(&proto_agent.agent_id));
}
}
AgentId::from_bytes(hash_to_16(&event.agent_id))
}
fn derive_session_id(event: &EnrichedEvent) -> SessionId {
let raw = if !event.inner.session_id.is_empty() {
&event.inner.session_id
} else {
&event.inner.event_id
};
if let Ok(uuid) = raw.parse::<uuid::Uuid>() {
return SessionId::from_bytes(*uuid.as_bytes());
}
SessionId::from_bytes(hash_to_16(raw))
}
fn derive_lineage(proto: &AuditEvent) -> Lineage {
let org_id = proto
.agent_id
.as_ref()
.map(|a| a.org_id.clone())
.filter(|s| !s.is_empty());
let team_id = proto
.agent_id
.as_ref()
.map(|a| a.team_id.clone())
.filter(|s| !s.is_empty())
.or_else(|| Some(proto.team_id.clone()).filter(|s| !s.is_empty()));
Lineage {
org_id,
team_id,
delegation_reason: Some(proto.delegation_reason.clone()).filter(|s| !s.is_empty()),
spawned_by_tool: Some(proto.spawned_by_tool.clone()).filter(|s| !s.is_empty()),
depth: (proto.depth != 0).then_some(proto.depth),
root_agent_id: parse_optional_agent_id(&proto.root_agent_id),
parent_agent_id: parse_optional_agent_id(&proto.parent_agent_id),
}
}
fn parse_optional_agent_id(raw: &str) -> Option<AgentId> {
raw.parse::<uuid::Uuid>()
.ok()
.map(|u| AgentId::from_bytes(*u.as_bytes()))
}
fn build_payload(event: &EnrichedEvent) -> String {
let proto = &event.inner;
let action = ActionType::try_from(proto.action_type)
.unwrap_or(ActionType::ActionUnspecified)
.as_str_name();
let detail = detail_summary(&proto.detail);
serde_json::json!({
"event_id": proto.event_id,
"action_type": action,
"source": source_label(&event.source),
"decision": proto.decision,
"detail": detail,
})
.to_string()
}
fn detail_summary(detail: &Option<Detail>) -> serde_json::Value {
match detail {
Some(Detail::LlmCall(d)) => serde_json::json!({
"kind": "llm_call", "model": d.model, "provider": d.provider,
}),
Some(Detail::ToolCall(d)) => serde_json::json!({
"kind": "tool_call", "tool_name": d.tool_name, "tool_source": d.tool_source,
"succeeded": d.succeeded,
}),
Some(Detail::FileOp(d)) => serde_json::json!({
"kind": "file_op", "operation": d.operation, "path": d.path, "source": d.source,
}),
Some(Detail::Network(d)) => serde_json::json!({
"kind": "network_call", "host": d.host, "port": d.port, "protocol": d.protocol,
}),
Some(Detail::Process(d)) => serde_json::json!({
"kind": "process_exec", "command": d.command, "exit_code": d.exit_code,
}),
Some(Detail::Violation(d)) => serde_json::json!({
"kind": "policy_violation", "policy_rule": d.policy_rule,
"blocked_action": d.blocked_action, "reason": d.reason,
}),
Some(Detail::Approval(d)) => serde_json::json!({
"kind": "approval", "approval_id": d.approval_id, "approved": d.approved,
}),
None => serde_json::Value::Null,
}
}
fn source_label(source: &EventSource) -> &'static str {
match source {
EventSource::Sdk => "sdk",
EventSource::EBpf => "ebpf",
EventSource::Proxy => "proxy",
}
}
fn timestamp_ns_for(event: &EnrichedEvent) -> u64 {
let ms = event.received_at_ms.max(0) as u64;
ms.saturating_mul(1_000_000)
}
fn hash_to_16(s: &str) -> [u8; 16] {
let digest = Sha256::digest(s.as_bytes());
let mut out = [0u8; 16];
out.copy_from_slice(&digest[..16]);
out
}
#[cfg(test)]
mod tests {
use super::*;
use aa_proto::assembly::audit::v1::{
ApprovalEvent, FileOpDetail, LlmCallDetail, NetworkCallDetail, PolicyViolation, ProcessExecDetail,
ToolCallDetail,
};
use aa_proto::assembly::common::v1::AgentId as ProtoAgentId;
fn enriched(action: ActionType, detail: Option<Detail>, source: EventSource) -> EnrichedEvent {
EnrichedEvent {
inner: AuditEvent {
event_id: "550e8400-e29b-41d4-a716-446655440000".to_string(),
action_type: action as i32,
detail,
..AuditEvent::default()
},
received_at_ms: 1_700,
source,
agent_id: "test-agent".to_string(),
connection_id: 1,
sequence_number: 7,
}
}
fn payload_json(entry: &AuditEntry) -> serde_json::Value {
serde_json::from_str(entry.payload()).expect("payload is valid JSON")
}
#[test]
fn tool_call_maps_to_tool_call_intercepted() {
let detail = Some(Detail::ToolCall(ToolCallDetail {
tool_name: "web_search".to_string(),
tool_source: "mcp".to_string(),
succeeded: true,
..ToolCallDetail::default()
}));
let entry = enriched_to_audit_entry(&enriched(ActionType::ToolCall, detail, EventSource::Sdk));
assert_eq!(entry.event_type(), AuditEventType::ToolCallIntercepted);
let p = payload_json(&entry);
assert_eq!(p["action_type"], "TOOL_CALL");
assert_eq!(p["source"], "sdk");
assert_eq!(p["detail"]["kind"], "tool_call");
assert_eq!(p["detail"]["tool_name"], "web_search");
}
#[test]
fn llm_call_maps_to_tool_call_intercepted_with_model() {
let detail = Some(Detail::LlmCall(LlmCallDetail {
model: "gpt-4o".to_string(),
provider: "openai".to_string(),
..LlmCallDetail::default()
}));
let entry = enriched_to_audit_entry(&enriched(ActionType::LlmCall, detail, EventSource::Sdk));
assert_eq!(entry.event_type(), AuditEventType::ToolCallIntercepted);
let p = payload_json(&entry);
assert_eq!(p["action_type"], "LLM_CALL");
assert_eq!(p["detail"]["model"], "gpt-4o");
}
#[test]
fn file_operation_maps_to_tool_call_intercepted_with_path() {
let detail = Some(Detail::FileOp(FileOpDetail {
operation: "read".to_string(),
path: "/etc/passwd".to_string(),
source: "ebpf".to_string(),
..FileOpDetail::default()
}));
let entry = enriched_to_audit_entry(&enriched(ActionType::FileOperation, detail, EventSource::EBpf));
assert_eq!(entry.event_type(), AuditEventType::ToolCallIntercepted);
let p = payload_json(&entry);
assert_eq!(p["action_type"], "FILE_OPERATION");
assert_eq!(p["source"], "ebpf");
assert_eq!(p["detail"]["path"], "/etc/passwd");
}
#[test]
fn network_call_maps_with_host_and_port() {
let detail = Some(Detail::Network(NetworkCallDetail {
host: "api.example.com".to_string(),
port: 443,
protocol: "https".to_string(),
..NetworkCallDetail::default()
}));
let entry = enriched_to_audit_entry(&enriched(ActionType::NetworkCall, detail, EventSource::Proxy));
assert_eq!(entry.event_type(), AuditEventType::ToolCallIntercepted);
let p = payload_json(&entry);
assert_eq!(p["action_type"], "NETWORK_CALL");
assert_eq!(p["source"], "proxy");
assert_eq!(p["detail"]["host"], "api.example.com");
assert_eq!(p["detail"]["port"], 443);
}
#[test]
fn process_exec_maps_with_command() {
let detail = Some(Detail::Process(ProcessExecDetail {
command: "/bin/sh".to_string(),
exit_code: 0,
..ProcessExecDetail::default()
}));
let entry = enriched_to_audit_entry(&enriched(ActionType::ProcessExec, detail, EventSource::EBpf));
assert_eq!(entry.event_type(), AuditEventType::ToolCallIntercepted);
let p = payload_json(&entry);
assert_eq!(p["action_type"], "PROCESS_EXEC");
assert_eq!(p["detail"]["command"], "/bin/sh");
}
#[test]
fn agent_spawn_maps_to_a2a_call_intercepted() {
let entry = enriched_to_audit_entry(&enriched(ActionType::AgentSpawn, None, EventSource::Sdk));
assert_eq!(entry.event_type(), AuditEventType::A2ACallIntercepted);
assert_eq!(payload_json(&entry)["action_type"], "AGENT_SPAWN");
}
#[test]
fn policy_violation_detail_overrides_action_type() {
let detail = Some(Detail::Violation(PolicyViolation {
policy_rule: "no-egress".to_string(),
blocked_action: "network".to_string(),
reason: "blocked host".to_string(),
..PolicyViolation::default()
}));
let entry = enriched_to_audit_entry(&enriched(ActionType::ToolCall, detail, EventSource::Proxy));
assert_eq!(entry.event_type(), AuditEventType::PolicyViolation);
let p = payload_json(&entry);
assert_eq!(p["detail"]["kind"], "policy_violation");
assert_eq!(p["detail"]["policy_rule"], "no-egress");
}
#[test]
fn approval_detail_summarised() {
let detail = Some(Detail::Approval(ApprovalEvent {
approval_id: "a-1".to_string(),
approved: true,
..ApprovalEvent::default()
}));
let entry = enriched_to_audit_entry(&enriched(ActionType::ToolCall, detail, EventSource::Sdk));
let p = payload_json(&entry);
assert_eq!(p["detail"]["kind"], "approval");
assert_eq!(p["detail"]["approved"], true);
}
#[test]
fn unspecified_action_falls_back_to_intercepted_not_dropped() {
let entry = enriched_to_audit_entry(&enriched(ActionType::ActionUnspecified, None, EventSource::Sdk));
assert_eq!(entry.event_type(), AuditEventType::ToolCallIntercepted);
}
#[test]
fn sequence_number_and_timestamp_are_carried() {
let entry = enriched_to_audit_entry(&enriched(ActionType::ToolCall, None, EventSource::Sdk));
assert_eq!(entry.seq(), 7);
assert_eq!(entry.timestamp_ns(), 1_700_000_000);
}
#[test]
fn negative_timestamp_clamps_to_zero() {
let mut event = enriched(ActionType::ToolCall, None, EventSource::Sdk);
event.received_at_ms = -5;
let entry = enriched_to_audit_entry(&event);
assert_eq!(entry.timestamp_ns(), 0);
}
#[test]
fn agent_id_uuid_parsed_from_proto_when_present() {
let uuid = uuid::Uuid::new_v4();
let mut event = enriched(ActionType::ToolCall, None, EventSource::Sdk);
event.inner.agent_id = Some(ProtoAgentId {
agent_id: uuid.to_string(),
..ProtoAgentId::default()
});
let entry = enriched_to_audit_entry(&event);
assert_eq!(entry.agent_id().as_bytes(), uuid.as_bytes());
}
#[test]
fn agent_id_falls_back_to_hash_of_enriched_agent_string() {
let event = enriched(ActionType::ToolCall, None, EventSource::Sdk);
let entry = enriched_to_audit_entry(&event);
assert_eq!(entry.agent_id().as_bytes(), &hash_to_16("test-agent"));
}
#[test]
fn lineage_org_and_team_drive_tenant() {
let mut event = enriched(ActionType::ToolCall, None, EventSource::Sdk);
event.inner.agent_id = Some(ProtoAgentId {
org_id: "acme".to_string(),
team_id: "payments".to_string(),
agent_id: uuid::Uuid::new_v4().to_string(),
});
let entry = enriched_to_audit_entry(&event);
assert_eq!(entry.org_id(), Some("acme"));
assert_eq!(entry.team_id(), Some("payments"));
assert!(super::super::subject_for(&entry).starts_with("assembly.audit.acme."));
}
#[test]
fn entry_integrity_holds() {
let entry = enriched_to_audit_entry(&enriched(ActionType::ToolCall, None, EventSource::Sdk));
assert!(entry.verify_integrity());
}
}