reddb_server/runtime/
audit_query.rs1use std::path::Path;
14
15use crate::runtime::audit_log::{AuditEvent, Outcome};
16
17#[derive(Debug, Clone, Default)]
20pub struct AuditQuery {
21 pub since_ms: Option<u128>,
22 pub until_ms: Option<u128>,
23 pub principal: Option<String>,
24 pub tenant: Option<String>,
25 pub action_prefix: Option<String>,
28 pub outcome: Option<Outcome>,
29 pub limit: usize,
32}
33
34impl AuditQuery {
35 pub fn new() -> Self {
36 Self {
37 limit: 100,
38 ..Default::default()
39 }
40 }
41
42 fn matches(&self, ev: &AuditEvent) -> bool {
43 if let Some(since) = self.since_ms {
44 if ev.ts < since {
45 return false;
46 }
47 }
48 if let Some(until) = self.until_ms {
49 if ev.ts > until {
50 return false;
51 }
52 }
53 if let Some(principal) = &self.principal {
54 match &ev.principal {
55 Some(p) if p == principal => {}
56 _ => return false,
57 }
58 }
59 if let Some(tenant) = &self.tenant {
60 match &ev.tenant {
61 Some(t) if t == tenant => {}
62 _ => return false,
63 }
64 }
65 if let Some(prefix) = &self.action_prefix {
66 if !ev.action.starts_with(prefix) {
67 return false;
68 }
69 }
70 if let Some(outcome) = self.outcome {
71 if ev.outcome != outcome {
72 return false;
73 }
74 }
75 true
76 }
77}
78
79pub fn run_query(active_path: &Path, query: &AuditQuery) -> Vec<AuditEvent> {
85 let mut events: Vec<AuditEvent> = Vec::new();
86 let parent = active_path.parent().unwrap_or_else(|| Path::new("."));
87
88 let mut rotated: Vec<(u128, std::path::PathBuf)> = Vec::new();
89 if let Ok(rd) = std::fs::read_dir(parent) {
90 for entry in rd.flatten() {
91 let name = entry.file_name();
92 let Some(name_s) = name.to_str() else {
93 continue;
94 };
95 if let Some(ts) =
96 reddb_file::layout::parse_audit_log_rotated_timestamp(active_path, name_s)
97 {
98 rotated.push((ts, entry.path()));
99 }
100 }
101 }
102 rotated.sort_by_key(|(ts, _)| *ts);
103
104 for (_, path) in &rotated {
105 let bytes = match std::fs::read(path) {
106 Ok(b) => b,
107 Err(_) => continue,
108 };
109 let plain = if path
110 .extension()
111 .and_then(|e| e.to_str())
112 .map(|e| e == reddb_file::layout::AUDIT_LOG_ROTATED_COMPRESSED_EXTENSION)
113 .unwrap_or(false)
114 {
115 match zstd::bulk::decompress(&bytes, 256 * 1024 * 1024) {
116 Ok(p) => p,
117 Err(_) => continue,
118 }
119 } else {
120 bytes
121 };
122 ingest_buffer(&plain, query, &mut events);
123 }
124
125 if let Ok(active_bytes) = std::fs::read(active_path) {
126 ingest_buffer(&active_bytes, query, &mut events);
127 }
128
129 if events.len() > query.limit {
130 let take = query.limit;
131 let drop = events.len() - take;
132 events.drain(0..drop);
133 }
134 events
135}
136
137fn ingest_buffer(bytes: &[u8], query: &AuditQuery, out: &mut Vec<AuditEvent>) {
138 let Ok(text) = std::str::from_utf8(bytes) else {
139 return;
140 };
141 for line in text.lines() {
142 if line.is_empty() {
143 continue;
144 }
145 let Some(ev) = AuditEvent::parse_line(line) else {
146 continue;
147 };
148 if query.matches(&ev) {
149 out.push(ev);
150 }
151 }
152}
153
154pub fn events_to_json_array(events: &[AuditEvent]) -> crate::json::Value {
157 use crate::json::{Map, Value};
158 let mut arr: Vec<Value> = Vec::with_capacity(events.len());
159 for ev in events {
160 let line = ev.to_json_line(None);
161 if let Ok(v) = crate::json::from_str::<Value>(&line) {
162 arr.push(v);
163 }
164 }
165 let mut obj = Map::new();
166 obj.insert("count".to_string(), Value::Number(events.len() as f64));
167 obj.insert("events".to_string(), Value::Array(arr));
168 Value::Object(obj)
169}
170
171pub fn parse_time_arg(raw: &str) -> Option<u128> {
179 let trimmed = raw.trim();
180 if trimmed.is_empty() {
181 return None;
182 }
183 if let Ok(ms) = trimmed.parse::<u128>() {
184 return Some(ms);
185 }
186 parse_rfc3339_ms(trimmed)
187}
188
189pub fn parse_rfc3339_ms(s: &str) -> Option<u128> {
193 let bytes = s.as_bytes();
194 if bytes.len() < 20 {
195 return None;
196 }
197 if !s.ends_with('Z') {
198 return None;
199 }
200 let year: i64 = s.get(0..4)?.parse().ok()?;
201 let month: u32 = s.get(5..7)?.parse().ok()?;
202 let day: u32 = s.get(8..10)?.parse().ok()?;
203 if &bytes[4..5] != b"-" || &bytes[7..8] != b"-" || &bytes[10..11] != b"T" {
204 return None;
205 }
206 let hour: u64 = s.get(11..13)?.parse().ok()?;
207 let minute: u64 = s.get(14..16)?.parse().ok()?;
208 let second: u64 = s.get(17..19)?.parse().ok()?;
209 if &bytes[13..14] != b":" || &bytes[16..17] != b":" {
210 return None;
211 }
212 let mut ms_extra: u64 = 0;
213 if bytes.len() > 20 {
214 if &bytes[19..20] == b"." {
216 let dot_end = s.len() - 1; let frac = s.get(20..dot_end)?;
219 if frac.len() > 9 || frac.is_empty() {
220 return None;
221 }
222 let mut digits = String::with_capacity(3);
224 for c in frac.chars().take(3) {
225 if !c.is_ascii_digit() {
226 return None;
227 }
228 digits.push(c);
229 }
230 while digits.len() < 3 {
231 digits.push('0');
232 }
233 ms_extra = digits.parse().ok()?;
234 } else if &bytes[19..20] != b"Z" {
235 return None;
236 }
237 }
238 let days = days_from_civil(year, month, day);
239 let secs =
240 (days as i128) * 86_400 + (hour as i128) * 3600 + (minute as i128) * 60 + second as i128;
241 let ms = secs * 1000 + ms_extra as i128;
242 if ms < 0 {
243 return None;
244 }
245 Some(ms as u128)
246}
247
248fn days_from_civil(y: i64, m: u32, d: u32) -> i64 {
251 let y = if m <= 2 { y - 1 } else { y };
252 let era = if y >= 0 { y } else { y - 399 } / 400;
253 let yoe = (y - era * 400) as u64;
254 let m = m as u64;
255 let d = d as u64;
256 let doy = (153 * (if m > 2 { m - 3 } else { m + 9 }) + 2) / 5 + d - 1;
257 let doe = yoe * 365 + yoe / 4 - yoe / 100 + doy;
258 era * 146_097 + (doe as i64) - 719_468
259}
260
261#[cfg(test)]
262mod tests {
263 use super::*;
264 use crate::runtime::audit_log::{AuditAuthSource, AuditEvent, AuditLogger, Outcome};
265 use std::path::PathBuf;
266 use std::time::Duration;
267
268 fn temp_path(tag: &str) -> PathBuf {
269 let mut p = std::env::temp_dir();
270 p.push(format!(
271 "reddb-audit-query-{}-{}-{}",
272 tag,
273 std::process::id(),
274 crate::utils::now_unix_nanos()
275 ));
276 std::fs::create_dir_all(&p).unwrap();
277 p.push("data.rdb");
278 p
279 }
280
281 #[test]
282 fn filters_by_principal_and_action_prefix() {
283 let data = temp_path("filter");
284 let logger = AuditLogger::for_data_path(&data);
285 for who in ["alice", "bob", "alice", "carol"] {
286 logger.record_event(
287 AuditEvent::builder("auth/login.ok")
288 .principal(who)
289 .source(AuditAuthSource::Password)
290 .build(),
291 );
292 }
293 logger.record_event(
294 AuditEvent::builder("admin/shutdown")
295 .principal("alice")
296 .source(AuditAuthSource::Session)
297 .outcome(Outcome::Success)
298 .build(),
299 );
300 assert!(logger.wait_idle(Duration::from_secs(2)));
301
302 let q = AuditQuery {
303 principal: Some("alice".to_string()),
304 action_prefix: Some("auth/".to_string()),
305 limit: 100,
306 ..Default::default()
307 };
308 let hits = run_query(logger.path(), &q);
309 assert_eq!(hits.len(), 2, "expected two alice/auth lines");
310 assert!(hits.iter().all(|e| e.principal.as_deref() == Some("alice")));
311 assert!(hits.iter().all(|e| e.action.starts_with("auth/")));
312 }
313
314 #[test]
315 fn parse_time_accepts_rfc3339_and_ms() {
316 assert_eq!(
317 parse_time_arg("2024-02-29T12:34:56.789Z"),
318 Some(1_709_210_096_789)
319 );
320 assert_eq!(parse_time_arg("1709210096789"), Some(1_709_210_096_789));
321 assert_eq!(parse_time_arg("not a time"), None);
322 }
323
324 #[test]
325 fn limit_caps_oldest_off() {
326 let data = temp_path("limit");
327 let logger = AuditLogger::for_data_path(&data);
328 for i in 0..10 {
329 logger.record_event(AuditEvent::builder(format!("test/n/{i}")).build());
330 }
331 assert!(logger.wait_idle(Duration::from_secs(2)));
332 let q = AuditQuery {
333 limit: 3,
334 ..Default::default()
335 };
336 let hits = run_query(logger.path(), &q);
337 assert_eq!(hits.len(), 3);
338 assert_eq!(hits[0].action, "test/n/7");
340 assert_eq!(hits[2].action, "test/n/9");
341 }
342}