1use crate::error::AuditError;
6use crate::traits::{AuditConfig, AuditLogger, AuditStats};
7use crate::types::AuditEvent;
8use async_trait::async_trait;
9use chrono::{DateTime, Timelike, Utc};
10use std::path::PathBuf;
11use std::sync::atomic::{AtomicU64, Ordering};
12use tokio::fs::{File, OpenOptions};
13use tokio::io::AsyncWriteExt;
14use tokio::sync::Mutex;
15
16#[derive(Debug, Clone)]
18pub enum RotationPolicy {
19 Size(u64),
21 Daily,
23 Hourly,
25 Never,
27}
28
29impl Default for RotationPolicy {
30 fn default() -> Self {
31 Self::Size(10 * 1024 * 1024) }
33}
34
35#[derive(Debug, Clone)]
37pub struct RotationConfig {
38 pub policy: RotationPolicy,
40 pub max_files: usize,
42 pub compress: bool,
44}
45
46impl Default for RotationConfig {
47 fn default() -> Self {
48 Self {
49 policy: RotationPolicy::default(),
50 max_files: 10,
51 compress: false,
52 }
53 }
54}
55
56impl RotationConfig {
57 pub fn new(policy: RotationPolicy) -> Self {
59 Self {
60 policy,
61 ..Default::default()
62 }
63 }
64
65 pub fn with_max_files(mut self, max: usize) -> Self {
67 self.max_files = max;
68 self
69 }
70
71 pub fn with_compression(mut self, compress: bool) -> Self {
73 self.compress = compress;
74 self
75 }
76}
77
78pub struct JsonFileLogger {
82 base_path: PathBuf,
83 current_file: Mutex<CurrentFile>,
84 config: AuditConfig,
85 rotation: RotationConfig,
86 stats: JsonLoggerStats,
87}
88
89struct CurrentFile {
90 file: Option<File>,
91 path: PathBuf,
92 bytes_written: u64,
93 created_at: DateTime<Utc>,
94}
95
96struct JsonLoggerStats {
97 total_events: AtomicU64,
98 failed_events: AtomicU64,
99 bytes_written: AtomicU64,
100 rotations: AtomicU64,
101}
102
103impl JsonFileLogger {
104 pub async fn new(
106 base_path: impl Into<PathBuf>,
107 config: AuditConfig,
108 rotation: RotationConfig,
109 ) -> Result<Self, AuditError> {
110 let base_path = base_path.into();
111
112 if let Some(parent) = base_path.parent() {
114 tokio::fs::create_dir_all(parent).await?;
115 }
116
117 let current_path = Self::generate_filename(&base_path, Utc::now());
118 let file = OpenOptions::new()
119 .create(true)
120 .append(true)
121 .open(¤t_path)
122 .await?;
123
124 let metadata = file.metadata().await?;
125
126 Ok(Self {
127 base_path,
128 current_file: Mutex::new(CurrentFile {
129 file: Some(file),
130 path: current_path,
131 bytes_written: metadata.len(),
132 created_at: Utc::now(),
133 }),
134 config,
135 rotation,
136 stats: JsonLoggerStats {
137 total_events: AtomicU64::new(0),
138 failed_events: AtomicU64::new(0),
139 bytes_written: AtomicU64::new(0),
140 rotations: AtomicU64::new(0),
141 },
142 })
143 }
144
145 pub async fn with_path(path: impl Into<PathBuf>) -> Result<Self, AuditError> {
147 Self::new(path, AuditConfig::default(), RotationConfig::default()).await
148 }
149
150 fn generate_filename(base: &std::path::Path, time: DateTime<Utc>) -> PathBuf {
152 let stem = base.file_stem().and_then(|s| s.to_str()).unwrap_or("audit");
153 let ext = base.extension().and_then(|s| s.to_str()).unwrap_or("jsonl");
154 let parent = base.parent().unwrap_or(std::path::Path::new("."));
155
156 parent.join(format!("{}-{}.{}", stem, time.format("%Y%m%d-%H%M%S"), ext))
157 }
158
159 fn needs_rotation(&self, current: &CurrentFile, now: DateTime<Utc>) -> bool {
161 match self.rotation.policy {
162 RotationPolicy::Size(max_size) => current.bytes_written >= max_size,
163 RotationPolicy::Daily => current.created_at.date_naive() != now.date_naive(),
164 RotationPolicy::Hourly => {
165 current.created_at.date_naive() != now.date_naive()
166 || current.created_at.hour() != now.hour()
167 }
168 RotationPolicy::Never => false,
169 }
170 }
171
172 async fn rotate(&self, current: &mut CurrentFile) -> Result<(), AuditError> {
174 if let Some(mut file) = current.file.take() {
176 file.flush().await?;
177 }
178
179 self.cleanup_old_files().await?;
181
182 let now = Utc::now();
184 let new_path = Self::generate_filename(&self.base_path, now);
185 let new_file = OpenOptions::new()
186 .create(true)
187 .append(true)
188 .open(&new_path)
189 .await?;
190
191 current.file = Some(new_file);
192 current.path = new_path;
193 current.bytes_written = 0;
194 current.created_at = now;
195
196 self.stats.rotations.fetch_add(1, Ordering::Relaxed);
197
198 tracing::info!("Audit log rotated to {:?}", current.path);
199
200 Ok(())
201 }
202
203 async fn cleanup_old_files(&self) -> Result<(), AuditError> {
205 let parent = self.base_path.parent().unwrap_or(std::path::Path::new("."));
206 let stem = self
207 .base_path
208 .file_stem()
209 .and_then(|s| s.to_str())
210 .unwrap_or("audit");
211
212 let mut entries: Vec<_> = Vec::new();
213 let mut dir = tokio::fs::read_dir(parent).await?;
214
215 while let Some(entry) = dir.next_entry().await? {
216 let path = entry.path();
217 if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
218 if name.starts_with(stem) && name.contains('-') {
219 if let Ok(metadata) = entry.metadata().await {
220 entries.push((
221 path,
222 metadata
223 .modified()
224 .unwrap_or(std::time::SystemTime::UNIX_EPOCH),
225 ));
226 }
227 }
228 }
229 }
230
231 entries.sort_by_key(|(_, time)| *time);
233
234 while entries.len() > self.rotation.max_files {
236 if let Some((path, _)) = entries.first() {
237 tracing::debug!("Removing old audit log: {:?}", path);
238 tokio::fs::remove_file(path).await?;
239 entries.remove(0);
240 }
241 }
242
243 Ok(())
244 }
245}
246
247#[async_trait]
248impl AuditLogger for JsonFileLogger {
249 async fn log(&self, event: AuditEvent) -> Result<(), AuditError> {
250 if !self.config.should_log(event.level) {
251 return Ok(());
252 }
253
254 let mut json = serde_json::to_string(&event)?;
255 json.push('\n');
256 let bytes = json.as_bytes();
257
258 let mut current = self.current_file.lock().await;
259
260 let now = Utc::now();
262 if self.needs_rotation(¤t, now) {
263 self.rotate(&mut current).await?;
264 }
265
266 if let Some(file) = current.file.as_mut() {
268 match file.write_all(bytes).await {
269 Ok(_) => {
270 current.bytes_written += bytes.len() as u64;
271 self.stats.total_events.fetch_add(1, Ordering::Relaxed);
272 self.stats
273 .bytes_written
274 .fetch_add(bytes.len() as u64, Ordering::Relaxed);
275 Ok(())
276 }
277 Err(e) => {
278 self.stats.failed_events.fetch_add(1, Ordering::Relaxed);
279 Err(AuditError::Io(e))
280 }
281 }
282 } else {
283 self.stats.failed_events.fetch_add(1, Ordering::Relaxed);
284 Err(AuditError::NotInitialized)
285 }
286 }
287
288 async fn flush(&self) -> Result<(), AuditError> {
289 let mut current = self.current_file.lock().await;
290 if let Some(file) = current.file.as_mut() {
291 file.flush().await?;
292 }
293 Ok(())
294 }
295
296 fn name(&self) -> &str {
297 "json_file"
298 }
299
300 async fn health_check(&self) -> Result<(), AuditError> {
301 let current = self.current_file.lock().await;
302 if current.file.is_some() {
303 Ok(())
304 } else {
305 Err(AuditError::NotInitialized)
306 }
307 }
308
309 async fn stats(&self) -> AuditStats {
310 AuditStats {
311 total_events: self.stats.total_events.load(Ordering::Relaxed),
312 failed_events: self.stats.failed_events.load(Ordering::Relaxed),
313 bytes_written: self.stats.bytes_written.load(Ordering::Relaxed),
314 ..Default::default()
315 }
316 }
317}
318
319impl std::fmt::Debug for JsonFileLogger {
320 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
321 f.debug_struct("JsonFileLogger")
322 .field("base_path", &self.base_path)
323 .field("config", &self.config)
324 .field("rotation", &self.rotation)
325 .finish()
326 }
327}
328
329#[cfg(test)]
330mod tests {
331 use super::*;
332 use crate::types::AuditContext;
333 use tempfile::tempdir;
334
335 #[tokio::test]
336 async fn test_json_logger_creation() {
337 let dir = tempdir().unwrap();
338 let path = dir.path().join("audit.jsonl");
339
340 let logger = JsonFileLogger::with_path(&path).await.unwrap();
341 assert_eq!(logger.name(), "json_file");
342 }
343
344 #[tokio::test]
345 async fn test_json_logger_write() {
346 let dir = tempdir().unwrap();
347 let path = dir.path().join("audit.jsonl");
348
349 let logger = JsonFileLogger::with_path(&path).await.unwrap();
350
351 let event =
352 AuditEvent::tool_call("execute_code", serde_json::json!({"lang": "python"}), true)
353 .with_context(AuditContext::new().with_agent_id("agent-1"));
354
355 logger.log(event).await.unwrap();
356 logger.flush().await.unwrap();
357
358 let current = logger.current_file.lock().await;
360 let content = tokio::fs::read_to_string(¤t.path).await.unwrap();
361
362 let parsed: AuditEvent = serde_json::from_str(content.trim()).unwrap();
364 assert!(matches!(
365 parsed.kind,
366 crate::types::EventKind::ToolCall { .. }
367 ));
368 }
369
370 #[tokio::test]
371 async fn test_json_logger_rotation_by_size() {
372 let dir = tempdir().unwrap();
373 let path = dir.path().join("audit.jsonl");
374
375 let rotation = RotationConfig::new(RotationPolicy::Size(500)); let logger = JsonFileLogger::new(&path, AuditConfig::default(), rotation)
377 .await
378 .unwrap();
379
380 for i in 0..20 {
382 let event = AuditEvent::tool_call(
383 format!("tool_with_long_name_{}", i),
384 serde_json::json!({"data": "some test data that takes up space"}),
385 true,
386 );
387 logger.log(event).await.unwrap();
388 }
389 logger.flush().await.unwrap();
390
391 let stats = logger.stats().await;
393 assert!(stats.total_events > 0);
394
395 let mut count = 0;
397 let mut dir_entries = tokio::fs::read_dir(dir.path()).await.unwrap();
398 while let Some(_) = dir_entries.next_entry().await.unwrap() {
399 count += 1;
400 }
401 assert!(count >= 1);
402 }
403
404 #[tokio::test]
405 async fn test_rotation_config() {
406 let config = RotationConfig::new(RotationPolicy::Daily)
407 .with_max_files(5)
408 .with_compression(true);
409
410 assert!(matches!(config.policy, RotationPolicy::Daily));
411 assert_eq!(config.max_files, 5);
412 assert!(config.compress);
413 }
414}