Skip to main content

oxios_kernel/access_manager/
audit_sink.rs

1//! Unified audit sink — single destination for all security events.
2//!
3//! Eliminates the three-way split between `AccessManager.audit_log`,
4//! `RbacManager.audit_log`, and `AuditTrail`. All security events
5//! flow through `AuditSink` into the Merkle chain and JSONL file.
6
7use std::path::PathBuf;
8use std::sync::Arc;
9
10use chrono::{DateTime, Utc};
11use oxi_sdk::observability::{AuditAction, AuditTrail};
12use serde::{Deserialize, Serialize};
13
14// ─── Audit Event ────────────────────────────────────────────────────────────
15
16/// Unified security audit event.
17///
18/// Every security-relevant decision produces one of these variants.
19/// Serialized as JSONL for file persistence and ingested into the
20/// Merkle-chain `AuditTrail` for tamper-evidence.
21#[derive(Debug, Clone, Serialize, Deserialize)]
22#[serde(tag = "kind")]
23#[allow(missing_docs)]
24pub enum AuditEvent {
25    /// Tool access decision.
26    ToolAccess {
27        #[serde(with = "chrono::serde::ts_milliseconds")]
28        timestamp: DateTime<Utc>,
29        agent: String,
30        tool: String,
31        allowed: bool,
32        layer: Option<String>,
33        reason: Option<String>,
34    },
35    /// Path access decision.
36    PathAccess {
37        #[serde(with = "chrono::serde::ts_milliseconds")]
38        timestamp: DateTime<Utc>,
39        agent: String,
40        path: String,
41        mode: String,
42        allowed: bool,
43        layer: Option<String>,
44        reason: Option<String>,
45    },
46    /// Command execution decision.
47    ExecAccess {
48        #[serde(with = "chrono::serde::ts_milliseconds")]
49        timestamp: DateTime<Utc>,
50        agent: String,
51        binary: String,
52        allowed: bool,
53        layer: Option<String>,
54        reason: Option<String>,
55    },
56    /// RBAC authorization decision.
57    RbacDecision {
58        #[serde(with = "chrono::serde::ts_milliseconds")]
59        timestamp: DateTime<Utc>,
60        subject: String,
61        action: String,
62        resource: String,
63        allowed: bool,
64        reason: Option<String>,
65    },
66    /// Workspace sandbox violation.
67    SandboxViolation {
68        #[serde(with = "chrono::serde::ts_milliseconds")]
69        timestamp: DateTime<Utc>,
70        agent: String,
71        path: String,
72        workspace: String,
73    },
74    /// Human-in-the-loop approval event.
75    Approval {
76        #[serde(with = "chrono::serde::ts_milliseconds")]
77        timestamp: DateTime<Utc>,
78        approval_id: String,
79        subject: String,
80        action: String,
81        status: String,
82    },
83}
84
85impl AuditEvent {
86    /// Returns the agent/subject responsible for this event.
87    pub fn actor(&self) -> &str {
88        match self {
89            AuditEvent::ToolAccess { agent, .. } => agent,
90            AuditEvent::PathAccess { agent, .. } => agent,
91            AuditEvent::ExecAccess { agent, .. } => agent,
92            AuditEvent::RbacDecision { subject, .. } => subject,
93            AuditEvent::SandboxViolation { agent, .. } => agent,
94            AuditEvent::Approval { subject, .. } => subject,
95        }
96    }
97
98    /// Convert to an AuditAction for the Merkle-chain AuditTrail.
99    pub fn to_audit_action(&self) -> AuditAction {
100        match self {
101            AuditEvent::ToolAccess { tool, allowed, .. } => AuditAction::Other {
102                detail: format!("tool_access:{tool}:allowed={allowed}"),
103            },
104            AuditEvent::PathAccess {
105                path,
106                mode,
107                allowed,
108                ..
109            } => AuditAction::Other {
110                detail: format!("path_access:{path}:{mode}:allowed={allowed}"),
111            },
112            AuditEvent::ExecAccess {
113                binary, allowed, ..
114            } => AuditAction::Other {
115                detail: format!("exec_access:{binary}:allowed={allowed}"),
116            },
117            AuditEvent::RbacDecision {
118                subject,
119                action,
120                allowed,
121                ..
122            } => AuditAction::Other {
123                detail: format!("rbac:{subject}:{action}:allowed={allowed}"),
124            },
125            AuditEvent::SandboxViolation {
126                agent,
127                path,
128                workspace,
129                ..
130            } => AuditAction::Other {
131                detail: format!("sandbox_violation:{agent}:{path}:ws={workspace}"),
132            },
133            AuditEvent::Approval {
134                approval_id,
135                status,
136                ..
137            } => AuditAction::Other {
138                detail: format!("approval:{approval_id}:{status}"),
139            },
140        }
141    }
142
143    #[cfg(test)]
144    fn now() -> DateTime<Utc> {
145        Utc::now()
146    }
147}
148
149// ─── Audit Sink Trait ───────────────────────────────────────────────────────
150
151/// Destination for all security audit events.
152///
153/// Implementations persist events to Merkle chain + file, or are no-ops for tests.
154pub trait AuditSink: Send + Sync {
155    /// Record a security audit event.
156    fn record(&self, event: AuditEvent);
157}
158
159// ─── Trail Audit Sink ───────────────────────────────────────────────────────
160
161/// Production audit sink: Merkle chain + async JSONL file writer.
162///
163/// Events are:
164/// 1. Appended to the `AuditTrail` (Merkle chain, tamper-evident)
165/// 2. Sent to a background file writer via bounded channel (JSONL)
166///
167/// If the channel is full, a warning is logged and the event is still
168/// recorded in the Merkle chain (just not persisted to file immediately).
169pub struct TrailAuditSink {
170    /// Merkle-chain audit trail — always succeeds (in-memory).
171    trail: Arc<AuditTrail>,
172    /// Bounded channel to background file writer.
173    file_tx: tokio::sync::mpsc::Sender<String>,
174}
175
176impl TrailAuditSink {
177    /// Create a new `TrailAuditSink`.
178    ///
179    /// Spawns a background tokio task that reads from the bounded channel
180    /// and appends JSONL entries to `audit_path`.
181    pub fn new(trail: Arc<AuditTrail>, audit_path: PathBuf) -> Self {
182        let (tx, mut rx) = tokio::sync::mpsc::channel::<String>(1000);
183
184        let path = audit_path.clone();
185        tokio::spawn(async move {
186            if let Ok(mut file) = tokio::fs::OpenOptions::new()
187                .create(true)
188                .append(true)
189                .open(&path)
190                .await
191            {
192                use tokio::io::AsyncWriteExt;
193                while let Some(line) = rx.recv().await {
194                    let _ = file.write_all(line.as_bytes()).await;
195                    let _ = file.write_all(b"\n").await;
196                }
197            }
198        });
199
200        Self { trail, file_tx: tx }
201    }
202}
203
204impl AuditSink for TrailAuditSink {
205    fn record(&self, event: AuditEvent) {
206        // 1. Merkle chain (always succeeds)
207        let actor = event.actor().to_string();
208        let action = event.to_audit_action();
209        self.trail.append(actor, action, "access_gate".into());
210
211        // 2. JSONL file (fire-and-forget, may drop if channel full)
212        if let Ok(line) = serde_json::to_string(&event) {
213            match self.file_tx.try_send(line) {
214                Ok(()) => {}
215                Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
216                    tracing::warn!("Audit sink channel full — event still in Merkle chain");
217                }
218                Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
219                    tracing::warn!("Audit sink channel closed");
220                }
221            }
222        }
223    }
224}
225
226// ─── No-op Sink (tests) ────────────────────────────────────────────────────
227
228/// No-op audit sink for tests — discards all events.
229#[cfg(test)]
230pub struct NoOpAuditSink;
231
232#[cfg(test)]
233impl AuditSink for NoOpAuditSink {
234    fn record(&self, _event: AuditEvent) {}
235}
236
237/// Minimal audit sink that logs to tracing — used as default when no file sink is configured.
238pub struct TracingAuditSink;
239
240impl AuditSink for TracingAuditSink {
241    fn record(&self, event: AuditEvent) {
242        if let AuditEvent::ToolAccess {
243            agent,
244            tool,
245            allowed: false,
246            layer,
247            ..
248        } = &event
249        {
250            tracing::warn!(
251                agent = %agent,
252                tool = %tool,
253                layer = ?layer,
254                "Access denied (no persistent audit sink configured)"
255            );
256        }
257    }
258}
259
260// ─── Tests ──────────────────────────────────────────────────────────────────
261
262#[cfg(test)]
263mod tests {
264    use super::*;
265
266    #[test]
267    fn test_tool_access_event() {
268        let event = AuditEvent::ToolAccess {
269            timestamp: AuditEvent::now(),
270            agent: "test-agent".into(),
271            tool: "exec".into(),
272            allowed: true,
273            layer: None,
274            reason: None,
275        };
276        assert_eq!(event.actor(), "test-agent");
277        let action = event.to_audit_action();
278        assert!(matches!(action, AuditAction::Other { .. }));
279    }
280
281    #[test]
282    fn test_rbac_decision_event() {
283        let event = AuditEvent::RbacDecision {
284            timestamp: AuditEvent::now(),
285            subject: "user:alice".into(),
286            action: "UseTool(exec)".into(),
287            resource: "exec".into(),
288            allowed: false,
289            reason: Some("role User does not allow".into()),
290        };
291        assert_eq!(event.actor(), "user:alice");
292    }
293
294    #[test]
295    fn test_sandbox_violation_event() {
296        let event = AuditEvent::SandboxViolation {
297            timestamp: AuditEvent::now(),
298            agent: "rogue-agent".into(),
299            path: "/etc/passwd".into(),
300            workspace: "project-alpha".into(),
301        };
302        assert_eq!(event.actor(), "rogue-agent");
303    }
304
305    #[test]
306    fn test_event_serialization_roundtrip() {
307        let event = AuditEvent::ExecAccess {
308            timestamp: AuditEvent::now(),
309            agent: "test".into(),
310            binary: "git".into(),
311            allowed: true,
312            layer: None,
313            reason: None,
314        };
315        let json = serde_json::to_string(&event).unwrap();
316        assert!(json.contains("ExecAccess"));
317        let deserialized: AuditEvent = serde_json::from_str(&json).unwrap();
318        assert!(matches!(deserialized, AuditEvent::ExecAccess { .. }));
319    }
320
321    #[test]
322    fn test_noop_sink() {
323        let sink = NoOpAuditSink;
324        sink.record(AuditEvent::ToolAccess {
325            timestamp: AuditEvent::now(),
326            agent: "test".into(),
327            tool: "exec".into(),
328            allowed: true,
329            layer: None,
330            reason: None,
331        });
332        // No panic = success
333    }
334}