Skip to main content

adk_auth/
audit.rs

1//! Enterprise audit logging for access control and platform operations.
2//!
3//! Provides structured audit events with multi-tenant context, extensible event
4//! types, multiple sink backends, batch logging, cryptographic chaining, and
5//! query/retention APIs.
6//!
7//! # Enterprise Features
8//!
9//! - **Multi-tenant context**: `workspace_id`, `tenant_id`, `request_id` fields
10//! - **Extensible event types**: 15 built-in types + `Custom(String)` for platform extensions
11//! - **Multiple outcomes**: 9 outcome variants covering full lifecycle
12//! - **Batch logging**: `log_batch()` for high-throughput scenarios
13//! - **Cryptographic chaining**: Optional SHA-256 hash chain for append-only integrity
14//! - **Query interface**: `AuditFilter` for searching audit logs from the UI
15//! - **Retention API**: `purge_before()` for compliance-driven data lifecycle
16//! - **Multiple sinks**: File (JSONL), PostgreSQL, OpenTelemetry export
17
18use chrono::{DateTime, Utc};
19use serde::{Deserialize, Serialize};
20use sha2::{Digest, Sha256};
21use std::fs::{File, OpenOptions};
22use std::io::{BufWriter, Write};
23use std::path::PathBuf;
24use std::sync::Mutex;
25
26/// Type of audit event.
27///
28/// Covers access control, resource lifecycle, configuration changes, and
29/// platform-specific operations. Use `Custom(String)` for platform extensions
30/// without forking the crate.
31#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
32#[serde(rename_all = "snake_case")]
33pub enum AuditEventType {
34    /// Tool access attempt.
35    ToolAccess,
36    /// Agent access attempt.
37    AgentAccess,
38    /// Permission check.
39    PermissionCheck,
40    /// Authentication event (login, token refresh, logout).
41    Authentication,
42    /// Authorization decision (role/policy evaluation).
43    Authorization,
44    /// Resource created (agent, workspace, project).
45    ResourceCreated,
46    /// Resource updated.
47    ResourceUpdated,
48    /// Resource deleted.
49    ResourceDeleted,
50    /// Configuration changed (settings, feature flags).
51    ConfigChanged,
52    /// Secret accessed (read from vault).
53    SecretAccessed,
54    /// Secret rotated (key rotation event).
55    SecretRotated,
56    /// Payment executed (billing event).
57    PaymentExecuted,
58    /// Policy evaluated (guardrail, rate limit).
59    PolicyEvaluated,
60    /// Session started.
61    SessionStarted,
62    /// Session ended.
63    SessionEnded,
64    /// Platform-specific custom event type.
65    Custom(String),
66}
67
68/// Outcome of an audit event.
69///
70/// Covers the full lifecycle of access decisions and resource operations.
71#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
72#[serde(rename_all = "snake_case")]
73pub enum AuditOutcome {
74    /// Access was allowed.
75    Allowed,
76    /// Access was denied.
77    Denied,
78    /// An error occurred during the operation.
79    Error,
80    /// Resource was created successfully.
81    Created,
82    /// Resource was updated successfully.
83    Updated,
84    /// Resource was deleted successfully.
85    Deleted,
86    /// Operation was blocked by a policy or guardrail.
87    Blocked,
88    /// Operation was paused (awaiting approval).
89    Paused,
90    /// Operation was escalated to a higher authority.
91    Escalated,
92}
93
94/// An audit event with enterprise multi-tenant context.
95///
96/// All fields beyond `timestamp`, `user`, `event_type`, `resource`, and `outcome`
97/// are optional for backward compatibility. Enterprise platforms populate the
98/// additional context fields for multi-tenancy, tracing, and compliance.
99#[derive(Debug, Clone, Serialize, Deserialize)]
100pub struct AuditEvent {
101    /// Timestamp of the event.
102    pub timestamp: DateTime<Utc>,
103    /// User ID (email, subject, or system identifier).
104    pub user: String,
105    /// Session ID (if available).
106    #[serde(skip_serializing_if = "Option::is_none")]
107    pub session_id: Option<String>,
108    /// Type of event.
109    pub event_type: AuditEventType,
110    /// Resource being accessed (tool name, agent name, or descriptive path).
111    pub resource: String,
112    /// Outcome of the operation.
113    pub outcome: AuditOutcome,
114    /// Additional metadata (arbitrary JSON).
115    #[serde(skip_serializing_if = "Option::is_none")]
116    pub metadata: Option<serde_json::Value>,
117
118    // ── Enterprise context fields ───────────────────────────────
119    /// Workspace ID for multi-tenant scoping.
120    #[serde(skip_serializing_if = "Option::is_none")]
121    pub workspace_id: Option<String>,
122    /// Tenant ID for multi-tenant scoping (higher level than workspace).
123    #[serde(skip_serializing_if = "Option::is_none")]
124    pub tenant_id: Option<String>,
125    /// Request ID for distributed tracing correlation.
126    #[serde(skip_serializing_if = "Option::is_none")]
127    pub request_id: Option<String>,
128    /// Client IP address.
129    #[serde(skip_serializing_if = "Option::is_none")]
130    pub ip_address: Option<String>,
131    /// Resource UUID (distinct from the human-readable `resource` name).
132    #[serde(skip_serializing_if = "Option::is_none")]
133    pub resource_id: Option<String>,
134    /// Action verb (e.g., "read", "write", "delete", "execute").
135    #[serde(skip_serializing_if = "Option::is_none")]
136    pub action: Option<String>,
137    /// SHA-256 hash of the previous event (for cryptographic chaining).
138    #[serde(skip_serializing_if = "Option::is_none")]
139    pub prev_hash: Option<String>,
140}
141
142impl AuditEvent {
143    /// Create a new audit event with the given type, user, resource, and outcome.
144    pub fn new(
145        event_type: AuditEventType,
146        user: impl Into<String>,
147        resource: impl Into<String>,
148        outcome: AuditOutcome,
149    ) -> Self {
150        Self {
151            timestamp: Utc::now(),
152            user: user.into(),
153            session_id: None,
154            event_type,
155            resource: resource.into(),
156            outcome,
157            metadata: None,
158            workspace_id: None,
159            tenant_id: None,
160            request_id: None,
161            ip_address: None,
162            resource_id: None,
163            action: None,
164            prev_hash: None,
165        }
166    }
167
168    /// Create a new tool access event.
169    pub fn tool_access(user: &str, tool_name: &str, outcome: AuditOutcome) -> Self {
170        Self::new(AuditEventType::ToolAccess, user, tool_name, outcome)
171    }
172
173    /// Create a new agent access event.
174    pub fn agent_access(user: &str, agent_name: &str, outcome: AuditOutcome) -> Self {
175        Self::new(AuditEventType::AgentAccess, user, agent_name, outcome)
176    }
177
178    /// Create an authentication event.
179    pub fn authentication(user: &str, outcome: AuditOutcome) -> Self {
180        Self::new(AuditEventType::Authentication, user, "auth", outcome)
181    }
182
183    /// Create a resource lifecycle event.
184    pub fn resource_event(
185        event_type: AuditEventType,
186        user: &str,
187        resource: &str,
188        resource_id: &str,
189        outcome: AuditOutcome,
190    ) -> Self {
191        Self::new(event_type, user, resource, outcome).with_resource_id(resource_id)
192    }
193
194    /// Create a secret access event.
195    pub fn secret_accessed(user: &str, secret_name: &str, outcome: AuditOutcome) -> Self {
196        Self::new(AuditEventType::SecretAccessed, user, secret_name, outcome)
197    }
198
199    /// Create a configuration change event.
200    pub fn config_changed(user: &str, config_key: &str, outcome: AuditOutcome) -> Self {
201        Self::new(AuditEventType::ConfigChanged, user, config_key, outcome)
202    }
203
204    /// Create a custom event type for platform extensions.
205    pub fn custom(event_type: &str, user: &str, resource: &str, outcome: AuditOutcome) -> Self {
206        Self::new(AuditEventType::Custom(event_type.to_string()), user, resource, outcome)
207    }
208
209    /// Set the session ID.
210    pub fn with_session(mut self, session_id: impl Into<String>) -> Self {
211        self.session_id = Some(session_id.into());
212        self
213    }
214
215    /// Set metadata.
216    pub fn with_metadata(mut self, metadata: serde_json::Value) -> Self {
217        self.metadata = Some(metadata);
218        self
219    }
220
221    /// Set workspace ID for multi-tenant scoping.
222    pub fn with_workspace(mut self, workspace_id: impl Into<String>) -> Self {
223        self.workspace_id = Some(workspace_id.into());
224        self
225    }
226
227    /// Set tenant ID for multi-tenant scoping.
228    pub fn with_tenant(mut self, tenant_id: impl Into<String>) -> Self {
229        self.tenant_id = Some(tenant_id.into());
230        self
231    }
232
233    /// Set request ID for distributed tracing.
234    pub fn with_request_id(mut self, request_id: impl Into<String>) -> Self {
235        self.request_id = Some(request_id.into());
236        self
237    }
238
239    /// Set client IP address.
240    pub fn with_ip_address(mut self, ip: impl Into<String>) -> Self {
241        self.ip_address = Some(ip.into());
242        self
243    }
244
245    /// Set resource UUID.
246    pub fn with_resource_id(mut self, id: impl Into<String>) -> Self {
247        self.resource_id = Some(id.into());
248        self
249    }
250
251    /// Set action verb.
252    pub fn with_action(mut self, action: impl Into<String>) -> Self {
253        self.action = Some(action.into());
254        self
255    }
256
257    /// Compute and set the cryptographic hash chain link.
258    ///
259    /// The hash is SHA-256 of the JSON-serialized previous event.
260    /// Call this before logging to maintain an append-only chain.
261    pub fn with_prev_hash(mut self, prev_event_json: &str) -> Self {
262        let mut hasher = Sha256::new();
263        hasher.update(prev_event_json.as_bytes());
264        self.prev_hash = Some(hex::encode(hasher.finalize()));
265        self
266    }
267
268    /// Serialize this event to JSON (for hash chaining).
269    pub fn to_json(&self) -> Result<String, serde_json::Error> {
270        serde_json::to_string(self)
271    }
272}
273
274/// Filter for querying audit events.
275///
276/// All fields are optional — only non-None fields are used as filter criteria.
277/// Multiple fields are combined with AND logic.
278#[derive(Debug, Clone, Default, Serialize, Deserialize)]
279pub struct AuditFilter {
280    /// Filter by user ID.
281    #[serde(skip_serializing_if = "Option::is_none")]
282    pub user: Option<String>,
283    /// Filter by workspace ID.
284    #[serde(skip_serializing_if = "Option::is_none")]
285    pub workspace_id: Option<String>,
286    /// Filter by tenant ID.
287    #[serde(skip_serializing_if = "Option::is_none")]
288    pub tenant_id: Option<String>,
289    /// Filter by event type.
290    #[serde(skip_serializing_if = "Option::is_none")]
291    pub event_type: Option<AuditEventType>,
292    /// Filter by outcome.
293    #[serde(skip_serializing_if = "Option::is_none")]
294    pub outcome: Option<AuditOutcome>,
295    /// Filter by resource name (substring match).
296    #[serde(skip_serializing_if = "Option::is_none")]
297    pub resource: Option<String>,
298    /// Filter by resource UUID.
299    #[serde(skip_serializing_if = "Option::is_none")]
300    pub resource_id: Option<String>,
301    /// Events after this timestamp.
302    #[serde(skip_serializing_if = "Option::is_none")]
303    pub after: Option<DateTime<Utc>>,
304    /// Events before this timestamp.
305    #[serde(skip_serializing_if = "Option::is_none")]
306    pub before: Option<DateTime<Utc>>,
307    /// Maximum number of results to return.
308    #[serde(skip_serializing_if = "Option::is_none")]
309    pub limit: Option<usize>,
310    /// Offset for pagination.
311    #[serde(skip_serializing_if = "Option::is_none")]
312    pub offset: Option<usize>,
313}
314
315impl AuditFilter {
316    /// Check if an event matches this filter.
317    pub fn matches(&self, event: &AuditEvent) -> bool {
318        if let Some(ref user) = self.user
319            && &event.user != user
320        {
321            return false;
322        }
323        if let Some(ref ws) = self.workspace_id
324            && event.workspace_id.as_ref() != Some(ws)
325        {
326            return false;
327        }
328        if let Some(ref tid) = self.tenant_id
329            && event.tenant_id.as_ref() != Some(tid)
330        {
331            return false;
332        }
333        if let Some(ref et) = self.event_type
334            && &event.event_type != et
335        {
336            return false;
337        }
338        if let Some(ref oc) = self.outcome
339            && &event.outcome != oc
340        {
341            return false;
342        }
343        if let Some(ref res) = self.resource
344            && !event.resource.contains(res.as_str())
345        {
346            return false;
347        }
348        if let Some(ref rid) = self.resource_id
349            && event.resource_id.as_ref() != Some(rid)
350        {
351            return false;
352        }
353        if let Some(after) = self.after
354            && event.timestamp <= after
355        {
356            return false;
357        }
358        if let Some(before) = self.before
359            && event.timestamp >= before
360        {
361            return false;
362        }
363        true
364    }
365}
366
367/// Trait for audit sinks.
368///
369/// Implementations persist audit events to a storage backend. The trait supports
370/// single-event logging, batch logging for high throughput, querying for UI
371/// display, and retention management for compliance.
372#[async_trait::async_trait]
373pub trait AuditSink: Send + Sync {
374    /// Log a single audit event.
375    async fn log(&self, event: AuditEvent) -> Result<(), crate::AuthError>;
376
377    /// Log a batch of audit events atomically.
378    ///
379    /// Default implementation logs events sequentially. Override for
380    /// high-throughput backends that support batch inserts.
381    async fn log_batch(&self, events: Vec<AuditEvent>) -> Result<(), crate::AuthError> {
382        for event in events {
383            self.log(event).await?;
384        }
385        Ok(())
386    }
387
388    /// Query audit events matching the given filter.
389    ///
390    /// Default implementation returns an empty vec (not all sinks support queries).
391    /// Override for queryable backends (PostgreSQL, in-memory).
392    async fn query(&self, _filter: &AuditFilter) -> Result<Vec<AuditEvent>, crate::AuthError> {
393        Ok(Vec::new())
394    }
395
396    /// Purge events older than the given cutoff timestamp.
397    ///
398    /// Returns the number of events purged. Default returns 0 (not all sinks
399    /// support retention management).
400    async fn purge_before(&self, _cutoff: DateTime<Utc>) -> Result<u64, crate::AuthError> {
401        Ok(0)
402    }
403
404    /// Flush any buffered events to the underlying storage.
405    ///
406    /// Default is a no-op. Override for buffered sinks.
407    async fn flush(&self) -> Result<(), crate::AuthError> {
408        Ok(())
409    }
410}
411
412/// File-based audit sink that writes JSONL (one JSON line per event).
413///
414/// Supports optional cryptographic chaining — when enabled, each event includes
415/// the SHA-256 hash of the previous event's JSON representation, creating an
416/// append-only tamper-evident log.
417pub struct FileAuditSink {
418    writer: Mutex<BufWriter<File>>,
419    path: PathBuf,
420    /// Last event JSON for hash chaining (None = chaining disabled).
421    last_event_json: Mutex<Option<String>>,
422    /// Whether to enable cryptographic hash chaining.
423    chain_enabled: bool,
424}
425
426impl FileAuditSink {
427    /// Create a new file audit sink.
428    pub fn new(path: impl Into<PathBuf>) -> Result<Self, std::io::Error> {
429        let path = path.into();
430        let file = OpenOptions::new().create(true).append(true).open(&path)?;
431        let writer = Mutex::new(BufWriter::new(file));
432        Ok(Self { writer, path, last_event_json: Mutex::new(None), chain_enabled: false })
433    }
434
435    /// Create a new file audit sink with cryptographic hash chaining enabled.
436    ///
437    /// Each event will include a `prev_hash` field containing the SHA-256 hash
438    /// of the previous event's JSON, creating a tamper-evident append-only log.
439    pub fn with_chaining(path: impl Into<PathBuf>) -> Result<Self, std::io::Error> {
440        let path = path.into();
441        let file = OpenOptions::new().create(true).append(true).open(&path)?;
442        let writer = Mutex::new(BufWriter::new(file));
443        Ok(Self { writer, path, last_event_json: Mutex::new(None), chain_enabled: true })
444    }
445
446    /// Get the path to the audit log file.
447    pub fn path(&self) -> &PathBuf {
448        &self.path
449    }
450}
451
452#[async_trait::async_trait]
453impl AuditSink for FileAuditSink {
454    async fn log(&self, mut event: AuditEvent) -> Result<(), crate::AuthError> {
455        // Apply hash chaining if enabled
456        if self.chain_enabled {
457            let mut last = self.last_event_json.lock().unwrap_or_else(|p| p.into_inner());
458            if let Some(ref prev_json) = *last {
459                let mut hasher = Sha256::new();
460                hasher.update(prev_json.as_bytes());
461                event.prev_hash = Some(hex::encode(hasher.finalize()));
462            }
463            let line = serde_json::to_string(&event)
464                .map_err(|e| crate::AuthError::AuditError(e.to_string()))?;
465            *last = Some(line.clone());
466
467            let mut writer = self.writer.lock().unwrap_or_else(|poisoned| {
468                tracing::warn!(path = %self.path.display(), "audit writer lock poisoned, recovering");
469                poisoned.into_inner()
470            });
471            writeln!(writer, "{line}")?;
472            writer.flush()?;
473        } else {
474            let line = serde_json::to_string(&event)
475                .map_err(|e| crate::AuthError::AuditError(e.to_string()))?;
476
477            let mut writer = self.writer.lock().unwrap_or_else(|poisoned| {
478                tracing::warn!(path = %self.path.display(), "audit writer lock poisoned, recovering");
479                poisoned.into_inner()
480            });
481            writeln!(writer, "{line}")?;
482            writer.flush()?;
483        }
484
485        Ok(())
486    }
487}
488
489/// In-memory audit sink for testing and development.
490///
491/// Stores all events in a `Vec` behind a `RwLock`. Supports querying and
492/// purging. Not suitable for production — use `FileAuditSink` or a database
493/// sink instead.
494pub struct InMemoryAuditSink {
495    events: tokio::sync::RwLock<Vec<AuditEvent>>,
496}
497
498impl InMemoryAuditSink {
499    /// Create a new in-memory audit sink.
500    pub fn new() -> Self {
501        Self { events: tokio::sync::RwLock::new(Vec::new()) }
502    }
503
504    /// Get the number of stored events.
505    pub async fn len(&self) -> usize {
506        self.events.read().await.len()
507    }
508
509    /// Check if the sink is empty.
510    pub async fn is_empty(&self) -> bool {
511        self.events.read().await.is_empty()
512    }
513}
514
515impl Default for InMemoryAuditSink {
516    fn default() -> Self {
517        Self::new()
518    }
519}
520
521#[async_trait::async_trait]
522impl AuditSink for InMemoryAuditSink {
523    async fn log(&self, event: AuditEvent) -> Result<(), crate::AuthError> {
524        self.events.write().await.push(event);
525        Ok(())
526    }
527
528    async fn log_batch(&self, events: Vec<AuditEvent>) -> Result<(), crate::AuthError> {
529        self.events.write().await.extend(events);
530        Ok(())
531    }
532
533    async fn query(&self, filter: &AuditFilter) -> Result<Vec<AuditEvent>, crate::AuthError> {
534        let events = self.events.read().await;
535        let mut results: Vec<AuditEvent> =
536            events.iter().filter(|e| filter.matches(e)).cloned().collect();
537
538        // Apply offset
539        if let Some(offset) = filter.offset {
540            if offset < results.len() {
541                results = results[offset..].to_vec();
542            } else {
543                results.clear();
544            }
545        }
546
547        // Apply limit
548        if let Some(limit) = filter.limit {
549            results.truncate(limit);
550        }
551
552        Ok(results)
553    }
554
555    async fn purge_before(&self, cutoff: DateTime<Utc>) -> Result<u64, crate::AuthError> {
556        let mut events = self.events.write().await;
557        let before_len = events.len();
558        events.retain(|e| e.timestamp >= cutoff);
559        Ok((before_len - events.len()) as u64)
560    }
561}
562
563#[cfg(test)]
564mod tests {
565    use super::*;
566
567    #[test]
568    fn test_audit_event_serialization() {
569        let event = AuditEvent::tool_access("alice", "search", AuditOutcome::Allowed);
570        let json = serde_json::to_string(&event).unwrap();
571        assert!(json.contains("\"user\":\"alice\""));
572        assert!(json.contains("\"resource\":\"search\""));
573        assert!(json.contains("\"outcome\":\"allowed\""));
574    }
575
576    #[test]
577    fn test_audit_event_with_session() {
578        let event = AuditEvent::tool_access("bob", "exec", AuditOutcome::Denied)
579            .with_session("session-123");
580        assert_eq!(event.session_id, Some("session-123".to_string()));
581    }
582
583    #[test]
584    fn test_enterprise_context_fields() {
585        let event = AuditEvent::tool_access("alice", "search", AuditOutcome::Allowed)
586            .with_workspace("ws_123")
587            .with_tenant("tenant_abc")
588            .with_request_id("req_456")
589            .with_ip_address("192.168.1.1")
590            .with_resource_id("550e8400-e29b-41d4-a716-446655440000")
591            .with_action("execute");
592
593        assert_eq!(event.workspace_id, Some("ws_123".to_string()));
594        assert_eq!(event.tenant_id, Some("tenant_abc".to_string()));
595        assert_eq!(event.request_id, Some("req_456".to_string()));
596        assert_eq!(event.ip_address, Some("192.168.1.1".to_string()));
597        assert_eq!(event.resource_id, Some("550e8400-e29b-41d4-a716-446655440000".to_string()));
598        assert_eq!(event.action, Some("execute".to_string()));
599
600        // Verify JSON serialization includes enterprise fields
601        let json = serde_json::to_string(&event).unwrap();
602        assert!(json.contains("\"workspace_id\":\"ws_123\""));
603        assert!(json.contains("\"tenant_id\":\"tenant_abc\""));
604        assert!(json.contains("\"request_id\":\"req_456\""));
605    }
606
607    #[test]
608    fn test_custom_event_type() {
609        let event =
610            AuditEvent::custom("deployment_triggered", "ci-bot", "agent-v2", AuditOutcome::Created);
611        let json = serde_json::to_string(&event).unwrap();
612        assert!(json.contains("deployment_triggered"));
613        assert!(json.contains("\"outcome\":\"created\""));
614    }
615
616    #[test]
617    fn test_new_outcomes() {
618        for outcome in [
619            AuditOutcome::Created,
620            AuditOutcome::Updated,
621            AuditOutcome::Deleted,
622            AuditOutcome::Blocked,
623            AuditOutcome::Paused,
624            AuditOutcome::Escalated,
625        ] {
626            let event =
627                AuditEvent::new(AuditEventType::ResourceCreated, "user", "res", outcome.clone());
628            let json = serde_json::to_string(&event).unwrap();
629            let deserialized: AuditEvent = serde_json::from_str(&json).unwrap();
630            assert_eq!(deserialized.outcome, outcome);
631        }
632    }
633
634    #[test]
635    fn test_new_event_types() {
636        let types = vec![
637            AuditEventType::Authentication,
638            AuditEventType::Authorization,
639            AuditEventType::ResourceCreated,
640            AuditEventType::ResourceUpdated,
641            AuditEventType::ResourceDeleted,
642            AuditEventType::ConfigChanged,
643            AuditEventType::SecretAccessed,
644            AuditEventType::SecretRotated,
645            AuditEventType::PaymentExecuted,
646            AuditEventType::PolicyEvaluated,
647            AuditEventType::SessionStarted,
648            AuditEventType::SessionEnded,
649            AuditEventType::Custom("my_event".to_string()),
650        ];
651
652        for event_type in types {
653            let event =
654                AuditEvent::new(event_type.clone(), "user", "resource", AuditOutcome::Allowed);
655            let json = serde_json::to_string(&event).unwrap();
656            let deserialized: AuditEvent = serde_json::from_str(&json).unwrap();
657            assert_eq!(deserialized.event_type, event_type);
658        }
659    }
660
661    #[test]
662    fn test_hash_chaining() {
663        let event1 = AuditEvent::tool_access("alice", "search", AuditOutcome::Allowed);
664        let json1 = event1.to_json().unwrap();
665
666        let event2 =
667            AuditEvent::tool_access("bob", "exec", AuditOutcome::Denied).with_prev_hash(&json1);
668
669        assert!(event2.prev_hash.is_some());
670        // SHA-256 hex is 64 chars
671        assert_eq!(event2.prev_hash.as_ref().unwrap().len(), 64);
672    }
673
674    #[test]
675    fn test_audit_filter_matches() {
676        let event = AuditEvent::tool_access("alice", "search", AuditOutcome::Allowed)
677            .with_workspace("ws_1");
678
679        let filter = AuditFilter { user: Some("alice".to_string()), ..Default::default() };
680        assert!(filter.matches(&event));
681
682        let filter = AuditFilter { user: Some("bob".to_string()), ..Default::default() };
683        assert!(!filter.matches(&event));
684
685        let filter = AuditFilter { workspace_id: Some("ws_1".to_string()), ..Default::default() };
686        assert!(filter.matches(&event));
687
688        let filter = AuditFilter { workspace_id: Some("ws_2".to_string()), ..Default::default() };
689        assert!(!filter.matches(&event));
690    }
691
692    #[test]
693    fn test_backward_compatible_serialization() {
694        // Old-style event without enterprise fields should still deserialize
695        let old_json = r#"{"timestamp":"2026-01-01T00:00:00Z","user":"alice","event_type":"tool_access","resource":"search","outcome":"allowed"}"#;
696        let event: AuditEvent = serde_json::from_str(old_json).unwrap();
697        assert_eq!(event.user, "alice");
698        assert_eq!(event.workspace_id, None);
699        assert_eq!(event.tenant_id, None);
700    }
701
702    #[tokio::test]
703    async fn test_in_memory_sink_query() {
704        let sink = InMemoryAuditSink::new();
705
706        sink.log(
707            AuditEvent::tool_access("alice", "search", AuditOutcome::Allowed)
708                .with_workspace("ws_1"),
709        )
710        .await
711        .unwrap();
712        sink.log(
713            AuditEvent::tool_access("bob", "exec", AuditOutcome::Denied).with_workspace("ws_1"),
714        )
715        .await
716        .unwrap();
717        sink.log(
718            AuditEvent::tool_access("alice", "deploy", AuditOutcome::Allowed)
719                .with_workspace("ws_2"),
720        )
721        .await
722        .unwrap();
723
724        // Query by user
725        let results = sink
726            .query(&AuditFilter { user: Some("alice".to_string()), ..Default::default() })
727            .await
728            .unwrap();
729        assert_eq!(results.len(), 2);
730
731        // Query by workspace
732        let results = sink
733            .query(&AuditFilter { workspace_id: Some("ws_1".to_string()), ..Default::default() })
734            .await
735            .unwrap();
736        assert_eq!(results.len(), 2);
737
738        // Query by outcome
739        let results = sink
740            .query(&AuditFilter { outcome: Some(AuditOutcome::Denied), ..Default::default() })
741            .await
742            .unwrap();
743        assert_eq!(results.len(), 1);
744        assert_eq!(results[0].user, "bob");
745    }
746
747    #[tokio::test]
748    async fn test_in_memory_sink_purge() {
749        let sink = InMemoryAuditSink::new();
750
751        sink.log(AuditEvent::tool_access("alice", "search", AuditOutcome::Allowed)).await.unwrap();
752        sink.log(AuditEvent::tool_access("bob", "exec", AuditOutcome::Denied)).await.unwrap();
753
754        assert_eq!(sink.len().await, 2);
755
756        // Purge everything before now+1s (should purge all)
757        let cutoff = Utc::now() + chrono::Duration::seconds(1);
758        let purged = sink.purge_before(cutoff).await.unwrap();
759        assert_eq!(purged, 2);
760        assert!(sink.is_empty().await);
761    }
762
763    #[tokio::test]
764    async fn test_in_memory_sink_batch() {
765        let sink = InMemoryAuditSink::new();
766
767        let events = vec![
768            AuditEvent::tool_access("alice", "a", AuditOutcome::Allowed),
769            AuditEvent::tool_access("bob", "b", AuditOutcome::Denied),
770            AuditEvent::tool_access("carol", "c", AuditOutcome::Allowed),
771        ];
772
773        sink.log_batch(events).await.unwrap();
774        assert_eq!(sink.len().await, 3);
775    }
776}