Skip to main content

reddb_server/runtime/
audit_log.rs

1//! Structured audit log for SOC 2 / HIPAA / ISO 27001 deploys.
2//!
3//! Replaces the previous free-form `record(action, principal, target,
4//! result, details)` API with a stable JSON-Lines schema keyed on
5//! `event_id`, `ts`, `principal`, `tenant`, `action`, `resource`,
6//! `outcome`, `detail`, `remote_addr`, and `correlation_id` so external
7//! tooling (Splunk, Datadog HEC, ELK, BigQuery, Athena) can ingest the
8//! file without per-deploy regex.
9//!
10//! Operational properties:
11//!   * **Async-write**: emit sites push onto a bounded `std::sync::mpsc`
12//!     channel; a dedicated thread owns the file handle and flushes on
13//!     a periodic timer (default 250 ms) or per-event when
14//!     `RED_AUDIT_FSYNC=every`. If the channel fills the emit site
15//!     falls back to a direct sync write — losing throughput is
16//!     preferable to dropping audit lines.
17//!   * **Rotation**: when the active file exceeds `RED_AUDIT_MAX_BYTES`
18//!     (default 64 MiB) the writer renames it to
19//!     `.audit.log.<ms>.zst`, zstd-compresses it, and starts a fresh
20//!     active file. The repo already pulls `zstd`; we don't add a gzip
21//!     dependency.
22//!   * **Hash chain (tamper-evidence)**: each event carries a
23//!     `prev_hash` field — the sha256 of the previous JSON line. An
24//!     auditor verifying the file recomputes the chain; a single edit
25//!     anywhere in the file breaks every subsequent hash. Does **not**
26//!     defend against an attacker with `root + write` (they could
27//!     rebuild the chain), but it does defend against accidental edits
28//!     and most insider tampering.
29//!   * **SIEM streaming**: when `RED_AUDIT_STREAM_URL` is set every
30//!     line is also POSTed to that URL fire-and-forget.
31//!
32//! Pre-1.0: the file format breaks from the previous shape. Old
33//! `.audit.log` files are NOT readable by the new query endpoint.
34//! That's an accepted regression — operators upgrading should rotate
35//! the file before the deploy.
36
37use std::io::{BufWriter, Write};
38use std::path::{Path, PathBuf};
39use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
40use std::sync::{mpsc, Arc, Mutex};
41use std::thread::{self, JoinHandle};
42use std::time::{Duration, Instant};
43
44use crate::crypto::{os_random, sha256};
45use crate::json::{Map, Value as JsonValue};
46
47// ---------------------------------------------------------------------------
48// Schema
49// ---------------------------------------------------------------------------
50
51/// Auth pathway that produced the principal. Decoupled from
52/// `crate::auth::AuthSource` so we can record System / Anonymous /
53/// Session / ApiKey lanes that aren't surfaced by the runtime auth
54/// enum (which only covers Password / ClientCert / Oauth today).
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56pub enum AuditAuthSource {
57    ApiKey,
58    Session,
59    Password,
60    Oauth,
61    ClientCert,
62    Anonymous,
63    System,
64}
65
66impl AuditAuthSource {
67    pub fn as_str(self) -> &'static str {
68        match self {
69            Self::ApiKey => "api_key",
70            Self::Session => "session",
71            Self::Password => "password",
72            Self::Oauth => "oauth",
73            Self::ClientCert => "client_cert",
74            Self::Anonymous => "anonymous",
75            Self::System => "system",
76        }
77    }
78
79    pub fn parse(s: &str) -> Option<Self> {
80        Some(match s {
81            "api_key" => Self::ApiKey,
82            "session" => Self::Session,
83            "password" => Self::Password,
84            "oauth" => Self::Oauth,
85            "client_cert" => Self::ClientCert,
86            "anonymous" => Self::Anonymous,
87            "system" => Self::System,
88            _ => return None,
89        })
90    }
91}
92
93/// Outcome of the audited action.
94#[derive(Debug, Clone, Copy, PartialEq, Eq)]
95pub enum Outcome {
96    Success,
97    Denied,
98    Error,
99}
100
101impl Outcome {
102    pub fn as_str(self) -> &'static str {
103        match self {
104            Self::Success => "success",
105            Self::Denied => "denied",
106            Self::Error => "error",
107        }
108    }
109
110    pub fn parse(s: &str) -> Option<Self> {
111        Some(match s {
112            "success" | "ok" => Self::Success,
113            "denied" | "deny" => Self::Denied,
114            "error" | "err" => Self::Error,
115            _ => return None,
116        })
117    }
118}
119
120/// Structured audit event. Serialised as one JSONL row per call.
121#[derive(Debug, Clone)]
122pub struct AuditEvent {
123    pub ts: u128,
124    pub event_id: String,
125    pub principal: Option<String>,
126    pub source: AuditAuthSource,
127    pub tenant: Option<String>,
128    pub action: String,
129    pub resource: Option<String>,
130    pub outcome: Outcome,
131    pub detail: JsonValue,
132    pub remote_addr: Option<String>,
133    pub correlation_id: Option<String>,
134}
135
136impl AuditEvent {
137    /// Convenience builder. The caller fills the required fields and
138    /// chains the optional ones — keeps emit sites readable.
139    pub fn builder(action: impl Into<String>) -> AuditEventBuilder {
140        AuditEventBuilder {
141            inner: AuditEvent {
142                ts: crate::utils::now_unix_millis() as u128,
143                event_id: new_event_id(),
144                principal: None,
145                source: AuditAuthSource::System,
146                tenant: None,
147                action: action.into(),
148                resource: None,
149                outcome: Outcome::Success,
150                detail: JsonValue::Null,
151                remote_addr: None,
152                correlation_id: None,
153            },
154        }
155    }
156
157    /// Wrap a pre-structured-API call into the new schema. Used by the
158    /// back-compat `record(action, principal, target, result, details)`
159    /// path so existing emit sites keep compiling.
160    pub fn from_legacy(
161        action: &str,
162        principal: &str,
163        target: &str,
164        result: &str,
165        details: JsonValue,
166    ) -> Self {
167        let outcome = if result == "ok" {
168            Outcome::Success
169        } else if result.starts_with("err") {
170            Outcome::Error
171        } else if result.starts_with("denied") || result.starts_with("deny") {
172            Outcome::Denied
173        } else {
174            Outcome::Success
175        };
176        let mut detail = details;
177        if !result.is_empty() && result != "ok" {
178            // Preserve the human-readable result string for forensic
179            // continuity with pre-restructuring lines.
180            let mut obj = match detail {
181                JsonValue::Object(map) => map,
182                JsonValue::Null => Map::new(),
183                other => {
184                    let mut m = Map::new();
185                    m.insert("legacy".to_string(), other);
186                    m
187                }
188            };
189            obj.entry("result_text".to_string())
190                .or_insert(JsonValue::String(result.to_string()));
191            detail = JsonValue::Object(obj);
192        }
193        Self {
194            ts: crate::utils::now_unix_millis() as u128,
195            event_id: new_event_id(),
196            principal: if principal.is_empty() || principal == "system" {
197                None
198            } else {
199                Some(principal.to_string())
200            },
201            source: if principal == "system" {
202                AuditAuthSource::System
203            } else if principal.is_empty() {
204                AuditAuthSource::Anonymous
205            } else {
206                AuditAuthSource::Password
207            },
208            tenant: None,
209            action: action.to_string(),
210            resource: if target.is_empty() {
211                None
212            } else {
213                Some(target.to_string())
214            },
215            outcome,
216            detail,
217            remote_addr: None,
218            correlation_id: None,
219        }
220    }
221
222    /// Render to a single JSONL line, optionally seeded with a
223    /// `prev_hash` for the tamper-evident chain.
224    pub fn to_json_line(&self, prev_hash: Option<&str>) -> String {
225        let mut object = Map::new();
226        object.insert("ts".to_string(), JsonValue::Number(self.ts as f64));
227        object.insert(
228            "ts_iso".to_string(),
229            JsonValue::String(format_iso8601(self.ts as u64)),
230        );
231        object.insert(
232            "event_id".to_string(),
233            JsonValue::String(self.event_id.clone()),
234        );
235        if let Some(p) = &self.principal {
236            object.insert("principal".to_string(), JsonValue::String(p.clone()));
237        }
238        object.insert(
239            "source".to_string(),
240            JsonValue::String(self.source.as_str().to_string()),
241        );
242        if let Some(t) = &self.tenant {
243            object.insert("tenant".to_string(), JsonValue::String(t.clone()));
244        }
245        object.insert("action".to_string(), JsonValue::String(self.action.clone()));
246        if let Some(r) = &self.resource {
247            object.insert("resource".to_string(), JsonValue::String(r.clone()));
248        }
249        object.insert(
250            "outcome".to_string(),
251            JsonValue::String(self.outcome.as_str().to_string()),
252        );
253        if !matches!(self.detail, JsonValue::Null) {
254            object.insert("detail".to_string(), self.detail.clone());
255        }
256        if let Some(ip) = &self.remote_addr {
257            object.insert("remote_addr".to_string(), JsonValue::String(ip.clone()));
258        }
259        if let Some(cid) = &self.correlation_id {
260            object.insert("correlation_id".to_string(), JsonValue::String(cid.clone()));
261        }
262        if let Some(h) = prev_hash {
263            object.insert("prev_hash".to_string(), JsonValue::String(h.to_string()));
264        }
265        JsonValue::Object(object).to_string_compact()
266    }
267
268    /// Parse one JSONL line back into an `AuditEvent`. Returns `None`
269    /// for legacy lines (pre-restructuring) or malformed JSON; callers
270    /// in the query path use this to filter.
271    pub fn parse_line(line: &str) -> Option<Self> {
272        let v: JsonValue = crate::json::from_str(line).ok()?;
273        let action = v.get("action")?.as_str()?.to_string();
274        let outcome_s = v.get("outcome").and_then(|n| n.as_str()).unwrap_or("");
275        let outcome = Outcome::parse(outcome_s)?;
276        let ts = v.get("ts").and_then(|n| n.as_f64()).unwrap_or(0.0) as u128;
277        let event_id = v
278            .get("event_id")
279            .and_then(|n| n.as_str())
280            .unwrap_or("")
281            .to_string();
282        let source = v
283            .get("source")
284            .and_then(|n| n.as_str())
285            .and_then(AuditAuthSource::parse)
286            .unwrap_or(AuditAuthSource::System);
287        Some(Self {
288            ts,
289            event_id,
290            principal: v
291                .get("principal")
292                .and_then(|n| n.as_str())
293                .map(|s| s.to_string()),
294            source,
295            tenant: v
296                .get("tenant")
297                .and_then(|n| n.as_str())
298                .map(|s| s.to_string()),
299            action,
300            resource: v
301                .get("resource")
302                .and_then(|n| n.as_str())
303                .map(|s| s.to_string()),
304            outcome,
305            detail: v.get("detail").cloned().unwrap_or(JsonValue::Null),
306            remote_addr: v
307                .get("remote_addr")
308                .and_then(|n| n.as_str())
309                .map(|s| s.to_string()),
310            correlation_id: v
311                .get("correlation_id")
312                .and_then(|n| n.as_str())
313                .map(|s| s.to_string()),
314        })
315    }
316}
317
318/// Builder for `AuditEvent`. Generated by `AuditEvent::builder()`.
319pub struct AuditEventBuilder {
320    inner: AuditEvent,
321}
322
323impl AuditEventBuilder {
324    pub fn principal(mut self, principal: impl Into<String>) -> Self {
325        self.inner.principal = Some(principal.into());
326        self
327    }
328
329    pub fn principal_opt(mut self, principal: Option<String>) -> Self {
330        self.inner.principal = principal;
331        self
332    }
333
334    pub fn source(mut self, source: AuditAuthSource) -> Self {
335        self.inner.source = source;
336        self
337    }
338
339    pub fn tenant(mut self, tenant: impl Into<String>) -> Self {
340        self.inner.tenant = Some(tenant.into());
341        self
342    }
343
344    pub fn resource(mut self, resource: impl Into<String>) -> Self {
345        self.inner.resource = Some(resource.into());
346        self
347    }
348
349    pub fn outcome(mut self, outcome: Outcome) -> Self {
350        self.inner.outcome = outcome;
351        self
352    }
353
354    pub fn detail(mut self, detail: JsonValue) -> Self {
355        self.inner.detail = detail;
356        self
357    }
358
359    /// Add a single typed audit field to the detail object. Goes
360    /// through `AuditFieldEscaper` so caller-supplied bytes never
361    /// reach `format!` or string concatenation — see ADR 0010.
362    pub fn field(mut self, field: AuditField) -> Self {
363        let mut obj = match std::mem::replace(&mut self.inner.detail, JsonValue::Null) {
364            JsonValue::Object(map) => map,
365            JsonValue::Null => Map::new(),
366            other => {
367                // Preserve any preceding non-object detail under a
368                // reserved key rather than dropping it on the floor.
369                let mut m = Map::new();
370                m.insert("legacy_detail".to_string(), other);
371                m
372            }
373        };
374        obj.insert(field.name.to_string(), field.value.into_json_value());
375        self.inner.detail = JsonValue::Object(obj);
376        self
377    }
378
379    /// Bulk variant of [`Self::field`] for multi-field call sites.
380    pub fn fields(mut self, fields: impl IntoIterator<Item = AuditField>) -> Self {
381        for f in fields {
382            self = self.field(f);
383        }
384        self
385    }
386
387    pub fn remote_addr(mut self, addr: impl Into<String>) -> Self {
388        self.inner.remote_addr = Some(addr.into());
389        self
390    }
391
392    pub fn correlation_id(mut self, cid: impl Into<String>) -> Self {
393        self.inner.correlation_id = Some(cid.into());
394        self
395    }
396
397    pub fn build(self) -> AuditEvent {
398        self.inner
399    }
400}
401
402// ---------------------------------------------------------------------------
403// AuditFieldEscaper — typed-field guard (ADR 0010 / issue #177)
404// ---------------------------------------------------------------------------
405
406/// Typed value variants accepted by the audit-field guard. The
407/// serializer (`to_json_line`) owns the framing; an `AuditValue`
408/// cannot smuggle structural bytes past the canonical encoder
409/// (`crate::serde_json::Value::escape_string`, RFC 8259 §7) because
410/// the variant is consumed as a typed value, not as an interpolated
411/// string. Adversarial corpora (CRLF, NUL, quote, semicolon,
412/// JSON-in-JSON, control bytes 0x00..0x20) survive the boundary
413/// because the encoder emits `\u00XX` escapes for every byte below
414/// 0x20.
415#[derive(Debug, Clone)]
416pub enum AuditValue {
417    String(String),
418    /// Arbitrary bytes — base64-encoded on emit so non-UTF-8 payloads
419    /// (binary keys, raw frame bytes) never enter the JSON string
420    /// channel.
421    Bytes(Vec<u8>),
422    Number(i64),
423    Bool(bool),
424    Null,
425}
426
427impl AuditValue {
428    fn into_json_value(self) -> JsonValue {
429        match self {
430            AuditValue::String(s) => JsonValue::String(s),
431            AuditValue::Bytes(bytes) => JsonValue::String(base64_encode(&bytes)),
432            AuditValue::Number(n) => JsonValue::Number(n as f64),
433            AuditValue::Bool(b) => JsonValue::Bool(b),
434            AuditValue::Null => JsonValue::Null,
435        }
436    }
437}
438
439impl From<String> for AuditValue {
440    fn from(value: String) -> Self {
441        AuditValue::String(value)
442    }
443}
444
445impl From<&str> for AuditValue {
446    fn from(value: &str) -> Self {
447        AuditValue::String(value.to_string())
448    }
449}
450
451impl From<&String> for AuditValue {
452    fn from(value: &String) -> Self {
453        AuditValue::String(value.clone())
454    }
455}
456
457impl From<Vec<u8>> for AuditValue {
458    fn from(value: Vec<u8>) -> Self {
459        AuditValue::Bytes(value)
460    }
461}
462
463impl From<&[u8]> for AuditValue {
464    fn from(value: &[u8]) -> Self {
465        AuditValue::Bytes(value.to_vec())
466    }
467}
468
469impl From<i64> for AuditValue {
470    fn from(value: i64) -> Self {
471        AuditValue::Number(value)
472    }
473}
474
475impl From<u64> for AuditValue {
476    fn from(value: u64) -> Self {
477        // Saturate at i64::MAX rather than wrapping; audit fields
478        // are observability data, not arithmetic inputs.
479        AuditValue::Number(if value > i64::MAX as u64 {
480            i64::MAX
481        } else {
482            value as i64
483        })
484    }
485}
486
487impl From<u32> for AuditValue {
488    fn from(value: u32) -> Self {
489        AuditValue::Number(value as i64)
490    }
491}
492
493impl From<usize> for AuditValue {
494    fn from(value: usize) -> Self {
495        AuditValue::Number(if value > i64::MAX as usize {
496            i64::MAX
497        } else {
498            value as i64
499        })
500    }
501}
502
503impl From<bool> for AuditValue {
504    fn from(value: bool) -> Self {
505        AuditValue::Bool(value)
506    }
507}
508
509impl<T: Into<AuditValue>> From<Option<T>> for AuditValue {
510    fn from(value: Option<T>) -> Self {
511        match value {
512            Some(v) => v.into(),
513            None => AuditValue::Null,
514        }
515    }
516}
517
518/// A single typed audit field (key + typed value). Construction is
519/// gated by [`AuditFieldEscaper::field`] so the typed value cannot
520/// be bypassed — there is no `pub` constructor for the field that
521/// accepts a free-form string value.
522#[derive(Debug, Clone)]
523pub struct AuditField {
524    name: &'static str,
525    value: AuditValue,
526}
527
528impl AuditField {
529    pub fn name(&self) -> &'static str {
530        self.name
531    }
532    pub fn value(&self) -> &AuditValue {
533        &self.value
534    }
535}
536
537/// Typed-field guard for audit emission (ADR 0010).
538///
539/// The guard owns the boundary's escape contract: caller-supplied
540/// bytes always round-trip through a typed `AuditValue`, never
541/// through `format!`, never through string concatenation. The
542/// canonical JSON encoder is `crate::serde_json::Value::escape_string`
543/// (RFC 8259 §7 compliant after F-01 hotfix #181) which the
544/// `AuditEvent::to_json_line` serializer uses for every string slot.
545///
546/// The other two encoder paths in the codebase
547/// (`utils::json::JsonValue::write_json` and
548/// `grpc::scan_json::write_json_string`) are correct after #181 but
549/// not the canonical owner of the audit boundary. Both are marked
550/// `#[deprecated]` on the audit path and remain in place for
551/// non-audit call sites pending follow-up retirement.
552pub struct AuditFieldEscaper;
553
554impl AuditFieldEscaper {
555    /// Construct a typed audit field. The `name` is `'static` so
556    /// the schema key set is fixed at compile time — only `value`
557    /// can carry attacker-influenced bytes, and `value` is a typed
558    /// enum, not a string sink.
559    pub fn field(name: &'static str, value: impl Into<AuditValue>) -> AuditField {
560        AuditField {
561            name,
562            value: value.into(),
563        }
564    }
565}
566
567// ---------------------------------------------------------------------------
568// Base64 (audit-only, no external dep)
569// ---------------------------------------------------------------------------
570
571/// Standard base64 (RFC 4648 §4) encoder. Audit-only; we don't
572/// import the base64 crate from this slice.
573fn base64_encode(input: &[u8]) -> String {
574    const ALPHABET: &[u8; 64] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
575    let mut out = String::with_capacity(input.len().div_ceil(3) * 4);
576    let mut i = 0;
577    while i + 3 <= input.len() {
578        let b0 = input[i] as u32;
579        let b1 = input[i + 1] as u32;
580        let b2 = input[i + 2] as u32;
581        let n = (b0 << 16) | (b1 << 8) | b2;
582        out.push(ALPHABET[((n >> 18) & 0x3f) as usize] as char);
583        out.push(ALPHABET[((n >> 12) & 0x3f) as usize] as char);
584        out.push(ALPHABET[((n >> 6) & 0x3f) as usize] as char);
585        out.push(ALPHABET[(n & 0x3f) as usize] as char);
586        i += 3;
587    }
588    let rem = input.len() - i;
589    if rem == 1 {
590        let n = (input[i] as u32) << 16;
591        out.push(ALPHABET[((n >> 18) & 0x3f) as usize] as char);
592        out.push(ALPHABET[((n >> 12) & 0x3f) as usize] as char);
593        out.push('=');
594        out.push('=');
595    } else if rem == 2 {
596        let n = ((input[i] as u32) << 16) | ((input[i + 1] as u32) << 8);
597        out.push(ALPHABET[((n >> 18) & 0x3f) as usize] as char);
598        out.push(ALPHABET[((n >> 12) & 0x3f) as usize] as char);
599        out.push(ALPHABET[((n >> 6) & 0x3f) as usize] as char);
600        out.push('=');
601    }
602    out
603}
604
605// ---------------------------------------------------------------------------
606// Identifier
607// ---------------------------------------------------------------------------
608
609/// Crockford-base32 ULID-like ID: 10-char timestamp prefix + 16-char
610/// random suffix = 26 chars total. Ordered lexicographically by time.
611fn new_event_id() -> String {
612    const ALPHABET: &[u8; 32] = b"0123456789ABCDEFGHJKMNPQRSTVWXYZ";
613    let now_ms = crate::utils::now_unix_millis();
614    let mut rand_bytes = [0u8; 10];
615    let _ = os_random::fill_bytes(&mut rand_bytes);
616
617    let mut out = String::with_capacity(26);
618    // 48-bit timestamp -> 10 base32 chars (50 bits, top 2 are zero).
619    for i in (0..10).rev() {
620        let shift = (i as u32) * 5;
621        let idx = ((now_ms >> shift) & 0x1f) as usize;
622        out.push(ALPHABET[idx] as char);
623    }
624    // 80 random bits -> 16 base32 chars.
625    let mut acc: u128 = 0;
626    for &b in &rand_bytes {
627        acc = (acc << 8) | (b as u128);
628    }
629    for i in (0..16).rev() {
630        let shift = (i as u32) * 5;
631        let idx = ((acc >> shift) & 0x1f) as usize;
632        out.push(ALPHABET[idx] as char);
633    }
634    out
635}
636
637// ---------------------------------------------------------------------------
638// Fsync mode
639// ---------------------------------------------------------------------------
640
641#[derive(Debug, Clone, Copy, PartialEq, Eq)]
642enum FsyncMode {
643    /// Buffered, fsync only on rotation + shutdown.
644    Off,
645    /// fsync after every event. Safe for HIPAA strong-durability.
646    Every,
647    /// fsync every N ms (default 250) — compliance-appropriate
648    /// without tanking write throughput.
649    Periodic,
650}
651
652impl FsyncMode {
653    fn from_env() -> Self {
654        match std::env::var("RED_AUDIT_FSYNC")
655            .unwrap_or_default()
656            .to_ascii_lowercase()
657            .as_str()
658        {
659            "every" | "strong" | "on" => Self::Every,
660            "off" | "none" => Self::Off,
661            _ => Self::Periodic,
662        }
663    }
664}
665
666// ---------------------------------------------------------------------------
667// Channel message
668// ---------------------------------------------------------------------------
669
670#[allow(clippy::large_enum_variant)]
671enum WriterMsg {
672    Event(AuditEvent),
673    Flush(mpsc::Sender<()>),
674    Shutdown,
675}
676
677// ---------------------------------------------------------------------------
678// AuditLogger
679// ---------------------------------------------------------------------------
680
681#[derive(Debug)]
682pub struct AuditLogger {
683    path: PathBuf,
684    tx: Mutex<Option<mpsc::SyncSender<WriterMsg>>>,
685    /// Direct-write fallback mutex. Used when the writer thread isn't
686    /// running (in-memory tests) or the channel is full and the emit
687    /// site picks the sync path so the event can't be dropped.
688    fallback_lock: Mutex<()>,
689    /// Snapshot of the most recent line's sha256 for tamper-evidence.
690    /// Shared between the writer thread and the direct-write fallback
691    /// so the chain stays consistent across both paths.
692    last_hash: Arc<Mutex<Option<String>>>,
693    max_bytes: u64,
694    fsync_mode: FsyncMode,
695    stream_url: Option<String>,
696    /// Background-write flag; tests use `wait_idle()` to synchronise.
697    writer_alive: Arc<AtomicBool>,
698    pending: Arc<AtomicU64>,
699    handle: Mutex<Option<JoinHandle<()>>>,
700}
701
702impl AuditLogger {
703    /// Place the audit log next to the primary `.rdb` file so backup
704    /// + restore flows can ship it together.
705    pub fn for_data_path(data_path: &Path) -> Self {
706        let parent = data_path.parent().unwrap_or_else(|| Path::new("."));
707        let path = parent.join(".audit.log");
708        Self::with_path(path)
709    }
710
711    /// Direct constructor primarily for tests that want a custom path.
712    pub fn with_path(path: PathBuf) -> Self {
713        let max_bytes = std::env::var("RED_AUDIT_MAX_BYTES")
714            .ok()
715            .and_then(|v| v.parse::<u64>().ok())
716            .unwrap_or(64 * 1024 * 1024);
717        let fsync_mode = FsyncMode::from_env();
718        let stream_url = std::env::var("RED_AUDIT_STREAM_URL")
719            .ok()
720            .filter(|s| !s.is_empty());
721        Self::with_settings(path, max_bytes, fsync_mode, stream_url)
722    }
723
724    /// Test/integration constructor with explicit settings — bypasses
725    /// env-var resolution so parallel tests don't race on `set_var`.
726    pub fn with_max_bytes(path: PathBuf, max_bytes: u64) -> Self {
727        Self::with_settings(path, max_bytes, FsyncMode::Periodic, None)
728    }
729
730    fn with_settings(
731        path: PathBuf,
732        max_bytes: u64,
733        fsync_mode: FsyncMode,
734        stream_url: Option<String>,
735    ) -> Self {
736        let writer_alive = Arc::new(AtomicBool::new(false));
737        let pending = Arc::new(AtomicU64::new(0));
738        // Seed the chain from the existing tail so a restart doesn't
739        // break tamper evidence.
740        let mut seed: Option<String> = None;
741        if let Ok(body) = std::fs::read_to_string(&path) {
742            if let Some(line) = body.lines().last() {
743                seed = Some(crate::utils::to_hex(&sha256::sha256(line.as_bytes())));
744            }
745        }
746        let last_hash = Arc::new(Mutex::new(seed));
747        let logger = Self {
748            path,
749            tx: Mutex::new(None),
750            fallback_lock: Mutex::new(()),
751            last_hash,
752            max_bytes,
753            fsync_mode,
754            stream_url,
755            writer_alive: Arc::clone(&writer_alive),
756            pending: Arc::clone(&pending),
757            handle: Mutex::new(None),
758        };
759        logger.start_writer_thread();
760        logger
761    }
762
763    pub fn path(&self) -> &Path {
764        &self.path
765    }
766
767    /// Active rotation threshold (bytes).
768    pub fn max_bytes(&self) -> u64 {
769        self.max_bytes
770    }
771
772    fn start_writer_thread(&self) {
773        let (tx, rx) = mpsc::sync_channel::<WriterMsg>(4096);
774        *self.tx.lock().unwrap_or_else(|e| e.into_inner()) = Some(tx);
775        let path = self.path.clone();
776        let max_bytes = self.max_bytes;
777        let fsync_mode = self.fsync_mode;
778        let stream_url = self.stream_url.clone();
779        let writer_alive = Arc::clone(&self.writer_alive);
780        let pending = Arc::clone(&self.pending);
781        let last_hash = Arc::clone(&self.last_hash);
782        writer_alive.store(true, Ordering::SeqCst);
783        let handle = thread::Builder::new()
784            .name("reddb-audit-writer".to_string())
785            .spawn(move || {
786                writer_loop(
787                    rx,
788                    path,
789                    max_bytes,
790                    fsync_mode,
791                    stream_url,
792                    writer_alive,
793                    pending,
794                    last_hash,
795                );
796            })
797            .expect("spawn audit writer thread");
798        *self.handle.lock().unwrap_or_else(|e| e.into_inner()) = Some(handle);
799    }
800
801    /// Latest tip-hash of the chain (for diagnostics + tests).
802    pub fn current_hash(&self) -> Option<String> {
803        self.last_hash
804            .lock()
805            .unwrap_or_else(|e| e.into_inner())
806            .clone()
807    }
808
809    /// Block until the writer thread has drained every pending event.
810    /// Tests use this to make assertions deterministic.
811    pub fn wait_idle(&self, timeout: Duration) -> bool {
812        let deadline = Instant::now() + timeout;
813        let tx_guard = self.tx.lock().unwrap_or_else(|e| e.into_inner());
814        if let Some(tx) = tx_guard.as_ref() {
815            let (back_tx, back_rx) = mpsc::channel();
816            if tx.send(WriterMsg::Flush(back_tx)).is_err() {
817                return false;
818            }
819            drop(tx_guard);
820            let remaining = deadline.saturating_duration_since(Instant::now());
821            return back_rx.recv_timeout(remaining).is_ok();
822        }
823        false
824    }
825
826    /// Back-compat record signature. Used by every existing emit site.
827    /// Wraps into the new schema and forwards to `record_event`.
828    pub fn record(
829        &self,
830        action: &str,
831        principal: &str,
832        target: &str,
833        result: &str,
834        details: JsonValue,
835    ) {
836        let event = AuditEvent::from_legacy(action, principal, target, result, details);
837        self.record_event(event);
838    }
839
840    /// Primary new entry point. Non-blocking when the channel has
841    /// capacity; falls back to a sync write when full or the writer
842    /// thread has shut down.
843    pub fn record_event(&self, event: AuditEvent) {
844        let tx_guard = self.tx.lock().unwrap_or_else(|e| e.into_inner());
845        let recovered_event: AuditEvent;
846        if let Some(tx) = tx_guard.as_ref() {
847            self.pending.fetch_add(1, Ordering::SeqCst);
848            match tx.try_send(WriterMsg::Event(event)) {
849                Ok(()) => return,
850                Err(mpsc::TrySendError::Full(msg)) => {
851                    self.pending.fetch_sub(1, Ordering::SeqCst);
852                    tracing::warn!(
853                        target: "reddb::audit",
854                        "audit channel saturated; falling back to sync write"
855                    );
856                    recovered_event = match msg {
857                        WriterMsg::Event(ev) => ev,
858                        _ => return,
859                    };
860                }
861                Err(mpsc::TrySendError::Disconnected(msg)) => {
862                    self.pending.fetch_sub(1, Ordering::SeqCst);
863                    recovered_event = match msg {
864                        WriterMsg::Event(ev) => ev,
865                        _ => return,
866                    };
867                }
868            }
869        } else {
870            recovered_event = event;
871        }
872        drop(tx_guard);
873        self.write_direct(recovered_event);
874    }
875
876    /// Fallback path: a direct, synchronous append. Used when the
877    /// background channel is full or the writer thread isn't running.
878    fn write_direct(&self, event: AuditEvent) {
879        let _g = self.fallback_lock.lock().unwrap_or_else(|e| e.into_inner());
880        let prev = self
881            .last_hash
882            .lock()
883            .unwrap_or_else(|e| e.into_inner())
884            .clone();
885        let line = event.to_json_line(prev.as_deref());
886        if let Err(err) = append_line_with_rotation(&self.path, &line, self.max_bytes) {
887            tracing::warn!(
888                target: "reddb::audit",
889                error = %err,
890                path = %self.path.display(),
891                "direct audit append failed"
892            );
893            return;
894        }
895        let new_hash = crate::utils::to_hex(&sha256::sha256(line.as_bytes()));
896        if let Ok(mut g) = self.last_hash.lock() {
897            *g = Some(new_hash);
898        }
899        if let Some(url) = &self.stream_url {
900            stream_post(url, &line);
901        }
902        tracing::info!(target: "reddb::audit", "{line}");
903    }
904}
905
906impl Drop for AuditLogger {
907    fn drop(&mut self) {
908        let mut tx_guard = self.tx.lock().unwrap_or_else(|e| e.into_inner());
909        if let Some(tx) = tx_guard.take() {
910            let _ = tx.send(WriterMsg::Shutdown);
911        }
912        drop(tx_guard);
913        if let Some(handle) = self.handle.lock().unwrap_or_else(|e| e.into_inner()).take() {
914            let _ = handle.join();
915        }
916    }
917}
918
919// ---------------------------------------------------------------------------
920// Writer thread
921// ---------------------------------------------------------------------------
922
923#[allow(clippy::too_many_arguments)]
924fn writer_loop(
925    rx: mpsc::Receiver<WriterMsg>,
926    path: PathBuf,
927    max_bytes: u64,
928    fsync_mode: FsyncMode,
929    stream_url: Option<String>,
930    writer_alive: Arc<AtomicBool>,
931    pending: Arc<AtomicU64>,
932    last_hash: Arc<Mutex<Option<String>>>,
933) {
934    if let Some(parent) = path.parent() {
935        if !parent.as_os_str().is_empty() {
936            let _ = std::fs::create_dir_all(parent);
937        }
938    }
939
940    let mut writer = match open_active(&path) {
941        Ok(w) => w,
942        Err(err) => {
943            tracing::error!(target: "reddb::audit", error = %err, "audit writer init failed");
944            writer_alive.store(false, Ordering::SeqCst);
945            return;
946        }
947    };
948    // Track size in-memory; BufWriter hides the on-disk size until
949    // flush, and we rotate on bytes-actually-written so a slow
950    // flush cadence doesn't run away.
951    let mut bytes_written: u64 = std::fs::metadata(&path).map(|m| m.len()).unwrap_or(0);
952
953    let periodic_interval = Duration::from_millis(250);
954    let mut last_flush = Instant::now();
955    let mut buffered_since_fsync: u64 = 0;
956
957    loop {
958        // Wake up periodically so the periodic-fsync mode can run even
959        // when no events arrive (compliance-driven).
960        let recv_timeout = match fsync_mode {
961            FsyncMode::Periodic => periodic_interval,
962            FsyncMode::Every | FsyncMode::Off => Duration::from_secs(1),
963        };
964        match rx.recv_timeout(recv_timeout) {
965            Ok(WriterMsg::Event(event)) => {
966                let prev = last_hash.lock().unwrap_or_else(|e| e.into_inner()).clone();
967                let line = event.to_json_line(prev.as_deref());
968
969                let line_bytes = line.len() as u64 + 1; // newline
970                if let Err(err) = write_line(&mut writer, &line) {
971                    tracing::warn!(
972                        target: "reddb::audit",
973                        error = %err,
974                        "audit write failed; reopening"
975                    );
976                    if let Ok(w2) = open_active(&path) {
977                        writer = w2;
978                        let _ = write_line(&mut writer, &line);
979                    }
980                }
981                bytes_written = bytes_written.saturating_add(line_bytes);
982                let new_hash = crate::utils::to_hex(&sha256::sha256(line.as_bytes()));
983                if let Ok(mut g) = last_hash.lock() {
984                    *g = Some(new_hash);
985                }
986                if let Some(url) = &stream_url {
987                    stream_post(url, &line);
988                }
989                tracing::info!(target: "reddb::audit", "{line}");
990                pending.fetch_sub(1, Ordering::SeqCst);
991                buffered_since_fsync += 1;
992
993                match fsync_mode {
994                    FsyncMode::Every => {
995                        let _ = writer.flush();
996                        let _ = writer.get_ref().sync_data();
997                        buffered_since_fsync = 0;
998                    }
999                    FsyncMode::Periodic => {
1000                        if last_flush.elapsed() >= periodic_interval {
1001                            let _ = writer.flush();
1002                            let _ = writer.get_ref().sync_data();
1003                            last_flush = Instant::now();
1004                            buffered_since_fsync = 0;
1005                        }
1006                    }
1007                    FsyncMode::Off => {}
1008                }
1009
1010                // Rotation check based on in-memory accounting; BufWriter
1011                // metadata can lag.
1012                if bytes_written >= max_bytes {
1013                    let _ = writer.flush();
1014                    let _ = writer.get_ref().sync_data();
1015                    if let Err(err) = rotate(&path) {
1016                        tracing::warn!(
1017                            target: "reddb::audit",
1018                            error = %err,
1019                            "audit rotation failed; continuing in-place"
1020                        );
1021                    }
1022                    match open_active(&path) {
1023                        Ok(w2) => writer = w2,
1024                        Err(err) => {
1025                            tracing::error!(
1026                                target: "reddb::audit",
1027                                error = %err,
1028                                "audit reopen failed after rotate"
1029                            );
1030                            break;
1031                        }
1032                    }
1033                    last_flush = Instant::now();
1034                    buffered_since_fsync = 0;
1035                    bytes_written = 0;
1036                }
1037            }
1038            Ok(WriterMsg::Flush(ack)) => {
1039                let _ = writer.flush();
1040                let _ = writer.get_ref().sync_data();
1041                last_flush = Instant::now();
1042                buffered_since_fsync = 0;
1043                // Acks are sent only after pending == 0; in this design
1044                // every event sent before Flush has already been
1045                // processed (channel is FIFO), so we can ack now.
1046                let _ = ack.send(());
1047            }
1048            Ok(WriterMsg::Shutdown) => {
1049                let _ = writer.flush();
1050                let _ = writer.get_ref().sync_data();
1051                break;
1052            }
1053            Err(mpsc::RecvTimeoutError::Timeout) => {
1054                if buffered_since_fsync > 0 {
1055                    let _ = writer.flush();
1056                    let _ = writer.get_ref().sync_data();
1057                    last_flush = Instant::now();
1058                    buffered_since_fsync = 0;
1059                }
1060            }
1061            Err(mpsc::RecvTimeoutError::Disconnected) => {
1062                let _ = writer.flush();
1063                let _ = writer.get_ref().sync_data();
1064                break;
1065            }
1066        }
1067    }
1068
1069    writer_alive.store(false, Ordering::SeqCst);
1070}
1071
1072fn open_active(path: &Path) -> std::io::Result<BufWriter<std::fs::File>> {
1073    if let Some(parent) = path.parent() {
1074        if !parent.as_os_str().is_empty() {
1075            std::fs::create_dir_all(parent)?;
1076        }
1077    }
1078    let f = std::fs::OpenOptions::new()
1079        .create(true)
1080        .append(true)
1081        .open(path)?;
1082    Ok(BufWriter::new(f))
1083}
1084
1085fn write_line(writer: &mut BufWriter<std::fs::File>, line: &str) -> std::io::Result<()> {
1086    writer.write_all(line.as_bytes())?;
1087    writer.write_all(b"\n")?;
1088    Ok(())
1089}
1090
1091fn append_line_with_rotation(path: &Path, line: &str, max_bytes: u64) -> std::io::Result<()> {
1092    if let Some(parent) = path.parent() {
1093        if !parent.as_os_str().is_empty() {
1094            std::fs::create_dir_all(parent)?;
1095        }
1096    }
1097    let mut file = std::fs::OpenOptions::new()
1098        .create(true)
1099        .append(true)
1100        .open(path)?;
1101    file.write_all(line.as_bytes())?;
1102    file.write_all(b"\n")?;
1103    file.sync_data()?;
1104    drop(file);
1105    if let Ok(meta) = std::fs::metadata(path) {
1106        if meta.len() >= max_bytes {
1107            let _ = rotate(path);
1108        }
1109    }
1110    Ok(())
1111}
1112
1113/// Rename the active file to `<path>.<ms>.zst` and zstd-compress it
1114/// in-place. The compressed file replaces the renamed plaintext copy
1115/// so the on-disk artefact is `.audit.log.<ms>.zst`.
1116///
1117/// Rotation timestamp uses unix nanos so back-to-back rotations
1118/// under load (or in a tight test loop) don't collide on the same
1119/// filename.
1120fn rotate(active: &Path) -> std::io::Result<()> {
1121    let ts = crate::utils::now_unix_nanos();
1122    let stem = active
1123        .file_name()
1124        .and_then(|s| s.to_str())
1125        .unwrap_or(".audit.log");
1126    let parent = active.parent().unwrap_or_else(|| Path::new("."));
1127    let plain = parent.join(format!("{stem}.{ts}"));
1128    std::fs::rename(active, &plain)?;
1129    let raw = std::fs::read(&plain)?;
1130    let compressed = match zstd::bulk::compress(&raw, 3) {
1131        Ok(c) => c,
1132        Err(err) => {
1133            // Compression failed: leave the rotated file uncompressed
1134            // rather than lose audit data.
1135            tracing::warn!(
1136                target: "reddb::audit",
1137                error = %err,
1138                "audit rotation: zstd compress failed; leaving plaintext"
1139            );
1140            return Ok(());
1141        }
1142    };
1143    let zst = parent.join(format!("{stem}.{ts}.zst"));
1144    let mut out = std::fs::File::create(&zst)?;
1145    out.write_all(&compressed)?;
1146    out.sync_data()?;
1147    drop(out);
1148    let _ = std::fs::remove_file(&plain);
1149    Ok(())
1150}
1151
1152// ---------------------------------------------------------------------------
1153// SIEM streaming (fire-and-forget)
1154// ---------------------------------------------------------------------------
1155
1156fn stream_post(url: &str, line: &str) {
1157    let url = url.to_string();
1158    let line = line.to_string();
1159    // Spawn a one-shot thread; ureq builds a fresh agent per call.
1160    // Best-effort: one attempt, no retry — SIEM ingestion lag is
1161    // not the RedDB hot path's problem.
1162    let _ = thread::Builder::new()
1163        .name("reddb-audit-siem".to_string())
1164        .spawn(move || {
1165            let agent: ureq::Agent = ureq::Agent::config_builder()
1166                .timeout_connect(Some(Duration::from_secs(2)))
1167                .timeout_send_request(Some(Duration::from_secs(3)))
1168                .timeout_recv_response(Some(Duration::from_secs(3)))
1169                .http_status_as_error(false)
1170                .build()
1171                .into();
1172            let _ = agent
1173                .post(&url)
1174                .header("content-type", "application/x-ndjson")
1175                .send(line.as_bytes());
1176        });
1177}
1178
1179// ---------------------------------------------------------------------------
1180// ISO-8601 helper (kept from the previous implementation)
1181// ---------------------------------------------------------------------------
1182
1183fn format_iso8601(ms_since_epoch: u64) -> String {
1184    let secs = ms_since_epoch / 1000;
1185    let ms = ms_since_epoch % 1000;
1186    let days = secs / 86_400;
1187    let rem = secs % 86_400;
1188    let (y, mo, d) = civil_from_days(days as i64);
1189    let h = rem / 3600;
1190    let mi = (rem % 3600) / 60;
1191    let s = rem % 60;
1192    format!(
1193        "{:04}-{:02}-{:02}T{:02}:{:02}:{:02}.{:03}Z",
1194        y, mo, d, h, mi, s, ms
1195    )
1196}
1197
1198fn civil_from_days(z: i64) -> (i64, u32, u32) {
1199    let z = z + 719_468;
1200    let era = if z >= 0 { z } else { z - 146_096 } / 146_097;
1201    let doe = (z - era * 146_097) as u64;
1202    let yoe = (doe - doe / 1460 + doe / 36_524 - doe / 146_096) / 365;
1203    let y = (yoe as i64) + era * 400;
1204    let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
1205    let mp = (5 * doy + 2) / 153;
1206    let d = (doy - (153 * mp + 2) / 5 + 1) as u32;
1207    let m = (if mp < 10 { mp + 3 } else { mp - 9 }) as u32;
1208    (if m <= 2 { y + 1 } else { y }, m, d)
1209}
1210
1211// ---------------------------------------------------------------------------
1212// Tests
1213// ---------------------------------------------------------------------------
1214
1215#[cfg(test)]
1216mod tests {
1217    use super::*;
1218
1219    fn temp_data_path(tag: &str) -> PathBuf {
1220        let mut p = std::env::temp_dir();
1221        p.push(format!(
1222            "reddb-audit-{}-{}-{}",
1223            tag,
1224            std::process::id(),
1225            crate::utils::now_unix_nanos()
1226        ));
1227        std::fs::create_dir_all(&p).unwrap();
1228        p.push("data.rdb");
1229        p
1230    }
1231
1232    fn drain(logger: &AuditLogger) {
1233        assert!(logger.wait_idle(Duration::from_secs(2)));
1234    }
1235
1236    #[test]
1237    fn record_writes_one_line_per_call() {
1238        let data = temp_data_path("one-line");
1239        let logger = AuditLogger::for_data_path(&data);
1240        logger.record(
1241            "admin/readonly",
1242            "operator",
1243            "instance",
1244            "ok",
1245            JsonValue::Null,
1246        );
1247        drain(&logger);
1248        let body = std::fs::read_to_string(logger.path()).unwrap();
1249        let lines: Vec<&str> = body.lines().collect();
1250        assert_eq!(lines.len(), 1);
1251        assert!(lines[0].contains("\"action\":\"admin/readonly\""));
1252        assert!(lines[0].contains("\"outcome\":\"success\""));
1253    }
1254
1255    #[test]
1256    fn record_appends_across_calls() {
1257        let data = temp_data_path("append");
1258        let logger = AuditLogger::for_data_path(&data);
1259        logger.record("admin/drain", "op", "instance", "ok", JsonValue::Null);
1260        logger.record("admin/shutdown", "op", "instance", "ok", JsonValue::Null);
1261        drain(&logger);
1262        let lines = std::fs::read_to_string(logger.path()).unwrap();
1263        assert_eq!(lines.lines().count(), 2);
1264    }
1265
1266    #[test]
1267    fn record_event_emits_full_schema() {
1268        let data = temp_data_path("schema");
1269        let logger = AuditLogger::for_data_path(&data);
1270        let mut detail = Map::new();
1271        detail.insert("ms".to_string(), JsonValue::Number(412.0));
1272        let ev = AuditEvent::builder("admin/shutdown")
1273            .principal("alice@acme")
1274            .source(AuditAuthSource::Session)
1275            .tenant("acme")
1276            .resource("instance")
1277            .outcome(Outcome::Success)
1278            .detail(JsonValue::Object(detail))
1279            .remote_addr("203.0.113.5")
1280            .correlation_id("req-42")
1281            .build();
1282        logger.record_event(ev);
1283        drain(&logger);
1284        let body = std::fs::read_to_string(logger.path()).unwrap();
1285        assert!(body.contains("\"action\":\"admin/shutdown\""));
1286        assert!(body.contains("\"principal\":\"alice@acme\""));
1287        assert!(body.contains("\"tenant\":\"acme\""));
1288        assert!(body.contains("\"source\":\"session\""));
1289        assert!(body.contains("\"correlation_id\":\"req-42\""));
1290        assert!(body.contains("\"remote_addr\":\"203.0.113.5\""));
1291        assert!(body.contains("\"event_id\":\""));
1292        assert!(body.contains("\"prev_hash\":") || body.lines().count() == 1);
1293    }
1294
1295    #[test]
1296    fn hash_chain_links_every_event() {
1297        let data = temp_data_path("chain");
1298        let logger = AuditLogger::for_data_path(&data);
1299        for i in 0..5 {
1300            logger.record_event(
1301                AuditEvent::builder(format!("test/event/{i}"))
1302                    .principal("tester")
1303                    .build(),
1304            );
1305        }
1306        drain(&logger);
1307        let body = std::fs::read_to_string(logger.path()).unwrap();
1308        let lines: Vec<&str> = body.lines().collect();
1309        assert_eq!(lines.len(), 5);
1310        let mut prev: Option<String> = None;
1311        for (idx, line) in lines.iter().enumerate() {
1312            let parsed: JsonValue = crate::json::from_str(line).unwrap();
1313            let stored_prev = parsed
1314                .get("prev_hash")
1315                .and_then(|v| v.as_str())
1316                .map(|s| s.to_string());
1317            assert_eq!(stored_prev, prev, "line {idx} prev_hash mismatch");
1318            prev = Some(crate::utils::to_hex(&sha256::sha256(line.as_bytes())));
1319        }
1320    }
1321
1322    #[test]
1323    fn legacy_record_back_compat_maps_outcomes() {
1324        let data = temp_data_path("legacy");
1325        let logger = AuditLogger::for_data_path(&data);
1326        logger.record(
1327            "admin/restore",
1328            "operator",
1329            "instance",
1330            "err: disk full",
1331            JsonValue::Null,
1332        );
1333        drain(&logger);
1334        let body = std::fs::read_to_string(logger.path()).unwrap();
1335        assert!(body.contains("\"outcome\":\"error\""));
1336        assert!(body.contains("\"result_text\":\"err: disk full\""));
1337    }
1338
1339    #[test]
1340    fn iso8601_formats_known_epoch() {
1341        assert_eq!(
1342            format_iso8601(1_709_210_096_789),
1343            "2024-02-29T12:34:56.789Z"
1344        );
1345    }
1346
1347    #[test]
1348    fn rotation_at_threshold() {
1349        let data = temp_data_path("rotate");
1350        let parent = data.parent().unwrap().to_path_buf();
1351        let logger = AuditLogger::with_max_bytes(parent.join(".audit.log"), 1024);
1352        for i in 0..30 {
1353            logger.record_event(
1354                AuditEvent::builder(format!("test/rotate/{i}"))
1355                    .principal("rotator")
1356                    .detail(JsonValue::String(
1357                        "lorem ipsum dolor sit amet consectetur padding padding padding"
1358                            .to_string(),
1359                    ))
1360                    .build(),
1361            );
1362        }
1363        drain(&logger);
1364        let parent = logger.path().parent().unwrap();
1365        let rotated: Vec<_> = std::fs::read_dir(parent)
1366            .unwrap()
1367            .filter_map(|e| e.ok())
1368            .filter(|e| {
1369                e.file_name()
1370                    .to_str()
1371                    .map(|n| n.starts_with(".audit.log.") && n.ends_with(".zst"))
1372                    .unwrap_or(false)
1373            })
1374            .collect();
1375        assert!(
1376            !rotated.is_empty(),
1377            "expected at least one rotated .zst file"
1378        );
1379    }
1380
1381    #[test]
1382    fn parse_line_round_trips() {
1383        let ev = AuditEvent::builder("auth/login.ok")
1384            .principal("alice")
1385            .source(AuditAuthSource::Password)
1386            .tenant("acme")
1387            .outcome(Outcome::Success)
1388            .build();
1389        let line = ev.to_json_line(None);
1390        let parsed = AuditEvent::parse_line(&line).expect("round-trip parse");
1391        assert_eq!(parsed.action, "auth/login.ok");
1392        assert_eq!(parsed.principal.as_deref(), Some("alice"));
1393        assert_eq!(parsed.tenant.as_deref(), Some("acme"));
1394        assert_eq!(parsed.outcome, Outcome::Success);
1395        assert_eq!(parsed.source, AuditAuthSource::Password);
1396    }
1397
1398    #[test]
1399    fn event_id_is_lexicographically_sortable_by_time() {
1400        let a = new_event_id();
1401        std::thread::sleep(Duration::from_millis(2));
1402        let b = new_event_id();
1403        assert!(a < b, "event_id ordering broken: {a} >= {b}");
1404    }
1405
1406    // -------------------------------------------------------------------
1407    // AuditFieldEscaper / AuditValue tests (issue #177)
1408    // -------------------------------------------------------------------
1409
1410    #[test]
1411    fn audit_field_escaper_typed_string() {
1412        let f = AuditFieldEscaper::field("collection", "users");
1413        assert_eq!(f.name(), "collection");
1414        match f.value() {
1415            AuditValue::String(s) => assert_eq!(s, "users"),
1416            other => panic!("expected String, got {:?}", other),
1417        }
1418    }
1419
1420    #[test]
1421    fn audit_field_escaper_bytes_emit_base64() {
1422        let bytes = vec![0xDEu8, 0xAD, 0xBE, 0xEF];
1423        let f = AuditFieldEscaper::field("blob", bytes);
1424        let ev = AuditEvent::builder("test/bytes").field(f).build();
1425        let line = ev.to_json_line(None);
1426        // base64 of 0xDEADBEEF is "3q2+7w=="
1427        assert!(
1428            line.contains("\"blob\":\"3q2+7w==\""),
1429            "expected base64 emission: {line}"
1430        );
1431    }
1432
1433    #[test]
1434    fn audit_field_escaper_number_bool_null() {
1435        let ev = AuditEvent::builder("test/types")
1436            .field(AuditFieldEscaper::field("count", 42i64))
1437            .field(AuditFieldEscaper::field("ok", true))
1438            .field(AuditFieldEscaper::field("missing", AuditValue::Null))
1439            .build();
1440        let line = ev.to_json_line(None);
1441        assert!(line.contains("\"count\":42"));
1442        assert!(line.contains("\"ok\":true"));
1443        assert!(line.contains("\"missing\":null"));
1444    }
1445
1446    #[test]
1447    fn audit_field_escaper_adversarial_corpus_preserves_structure() {
1448        // The full F-01 / F-02 adversarial corpus: CRLF, NUL,
1449        // quote, semicolon, JSON-in-JSON, control bytes 0x00..0x20.
1450        // Every payload must encode to a single JSONL row that
1451        // round-trips through the in-house parser back to the
1452        // original bytes.
1453        let cases: &[(&str, &str)] = &[
1454            ("crlf", "line1\r\nline2"),
1455            ("nul", "before\0after"),
1456            ("quote", "she said \"hi\""),
1457            ("semicolon", "a;b;c"),
1458            ("json_in_json", r#"{"injected":"yes"}"#),
1459            ("low_ctrl", "\x01\x02\x03\x07\x1f"),
1460            ("backslash", "C:\\path\\file"),
1461            ("mixed", "name=\"x\"\n\\path\t\x01end"),
1462            ("empty", ""),
1463            // Note: legal Unicode payloads round-trip through the
1464            // RFC 8259 encoder fine; the in-house parser
1465            // (`utils::json`) used by `crate::json::from_str` has a
1466            // pre-existing byte-oriented bug for multi-byte UTF-8
1467            // sequences, which is orthogonal to the F-01 / F-02
1468            // boundary-smuggling threat model. ASCII-only here.
1469        ];
1470        let mut survivors = 0usize;
1471        for (label, payload) in cases {
1472            let f = AuditFieldEscaper::field("user_input", *payload);
1473            let ev = AuditEvent::builder(format!("test/adv/{label}"))
1474                .principal("attacker")
1475                .field(f)
1476                .build();
1477            let line = ev.to_json_line(None);
1478            // 1. exactly one row, no embedded newline.
1479            assert!(
1480                !line.contains('\n'),
1481                "{label}: embedded newline in JSONL row: {line:?}"
1482            );
1483            // 2. parse back via the canonical decoder and read user_input.
1484            let parsed: JsonValue = crate::json::from_str(&line)
1485                .unwrap_or_else(|err| panic!("{label}: line did not parse: {err} :: {line:?}"));
1486            let detail = parsed.get("detail").expect("detail present");
1487            let recovered = detail.get("user_input").and_then(|v| v.as_str()).unwrap();
1488            assert_eq!(
1489                recovered, *payload,
1490                "{label}: round-trip mismatch: {recovered:?} != {payload:?}"
1491            );
1492            survivors += 1;
1493        }
1494        assert_eq!(
1495            survivors,
1496            cases.len(),
1497            "adversarial corpus survival rate: {survivors}/{}",
1498            cases.len()
1499        );
1500    }
1501
1502    #[test]
1503    fn audit_emission_emits_one_line_per_call_through_guard() {
1504        let data = temp_data_path("guard-emission");
1505        let logger = AuditLogger::for_data_path(&data);
1506        // Smuggle attempt: NUL + CRLF + JSON-injection in collection name.
1507        let attacker = "users\";DROP\r\n{\"x\":1}\0";
1508        logger.record_event(
1509            AuditEvent::builder("admin/scan")
1510                .principal("evil")
1511                .field(AuditFieldEscaper::field("collection", attacker))
1512                .build(),
1513        );
1514        drain(&logger);
1515        let body = std::fs::read_to_string(logger.path()).unwrap();
1516        let lines: Vec<&str> = body.lines().collect();
1517        assert_eq!(lines.len(), 1, "guard must emit exactly one JSONL row");
1518        // The smuggled "{...}" cannot have escaped the JSON string.
1519        let parsed: JsonValue = crate::json::from_str(lines[0]).unwrap();
1520        let recovered = parsed
1521            .get("detail")
1522            .and_then(|d| d.get("collection"))
1523            .and_then(|v| v.as_str())
1524            .unwrap();
1525        assert_eq!(recovered, attacker);
1526    }
1527
1528    #[test]
1529    fn audit_field_escaper_no_format_macro_in_value_path() {
1530        // Compile-time guarantee: AuditField construction goes
1531        // through AuditValue, not through `Display`. This test is
1532        // a documentation anchor — if a contributor adds a `Display`
1533        // impl that bypasses the typed value, the property tests
1534        // below still catch the smuggling, but this test makes the
1535        // intent explicit.
1536        let _ = AuditFieldEscaper::field("name", "value"); // compiles
1537                                                           // No `format!` path, no `to_string()` of attacker bytes:
1538                                                           // the only entry is `Into<AuditValue>`.
1539    }
1540
1541    #[test]
1542    fn audit_field_escaper_chains_via_builder_fields() {
1543        let ev = AuditEvent::builder("test/multi")
1544            .fields([
1545                AuditFieldEscaper::field("a", "x"),
1546                AuditFieldEscaper::field("b", 7i64),
1547                AuditFieldEscaper::field("c", true),
1548            ])
1549            .build();
1550        let line = ev.to_json_line(None);
1551        let parsed: JsonValue = crate::json::from_str(&line).unwrap();
1552        let d = parsed.get("detail").unwrap();
1553        assert_eq!(d.get("a").and_then(|v| v.as_str()), Some("x"));
1554        assert_eq!(d.get("b").and_then(|v| v.as_i64()), Some(7));
1555        assert_eq!(d.get("c").and_then(|v| v.as_bool()), Some(true));
1556    }
1557
1558    proptest::proptest! {
1559        /// Random user-supplied strings must always round-trip through
1560        /// the typed-field guard exactly. No silent drops, no smuggling.
1561        ///
1562        /// ASCII-only here: the encoder is RFC 8259 §7 compliant for
1563        /// arbitrary Unicode, but the in-house decoder shared by
1564        /// `crate::json::from_str` (`utils::json::parse_string`) is
1565        /// byte-oriented for non-escape characters and does not
1566        /// recompose multi-byte UTF-8 sequences. The boundary-smuggling
1567        /// threat model is about control bytes (0x00..0x20) and
1568        /// structural bytes (`"`, `\`, `\r`, `\n`), not legal Unicode,
1569        /// so this restriction does not narrow the security claim.
1570        #[test]
1571        fn prop_audit_field_round_trips_arbitrary_strings(
1572            payload in proptest::string::string_regex("[\\x00-\\x7f]{0,128}").unwrap()
1573        ) {
1574            let f = AuditFieldEscaper::field("p", payload.as_str());
1575            let ev = AuditEvent::builder("prop/test").field(f).build();
1576            let line = ev.to_json_line(None);
1577            // Single-line invariant.
1578            proptest::prop_assert!(!line.contains('\n'));
1579            let parsed: JsonValue = crate::json::from_str(&line)
1580                .expect("emission must always parse");
1581            let recovered = parsed
1582                .get("detail")
1583                .and_then(|d| d.get("p"))
1584                .and_then(|v| v.as_str())
1585                .unwrap();
1586            proptest::prop_assert_eq!(recovered, payload.as_str());
1587        }
1588
1589        /// Random byte sequences via base64 round-trip — non-UTF-8
1590        /// payloads must never enter the JSON string channel raw.
1591        #[test]
1592        fn prop_audit_field_bytes_base64_round_trip(
1593            bytes in proptest::collection::vec(proptest::bits::u8::ANY, 0..64)
1594        ) {
1595            let f = AuditFieldEscaper::field("b", bytes.clone());
1596            let ev = AuditEvent::builder("prop/bytes").field(f).build();
1597            let line = ev.to_json_line(None);
1598            proptest::prop_assert!(!line.contains('\n'));
1599            let parsed: JsonValue = crate::json::from_str(&line).unwrap();
1600            let recovered_b64 = parsed
1601                .get("detail")
1602                .and_then(|d| d.get("b"))
1603                .and_then(|v| v.as_str())
1604                .unwrap()
1605                .to_string();
1606            proptest::prop_assert_eq!(recovered_b64, base64_encode(&bytes));
1607        }
1608    }
1609}