Skip to main content

sh_layer4/audit_logger/
storage.rs

1//! 审计日志存储后端
2//!
3//! 支持多种存储方式。
4
5use async_trait::async_trait;
6use chrono::{DateTime, Utc};
7use parking_lot::RwLock;
8use std::collections::VecDeque;
9use std::path::PathBuf;
10use std::sync::Arc;
11
12use super::entry::{AuditEntry, AuditFilter, ExportFormat};
13use anyhow::Result;
14
15/// 审计存储 trait
16#[async_trait]
17pub trait AuditStorage: Send + Sync {
18    /// 保存审计条目
19    async fn save(&self, entry: &AuditEntry) -> Result<()>;
20
21    /// 查询审计条目
22    async fn query(&self, filter: &AuditFilter) -> Result<Vec<AuditEntry>>;
23
24    /// 导出审计日志
25    async fn export(&self, format: ExportFormat, filter: &AuditFilter) -> Result<Vec<u8>>;
26
27    /// 清理过期日志
28    async fn cleanup(&self, before: DateTime<Utc>) -> Result<usize>;
29
30    /// 获取条目数量
31    async fn count(&self) -> Result<usize>;
32}
33
34/// 内存存储 (用于测试和小规模使用)
35pub struct MemoryStorage {
36    entries: Arc<RwLock<VecDeque<AuditEntry>>>,
37    max_entries: usize,
38}
39
40impl MemoryStorage {
41    pub fn new(max_entries: usize) -> Self {
42        Self {
43            entries: Arc::new(RwLock::new(VecDeque::with_capacity(max_entries))),
44            max_entries,
45        }
46    }
47
48    /// 获取所有条目
49    pub fn get_all(&self) -> Vec<AuditEntry> {
50        self.entries.read().iter().cloned().collect()
51    }
52}
53
54#[async_trait]
55impl AuditStorage for MemoryStorage {
56    async fn save(&self, entry: &AuditEntry) -> Result<()> {
57        let mut entries = self.entries.write();
58
59        // 如果超过容量,删除最旧的条目
60        if entries.len() >= self.max_entries {
61            entries.pop_front();
62        }
63
64        entries.push_back(entry.clone());
65        Ok(())
66    }
67
68    async fn query(&self, filter: &AuditFilter) -> Result<Vec<AuditEntry>> {
69        let entries = self.entries.read();
70
71        let mut results: Vec<AuditEntry> = entries
72            .iter()
73            .filter(|e| e.matches_filter(filter))
74            .cloned()
75            .collect();
76
77        // 应用限制
78        if let Some(limit) = filter.limit {
79            results = results.into_iter().take(limit).collect();
80        }
81
82        Ok(results)
83    }
84
85    async fn export(&self, format: ExportFormat, filter: &AuditFilter) -> Result<Vec<u8>> {
86        let entries = self.query(filter).await?;
87
88        match format {
89            ExportFormat::Json => {
90                let json = serde_json::to_string_pretty(&entries)?;
91                Ok(json.into_bytes())
92            }
93            ExportFormat::Csv => {
94                let mut csv = String::from("id,timestamp,user_id,action,resource_type,result\n");
95                for entry in entries {
96                    csv.push_str(&format!(
97                        "{},{},{},{},{},{}\n",
98                        entry.id,
99                        entry.timestamp.to_rfc3339(),
100                        entry.user_id,
101                        entry.action.as_str(),
102                        entry.resource_type,
103                        if entry.result.is_success() {
104                            "success"
105                        } else {
106                            "failure"
107                        }
108                    ));
109                }
110                Ok(csv.into_bytes())
111            }
112            ExportFormat::Syslog => {
113                let mut syslog = String::new();
114                for entry in entries {
115                    syslog.push_str(&format!(
116                        "{} AUDIT: user={} action={} resource={} result={}\n",
117                        entry.timestamp.to_rfc3339(),
118                        entry.user_id,
119                        entry.action.as_str(),
120                        entry.resource_type,
121                        if entry.result.is_success() {
122                            "SUCCESS"
123                        } else {
124                            "FAILURE"
125                        }
126                    ));
127                }
128                Ok(syslog.into_bytes())
129            }
130        }
131    }
132
133    async fn cleanup(&self, before: DateTime<Utc>) -> Result<usize> {
134        let mut entries = self.entries.write();
135        let original_len = entries.len();
136
137        entries.retain(|e| e.timestamp >= before);
138
139        Ok(original_len - entries.len())
140    }
141
142    async fn count(&self) -> Result<usize> {
143        Ok(self.entries.read().len())
144    }
145}
146
147/// 文件存储
148pub struct FileStorage {
149    base_path: PathBuf,
150    #[allow(dead_code)]
151    max_file_size: usize,
152}
153
154impl FileStorage {
155    pub fn new(base_path: PathBuf) -> Self {
156        Self {
157            base_path,
158            max_file_size: 10 * 1024 * 1024, // 10MB
159        }
160    }
161
162    fn get_log_path(&self, date: DateTime<Utc>) -> PathBuf {
163        self.base_path
164            .join(format!("audit-{}.jsonl", date.format("%Y-%m-%d")))
165    }
166}
167
168#[async_trait]
169impl AuditStorage for FileStorage {
170    async fn save(&self, entry: &AuditEntry) -> Result<()> {
171        let path = self.get_log_path(entry.timestamp);
172
173        // 确保目录存在
174        if let Some(parent) = path.parent() {
175            tokio::fs::create_dir_all(parent).await?;
176        }
177
178        // 写入条目 (JSON Lines 格式)
179        let json = serde_json::to_string(entry)?;
180        let line = format!("{}\n", json);
181
182        tokio::fs::write(&path, line).await?;
183        Ok(())
184    }
185
186    async fn query(&self, filter: &AuditFilter) -> Result<Vec<AuditEntry>> {
187        // 简化实现:读取所有文件并过滤
188        let mut results = Vec::new();
189
190        // 列出所有日志文件
191        let mut entries = tokio::fs::read_dir(&self.base_path).await?;
192
193        while let Some(entry) = entries.next_entry().await? {
194            let path = entry.path();
195            if path.extension().is_some_and(|e| e == "jsonl") {
196                let content = tokio::fs::read_to_string(&path).await?;
197
198                for line in content.lines() {
199                    if let Ok(audit_entry) = serde_json::from_str::<AuditEntry>(line) {
200                        if audit_entry.matches_filter(filter) {
201                            results.push(audit_entry);
202                        }
203                    }
204                }
205            }
206        }
207
208        // 应用限制
209        if let Some(limit) = filter.limit {
210            results = results.into_iter().take(limit).collect();
211        }
212
213        Ok(results)
214    }
215
216    async fn export(&self, format: ExportFormat, filter: &AuditFilter) -> Result<Vec<u8>> {
217        let entries = self.query(filter).await?;
218
219        match format {
220            ExportFormat::Json => {
221                let json = serde_json::to_string_pretty(&entries)?;
222                Ok(json.into_bytes())
223            }
224            ExportFormat::Csv => {
225                let mut csv = String::from("id,timestamp,user_id,action,resource_type,result\n");
226                for entry in entries {
227                    csv.push_str(&format!(
228                        "{},{},{},{},{},{}\n",
229                        entry.id,
230                        entry.timestamp.to_rfc3339(),
231                        entry.user_id,
232                        entry.action.as_str(),
233                        entry.resource_type,
234                        if entry.result.is_success() {
235                            "success"
236                        } else {
237                            "failure"
238                        }
239                    ));
240                }
241                Ok(csv.into_bytes())
242            }
243            ExportFormat::Syslog => {
244                let mut syslog = String::new();
245                for entry in entries {
246                    syslog.push_str(&format!(
247                        "{} AUDIT: user={} action={} resource={} result={}\n",
248                        entry.timestamp.to_rfc3339(),
249                        entry.user_id,
250                        entry.action.as_str(),
251                        entry.resource_type,
252                        if entry.result.is_success() {
253                            "SUCCESS"
254                        } else {
255                            "FAILURE"
256                        }
257                    ));
258                }
259                Ok(syslog.into_bytes())
260            }
261        }
262    }
263
264    async fn cleanup(&self, before: DateTime<Utc>) -> Result<usize> {
265        let mut removed_count = 0;
266
267        let mut entries = tokio::fs::read_dir(&self.base_path).await?;
268
269        while let Some(entry) = entries.next_entry().await? {
270            let path = entry.path();
271
272            // 检查文件名中的日期
273            let filename = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
274
275            if let Some(date_str) = filename
276                .strip_prefix("audit-")
277                .and_then(|s| s.strip_suffix(".jsonl"))
278            {
279                if let Ok(file_date) = chrono::NaiveDate::parse_from_str(date_str, "%Y-%m-%d") {
280                    let file_datetime = file_date.and_hms_opt(0, 0, 0).unwrap().and_utc();
281                    if file_datetime < before {
282                        tokio::fs::remove_file(&path).await?;
283                        removed_count += 1;
284                    }
285                }
286            }
287        }
288
289        Ok(removed_count)
290    }
291
292    async fn count(&self) -> Result<usize> {
293        let entries = self.query(&AuditFilter::default()).await?;
294        Ok(entries.len())
295    }
296}
297
298#[cfg(test)]
299mod tests {
300    use super::super::entry::AuditAction;
301    use super::*;
302    use tempfile::tempdir;
303
304    #[tokio::test]
305    async fn test_memory_storage() {
306        let storage = MemoryStorage::new(100);
307
308        let entry = AuditEntry::new("user1", AuditAction::Read, "doc");
309        storage.save(&entry).await.unwrap();
310
311        let entries = storage.query(&AuditFilter::default()).await.unwrap();
312        assert_eq!(entries.len(), 1);
313    }
314
315    #[tokio::test]
316    async fn test_memory_storage_limit() {
317        let storage = MemoryStorage::new(2);
318
319        storage
320            .save(&AuditEntry::new("user1", AuditAction::Read, "doc"))
321            .await
322            .unwrap();
323        storage
324            .save(&AuditEntry::new("user2", AuditAction::Read, "doc"))
325            .await
326            .unwrap();
327        storage
328            .save(&AuditEntry::new("user3", AuditAction::Read, "doc"))
329            .await
330            .unwrap();
331
332        let count = storage.count().await.unwrap();
333        assert_eq!(count, 2);
334    }
335
336    #[tokio::test]
337    async fn test_export_json() {
338        let storage = MemoryStorage::new(100);
339        storage
340            .save(&AuditEntry::new("user1", AuditAction::Login, "session"))
341            .await
342            .unwrap();
343
344        let data = storage
345            .export(ExportFormat::Json, &AuditFilter::default())
346            .await
347            .unwrap();
348        let json_str = String::from_utf8(data).unwrap();
349        assert!(json_str.contains("user1"));
350    }
351
352    #[tokio::test]
353    async fn test_file_storage() {
354        let dir = tempdir().unwrap();
355        let storage = FileStorage::new(dir.path().to_path_buf());
356
357        let entry = AuditEntry::new("user1", AuditAction::Read, "doc");
358        storage.save(&entry).await.unwrap();
359
360        let entries = storage.query(&AuditFilter::default()).await.unwrap();
361        assert!(!entries.is_empty());
362    }
363}