1use async_trait::async_trait;
6use chrono::{DateTime, Utc};
7use parking_lot::RwLock;
8use std::collections::VecDeque;
9use std::path::PathBuf;
10use std::sync::Arc;
11
12use super::entry::{AuditEntry, AuditFilter, ExportFormat};
13use anyhow::Result;
14
15#[async_trait]
17pub trait AuditStorage: Send + Sync {
18 async fn save(&self, entry: &AuditEntry) -> Result<()>;
20
21 async fn query(&self, filter: &AuditFilter) -> Result<Vec<AuditEntry>>;
23
24 async fn export(&self, format: ExportFormat, filter: &AuditFilter) -> Result<Vec<u8>>;
26
27 async fn cleanup(&self, before: DateTime<Utc>) -> Result<usize>;
29
30 async fn count(&self) -> Result<usize>;
32}
33
34pub struct MemoryStorage {
36 entries: Arc<RwLock<VecDeque<AuditEntry>>>,
37 max_entries: usize,
38}
39
40impl MemoryStorage {
41 pub fn new(max_entries: usize) -> Self {
42 Self {
43 entries: Arc::new(RwLock::new(VecDeque::with_capacity(max_entries))),
44 max_entries,
45 }
46 }
47
48 pub fn get_all(&self) -> Vec<AuditEntry> {
50 self.entries.read().iter().cloned().collect()
51 }
52}
53
54#[async_trait]
55impl AuditStorage for MemoryStorage {
56 async fn save(&self, entry: &AuditEntry) -> Result<()> {
57 let mut entries = self.entries.write();
58
59 if entries.len() >= self.max_entries {
61 entries.pop_front();
62 }
63
64 entries.push_back(entry.clone());
65 Ok(())
66 }
67
68 async fn query(&self, filter: &AuditFilter) -> Result<Vec<AuditEntry>> {
69 let entries = self.entries.read();
70
71 let mut results: Vec<AuditEntry> = entries
72 .iter()
73 .filter(|e| e.matches_filter(filter))
74 .cloned()
75 .collect();
76
77 if let Some(limit) = filter.limit {
79 results = results.into_iter().take(limit).collect();
80 }
81
82 Ok(results)
83 }
84
85 async fn export(&self, format: ExportFormat, filter: &AuditFilter) -> Result<Vec<u8>> {
86 let entries = self.query(filter).await?;
87
88 match format {
89 ExportFormat::Json => {
90 let json = serde_json::to_string_pretty(&entries)?;
91 Ok(json.into_bytes())
92 }
93 ExportFormat::Csv => {
94 let mut csv = String::from("id,timestamp,user_id,action,resource_type,result\n");
95 for entry in entries {
96 csv.push_str(&format!(
97 "{},{},{},{},{},{}\n",
98 entry.id,
99 entry.timestamp.to_rfc3339(),
100 entry.user_id,
101 entry.action.as_str(),
102 entry.resource_type,
103 if entry.result.is_success() {
104 "success"
105 } else {
106 "failure"
107 }
108 ));
109 }
110 Ok(csv.into_bytes())
111 }
112 ExportFormat::Syslog => {
113 let mut syslog = String::new();
114 for entry in entries {
115 syslog.push_str(&format!(
116 "{} AUDIT: user={} action={} resource={} result={}\n",
117 entry.timestamp.to_rfc3339(),
118 entry.user_id,
119 entry.action.as_str(),
120 entry.resource_type,
121 if entry.result.is_success() {
122 "SUCCESS"
123 } else {
124 "FAILURE"
125 }
126 ));
127 }
128 Ok(syslog.into_bytes())
129 }
130 }
131 }
132
133 async fn cleanup(&self, before: DateTime<Utc>) -> Result<usize> {
134 let mut entries = self.entries.write();
135 let original_len = entries.len();
136
137 entries.retain(|e| e.timestamp >= before);
138
139 Ok(original_len - entries.len())
140 }
141
142 async fn count(&self) -> Result<usize> {
143 Ok(self.entries.read().len())
144 }
145}
146
147pub struct FileStorage {
149 base_path: PathBuf,
150 #[allow(dead_code)]
151 max_file_size: usize,
152}
153
154impl FileStorage {
155 pub fn new(base_path: PathBuf) -> Self {
156 Self {
157 base_path,
158 max_file_size: 10 * 1024 * 1024, }
160 }
161
162 fn get_log_path(&self, date: DateTime<Utc>) -> PathBuf {
163 self.base_path
164 .join(format!("audit-{}.jsonl", date.format("%Y-%m-%d")))
165 }
166}
167
168#[async_trait]
169impl AuditStorage for FileStorage {
170 async fn save(&self, entry: &AuditEntry) -> Result<()> {
171 let path = self.get_log_path(entry.timestamp);
172
173 if let Some(parent) = path.parent() {
175 tokio::fs::create_dir_all(parent).await?;
176 }
177
178 let json = serde_json::to_string(entry)?;
180 let line = format!("{}\n", json);
181
182 tokio::fs::write(&path, line).await?;
183 Ok(())
184 }
185
186 async fn query(&self, filter: &AuditFilter) -> Result<Vec<AuditEntry>> {
187 let mut results = Vec::new();
189
190 let mut entries = tokio::fs::read_dir(&self.base_path).await?;
192
193 while let Some(entry) = entries.next_entry().await? {
194 let path = entry.path();
195 if path.extension().is_some_and(|e| e == "jsonl") {
196 let content = tokio::fs::read_to_string(&path).await?;
197
198 for line in content.lines() {
199 if let Ok(audit_entry) = serde_json::from_str::<AuditEntry>(line) {
200 if audit_entry.matches_filter(filter) {
201 results.push(audit_entry);
202 }
203 }
204 }
205 }
206 }
207
208 if let Some(limit) = filter.limit {
210 results = results.into_iter().take(limit).collect();
211 }
212
213 Ok(results)
214 }
215
216 async fn export(&self, format: ExportFormat, filter: &AuditFilter) -> Result<Vec<u8>> {
217 let entries = self.query(filter).await?;
218
219 match format {
220 ExportFormat::Json => {
221 let json = serde_json::to_string_pretty(&entries)?;
222 Ok(json.into_bytes())
223 }
224 ExportFormat::Csv => {
225 let mut csv = String::from("id,timestamp,user_id,action,resource_type,result\n");
226 for entry in entries {
227 csv.push_str(&format!(
228 "{},{},{},{},{},{}\n",
229 entry.id,
230 entry.timestamp.to_rfc3339(),
231 entry.user_id,
232 entry.action.as_str(),
233 entry.resource_type,
234 if entry.result.is_success() {
235 "success"
236 } else {
237 "failure"
238 }
239 ));
240 }
241 Ok(csv.into_bytes())
242 }
243 ExportFormat::Syslog => {
244 let mut syslog = String::new();
245 for entry in entries {
246 syslog.push_str(&format!(
247 "{} AUDIT: user={} action={} resource={} result={}\n",
248 entry.timestamp.to_rfc3339(),
249 entry.user_id,
250 entry.action.as_str(),
251 entry.resource_type,
252 if entry.result.is_success() {
253 "SUCCESS"
254 } else {
255 "FAILURE"
256 }
257 ));
258 }
259 Ok(syslog.into_bytes())
260 }
261 }
262 }
263
264 async fn cleanup(&self, before: DateTime<Utc>) -> Result<usize> {
265 let mut removed_count = 0;
266
267 let mut entries = tokio::fs::read_dir(&self.base_path).await?;
268
269 while let Some(entry) = entries.next_entry().await? {
270 let path = entry.path();
271
272 let filename = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
274
275 if let Some(date_str) = filename
276 .strip_prefix("audit-")
277 .and_then(|s| s.strip_suffix(".jsonl"))
278 {
279 if let Ok(file_date) = chrono::NaiveDate::parse_from_str(date_str, "%Y-%m-%d") {
280 let file_datetime = file_date.and_hms_opt(0, 0, 0).unwrap().and_utc();
281 if file_datetime < before {
282 tokio::fs::remove_file(&path).await?;
283 removed_count += 1;
284 }
285 }
286 }
287 }
288
289 Ok(removed_count)
290 }
291
292 async fn count(&self) -> Result<usize> {
293 let entries = self.query(&AuditFilter::default()).await?;
294 Ok(entries.len())
295 }
296}
297
298#[cfg(test)]
299mod tests {
300 use super::super::entry::AuditAction;
301 use super::*;
302 use tempfile::tempdir;
303
304 #[tokio::test]
305 async fn test_memory_storage() {
306 let storage = MemoryStorage::new(100);
307
308 let entry = AuditEntry::new("user1", AuditAction::Read, "doc");
309 storage.save(&entry).await.unwrap();
310
311 let entries = storage.query(&AuditFilter::default()).await.unwrap();
312 assert_eq!(entries.len(), 1);
313 }
314
315 #[tokio::test]
316 async fn test_memory_storage_limit() {
317 let storage = MemoryStorage::new(2);
318
319 storage
320 .save(&AuditEntry::new("user1", AuditAction::Read, "doc"))
321 .await
322 .unwrap();
323 storage
324 .save(&AuditEntry::new("user2", AuditAction::Read, "doc"))
325 .await
326 .unwrap();
327 storage
328 .save(&AuditEntry::new("user3", AuditAction::Read, "doc"))
329 .await
330 .unwrap();
331
332 let count = storage.count().await.unwrap();
333 assert_eq!(count, 2);
334 }
335
336 #[tokio::test]
337 async fn test_export_json() {
338 let storage = MemoryStorage::new(100);
339 storage
340 .save(&AuditEntry::new("user1", AuditAction::Login, "session"))
341 .await
342 .unwrap();
343
344 let data = storage
345 .export(ExportFormat::Json, &AuditFilter::default())
346 .await
347 .unwrap();
348 let json_str = String::from_utf8(data).unwrap();
349 assert!(json_str.contains("user1"));
350 }
351
352 #[tokio::test]
353 async fn test_file_storage() {
354 let dir = tempdir().unwrap();
355 let storage = FileStorage::new(dir.path().to_path_buf());
356
357 let entry = AuditEntry::new("user1", AuditAction::Read, "doc");
358 storage.save(&entry).await.unwrap();
359
360 let entries = storage.query(&AuditFilter::default()).await.unwrap();
361 assert!(!entries.is_empty());
362 }
363}