Skip to main content

reddb_server/runtime/
audit_query.rs

1//! Audit log query / replay helpers.
2//!
3//! Backs the `GET /admin/audit` endpoint. Reads the active
4//! `.audit.log` plus rotated siblings (`.audit.log.<ms>.zst`),
5//! parses each line into [`AuditEvent`], and applies the request
6//! filters in memory. The audit volume on a typical RedDB deploy is
7//! orders of magnitude smaller than the data plane (admin actions,
8//! auth events, lease transitions) so a linear scan over the rotated
9//! tail is acceptable. If the volume ever justifies it, a real index
10//! lives one refactor away — slot a sled / parquet sidecar in here
11//! without touching the public surface.
12
13use std::path::Path;
14
15use crate::runtime::audit_log::{AuditEvent, Outcome};
16
17/// Query filters. All fields are optional; an empty `Query` returns
18/// the entire window up to `limit`.
19#[derive(Debug, Clone, Default)]
20pub struct AuditQuery {
21    pub since_ms: Option<u128>,
22    pub until_ms: Option<u128>,
23    pub principal: Option<String>,
24    pub tenant: Option<String>,
25    /// Prefix match on `action` (e.g. `auth/`, `admin/`,
26    /// `lease/acquire`). Empty string matches everything.
27    pub action_prefix: Option<String>,
28    pub outcome: Option<Outcome>,
29    /// Hard cap on the number of returned events. Server should clamp
30    /// this to a sensible maximum (e.g. 1000) before passing through.
31    pub limit: usize,
32}
33
34impl AuditQuery {
35    pub fn new() -> Self {
36        Self {
37            limit: 100,
38            ..Default::default()
39        }
40    }
41
42    fn matches(&self, ev: &AuditEvent) -> bool {
43        if let Some(since) = self.since_ms {
44            if ev.ts < since {
45                return false;
46            }
47        }
48        if let Some(until) = self.until_ms {
49            if ev.ts > until {
50                return false;
51            }
52        }
53        if let Some(principal) = &self.principal {
54            match &ev.principal {
55                Some(p) if p == principal => {}
56                _ => return false,
57            }
58        }
59        if let Some(tenant) = &self.tenant {
60            match &ev.tenant {
61                Some(t) if t == tenant => {}
62                _ => return false,
63            }
64        }
65        if let Some(prefix) = &self.action_prefix {
66            if !ev.action.starts_with(prefix) {
67                return false;
68            }
69        }
70        if let Some(outcome) = self.outcome {
71            if ev.outcome != outcome {
72                return false;
73            }
74        }
75        true
76    }
77}
78
79/// Run `query` against the audit log rooted at `active_path` (the
80/// current `.audit.log`). Walks the active file plus every sibling
81/// rotated archive (`.audit.log.<ms>.zst`), oldest-first by filename.
82/// Returns the matching events in chronological order, capped at
83/// `query.limit`.
84pub fn run_query(active_path: &Path, query: &AuditQuery) -> Vec<AuditEvent> {
85    let mut events: Vec<AuditEvent> = Vec::new();
86    let parent = active_path.parent().unwrap_or_else(|| Path::new("."));
87    let stem = active_path
88        .file_name()
89        .and_then(|s| s.to_str())
90        .unwrap_or(".audit.log");
91
92    let mut rotated: Vec<(u128, std::path::PathBuf)> = Vec::new();
93    if let Ok(rd) = std::fs::read_dir(parent) {
94        for entry in rd.flatten() {
95            let name = entry.file_name();
96            let Some(name_s) = name.to_str() else {
97                continue;
98            };
99            if !name_s.starts_with(&format!("{stem}.")) {
100                continue;
101            }
102            // Two recognized shapes: `<stem>.<ms>` (uncompressed
103            // fallback) and `<stem>.<ms>.zst`.
104            let after = &name_s[stem.len() + 1..];
105            let ts_part = after.trim_end_matches(".zst");
106            if let Ok(ts) = ts_part.parse::<u128>() {
107                rotated.push((ts, entry.path()));
108            }
109        }
110    }
111    rotated.sort_by_key(|(ts, _)| *ts);
112
113    for (_, path) in &rotated {
114        let bytes = match std::fs::read(path) {
115            Ok(b) => b,
116            Err(_) => continue,
117        };
118        let plain = if path
119            .extension()
120            .and_then(|e| e.to_str())
121            .map(|e| e == "zst")
122            .unwrap_or(false)
123        {
124            match zstd::bulk::decompress(&bytes, 256 * 1024 * 1024) {
125                Ok(p) => p,
126                Err(_) => continue,
127            }
128        } else {
129            bytes
130        };
131        ingest_buffer(&plain, query, &mut events);
132    }
133
134    if let Ok(active_bytes) = std::fs::read(active_path) {
135        ingest_buffer(&active_bytes, query, &mut events);
136    }
137
138    if events.len() > query.limit {
139        let take = query.limit;
140        let drop = events.len() - take;
141        events.drain(0..drop);
142    }
143    events
144}
145
146fn ingest_buffer(bytes: &[u8], query: &AuditQuery, out: &mut Vec<AuditEvent>) {
147    let Ok(text) = std::str::from_utf8(bytes) else {
148        return;
149    };
150    for line in text.lines() {
151        if line.is_empty() {
152            continue;
153        }
154        let Some(ev) = AuditEvent::parse_line(line) else {
155            continue;
156        };
157        if query.matches(&ev) {
158            out.push(ev);
159        }
160    }
161}
162
163/// Render a list of events as a JSON array (returned by the HTTP
164/// query handler). Stable field set so dashboards stay locked.
165pub fn events_to_json_array(events: &[AuditEvent]) -> crate::json::Value {
166    use crate::json::{Map, Value};
167    let mut arr: Vec<Value> = Vec::with_capacity(events.len());
168    for ev in events {
169        let line = ev.to_json_line(None);
170        if let Ok(v) = crate::json::from_str::<Value>(&line) {
171            arr.push(v);
172        }
173    }
174    let mut obj = Map::new();
175    obj.insert("count".to_string(), Value::Number(events.len() as f64));
176    obj.insert("events".to_string(), Value::Array(arr));
177    Value::Object(obj)
178}
179
180// ---------------------------------------------------------------------------
181// Param parsing
182// ---------------------------------------------------------------------------
183
184/// Parse RFC-3339 with second precision OR an integer ms epoch. The
185/// query endpoint accepts either form per the spec; we keep the
186/// parser tiny so we don't pull `chrono`.
187pub fn parse_time_arg(raw: &str) -> Option<u128> {
188    let trimmed = raw.trim();
189    if trimmed.is_empty() {
190        return None;
191    }
192    if let Ok(ms) = trimmed.parse::<u128>() {
193        return Some(ms);
194    }
195    parse_rfc3339_ms(trimmed)
196}
197
198/// Tiny RFC 3339 -> ms parser. Accepts `YYYY-MM-DDTHH:MM:SSZ` and
199/// `YYYY-MM-DDTHH:MM:SS.mmmZ`. Rejects anything with a non-Z offset
200/// — the audit log writes UTC and we want callers to pass UTC too.
201pub fn parse_rfc3339_ms(s: &str) -> Option<u128> {
202    let bytes = s.as_bytes();
203    if bytes.len() < 20 {
204        return None;
205    }
206    if !s.ends_with('Z') {
207        return None;
208    }
209    let year: i64 = s.get(0..4)?.parse().ok()?;
210    let month: u32 = s.get(5..7)?.parse().ok()?;
211    let day: u32 = s.get(8..10)?.parse().ok()?;
212    if &bytes[4..5] != b"-" || &bytes[7..8] != b"-" || &bytes[10..11] != b"T" {
213        return None;
214    }
215    let hour: u64 = s.get(11..13)?.parse().ok()?;
216    let minute: u64 = s.get(14..16)?.parse().ok()?;
217    let second: u64 = s.get(17..19)?.parse().ok()?;
218    if &bytes[13..14] != b":" || &bytes[16..17] != b":" {
219        return None;
220    }
221    let mut ms_extra: u64 = 0;
222    if bytes.len() > 20 {
223        // Either `.mmm` then `Z`, or some other suffix.
224        if &bytes[19..20] == b"." {
225            // Up to the `Z`.
226            let dot_end = s.len() - 1; // skip Z
227            let frac = s.get(20..dot_end)?;
228            if frac.len() > 9 || frac.is_empty() {
229                return None;
230            }
231            // pad / truncate to 3 digits for ms.
232            let mut digits = String::with_capacity(3);
233            for c in frac.chars().take(3) {
234                if !c.is_ascii_digit() {
235                    return None;
236                }
237                digits.push(c);
238            }
239            while digits.len() < 3 {
240                digits.push('0');
241            }
242            ms_extra = digits.parse().ok()?;
243        } else if &bytes[19..20] != b"Z" {
244            return None;
245        }
246    }
247    let days = days_from_civil(year, month, day);
248    let secs =
249        (days as i128) * 86_400 + (hour as i128) * 3600 + (minute as i128) * 60 + second as i128;
250    let ms = secs * 1000 + ms_extra as i128;
251    if ms < 0 {
252        return None;
253    }
254    Some(ms as u128)
255}
256
257/// Howard Hinnant's days_from_civil algorithm. Mirrors the inverse
258/// `civil_from_days` already in `audit_log.rs`.
259fn days_from_civil(y: i64, m: u32, d: u32) -> i64 {
260    let y = if m <= 2 { y - 1 } else { y };
261    let era = if y >= 0 { y } else { y - 399 } / 400;
262    let yoe = (y - era * 400) as u64;
263    let m = m as u64;
264    let d = d as u64;
265    let doy = (153 * (if m > 2 { m - 3 } else { m + 9 }) + 2) / 5 + d - 1;
266    let doe = yoe * 365 + yoe / 4 - yoe / 100 + doy;
267    era * 146_097 + (doe as i64) - 719_468
268}
269
270#[cfg(test)]
271mod tests {
272    use super::*;
273    use crate::runtime::audit_log::{AuditAuthSource, AuditEvent, AuditLogger, Outcome};
274    use std::path::PathBuf;
275    use std::time::Duration;
276
277    fn temp_path(tag: &str) -> PathBuf {
278        let mut p = std::env::temp_dir();
279        p.push(format!(
280            "reddb-audit-query-{}-{}-{}",
281            tag,
282            std::process::id(),
283            crate::utils::now_unix_nanos()
284        ));
285        std::fs::create_dir_all(&p).unwrap();
286        p.push("data.rdb");
287        p
288    }
289
290    #[test]
291    fn filters_by_principal_and_action_prefix() {
292        let data = temp_path("filter");
293        let logger = AuditLogger::for_data_path(&data);
294        for who in ["alice", "bob", "alice", "carol"] {
295            logger.record_event(
296                AuditEvent::builder("auth/login.ok")
297                    .principal(who)
298                    .source(AuditAuthSource::Password)
299                    .build(),
300            );
301        }
302        logger.record_event(
303            AuditEvent::builder("admin/shutdown")
304                .principal("alice")
305                .source(AuditAuthSource::Session)
306                .outcome(Outcome::Success)
307                .build(),
308        );
309        assert!(logger.wait_idle(Duration::from_secs(2)));
310
311        let q = AuditQuery {
312            principal: Some("alice".to_string()),
313            action_prefix: Some("auth/".to_string()),
314            limit: 100,
315            ..Default::default()
316        };
317        let hits = run_query(logger.path(), &q);
318        assert_eq!(hits.len(), 2, "expected two alice/auth lines");
319        assert!(hits.iter().all(|e| e.principal.as_deref() == Some("alice")));
320        assert!(hits.iter().all(|e| e.action.starts_with("auth/")));
321    }
322
323    #[test]
324    fn parse_time_accepts_rfc3339_and_ms() {
325        assert_eq!(
326            parse_time_arg("2024-02-29T12:34:56.789Z"),
327            Some(1_709_210_096_789)
328        );
329        assert_eq!(parse_time_arg("1709210096789"), Some(1_709_210_096_789));
330        assert_eq!(parse_time_arg("not a time"), None);
331    }
332
333    #[test]
334    fn limit_caps_oldest_off() {
335        let data = temp_path("limit");
336        let logger = AuditLogger::for_data_path(&data);
337        for i in 0..10 {
338            logger.record_event(AuditEvent::builder(format!("test/n/{i}")).build());
339        }
340        assert!(logger.wait_idle(Duration::from_secs(2)));
341        let q = AuditQuery {
342            limit: 3,
343            ..Default::default()
344        };
345        let hits = run_query(logger.path(), &q);
346        assert_eq!(hits.len(), 3);
347        // Newest 3 are kept.
348        assert_eq!(hits[0].action, "test/n/7");
349        assert_eq!(hits[2].action, "test/n/9");
350    }
351}