reddb_server/runtime/
audit_query.rs1use std::path::Path;
14
15use crate::runtime::audit_log::{AuditEvent, Outcome};
16
17#[derive(Debug, Clone, Default)]
20pub struct AuditQuery {
21 pub since_ms: Option<u128>,
22 pub until_ms: Option<u128>,
23 pub principal: Option<String>,
24 pub tenant: Option<String>,
25 pub action_prefix: Option<String>,
28 pub outcome: Option<Outcome>,
29 pub limit: usize,
32}
33
34impl AuditQuery {
35 pub fn new() -> Self {
36 Self {
37 limit: 100,
38 ..Default::default()
39 }
40 }
41
42 fn matches(&self, ev: &AuditEvent) -> bool {
43 if let Some(since) = self.since_ms {
44 if ev.ts < since {
45 return false;
46 }
47 }
48 if let Some(until) = self.until_ms {
49 if ev.ts > until {
50 return false;
51 }
52 }
53 if let Some(principal) = &self.principal {
54 match &ev.principal {
55 Some(p) if p == principal => {}
56 _ => return false,
57 }
58 }
59 if let Some(tenant) = &self.tenant {
60 match &ev.tenant {
61 Some(t) if t == tenant => {}
62 _ => return false,
63 }
64 }
65 if let Some(prefix) = &self.action_prefix {
66 if !ev.action.starts_with(prefix) {
67 return false;
68 }
69 }
70 if let Some(outcome) = self.outcome {
71 if ev.outcome != outcome {
72 return false;
73 }
74 }
75 true
76 }
77}
78
79pub fn run_query(active_path: &Path, query: &AuditQuery) -> Vec<AuditEvent> {
85 let mut events: Vec<AuditEvent> = Vec::new();
86 let parent = active_path.parent().unwrap_or_else(|| Path::new("."));
87 let stem = active_path
88 .file_name()
89 .and_then(|s| s.to_str())
90 .unwrap_or(".audit.log");
91
92 let mut rotated: Vec<(u128, std::path::PathBuf)> = Vec::new();
93 if let Ok(rd) = std::fs::read_dir(parent) {
94 for entry in rd.flatten() {
95 let name = entry.file_name();
96 let Some(name_s) = name.to_str() else {
97 continue;
98 };
99 if !name_s.starts_with(&format!("{stem}.")) {
100 continue;
101 }
102 let after = &name_s[stem.len() + 1..];
105 let ts_part = after.trim_end_matches(".zst");
106 if let Ok(ts) = ts_part.parse::<u128>() {
107 rotated.push((ts, entry.path()));
108 }
109 }
110 }
111 rotated.sort_by_key(|(ts, _)| *ts);
112
113 for (_, path) in &rotated {
114 let bytes = match std::fs::read(path) {
115 Ok(b) => b,
116 Err(_) => continue,
117 };
118 let plain = if path
119 .extension()
120 .and_then(|e| e.to_str())
121 .map(|e| e == "zst")
122 .unwrap_or(false)
123 {
124 match zstd::bulk::decompress(&bytes, 256 * 1024 * 1024) {
125 Ok(p) => p,
126 Err(_) => continue,
127 }
128 } else {
129 bytes
130 };
131 ingest_buffer(&plain, query, &mut events);
132 }
133
134 if let Ok(active_bytes) = std::fs::read(active_path) {
135 ingest_buffer(&active_bytes, query, &mut events);
136 }
137
138 if events.len() > query.limit {
139 let take = query.limit;
140 let drop = events.len() - take;
141 events.drain(0..drop);
142 }
143 events
144}
145
146fn ingest_buffer(bytes: &[u8], query: &AuditQuery, out: &mut Vec<AuditEvent>) {
147 let Ok(text) = std::str::from_utf8(bytes) else {
148 return;
149 };
150 for line in text.lines() {
151 if line.is_empty() {
152 continue;
153 }
154 let Some(ev) = AuditEvent::parse_line(line) else {
155 continue;
156 };
157 if query.matches(&ev) {
158 out.push(ev);
159 }
160 }
161}
162
163pub fn events_to_json_array(events: &[AuditEvent]) -> crate::json::Value {
166 use crate::json::{Map, Value};
167 let mut arr: Vec<Value> = Vec::with_capacity(events.len());
168 for ev in events {
169 let line = ev.to_json_line(None);
170 if let Ok(v) = crate::json::from_str::<Value>(&line) {
171 arr.push(v);
172 }
173 }
174 let mut obj = Map::new();
175 obj.insert("count".to_string(), Value::Number(events.len() as f64));
176 obj.insert("events".to_string(), Value::Array(arr));
177 Value::Object(obj)
178}
179
180pub fn parse_time_arg(raw: &str) -> Option<u128> {
188 let trimmed = raw.trim();
189 if trimmed.is_empty() {
190 return None;
191 }
192 if let Ok(ms) = trimmed.parse::<u128>() {
193 return Some(ms);
194 }
195 parse_rfc3339_ms(trimmed)
196}
197
198pub fn parse_rfc3339_ms(s: &str) -> Option<u128> {
202 let bytes = s.as_bytes();
203 if bytes.len() < 20 {
204 return None;
205 }
206 if !s.ends_with('Z') {
207 return None;
208 }
209 let year: i64 = s.get(0..4)?.parse().ok()?;
210 let month: u32 = s.get(5..7)?.parse().ok()?;
211 let day: u32 = s.get(8..10)?.parse().ok()?;
212 if &bytes[4..5] != b"-" || &bytes[7..8] != b"-" || &bytes[10..11] != b"T" {
213 return None;
214 }
215 let hour: u64 = s.get(11..13)?.parse().ok()?;
216 let minute: u64 = s.get(14..16)?.parse().ok()?;
217 let second: u64 = s.get(17..19)?.parse().ok()?;
218 if &bytes[13..14] != b":" || &bytes[16..17] != b":" {
219 return None;
220 }
221 let mut ms_extra: u64 = 0;
222 if bytes.len() > 20 {
223 if &bytes[19..20] == b"." {
225 let dot_end = s.len() - 1; let frac = s.get(20..dot_end)?;
228 if frac.len() > 9 || frac.is_empty() {
229 return None;
230 }
231 let mut digits = String::with_capacity(3);
233 for c in frac.chars().take(3) {
234 if !c.is_ascii_digit() {
235 return None;
236 }
237 digits.push(c);
238 }
239 while digits.len() < 3 {
240 digits.push('0');
241 }
242 ms_extra = digits.parse().ok()?;
243 } else if &bytes[19..20] != b"Z" {
244 return None;
245 }
246 }
247 let days = days_from_civil(year, month, day);
248 let secs =
249 (days as i128) * 86_400 + (hour as i128) * 3600 + (minute as i128) * 60 + second as i128;
250 let ms = secs * 1000 + ms_extra as i128;
251 if ms < 0 {
252 return None;
253 }
254 Some(ms as u128)
255}
256
257fn days_from_civil(y: i64, m: u32, d: u32) -> i64 {
260 let y = if m <= 2 { y - 1 } else { y };
261 let era = if y >= 0 { y } else { y - 399 } / 400;
262 let yoe = (y - era * 400) as u64;
263 let m = m as u64;
264 let d = d as u64;
265 let doy = (153 * (if m > 2 { m - 3 } else { m + 9 }) + 2) / 5 + d - 1;
266 let doe = yoe * 365 + yoe / 4 - yoe / 100 + doy;
267 era * 146_097 + (doe as i64) - 719_468
268}
269
270#[cfg(test)]
271mod tests {
272 use super::*;
273 use crate::runtime::audit_log::{AuditAuthSource, AuditEvent, AuditLogger, Outcome};
274 use std::path::PathBuf;
275 use std::time::Duration;
276
277 fn temp_path(tag: &str) -> PathBuf {
278 let mut p = std::env::temp_dir();
279 p.push(format!(
280 "reddb-audit-query-{}-{}-{}",
281 tag,
282 std::process::id(),
283 crate::utils::now_unix_nanos()
284 ));
285 std::fs::create_dir_all(&p).unwrap();
286 p.push("data.rdb");
287 p
288 }
289
290 #[test]
291 fn filters_by_principal_and_action_prefix() {
292 let data = temp_path("filter");
293 let logger = AuditLogger::for_data_path(&data);
294 for who in ["alice", "bob", "alice", "carol"] {
295 logger.record_event(
296 AuditEvent::builder("auth/login.ok")
297 .principal(who)
298 .source(AuditAuthSource::Password)
299 .build(),
300 );
301 }
302 logger.record_event(
303 AuditEvent::builder("admin/shutdown")
304 .principal("alice")
305 .source(AuditAuthSource::Session)
306 .outcome(Outcome::Success)
307 .build(),
308 );
309 assert!(logger.wait_idle(Duration::from_secs(2)));
310
311 let q = AuditQuery {
312 principal: Some("alice".to_string()),
313 action_prefix: Some("auth/".to_string()),
314 limit: 100,
315 ..Default::default()
316 };
317 let hits = run_query(logger.path(), &q);
318 assert_eq!(hits.len(), 2, "expected two alice/auth lines");
319 assert!(hits.iter().all(|e| e.principal.as_deref() == Some("alice")));
320 assert!(hits.iter().all(|e| e.action.starts_with("auth/")));
321 }
322
323 #[test]
324 fn parse_time_accepts_rfc3339_and_ms() {
325 assert_eq!(
326 parse_time_arg("2024-02-29T12:34:56.789Z"),
327 Some(1_709_210_096_789)
328 );
329 assert_eq!(parse_time_arg("1709210096789"), Some(1_709_210_096_789));
330 assert_eq!(parse_time_arg("not a time"), None);
331 }
332
333 #[test]
334 fn limit_caps_oldest_off() {
335 let data = temp_path("limit");
336 let logger = AuditLogger::for_data_path(&data);
337 for i in 0..10 {
338 logger.record_event(AuditEvent::builder(format!("test/n/{i}")).build());
339 }
340 assert!(logger.wait_idle(Duration::from_secs(2)));
341 let q = AuditQuery {
342 limit: 3,
343 ..Default::default()
344 };
345 let hits = run_query(logger.path(), &q);
346 assert_eq!(hits.len(), 3);
347 assert_eq!(hits[0].action, "test/n/7");
349 assert_eq!(hits[2].action, "test/n/9");
350 }
351}