use anyhow::Result;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fs::{File, OpenOptions};
use std::io::{BufRead, BufReader, Write};
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use super::policy::PolicyDecision;
const DEFAULT_AUDIT_BUFFER_SIZE: usize = 100;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum AuditEventType {
ToolExecution,
FileAccess,
NetworkRequest,
AgentSpawn,
PolicyViolation,
TrustChange,
HumanIntervention,
SessionStart,
SessionEnd,
ConfigChange,
UserFeedback,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum ActionOutcome {
Success,
Failure,
Partial,
Timeout,
Cancelled,
Denied,
PendingApproval,
Approved,
Rejected,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HumanApproval {
pub approver: Option<String>,
pub timestamp: DateTime<Utc>,
pub action: String,
pub justification: Option<String>,
pub approved: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AuditEvent {
pub id: String,
pub timestamp: DateTime<Utc>,
pub event_type: AuditEventType,
pub agent_id: Option<String>,
pub action: String,
pub target: Option<String>,
pub policy_id: Option<String>,
pub decision: Option<String>,
pub trust_level: Option<u8>,
pub outcome: ActionOutcome,
pub duration_ms: Option<u64>,
pub error: Option<String>,
pub metadata: HashMap<String, String>,
}
impl AuditEvent {
pub fn new(event_type: AuditEventType) -> Self {
Self {
id: uuid::Uuid::new_v4().to_string(),
timestamp: Utc::now(),
event_type,
agent_id: None,
action: String::new(),
target: None,
policy_id: None,
decision: None,
trust_level: None,
outcome: ActionOutcome::Success,
duration_ms: None,
error: None,
metadata: HashMap::new(),
}
}
pub fn with_agent(mut self, agent_id: &str) -> Self {
self.agent_id = Some(agent_id.to_string());
self
}
pub fn with_action(mut self, action: &str) -> Self {
self.action = action.to_string();
self
}
pub fn with_target(mut self, target: &str) -> Self {
self.target = Some(target.to_string());
self
}
pub fn with_policy(mut self, policy_id: &str) -> Self {
self.policy_id = Some(policy_id.to_string());
self
}
pub fn with_decision(mut self, decision: &PolicyDecision) -> Self {
self.policy_id = decision.matched_policy.clone();
self.decision = decision.reason.clone();
self
}
pub fn with_trust_level(mut self, level: u8) -> Self {
self.trust_level = Some(level);
self
}
pub fn with_outcome(mut self, outcome: ActionOutcome) -> Self {
self.outcome = outcome;
self
}
pub fn with_duration(mut self, duration_ms: u64) -> Self {
self.duration_ms = Some(duration_ms);
self
}
pub fn with_error(mut self, error: &str) -> Self {
self.error = Some(error.to_string());
self.outcome = ActionOutcome::Failure;
self
}
pub fn with_metadata(mut self, key: &str, value: &str) -> Self {
self.metadata.insert(key.to_string(), value.to_string());
self
}
}
impl brainwires_telemetry::anomaly::ObservedEvent for AuditEvent {
fn timestamp_secs(&self) -> i64 {
self.timestamp.timestamp()
}
fn agent_id(&self) -> Option<&str> {
self.agent_id.as_deref()
}
fn category(&self) -> brainwires_telemetry::anomaly::EventCategory {
use brainwires_telemetry::anomaly::EventCategory;
match self.event_type {
AuditEventType::PolicyViolation => EventCategory::PolicyViolation,
AuditEventType::ToolExecution => EventCategory::ToolExecution,
AuditEventType::TrustChange => EventCategory::TrustChange,
_ => EventCategory::Other,
}
}
fn target(&self) -> Option<&str> {
self.target.as_deref()
}
}
#[derive(Debug, Clone, Default)]
pub struct AuditQuery {
pub agent_id: Option<String>,
pub event_type: Option<AuditEventType>,
pub action: Option<String>,
pub outcome: Option<ActionOutcome>,
pub since: Option<DateTime<Utc>>,
pub until: Option<DateTime<Utc>>,
pub limit: Option<usize>,
}
impl AuditQuery {
pub fn new() -> Self {
Self::default()
}
pub fn for_agent(mut self, agent_id: &str) -> Self {
self.agent_id = Some(agent_id.to_string());
self
}
pub fn of_type(mut self, event_type: AuditEventType) -> Self {
self.event_type = Some(event_type);
self
}
pub fn with_action(mut self, action: &str) -> Self {
self.action = Some(action.to_string());
self
}
pub fn with_outcome(mut self, outcome: ActionOutcome) -> Self {
self.outcome = Some(outcome);
self
}
pub fn since(mut self, since: DateTime<Utc>) -> Self {
self.since = Some(since);
self
}
pub fn until(mut self, until: DateTime<Utc>) -> Self {
self.until = Some(until);
self
}
pub fn limit(mut self, limit: usize) -> Self {
self.limit = Some(limit);
self
}
pub fn matches(&self, event: &AuditEvent) -> bool {
if let Some(ref agent_id) = self.agent_id
&& event.agent_id.as_ref() != Some(agent_id)
{
return false;
}
if let Some(event_type) = self.event_type
&& event.event_type != event_type
{
return false;
}
if let Some(ref action) = self.action
&& !event.action.contains(action)
{
return false;
}
if let Some(outcome) = self.outcome
&& event.outcome != outcome
{
return false;
}
if let Some(since) = self.since
&& event.timestamp < since
{
return false;
}
if let Some(until) = self.until
&& event.timestamp > until
{
return false;
}
true
}
}
#[derive(Debug)]
pub struct AuditLogger {
log_path: PathBuf,
buffer: Arc<Mutex<Vec<AuditEvent>>>,
max_buffer_size: usize,
enabled: bool,
anomaly_detector: Option<brainwires_telemetry::anomaly::AnomalyDetector>,
}
impl AuditLogger {
#[cfg(feature = "native")]
pub fn new() -> Result<Self> {
let log_dir = dirs::home_dir()
.ok_or_else(|| anyhow::anyhow!("Failed to get home directory"))?
.join(".brainwires")
.join("audit");
std::fs::create_dir_all(&log_dir)?;
let log_path = log_dir.join("audit.jsonl");
Ok(Self {
log_path,
buffer: Arc::new(Mutex::new(Vec::new())),
max_buffer_size: DEFAULT_AUDIT_BUFFER_SIZE,
enabled: true,
anomaly_detector: None,
})
}
#[cfg(feature = "native")]
pub fn with_path(path: PathBuf) -> Result<Self> {
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)?;
}
Ok(Self {
log_path: path,
buffer: Arc::new(Mutex::new(Vec::new())),
max_buffer_size: DEFAULT_AUDIT_BUFFER_SIZE,
enabled: true,
anomaly_detector: None,
})
}
pub fn with_anomaly_detection(
mut self,
config: brainwires_telemetry::anomaly::AnomalyConfig,
) -> Self {
self.anomaly_detector = Some(brainwires_telemetry::anomaly::AnomalyDetector::new(config));
self
}
pub fn drain_anomalies(&self) -> Option<Vec<brainwires_telemetry::anomaly::AnomalyEvent>> {
self.anomaly_detector.as_ref().map(|d| d.drain_anomalies())
}
pub fn pending_anomaly_count(&self) -> usize {
self.anomaly_detector
.as_ref()
.map(|d| d.pending_count())
.unwrap_or(0)
}
pub fn set_enabled(&mut self, enabled: bool) {
self.enabled = enabled;
}
pub fn log(&self, event: AuditEvent) -> Result<()> {
if !self.enabled {
return Ok(());
}
let is_important = matches!(
event.event_type,
AuditEventType::PolicyViolation
| AuditEventType::TrustChange
| AuditEventType::HumanIntervention
| AuditEventType::UserFeedback
);
if let Some(ref detector) = self.anomaly_detector {
detector.observe(&event);
}
if is_important {
self.write_event(&event)?;
} else {
let mut buffer = self.buffer.lock().expect("audit log buffer lock poisoned");
buffer.push(event);
if buffer.len() >= self.max_buffer_size {
self.flush_buffer_internal(&mut buffer)?;
}
}
Ok(())
}
pub fn log_tool_execution(
&self,
agent_id: Option<&str>,
tool_name: &str,
target: Option<&str>,
outcome: ActionOutcome,
duration_ms: Option<u64>,
) -> Result<()> {
let mut event = AuditEvent::new(AuditEventType::ToolExecution)
.with_action(tool_name)
.with_outcome(outcome);
if let Some(agent) = agent_id {
event = event.with_agent(agent);
}
if let Some(t) = target {
event = event.with_target(t);
}
if let Some(d) = duration_ms {
event = event.with_duration(d);
}
self.log(event)
}
pub fn log_denied(
&self,
agent_id: Option<&str>,
action: &str,
target: Option<&str>,
reason: &str,
) -> Result<()> {
let mut event = AuditEvent::new(AuditEventType::PolicyViolation)
.with_action(action)
.with_outcome(ActionOutcome::Denied)
.with_metadata("reason", reason);
if let Some(agent) = agent_id {
event = event.with_agent(agent);
}
if let Some(t) = target {
event = event.with_target(t);
}
self.log(event)
}
pub fn log_approval(
&self,
agent_id: Option<&str>,
action: &str,
approved: bool,
justification: Option<&str>,
) -> Result<()> {
let mut event = AuditEvent::new(AuditEventType::HumanIntervention)
.with_action(action)
.with_outcome(if approved {
ActionOutcome::Approved
} else {
ActionOutcome::Rejected
});
if let Some(agent) = agent_id {
event = event.with_agent(agent);
}
if let Some(j) = justification {
event = event.with_metadata("justification", j);
}
self.log(event)
}
pub fn log_trust_change(
&self,
agent_id: &str,
old_level: u8,
new_level: u8,
reason: &str,
) -> Result<()> {
let event = AuditEvent::new(AuditEventType::TrustChange)
.with_agent(agent_id)
.with_action("trust_change")
.with_trust_level(new_level)
.with_metadata("old_level", &old_level.to_string())
.with_metadata("reason", reason);
self.log(event)
}
pub fn submit_feedback(
&self,
run_id: &str,
polarity: FeedbackPolarity,
correction: Option<&str>,
) -> Result<FeedbackSignal> {
let signal = FeedbackSignal {
id: uuid::Uuid::new_v4().to_string(),
run_id: run_id.to_string(),
polarity,
correction: correction.map(str::to_string),
submitted_at: Utc::now(),
};
let polarity_str = match polarity {
FeedbackPolarity::ThumbsUp => "thumbs_up",
FeedbackPolarity::ThumbsDown => "thumbs_down",
};
let mut event = AuditEvent::new(AuditEventType::UserFeedback)
.with_action("user_feedback")
.with_metadata("run_id", run_id)
.with_metadata("polarity", polarity_str)
.with_metadata("feedback_id", &signal.id)
.with_outcome(ActionOutcome::Success);
if let Some(c) = correction {
event = event.with_metadata("correction", c);
}
self.log(event)?;
Ok(signal)
}
pub fn get_feedback_for_run(&self, run_id: &str) -> Result<Vec<FeedbackSignal>> {
let query = AuditQuery::new().of_type(AuditEventType::UserFeedback);
let events = self.query(&query)?;
let signals = events
.into_iter()
.filter(|e| {
e.metadata
.get("run_id")
.map(|r| r == run_id)
.unwrap_or(false)
})
.filter_map(|e| {
let feedback_id = e.metadata.get("feedback_id")?.clone();
let polarity_str = e.metadata.get("polarity")?;
let polarity = match polarity_str.as_str() {
"thumbs_up" => FeedbackPolarity::ThumbsUp,
"thumbs_down" => FeedbackPolarity::ThumbsDown,
_ => return None,
};
Some(FeedbackSignal {
id: feedback_id,
run_id: e.metadata.get("run_id")?.clone(),
polarity,
correction: e.metadata.get("correction").cloned(),
submitted_at: e.timestamp,
})
})
.collect();
Ok(signals)
}
pub fn flush(&self) -> Result<()> {
let mut buffer = self.buffer.lock().expect("audit log buffer lock poisoned");
self.flush_buffer_internal(&mut buffer)
}
fn flush_buffer_internal(&self, buffer: &mut Vec<AuditEvent>) -> Result<()> {
if buffer.is_empty() {
return Ok(());
}
let mut file = OpenOptions::new()
.create(true)
.append(true)
.open(&self.log_path)?;
for event in buffer.drain(..) {
let json = serde_json::to_string(&event)?;
writeln!(file, "{}", json)?;
}
Ok(())
}
fn write_event(&self, event: &AuditEvent) -> Result<()> {
let mut file = OpenOptions::new()
.create(true)
.append(true)
.open(&self.log_path)?;
let json = serde_json::to_string(event)?;
writeln!(file, "{}", json)?;
Ok(())
}
pub fn query(&self, query: &AuditQuery) -> Result<Vec<AuditEvent>> {
let mut results = Vec::new();
{
let buffer = self.buffer.lock().expect("audit log buffer lock poisoned");
for event in buffer.iter() {
if query.matches(event) {
results.push(event.clone());
}
}
}
if self.log_path.exists() {
let file = File::open(&self.log_path)?;
let reader = BufReader::new(file);
for line in reader.lines() {
let line = line?;
if line.is_empty() {
continue;
}
if let Ok(event) = serde_json::from_str::<AuditEvent>(&line)
&& query.matches(&event)
{
results.push(event);
}
}
}
results.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
if let Some(limit) = query.limit {
results.truncate(limit);
}
Ok(results)
}
pub fn recent(&self, count: usize) -> Result<Vec<AuditEvent>> {
self.query(&AuditQuery::new().limit(count))
}
pub fn count(&self, query: &AuditQuery) -> Result<usize> {
Ok(self.query(query)?.len())
}
pub fn export_json(&self, query: &AuditQuery) -> Result<String> {
let events = self.query(query)?;
Ok(serde_json::to_string_pretty(&events)?)
}
pub fn export_csv(&self, query: &AuditQuery) -> Result<String> {
let events = self.query(query)?;
let mut csv =
String::from("timestamp,event_type,agent_id,action,target,outcome,policy_id\n");
for event in events {
csv.push_str(&format!(
"{},{:?},{},{},{},{:?},{}\n",
event.timestamp.to_rfc3339(),
event.event_type,
event.agent_id.as_deref().unwrap_or(""),
event.action,
event.target.as_deref().unwrap_or(""),
event.outcome,
event.policy_id.as_deref().unwrap_or(""),
));
}
Ok(csv)
}
pub fn statistics(&self, since: Option<DateTime<Utc>>) -> Result<AuditStatistics> {
let query = if let Some(since) = since {
AuditQuery::new().since(since)
} else {
AuditQuery::new()
};
let events = self.query(&query)?;
let mut stats = AuditStatistics {
total_events: events.len(),
..Default::default()
};
for event in &events {
match event.event_type {
AuditEventType::ToolExecution => stats.tool_executions += 1,
AuditEventType::PolicyViolation => stats.policy_violations += 1,
AuditEventType::HumanIntervention => stats.human_interventions += 1,
_ => {}
}
match event.outcome {
ActionOutcome::Success => stats.successful_actions += 1,
ActionOutcome::Denied => stats.denied_actions += 1,
ActionOutcome::Failure => stats.failed_actions += 1,
_ => {}
}
}
Ok(stats)
}
}
impl Drop for AuditLogger {
fn drop(&mut self) {
let _ = self.flush();
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct AuditStatistics {
pub total_events: usize,
pub tool_executions: usize,
pub policy_violations: usize,
pub human_interventions: usize,
pub successful_actions: usize,
pub denied_actions: usize,
pub failed_actions: usize,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum FeedbackPolarity {
ThumbsUp,
ThumbsDown,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FeedbackSignal {
pub id: String,
pub run_id: String,
pub polarity: FeedbackPolarity,
pub correction: Option<String>,
pub submitted_at: DateTime<Utc>,
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
fn create_test_logger() -> (AuditLogger, TempDir) {
let temp_dir = TempDir::new().unwrap();
let log_path = temp_dir.path().join("audit.jsonl");
let logger = AuditLogger::with_path(log_path).unwrap();
(logger, temp_dir)
}
#[test]
fn test_log_event() {
let (logger, _temp) = create_test_logger();
let event = AuditEvent::new(AuditEventType::ToolExecution)
.with_agent("agent-123")
.with_action("write_file")
.with_target("/src/main.rs")
.with_outcome(ActionOutcome::Success);
logger.log(event).unwrap();
logger.flush().unwrap();
let events = logger.recent(10).unwrap();
assert_eq!(events.len(), 1);
assert_eq!(events[0].action, "write_file");
}
#[test]
fn test_query_events() {
let (logger, _temp) = create_test_logger();
logger
.log(
AuditEvent::new(AuditEventType::ToolExecution)
.with_agent("agent-1")
.with_action("read_file")
.with_outcome(ActionOutcome::Success),
)
.unwrap();
logger
.log(
AuditEvent::new(AuditEventType::ToolExecution)
.with_agent("agent-2")
.with_action("write_file")
.with_outcome(ActionOutcome::Denied),
)
.unwrap();
logger
.log(
AuditEvent::new(AuditEventType::PolicyViolation)
.with_agent("agent-1")
.with_action("delete_file")
.with_outcome(ActionOutcome::Denied),
)
.unwrap();
logger.flush().unwrap();
let query = AuditQuery::new().for_agent("agent-1");
let events = logger.query(&query).unwrap();
assert_eq!(events.len(), 2);
let query = AuditQuery::new().with_outcome(ActionOutcome::Denied);
let events = logger.query(&query).unwrap();
assert_eq!(events.len(), 2);
let query = AuditQuery::new().of_type(AuditEventType::PolicyViolation);
let events = logger.query(&query).unwrap();
assert_eq!(events.len(), 1);
}
#[test]
fn test_log_denied() {
let (logger, _temp) = create_test_logger();
logger
.log_denied(
Some("agent-123"),
"write_file",
Some("/.env"),
"Protected file",
)
.unwrap();
logger.flush().unwrap();
let events = logger.recent(10).unwrap();
assert_eq!(events.len(), 1);
assert_eq!(events[0].event_type, AuditEventType::PolicyViolation);
assert_eq!(events[0].outcome, ActionOutcome::Denied);
}
#[test]
fn test_statistics() {
let (logger, _temp) = create_test_logger();
logger
.log(
AuditEvent::new(AuditEventType::ToolExecution).with_outcome(ActionOutcome::Success),
)
.unwrap();
logger
.log(
AuditEvent::new(AuditEventType::ToolExecution).with_outcome(ActionOutcome::Success),
)
.unwrap();
logger
.log(
AuditEvent::new(AuditEventType::PolicyViolation)
.with_outcome(ActionOutcome::Denied),
)
.unwrap();
logger.flush().unwrap();
let stats = logger.statistics(None).unwrap();
assert_eq!(stats.total_events, 3);
assert_eq!(stats.tool_executions, 2);
assert_eq!(stats.policy_violations, 1);
assert_eq!(stats.successful_actions, 2);
assert_eq!(stats.denied_actions, 1);
}
#[test]
fn test_export_csv() {
let (logger, _temp) = create_test_logger();
logger
.log(
AuditEvent::new(AuditEventType::ToolExecution)
.with_agent("agent-1")
.with_action("read_file")
.with_outcome(ActionOutcome::Success),
)
.unwrap();
logger.flush().unwrap();
let csv = logger.export_csv(&AuditQuery::new()).unwrap();
assert!(csv.contains("timestamp,event_type,agent_id,action"));
assert!(csv.contains("read_file"));
assert!(csv.contains("agent-1"));
}
#[test]
fn test_submit_feedback_thumbs_up() {
let (logger, _temp) = create_test_logger();
let signal = logger
.submit_feedback("run-001", FeedbackPolarity::ThumbsUp, None)
.unwrap();
assert_eq!(signal.run_id, "run-001");
assert_eq!(signal.polarity, FeedbackPolarity::ThumbsUp);
assert!(signal.correction.is_none());
let feedback = logger.get_feedback_for_run("run-001").unwrap();
assert_eq!(feedback.len(), 1);
assert_eq!(feedback[0].polarity, FeedbackPolarity::ThumbsUp);
}
#[test]
fn test_submit_feedback_with_correction() {
let (logger, _temp) = create_test_logger();
let signal = logger
.submit_feedback(
"run-002",
FeedbackPolarity::ThumbsDown,
Some("Wrong answer"),
)
.unwrap();
assert_eq!(signal.correction.as_deref(), Some("Wrong answer"));
let feedback = logger.get_feedback_for_run("run-002").unwrap();
assert_eq!(feedback.len(), 1);
assert_eq!(feedback[0].correction.as_deref(), Some("Wrong answer"));
}
#[test]
fn test_feedback_isolated_per_run() {
let (logger, _temp) = create_test_logger();
logger
.submit_feedback("run-A", FeedbackPolarity::ThumbsUp, None)
.unwrap();
logger
.submit_feedback("run-B", FeedbackPolarity::ThumbsDown, None)
.unwrap();
let a = logger.get_feedback_for_run("run-A").unwrap();
let b = logger.get_feedback_for_run("run-B").unwrap();
assert_eq!(a.len(), 1);
assert_eq!(b.len(), 1);
assert_eq!(a[0].polarity, FeedbackPolarity::ThumbsUp);
assert_eq!(b[0].polarity, FeedbackPolarity::ThumbsDown);
}
#[test]
fn test_feedback_no_results_for_unknown_run() {
let (logger, _temp) = create_test_logger();
let feedback = logger.get_feedback_for_run("nonexistent-run").unwrap();
assert!(feedback.is_empty());
}
#[test]
fn test_feedback_event_type_stored_correctly() {
let (logger, _temp) = create_test_logger();
logger
.submit_feedback("run-X", FeedbackPolarity::ThumbsUp, None)
.unwrap();
let events = logger
.query(&AuditQuery::new().of_type(AuditEventType::UserFeedback))
.unwrap();
assert_eq!(events.len(), 1);
assert_eq!(events[0].event_type, AuditEventType::UserFeedback);
assert_eq!(events[0].metadata.get("run_id").unwrap(), "run-X");
}
#[test]
fn test_logger_with_anomaly_detection_violation() {
use brainwires_telemetry::anomaly::AnomalyConfig;
use tempfile::TempDir;
let temp_dir = TempDir::new().unwrap();
let log_path = temp_dir.path().join("audit.jsonl");
let logger = AuditLogger::with_path(log_path)
.unwrap()
.with_anomaly_detection(AnomalyConfig {
violation_threshold: 2,
..Default::default()
});
logger
.log_denied(Some("agent-x"), "write_file", None, "test")
.unwrap();
logger
.log_denied(Some("agent-x"), "write_file", None, "test")
.unwrap();
assert_eq!(logger.pending_anomaly_count(), 1);
let anomalies = logger.drain_anomalies().unwrap();
assert_eq!(anomalies.len(), 1);
assert_eq!(logger.pending_anomaly_count(), 0);
}
#[test]
fn test_logger_without_anomaly_detection_returns_none() {
let (logger, _temp) = create_test_logger();
assert!(logger.drain_anomalies().is_none());
assert_eq!(logger.pending_anomaly_count(), 0);
}
}