Skip to main content

cortexai_audit/backends/
json_file.rs

1//! JSON file logger backend with rotation support.
2//!
3//! Structured JSON logging with configurable rotation policies.
4
5use crate::error::AuditError;
6use crate::traits::{AuditConfig, AuditLogger, AuditStats};
7use crate::types::AuditEvent;
8use async_trait::async_trait;
9use chrono::{DateTime, Timelike, Utc};
10use std::path::PathBuf;
11use std::sync::atomic::{AtomicU64, Ordering};
12use tokio::fs::{File, OpenOptions};
13use tokio::io::AsyncWriteExt;
14use tokio::sync::Mutex;
15
16/// Rotation policy for log files.
17#[derive(Debug, Clone)]
18pub enum RotationPolicy {
19    /// Rotate when file exceeds size in bytes
20    Size(u64),
21    /// Rotate daily
22    Daily,
23    /// Rotate hourly
24    Hourly,
25    /// Never rotate
26    Never,
27}
28
29impl Default for RotationPolicy {
30    fn default() -> Self {
31        Self::Size(10 * 1024 * 1024) // 10MB default
32    }
33}
34
35/// Configuration for log rotation.
36#[derive(Debug, Clone)]
37pub struct RotationConfig {
38    /// Rotation policy
39    pub policy: RotationPolicy,
40    /// Maximum number of rotated files to keep
41    pub max_files: usize,
42    /// Whether to compress rotated files
43    pub compress: bool,
44}
45
46impl Default for RotationConfig {
47    fn default() -> Self {
48        Self {
49            policy: RotationPolicy::default(),
50            max_files: 10,
51            compress: false,
52        }
53    }
54}
55
56impl RotationConfig {
57    /// Create a new rotation config.
58    pub fn new(policy: RotationPolicy) -> Self {
59        Self {
60            policy,
61            ..Default::default()
62        }
63    }
64
65    /// Set maximum files to keep.
66    pub fn with_max_files(mut self, max: usize) -> Self {
67        self.max_files = max;
68        self
69    }
70
71    /// Enable compression.
72    pub fn with_compression(mut self, compress: bool) -> Self {
73        self.compress = compress;
74        self
75    }
76}
77
78/// JSON file logger with rotation support.
79///
80/// Writes each audit event as a JSON line (JSONL format) with optional rotation.
81pub struct JsonFileLogger {
82    base_path: PathBuf,
83    current_file: Mutex<CurrentFile>,
84    config: AuditConfig,
85    rotation: RotationConfig,
86    stats: JsonLoggerStats,
87}
88
89struct CurrentFile {
90    file: Option<File>,
91    path: PathBuf,
92    bytes_written: u64,
93    created_at: DateTime<Utc>,
94}
95
96struct JsonLoggerStats {
97    total_events: AtomicU64,
98    failed_events: AtomicU64,
99    bytes_written: AtomicU64,
100    rotations: AtomicU64,
101}
102
103impl JsonFileLogger {
104    /// Create a new JSON file logger.
105    pub async fn new(
106        base_path: impl Into<PathBuf>,
107        config: AuditConfig,
108        rotation: RotationConfig,
109    ) -> Result<Self, AuditError> {
110        let base_path = base_path.into();
111
112        // Ensure parent directory exists
113        if let Some(parent) = base_path.parent() {
114            tokio::fs::create_dir_all(parent).await?;
115        }
116
117        let current_path = Self::generate_filename(&base_path, Utc::now());
118        let file = OpenOptions::new()
119            .create(true)
120            .append(true)
121            .open(&current_path)
122            .await?;
123
124        let metadata = file.metadata().await?;
125
126        Ok(Self {
127            base_path,
128            current_file: Mutex::new(CurrentFile {
129                file: Some(file),
130                path: current_path,
131                bytes_written: metadata.len(),
132                created_at: Utc::now(),
133            }),
134            config,
135            rotation,
136            stats: JsonLoggerStats {
137                total_events: AtomicU64::new(0),
138                failed_events: AtomicU64::new(0),
139                bytes_written: AtomicU64::new(0),
140                rotations: AtomicU64::new(0),
141            },
142        })
143    }
144
145    /// Create with default configuration.
146    pub async fn with_path(path: impl Into<PathBuf>) -> Result<Self, AuditError> {
147        Self::new(path, AuditConfig::default(), RotationConfig::default()).await
148    }
149
150    /// Generate a filename with timestamp.
151    fn generate_filename(base: &std::path::Path, time: DateTime<Utc>) -> PathBuf {
152        let stem = base.file_stem().and_then(|s| s.to_str()).unwrap_or("audit");
153        let ext = base.extension().and_then(|s| s.to_str()).unwrap_or("jsonl");
154        let parent = base.parent().unwrap_or(std::path::Path::new("."));
155
156        parent.join(format!("{}-{}.{}", stem, time.format("%Y%m%d-%H%M%S"), ext))
157    }
158
159    /// Check if rotation is needed.
160    fn needs_rotation(&self, current: &CurrentFile, now: DateTime<Utc>) -> bool {
161        match self.rotation.policy {
162            RotationPolicy::Size(max_size) => current.bytes_written >= max_size,
163            RotationPolicy::Daily => current.created_at.date_naive() != now.date_naive(),
164            RotationPolicy::Hourly => {
165                current.created_at.date_naive() != now.date_naive()
166                    || current.created_at.hour() != now.hour()
167            }
168            RotationPolicy::Never => false,
169        }
170    }
171
172    /// Rotate the log file.
173    async fn rotate(&self, current: &mut CurrentFile) -> Result<(), AuditError> {
174        // Close current file
175        if let Some(mut file) = current.file.take() {
176            file.flush().await?;
177        }
178
179        // Clean up old files
180        self.cleanup_old_files().await?;
181
182        // Create new file
183        let now = Utc::now();
184        let new_path = Self::generate_filename(&self.base_path, now);
185        let new_file = OpenOptions::new()
186            .create(true)
187            .append(true)
188            .open(&new_path)
189            .await?;
190
191        current.file = Some(new_file);
192        current.path = new_path;
193        current.bytes_written = 0;
194        current.created_at = now;
195
196        self.stats.rotations.fetch_add(1, Ordering::Relaxed);
197
198        tracing::info!("Audit log rotated to {:?}", current.path);
199
200        Ok(())
201    }
202
203    /// Clean up old rotated files.
204    async fn cleanup_old_files(&self) -> Result<(), AuditError> {
205        let parent = self.base_path.parent().unwrap_or(std::path::Path::new("."));
206        let stem = self
207            .base_path
208            .file_stem()
209            .and_then(|s| s.to_str())
210            .unwrap_or("audit");
211
212        let mut entries: Vec<_> = Vec::new();
213        let mut dir = tokio::fs::read_dir(parent).await?;
214
215        while let Some(entry) = dir.next_entry().await? {
216            let path = entry.path();
217            if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
218                if name.starts_with(stem) && name.contains('-') {
219                    if let Ok(metadata) = entry.metadata().await {
220                        entries.push((
221                            path,
222                            metadata
223                                .modified()
224                                .unwrap_or(std::time::SystemTime::UNIX_EPOCH),
225                        ));
226                    }
227                }
228            }
229        }
230
231        // Sort by modification time (oldest first)
232        entries.sort_by_key(|(_, time)| *time);
233
234        // Remove oldest files if we have too many
235        while entries.len() > self.rotation.max_files {
236            if let Some((path, _)) = entries.first() {
237                tracing::debug!("Removing old audit log: {:?}", path);
238                tokio::fs::remove_file(path).await?;
239                entries.remove(0);
240            }
241        }
242
243        Ok(())
244    }
245}
246
247#[async_trait]
248impl AuditLogger for JsonFileLogger {
249    async fn log(&self, event: AuditEvent) -> Result<(), AuditError> {
250        if !self.config.should_log(event.level) {
251            return Ok(());
252        }
253
254        let mut json = serde_json::to_string(&event)?;
255        json.push('\n');
256        let bytes = json.as_bytes();
257
258        let mut current = self.current_file.lock().await;
259
260        // Check for rotation
261        let now = Utc::now();
262        if self.needs_rotation(&current, now) {
263            self.rotate(&mut current).await?;
264        }
265
266        // Write event
267        if let Some(file) = current.file.as_mut() {
268            match file.write_all(bytes).await {
269                Ok(_) => {
270                    current.bytes_written += bytes.len() as u64;
271                    self.stats.total_events.fetch_add(1, Ordering::Relaxed);
272                    self.stats
273                        .bytes_written
274                        .fetch_add(bytes.len() as u64, Ordering::Relaxed);
275                    Ok(())
276                }
277                Err(e) => {
278                    self.stats.failed_events.fetch_add(1, Ordering::Relaxed);
279                    Err(AuditError::Io(e))
280                }
281            }
282        } else {
283            self.stats.failed_events.fetch_add(1, Ordering::Relaxed);
284            Err(AuditError::NotInitialized)
285        }
286    }
287
288    async fn flush(&self) -> Result<(), AuditError> {
289        let mut current = self.current_file.lock().await;
290        if let Some(file) = current.file.as_mut() {
291            file.flush().await?;
292        }
293        Ok(())
294    }
295
296    fn name(&self) -> &str {
297        "json_file"
298    }
299
300    async fn health_check(&self) -> Result<(), AuditError> {
301        let current = self.current_file.lock().await;
302        if current.file.is_some() {
303            Ok(())
304        } else {
305            Err(AuditError::NotInitialized)
306        }
307    }
308
309    async fn stats(&self) -> AuditStats {
310        AuditStats {
311            total_events: self.stats.total_events.load(Ordering::Relaxed),
312            failed_events: self.stats.failed_events.load(Ordering::Relaxed),
313            bytes_written: self.stats.bytes_written.load(Ordering::Relaxed),
314            ..Default::default()
315        }
316    }
317}
318
319impl std::fmt::Debug for JsonFileLogger {
320    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
321        f.debug_struct("JsonFileLogger")
322            .field("base_path", &self.base_path)
323            .field("config", &self.config)
324            .field("rotation", &self.rotation)
325            .finish()
326    }
327}
328
329#[cfg(test)]
330mod tests {
331    use super::*;
332    use crate::types::AuditContext;
333    use tempfile::tempdir;
334
335    #[tokio::test]
336    async fn test_json_logger_creation() {
337        let dir = tempdir().unwrap();
338        let path = dir.path().join("audit.jsonl");
339
340        let logger = JsonFileLogger::with_path(&path).await.unwrap();
341        assert_eq!(logger.name(), "json_file");
342    }
343
344    #[tokio::test]
345    async fn test_json_logger_write() {
346        let dir = tempdir().unwrap();
347        let path = dir.path().join("audit.jsonl");
348
349        let logger = JsonFileLogger::with_path(&path).await.unwrap();
350
351        let event =
352            AuditEvent::tool_call("execute_code", serde_json::json!({"lang": "python"}), true)
353                .with_context(AuditContext::new().with_agent_id("agent-1"));
354
355        logger.log(event).await.unwrap();
356        logger.flush().await.unwrap();
357
358        // Read the generated file
359        let current = logger.current_file.lock().await;
360        let content = tokio::fs::read_to_string(&current.path).await.unwrap();
361
362        // Verify it's valid JSON
363        let parsed: AuditEvent = serde_json::from_str(content.trim()).unwrap();
364        assert!(matches!(
365            parsed.kind,
366            crate::types::EventKind::ToolCall { .. }
367        ));
368    }
369
370    #[tokio::test]
371    async fn test_json_logger_rotation_by_size() {
372        let dir = tempdir().unwrap();
373        let path = dir.path().join("audit.jsonl");
374
375        let rotation = RotationConfig::new(RotationPolicy::Size(500)); // Small size for testing
376        let logger = JsonFileLogger::new(&path, AuditConfig::default(), rotation)
377            .await
378            .unwrap();
379
380        // Write enough events to trigger rotation
381        for i in 0..20 {
382            let event = AuditEvent::tool_call(
383                format!("tool_with_long_name_{}", i),
384                serde_json::json!({"data": "some test data that takes up space"}),
385                true,
386            );
387            logger.log(event).await.unwrap();
388        }
389        logger.flush().await.unwrap();
390
391        // Check that rotation happened
392        let stats = logger.stats().await;
393        assert!(stats.total_events > 0);
394
395        // Check multiple files exist
396        let mut count = 0;
397        let mut dir_entries = tokio::fs::read_dir(dir.path()).await.unwrap();
398        while let Some(_) = dir_entries.next_entry().await.unwrap() {
399            count += 1;
400        }
401        assert!(count >= 1);
402    }
403
404    #[tokio::test]
405    async fn test_rotation_config() {
406        let config = RotationConfig::new(RotationPolicy::Daily)
407            .with_max_files(5)
408            .with_compression(true);
409
410        assert!(matches!(config.policy, RotationPolicy::Daily));
411        assert_eq!(config.max_files, 5);
412        assert!(config.compress);
413    }
414}