Skip to main content

codetether_agent/audit/
mod.rs

1//! System-wide audit trail
2//!
3//! Every action taken by the server — API calls, tool executions, session
4//! operations, cognition decisions — is recorded in a tamper-evident append-only
5//! log.  Entries are held in a bounded in-memory ring buffer and optionally
6//! flushed to a JSONL file on disk.
7
8use chrono::{DateTime, Utc};
9use serde::{Deserialize, Serialize};
10use std::collections::VecDeque;
11use std::path::PathBuf;
12use std::sync::Arc;
13use tokio::sync::RwLock;
14use uuid::Uuid;
15
16/// Maximum number of entries kept in memory before oldest are evicted.
17const DEFAULT_MAX_ENTRIES: usize = 10_000;
18
19/// Categories of auditable actions.
20#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
21#[serde(rename_all = "snake_case")]
22pub enum AuditCategory {
23    /// HTTP API request/response
24    Api,
25    /// Tool invocation
26    ToolExecution,
27    /// Session lifecycle (create, load, prompt)
28    Session,
29    /// Cognition loop events
30    Cognition,
31    /// Swarm persona operations
32    Swarm,
33    /// Authentication events
34    Auth,
35    /// Kubernetes self-deployment actions
36    K8s,
37    /// Plugin sandbox events
38    Sandbox,
39    /// Configuration changes
40    Config,
41}
42
43/// Outcome of an audited action.
44#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
45#[serde(rename_all = "snake_case")]
46pub enum AuditOutcome {
47    Success,
48    Failure,
49    Denied,
50}
51
52/// A single entry in the audit log.
53#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct AuditEntry {
55    /// Unique entry ID.
56    pub id: String,
57    /// When the action occurred.
58    pub timestamp: DateTime<Utc>,
59    /// Category of the action.
60    pub category: AuditCategory,
61    /// Human-readable description (e.g. "POST /api/session").
62    pub action: String,
63    /// Authenticated principal, if any.
64    pub principal: Option<String>,
65    /// Outcome of the action.
66    pub outcome: AuditOutcome,
67    /// Additional structured details.
68    #[serde(default, skip_serializing_if = "Option::is_none")]
69    pub detail: Option<serde_json::Value>,
70    /// Duration of the action in milliseconds, if measured.
71    #[serde(default, skip_serializing_if = "Option::is_none")]
72    pub duration_ms: Option<u64>,
73    /// OKR ID if this action is part of an OKR-gated operation.
74    #[serde(default, skip_serializing_if = "Option::is_none")]
75    pub okr_id: Option<String>,
76    /// OKR run ID if this action is part of an OKR run.
77    #[serde(default, skip_serializing_if = "Option::is_none")]
78    pub okr_run_id: Option<String>,
79    /// Relay ID if this action is part of a relay execution.
80    #[serde(default, skip_serializing_if = "Option::is_none")]
81    pub relay_id: Option<String>,
82    /// Session ID for correlation.
83    #[serde(default, skip_serializing_if = "Option::is_none")]
84    pub session_id: Option<String>,
85}
86
87/// Thread-safe, append-only audit log.
88#[derive(Clone)]
89pub struct AuditLog {
90    entries: Arc<RwLock<VecDeque<AuditEntry>>>,
91    max_entries: usize,
92    /// Optional path for JSONL persistence.
93    sink_path: Option<PathBuf>,
94}
95
96impl std::fmt::Debug for AuditLog {
97    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
98        f.debug_struct("AuditLog")
99            .field("max_entries", &self.max_entries)
100            .field("sink_path", &self.sink_path)
101            .finish()
102    }
103}
104
105impl Default for AuditLog {
106    fn default() -> Self {
107        Self::new(DEFAULT_MAX_ENTRIES, None)
108    }
109}
110
111impl AuditLog {
112    /// Create a new audit log.
113    pub fn new(max_entries: usize, sink_path: Option<PathBuf>) -> Self {
114        Self {
115            entries: Arc::new(RwLock::new(VecDeque::with_capacity(
116                max_entries.min(DEFAULT_MAX_ENTRIES),
117            ))),
118            max_entries: max_entries.max(128),
119            sink_path,
120        }
121    }
122
123    /// Build from environment variables.
124    pub fn from_env() -> Self {
125        let max = std::env::var("CODETETHER_AUDIT_MAX_ENTRIES")
126            .ok()
127            .and_then(|v| v.parse().ok())
128            .unwrap_or(DEFAULT_MAX_ENTRIES);
129
130        let path = std::env::var("CODETETHER_AUDIT_LOG_PATH")
131            .ok()
132            .map(PathBuf::from);
133
134        Self::new(max, path)
135    }
136
137    /// Record an audit entry.
138    pub async fn record(&self, entry: AuditEntry) {
139        // Persist to disk first (best effort).
140        if let Some(ref path) = self.sink_path {
141            if let Ok(line) = serde_json::to_string(&entry) {
142                use tokio::io::AsyncWriteExt;
143                if let Ok(mut file) = tokio::fs::OpenOptions::new()
144                    .create(true)
145                    .append(true)
146                    .open(path)
147                    .await
148                {
149                    let _ = file.write_all(line.as_bytes()).await;
150                    let _ = file.write_all(b"\n").await;
151                }
152            }
153        }
154
155        // Log to tracing so structured log aggregators pick it up.
156        tracing::info!(
157            audit_id = %entry.id,
158            category = ?entry.category,
159            action = %entry.action,
160            outcome = ?entry.outcome,
161            principal = entry.principal.as_deref().unwrap_or("-"),
162            "audit"
163        );
164
165        let mut lock = self.entries.write().await;
166        lock.push_back(entry);
167        while lock.len() > self.max_entries {
168            lock.pop_front();
169        }
170    }
171
172    /// Convenience: record a simple action.
173    pub async fn log(
174        &self,
175        category: AuditCategory,
176        action: impl Into<String>,
177        outcome: AuditOutcome,
178        principal: Option<String>,
179        detail: Option<serde_json::Value>,
180    ) {
181        self.record(AuditEntry {
182            id: Uuid::new_v4().to_string(),
183            timestamp: Utc::now(),
184            category,
185            action: action.into(),
186            principal,
187            outcome,
188            detail,
189            duration_ms: None,
190            okr_id: None,
191            okr_run_id: None,
192            relay_id: None,
193            session_id: None,
194        })
195        .await;
196    }
197
198    /// Convenience: record an action with OKR/relay correlation.
199    pub async fn log_with_correlation(
200        &self,
201        category: AuditCategory,
202        action: impl Into<String>,
203        outcome: AuditOutcome,
204        principal: Option<String>,
205        detail: Option<serde_json::Value>,
206        okr_id: Option<String>,
207        okr_run_id: Option<String>,
208        relay_id: Option<String>,
209        session_id: Option<String>,
210    ) {
211        self.record(AuditEntry {
212            id: Uuid::new_v4().to_string(),
213            timestamp: Utc::now(),
214            category,
215            action: action.into(),
216            principal,
217            outcome,
218            detail,
219            duration_ms: None,
220            okr_id,
221            okr_run_id,
222            relay_id,
223            session_id,
224        })
225        .await;
226    }
227
228    /// Return recent entries (newest first), up to `limit`.
229    pub async fn recent(&self, limit: usize) -> Vec<AuditEntry> {
230        let lock = self.entries.read().await;
231        lock.iter().rev().take(limit).cloned().collect()
232    }
233
234    /// Return total entry count.
235    pub async fn count(&self) -> usize {
236        self.entries.read().await.len()
237    }
238
239    /// Filter entries by category.
240    pub async fn by_category(&self, category: AuditCategory, limit: usize) -> Vec<AuditEntry> {
241        let lock = self.entries.read().await;
242        lock.iter()
243            .rev()
244            .filter(|e| e.category == category)
245            .take(limit)
246            .cloned()
247            .collect()
248    }
249}
250
251/// Global audit log singleton.
252static AUDIT_LOG: tokio::sync::OnceCell<AuditLog> = tokio::sync::OnceCell::const_new();
253
254/// Initialize the global audit log.
255pub fn init_audit_log(log: AuditLog) -> Result<(), AuditLog> {
256    AUDIT_LOG.set(log).map_err(|_| AuditLog::default())
257}
258
259/// Get the global audit log (panics if not initialized).
260pub fn audit_log() -> &'static AuditLog {
261    AUDIT_LOG
262        .get()
263        .expect("Audit log not initialized — call init_audit_log() at startup")
264}
265
266/// Get the global audit log if initialized.
267pub fn try_audit_log() -> Option<&'static AuditLog> {
268    AUDIT_LOG.get()
269}
270
271#[cfg(test)]
272mod tests {
273    use super::*;
274
275    #[tokio::test]
276    async fn audit_log_records_and_retrieves() {
277        let log = AuditLog::new(100, None);
278        log.log(
279            AuditCategory::Api,
280            "GET /health",
281            AuditOutcome::Success,
282            None,
283            None,
284        )
285        .await;
286
287        assert_eq!(log.count().await, 1);
288        let entries = log.recent(10).await;
289        assert_eq!(entries[0].action, "GET /health");
290    }
291
292    #[tokio::test]
293    async fn audit_log_evicts_oldest() {
294        let log = AuditLog::new(128, None);
295        for i in 0..200 {
296            log.log(
297                AuditCategory::Api,
298                format!("req-{}", i),
299                AuditOutcome::Success,
300                None,
301                None,
302            )
303            .await;
304        }
305        assert!(log.count().await <= 128);
306    }
307}