use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use uuid::Uuid;
use crate::backend::{StorageBackend, StorageError, StorageExt};
use vex_core::{Hash, MerkleTree};
use vex_core::audit::{ActorType, AuditEvent, AuditEventType, HashParams};
use vex_hardware::api::AgentIdentity;
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
struct ChainState {
last_hash: Option<Hash>,
sequence: u64,
}
#[derive(Debug)]
pub struct AuditStore<B: StorageBackend + ?Sized> {
backend: Arc<B>,
prefix: String,
}
impl<B: StorageBackend + ?Sized> AuditStore<B> {
pub fn new(backend: Arc<B>) -> Self {
Self {
backend,
prefix: "audit:".to_string(),
}
}
fn event_key(&self, tenant_id: &str, id: Uuid) -> String {
format!("{}tenant:{}:event:{}", self.prefix, tenant_id, id)
}
fn chain_key(&self, tenant_id: &str) -> String {
format!("{}tenant:{}:chain", self.prefix, tenant_id)
}
fn receipt_key(&self, tenant_id: &str, receipt: &str) -> String {
format!("{}tenant:{}:receipt:{}", self.prefix, tenant_id, receipt)
}
fn chain_state_key(&self, tenant_id: &str) -> String {
format!("{}tenant:{}:chain_state", self.prefix, tenant_id)
}
fn capsule_key(&self, tenant_id: &str, capsule_id: &str) -> String {
format!("{}tenant:{}:capsule:{}", self.prefix, tenant_id, capsule_id)
}
async fn get_chain_state(&self, tenant_id: &str) -> Result<ChainState, StorageError> {
self.backend
.get(&self.chain_state_key(tenant_id))
.await
.map(|opt| opt.unwrap_or_default())
}
async fn set_chain_state(
&self,
tenant_id: &str,
state: &ChainState,
) -> Result<(), StorageError> {
self.backend
.set(&self.chain_state_key(tenant_id), state)
.await
}
#[allow(clippy::too_many_arguments)]
pub async fn log(
&self,
tenant_id: &str,
event_type: AuditEventType,
actor: ActorType,
agent_id: Option<Uuid>,
data: serde_json::Value,
identity: Option<&AgentIdentity>,
witness_receipt: Option<String>,
vep_blob: Option<Vec<u8>>,
) -> Result<AuditEvent, StorageError> {
let actor = actor.pseudonymize();
let mut chain_state = self.get_chain_state(tenant_id).await?;
let seq = chain_state.sequence;
let mut event = match &chain_state.last_hash {
Some(prev) => {
AuditEvent::chained(event_type, agent_id, data.clone(), prev.clone(), seq)
}
None => AuditEvent::new(event_type, agent_id, data.clone(), seq),
};
event.actor = actor;
event.vep_blob = vep_blob.clone();
if let Some(wr) = witness_receipt {
let capsule_data = data
.get("authority")
.cloned()
.unwrap_or(serde_json::json!({}));
event.evidence_capsule = Some(vex_core::audit::EvidenceCapsule {
capsule_id: capsule_data
.get("capsule_id")
.and_then(|v| v.as_str())
.unwrap_or("unknown")
.to_string(),
outcome: capsule_data
.get("outcome")
.and_then(|v| v.as_str())
.unwrap_or("ALLOW")
.to_string(),
reason_code: capsule_data
.get("reason_code")
.and_then(|v| v.as_str())
.unwrap_or("APPROVED_WITNESS")
.to_string(),
witness_receipt: wr,
nonce: capsule_data
.get("nonce")
.and_then(|v| v.as_u64())
.unwrap_or(0),
magpie_source: None,
gate_sensors: serde_json::Value::Null,
reproducibility_context: serde_json::Value::Null,
vep_blob: vep_blob.clone(),
});
}
event.hash = AuditEvent::compute_hash(HashParams {
event_type: &event.event_type,
timestamp: event.timestamp.timestamp(),
sequence_number: event.sequence_number,
data: &event.data,
actor: &event.actor,
rationale: &event.rationale,
policy_version: &event.policy_version,
data_provenance_hash: &event.data_provenance_hash,
human_review_required: event.human_review_required,
approval_count: event.approval_signatures.len(),
evidence_capsule: &event.evidence_capsule,
schema_version: &event.schema_version,
});
if let Some(id) = identity {
event
.sign_hardware(id)
.await
.map_err(|e| StorageError::Internal(format!("Hardware signing failed: {}", e)))?;
event.hash = AuditEvent::compute_hash(HashParams {
event_type: &event.event_type,
timestamp: event.timestamp.timestamp(),
sequence_number: event.sequence_number,
data: &event.data,
actor: &event.actor,
rationale: &event.rationale,
policy_version: &event.policy_version,
data_provenance_hash: &event.data_provenance_hash,
human_review_required: event.human_review_required,
approval_count: event.approval_signatures.len(),
evidence_capsule: &event.evidence_capsule,
schema_version: &event.schema_version,
});
}
if let Some(prev) = &event.previous_hash {
event.hash = AuditEvent::compute_chained_hash(&event.hash, prev, event.sequence_number);
}
self.backend
.set(&self.event_key(tenant_id, event.id), &event)
.await?;
let mut chain: Vec<Uuid> = self
.backend
.get(&self.chain_key(tenant_id))
.await?
.unwrap_or_default();
chain.push(event.id);
self.backend.set(&self.chain_key(tenant_id), &chain).await?;
chain_state.last_hash = Some(event.hash.clone());
chain_state.sequence += 1;
self.set_chain_state(tenant_id, &chain_state).await?;
if let Some(capsule) = &event.evidence_capsule {
self.backend
.set_value(
&self.receipt_key(tenant_id, &capsule.witness_receipt),
serde_json::Value::String(event.id.to_string()),
)
.await?;
self.backend
.set_value(
&self.capsule_key(tenant_id, &capsule.capsule_id),
serde_json::Value::String(event.id.to_string()),
)
.await?;
}
Ok(event)
}
pub async fn get_by_witness_receipt(
&self,
tenant_id: &str,
witness_receipt: &str,
) -> Result<Option<AuditEvent>, StorageError> {
let receipt_key = self.receipt_key(tenant_id, witness_receipt);
let event_id_val: Option<serde_json::Value> = self.backend.get_value(&receipt_key).await?;
if let Some(serde_json::Value::String(id_str)) = event_id_val {
if let Ok(id) = Uuid::parse_str(&id_str) {
return self.backend.get(&self.event_key(tenant_id, id)).await;
}
}
Ok(None)
}
pub async fn get_by_capsule_id(
&self,
tenant_id: &str,
capsule_id: &str,
) -> Result<Option<AuditEvent>, StorageError> {
let capsule_key = self.capsule_key(tenant_id, capsule_id);
let event_id_val: Option<serde_json::Value> = self.backend.get_value(&capsule_key).await?;
if let Some(serde_json::Value::String(id_str)) = event_id_val {
if let Ok(id) = Uuid::parse_str(&id_str) {
return self.backend.get(&self.event_key(tenant_id, id)).await;
}
}
Ok(None)
}
pub async fn get_vep_by_capsule_id(
&self,
tenant_id: &str,
capsule_id: &str,
) -> Result<Option<Vec<u8>>, StorageError> {
if let Some(event) = self.get_by_capsule_id(tenant_id, capsule_id).await? {
return Ok(event.vep_blob.clone());
}
Ok(None)
}
pub async fn get(&self, tenant_id: &str, id: Uuid) -> Result<Option<AuditEvent>, StorageError> {
self.backend.get(&self.event_key(tenant_id, id)).await
}
pub async fn get_chain(&self, tenant_id: &str) -> Result<Vec<AuditEvent>, StorageError> {
let chain: Vec<Uuid> = self
.backend
.get(&self.chain_key(tenant_id))
.await?
.unwrap_or_default();
let mut events = Vec::new();
for id in chain {
if let Some(event) = self.get(tenant_id, id).await? {
events.push(event);
}
}
Ok(events)
}
pub async fn build_merkle_tree(&self, tenant_id: &str) -> Result<MerkleTree, StorageError> {
let events = self.get_chain(tenant_id).await?;
let leaves: Vec<(String, Hash)> = events
.iter()
.map(|e| (e.id.to_string(), e.hash.clone()))
.collect();
Ok(MerkleTree::from_leaves(leaves))
}
pub async fn verify_chain(&self, tenant_id: &str) -> Result<bool, StorageError> {
let events = self.get_chain(tenant_id).await?;
for (i, event) in events.iter().enumerate() {
if i == 0 {
if event.previous_hash.is_some() {
tracing::warn!("Chain integrity failed: first event has previous_hash");
return Ok(false);
}
} else {
match (&event.previous_hash, events.get(i - 1)) {
(Some(prev_hash), Some(prev_event)) => {
let expected = &prev_event.hash;
if prev_hash != expected {
tracing::warn!(
"Chain integrity failed at event {}: expected prev_hash {:?}, got {:?}",
event.id, expected.to_hex(), prev_hash.to_hex()
);
return Ok(false);
}
}
(None, _) => {
tracing::warn!(
"Chain integrity failed: event {} has no previous_hash",
event.id
);
return Ok(false);
}
(_, None) => {
tracing::warn!(
"Chain integrity failed: previous event not found for {}",
event.id
);
return Ok(false);
}
}
}
}
tracing::info!(
"Chain integrity verified for tenant {}: {} events",
tenant_id,
events.len()
);
Ok(true)
}
pub async fn export(&self, tenant_id: &str) -> Result<AuditExport, StorageError> {
let events = self.get_chain(tenant_id).await?;
let merkle_tree = self.build_merkle_tree(tenant_id).await?;
Ok(AuditExport {
events,
merkle_root: merkle_tree.root_hash().map(|h| h.to_string()),
exported_at: Utc::now(),
verified: self.verify_chain(tenant_id).await.unwrap_or(false),
})
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AuditExport {
pub events: Vec<AuditEvent>,
pub merkle_root: Option<String>,
pub exported_at: DateTime<Utc>,
pub verified: bool,
}
impl AuditExport {
pub fn to_ocsf(&self) -> Vec<serde_json::Value> {
self.events
.iter()
.map(|event| {
serde_json::json!({
"class_uid": 2004, "class_name": "Detection Finding",
"category_uid": 2, "category_name": "Findings",
"severity_id": 1, "activity_id": 1, "activity_name": "Create",
"status_id": 1, "time": event.timestamp.timestamp(),
"timezone_offset": 0,
"finding_info": {
"uid": event.id.to_string(),
"title": format!("{:?}", event.event_type),
"desc": event.rationale.clone().unwrap_or_default(),
"created_time": event.timestamp.timestamp(),
},
"actor": {
"type_uid": match &event.actor {
ActorType::Bot(_) => 2,
ActorType::Human(_) => 1,
ActorType::System(_) => 0,
},
"type": match &event.actor {
ActorType::Bot(id) => format!("Bot:{}", id),
ActorType::Human(name) => format!("Human:{}", name),
ActorType::System(id) => format!("System:{}", id),
},
},
"unmapped": {
"vex_event_type": format!("{:?}", event.event_type),
"vex_hash": event.hash.to_hex(),
"vex_sequence": event.sequence_number,
"vex_policy_version": event.policy_version.clone(),
"vex_data_provenance": event.data_provenance_hash.as_ref().map(|h| h.to_hex()),
"vex_human_review_required": event.human_review_required,
"vex_merkle_root": self.merkle_root.clone(),
},
"metadata": {
"version": "1.7.0",
"product": {
"name": "VEX Protocol",
"vendor_name": "ProvnAI",
"version": env!("CARGO_PKG_VERSION"),
},
},
})
})
.collect()
}
pub fn to_splunk_hec(&self, index: &str, source: &str) -> Vec<serde_json::Value> {
self.events
.iter()
.map(|event| {
serde_json::json!({
"time": event.timestamp.timestamp_millis() as f64 / 1000.0,
"host": "vex-protocol",
"source": source,
"sourcetype": "vex:audit:json",
"index": index,
"event": {
"id": event.id.to_string(),
"type": format!("{:?}", event.event_type),
"timestamp": event.timestamp.to_rfc3339(),
"agent_id": event.agent_id.map(|id| id.to_string()),
"data": AuditEvent::sanitize_data(event.data.clone()),
"hash": event.hash.to_hex(),
"sequence": event.sequence_number,
"actor": match &event.actor {
ActorType::Bot(id) => serde_json::json!({"type": "bot", "id": id.to_string()}),
ActorType::Human(name) => serde_json::json!({"type": "human", "name": name}),
ActorType::System(id) => serde_json::json!({"type": "system", "id": id}),
},
"rationale": event.rationale.clone(),
"policy_version": event.policy_version.clone(),
"human_review_required": event.human_review_required,
},
"fields": {
"event_type": format!("{:?}", event.event_type),
"merkle_root": self.merkle_root.clone(),
"verified": self.verified,
},
})
})
.collect()
}
pub fn to_datadog(&self, service: &str, env: &str) -> Vec<serde_json::Value> {
self.events
.iter()
.map(|event| {
serde_json::json!({
"ddsource": "vex-protocol",
"ddtags": format!("env:{},service:{}", env, service),
"hostname": "vex-audit",
"service": service,
"status": "info",
"timestamp": event.timestamp.to_rfc3339(),
"message": format!(
"[{}] {} - seq:{} hash:{}",
format!("{:?}", event.event_type),
event.rationale.clone().unwrap_or_else(|| "No rationale".to_string()),
event.sequence_number,
&event.hash.to_hex()[..16]
),
"event": {
"id": event.id.to_string(),
"type": format!("{:?}", event.event_type),
"agent_id": event.agent_id.map(|id| id.to_string()),
"sequence": event.sequence_number,
"hash": event.hash.to_hex(),
},
"usr": match &event.actor {
ActorType::Human(name) => serde_json::json!({"name": name}),
ActorType::Bot(id) => serde_json::json!({"id": id.to_string(), "type": "bot"}),
ActorType::System(id) => serde_json::json!({"id": id, "type": "system"}),
},
"vex": {
"merkle_root": self.merkle_root.clone(),
"verified": self.verified,
"policy_version": event.policy_version.clone(),
"human_review_required": event.human_review_required,
"data_provenance_hash": event.data_provenance_hash.as_ref().map(|h| h.to_hex()),
},
})
})
.collect()
}
pub fn to_jsonl(&self) -> String {
self.events
.iter()
.filter_map(|e| serde_json::to_string(e).ok())
.collect::<Vec<_>>()
.join("\n")
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::backend::MemoryBackend;
#[tokio::test]
async fn test_audit_store_isolation() {
let backend = Arc::new(MemoryBackend::new());
let store = AuditStore::new(backend);
let t1 = "tenant-1";
let t2 = "tenant-2";
store
.log(
t1,
AuditEventType::AgentCreated,
ActorType::System("test".to_string()),
None,
serde_json::json!({}),
None,
None,
None,
)
.await
.unwrap();
store
.log(
t2,
AuditEventType::AgentExecuted,
ActorType::System("test".to_string()),
None,
serde_json::json!({}),
None,
None,
None,
)
.await
.unwrap();
let chain1 = store.get_chain(t1).await.unwrap();
let chain2 = store.get_chain(t2).await.unwrap();
assert_eq!(chain1.len(), 1);
assert_eq!(chain2.len(), 1);
assert_ne!(chain1[0].id, chain2[0].id);
let root1 = store
.build_merkle_tree(t1)
.await
.unwrap()
.root_hash()
.cloned();
let root2 = store
.build_merkle_tree(t2)
.await
.unwrap()
.root_hash()
.cloned();
assert_ne!(root1, root2);
}
#[tokio::test]
async fn test_audit_witness_receipt_lookup() {
let backend = Arc::new(MemoryBackend::new());
let store = AuditStore::new(backend);
let tenant = "tenant-test";
let receipt = "witness-receipt-abc-123";
let event = store
.log(
tenant,
AuditEventType::GateDecision,
ActorType::System("test".to_string()),
None,
serde_json::json!({
"authority": {
"capsule_id": "capsule-1",
"outcome": "ALLOW",
"reason_code": "APPROVED",
"nonce": 42
}
}),
None,
Some(receipt.to_string()),
None,
)
.await
.unwrap();
let found = store.get_by_witness_receipt(tenant, receipt).await.unwrap();
assert!(found.is_some());
assert_eq!(found.unwrap().id, event.id);
let not_found = store
.get_by_witness_receipt(tenant, "non-existent")
.await
.unwrap();
assert!(not_found.is_none());
}
}