Skip to main content

reddb_server/runtime/
audit_log.rs

1//! Structured audit log for SOC 2 / HIPAA / ISO 27001 deploys.
2//!
3//! Replaces the previous free-form `record(action, principal, target,
4//! result, details)` API with a stable JSON-Lines schema keyed on
5//! `event_id`, `ts`, `principal`, `tenant`, `action`, `resource`,
6//! `outcome`, `detail`, `remote_addr`, and `correlation_id` so external
7//! tooling (Splunk, Datadog HEC, ELK, BigQuery, Athena) can ingest the
8//! file without per-deploy regex.
9//!
10//! Operational properties:
11//!   * **Async-write**: emit sites push onto a bounded `std::sync::mpsc`
12//!     channel; a dedicated thread owns the file handle and flushes on
13//!     a periodic timer (default 250 ms) or per-event when
14//!     `RED_AUDIT_FSYNC=every`. If the channel fills the emit site
15//!     falls back to a direct sync write — losing throughput is
16//!     preferable to dropping audit lines.
17//!   * **Rotation**: when the active file exceeds `RED_AUDIT_MAX_BYTES`
18//!     (default 64 MiB) the writer renames it to
19//!     `.audit.log.<ms>.zst`, zstd-compresses it, and starts a fresh
20//!     active file. The repo already pulls `zstd`; we don't add a gzip
21//!     dependency.
22//!   * **Hash chain (tamper-evidence)**: each event carries a
23//!     `prev_hash` field — the sha256 of the previous JSON line. An
24//!     auditor verifying the file recomputes the chain; a single edit
25//!     anywhere in the file breaks every subsequent hash. Does **not**
26//!     defend against an attacker with `root + write` (they could
27//!     rebuild the chain), but it does defend against accidental edits
28//!     and most insider tampering.
29//!   * **SIEM streaming**: when `RED_AUDIT_STREAM_URL` is set every
30//!     line is also POSTed to that URL fire-and-forget.
31//!
32//! Pre-1.0: the file format breaks from the previous shape. Old
33//! `.audit.log` files are NOT readable by the new query endpoint.
34//! That's an accepted regression — operators upgrading should rotate
35//! the file before the deploy.
36
37use std::io::{BufWriter, Write};
38use std::path::{Path, PathBuf};
39use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
40use std::sync::{mpsc, Arc, Mutex};
41use std::thread::{self, JoinHandle};
42use std::time::{Duration, Instant};
43
44use crate::crypto::{os_random, sha256};
45use crate::json::{Map, Value as JsonValue};
46
47// ---------------------------------------------------------------------------
48// Schema
49// ---------------------------------------------------------------------------
50
51/// Auth pathway that produced the principal. Decoupled from
52/// `crate::auth::AuthSource` so we can record System / Anonymous /
53/// Session / ApiKey lanes that aren't surfaced by the runtime auth
54/// enum (which only covers Password / ClientCert / Oauth today).
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56pub enum AuditAuthSource {
57    ApiKey,
58    Session,
59    Password,
60    Oauth,
61    ClientCert,
62    Anonymous,
63    System,
64}
65
66impl AuditAuthSource {
67    pub fn as_str(self) -> &'static str {
68        match self {
69            Self::ApiKey => "api_key",
70            Self::Session => "session",
71            Self::Password => "password",
72            Self::Oauth => "oauth",
73            Self::ClientCert => "client_cert",
74            Self::Anonymous => "anonymous",
75            Self::System => "system",
76        }
77    }
78
79    pub fn parse(s: &str) -> Option<Self> {
80        Some(match s {
81            "api_key" => Self::ApiKey,
82            "session" => Self::Session,
83            "password" => Self::Password,
84            "oauth" => Self::Oauth,
85            "client_cert" => Self::ClientCert,
86            "anonymous" => Self::Anonymous,
87            "system" => Self::System,
88            _ => return None,
89        })
90    }
91}
92
93/// Outcome of the audited action.
94#[derive(Debug, Clone, Copy, PartialEq, Eq)]
95pub enum Outcome {
96    Success,
97    Denied,
98    Error,
99}
100
101impl Outcome {
102    pub fn as_str(self) -> &'static str {
103        match self {
104            Self::Success => "success",
105            Self::Denied => "denied",
106            Self::Error => "error",
107        }
108    }
109
110    pub fn parse(s: &str) -> Option<Self> {
111        Some(match s {
112            "success" | "ok" => Self::Success,
113            "denied" | "deny" => Self::Denied,
114            "error" | "err" => Self::Error,
115            _ => return None,
116        })
117    }
118}
119
120/// Structured audit event. Serialised as one JSONL row per call.
121#[derive(Debug, Clone)]
122pub struct AuditEvent {
123    pub ts: u128,
124    pub event_id: String,
125    pub principal: Option<String>,
126    pub source: AuditAuthSource,
127    pub tenant: Option<String>,
128    pub action: String,
129    pub resource: Option<String>,
130    pub outcome: Outcome,
131    pub detail: JsonValue,
132    pub remote_addr: Option<String>,
133    pub correlation_id: Option<String>,
134}
135
136impl AuditEvent {
137    /// Convenience builder. The caller fills the required fields and
138    /// chains the optional ones — keeps emit sites readable.
139    pub fn builder(action: impl Into<String>) -> AuditEventBuilder {
140        AuditEventBuilder {
141            inner: AuditEvent {
142                ts: crate::utils::now_unix_millis() as u128,
143                event_id: new_event_id(),
144                principal: None,
145                source: AuditAuthSource::System,
146                tenant: None,
147                action: action.into(),
148                resource: None,
149                outcome: Outcome::Success,
150                detail: JsonValue::Null,
151                remote_addr: None,
152                correlation_id: None,
153            },
154        }
155    }
156
157    /// Wrap a pre-structured-API call into the new schema. Used by the
158    /// back-compat `record(action, principal, target, result, details)`
159    /// path so existing emit sites keep compiling.
160    pub fn from_legacy(
161        action: &str,
162        principal: &str,
163        target: &str,
164        result: &str,
165        details: JsonValue,
166    ) -> Self {
167        let outcome = if result == "ok" {
168            Outcome::Success
169        } else if result.starts_with("err") {
170            Outcome::Error
171        } else if result.starts_with("denied") || result.starts_with("deny") {
172            Outcome::Denied
173        } else {
174            Outcome::Success
175        };
176        let mut detail = details;
177        if !result.is_empty() && result != "ok" {
178            // Preserve the human-readable result string for forensic
179            // continuity with pre-restructuring lines.
180            let mut obj = match detail {
181                JsonValue::Object(map) => map,
182                JsonValue::Null => Map::new(),
183                other => {
184                    let mut m = Map::new();
185                    m.insert("legacy".to_string(), other);
186                    m
187                }
188            };
189            obj.entry("result_text".to_string())
190                .or_insert(JsonValue::String(result.to_string()));
191            detail = JsonValue::Object(obj);
192        }
193        Self {
194            ts: crate::utils::now_unix_millis() as u128,
195            event_id: new_event_id(),
196            principal: if principal.is_empty() || principal == "system" {
197                None
198            } else {
199                Some(principal.to_string())
200            },
201            source: if principal == "system" {
202                AuditAuthSource::System
203            } else if principal.is_empty() {
204                AuditAuthSource::Anonymous
205            } else {
206                AuditAuthSource::Password
207            },
208            tenant: None,
209            action: action.to_string(),
210            resource: if target.is_empty() {
211                None
212            } else {
213                Some(target.to_string())
214            },
215            outcome,
216            detail,
217            remote_addr: None,
218            correlation_id: None,
219        }
220    }
221
222    /// Render to a single JSONL line, optionally seeded with a
223    /// `prev_hash` for the tamper-evident chain.
224    pub fn to_json_line(&self, prev_hash: Option<&str>) -> String {
225        let mut object = Map::new();
226        object.insert("ts".to_string(), JsonValue::Number(self.ts as f64));
227        object.insert(
228            "ts_iso".to_string(),
229            JsonValue::String(format_iso8601(self.ts as u64)),
230        );
231        object.insert(
232            "event_id".to_string(),
233            JsonValue::String(self.event_id.clone()),
234        );
235        if let Some(p) = &self.principal {
236            object.insert("principal".to_string(), JsonValue::String(p.clone()));
237        }
238        object.insert(
239            "source".to_string(),
240            JsonValue::String(self.source.as_str().to_string()),
241        );
242        if let Some(t) = &self.tenant {
243            object.insert("tenant".to_string(), JsonValue::String(t.clone()));
244        }
245        object.insert("action".to_string(), JsonValue::String(self.action.clone()));
246        if let Some(r) = &self.resource {
247            object.insert("resource".to_string(), JsonValue::String(r.clone()));
248        }
249        object.insert(
250            "outcome".to_string(),
251            JsonValue::String(self.outcome.as_str().to_string()),
252        );
253        if !matches!(self.detail, JsonValue::Null) {
254            object.insert("detail".to_string(), self.detail.clone());
255        }
256        if let Some(ip) = &self.remote_addr {
257            object.insert("remote_addr".to_string(), JsonValue::String(ip.clone()));
258        }
259        if let Some(cid) = &self.correlation_id {
260            object.insert("correlation_id".to_string(), JsonValue::String(cid.clone()));
261        }
262        if let Some(h) = prev_hash {
263            object.insert("prev_hash".to_string(), JsonValue::String(h.to_string()));
264        }
265        JsonValue::Object(object).to_string_compact()
266    }
267
268    /// Parse one JSONL line back into an `AuditEvent`. Returns `None`
269    /// for legacy lines (pre-restructuring) or malformed JSON; callers
270    /// in the query path use this to filter.
271    pub fn parse_line(line: &str) -> Option<Self> {
272        let v: JsonValue = crate::json::from_str(line).ok()?;
273        let action = v.get("action")?.as_str()?.to_string();
274        let outcome_s = v.get("outcome").and_then(|n| n.as_str()).unwrap_or("");
275        let outcome = Outcome::parse(outcome_s)?;
276        let ts = v.get("ts").and_then(|n| n.as_f64()).unwrap_or(0.0) as u128;
277        let event_id = v
278            .get("event_id")
279            .and_then(|n| n.as_str())
280            .unwrap_or("")
281            .to_string();
282        let source = v
283            .get("source")
284            .and_then(|n| n.as_str())
285            .and_then(AuditAuthSource::parse)
286            .unwrap_or(AuditAuthSource::System);
287        Some(Self {
288            ts,
289            event_id,
290            principal: v
291                .get("principal")
292                .and_then(|n| n.as_str())
293                .map(|s| s.to_string()),
294            source,
295            tenant: v
296                .get("tenant")
297                .and_then(|n| n.as_str())
298                .map(|s| s.to_string()),
299            action,
300            resource: v
301                .get("resource")
302                .and_then(|n| n.as_str())
303                .map(|s| s.to_string()),
304            outcome,
305            detail: v.get("detail").cloned().unwrap_or(JsonValue::Null),
306            remote_addr: v
307                .get("remote_addr")
308                .and_then(|n| n.as_str())
309                .map(|s| s.to_string()),
310            correlation_id: v
311                .get("correlation_id")
312                .and_then(|n| n.as_str())
313                .map(|s| s.to_string()),
314        })
315    }
316}
317
318/// Builder for `AuditEvent`. Generated by `AuditEvent::builder()`.
319pub struct AuditEventBuilder {
320    inner: AuditEvent,
321}
322
323impl AuditEventBuilder {
324    pub fn principal(mut self, principal: impl Into<String>) -> Self {
325        self.inner.principal = Some(principal.into());
326        self
327    }
328
329    pub fn principal_opt(mut self, principal: Option<String>) -> Self {
330        self.inner.principal = principal;
331        self
332    }
333
334    pub fn source(mut self, source: AuditAuthSource) -> Self {
335        self.inner.source = source;
336        self
337    }
338
339    pub fn tenant(mut self, tenant: impl Into<String>) -> Self {
340        self.inner.tenant = Some(tenant.into());
341        self
342    }
343
344    pub fn resource(mut self, resource: impl Into<String>) -> Self {
345        self.inner.resource = Some(resource.into());
346        self
347    }
348
349    pub fn outcome(mut self, outcome: Outcome) -> Self {
350        self.inner.outcome = outcome;
351        self
352    }
353
354    pub fn detail(mut self, detail: JsonValue) -> Self {
355        self.inner.detail = detail;
356        self
357    }
358
359    /// Add a single typed audit field to the detail object. Goes
360    /// through `AuditFieldEscaper` so caller-supplied bytes never
361    /// reach `format!` or string concatenation — see ADR 0010.
362    pub fn field(mut self, field: AuditField) -> Self {
363        let mut obj = match std::mem::replace(&mut self.inner.detail, JsonValue::Null) {
364            JsonValue::Object(map) => map,
365            JsonValue::Null => Map::new(),
366            other => {
367                // Preserve any preceding non-object detail under a
368                // reserved key rather than dropping it on the floor.
369                let mut m = Map::new();
370                m.insert("legacy_detail".to_string(), other);
371                m
372            }
373        };
374        obj.insert(field.name.to_string(), field.value.into_json_value());
375        self.inner.detail = JsonValue::Object(obj);
376        self
377    }
378
379    /// Bulk variant of [`Self::field`] for multi-field call sites.
380    pub fn fields(mut self, fields: impl IntoIterator<Item = AuditField>) -> Self {
381        for f in fields {
382            self = self.field(f);
383        }
384        self
385    }
386
387    pub fn remote_addr(mut self, addr: impl Into<String>) -> Self {
388        self.inner.remote_addr = Some(addr.into());
389        self
390    }
391
392    pub fn correlation_id(mut self, cid: impl Into<String>) -> Self {
393        self.inner.correlation_id = Some(cid.into());
394        self
395    }
396
397    pub fn build(self) -> AuditEvent {
398        self.inner
399    }
400}
401
402// ---------------------------------------------------------------------------
403// AuditFieldEscaper — typed-field guard (ADR 0010 / issue #177)
404// ---------------------------------------------------------------------------
405
406/// Typed value variants accepted by the audit-field guard. The
407/// serializer (`to_json_line`) owns the framing; an `AuditValue`
408/// cannot smuggle structural bytes past the canonical encoder
409/// (`crate::serde_json::Value::escape_string`, RFC 8259 §7) because
410/// the variant is consumed as a typed value, not as an interpolated
411/// string. Adversarial corpora (CRLF, NUL, quote, semicolon,
412/// JSON-in-JSON, control bytes 0x00..0x20) survive the boundary
413/// because the encoder emits `\u00XX` escapes for every byte below
414/// 0x20.
415#[derive(Debug, Clone)]
416pub enum AuditValue {
417    String(String),
418    /// Arbitrary bytes — base64-encoded on emit so non-UTF-8 payloads
419    /// (binary keys, raw frame bytes) never enter the JSON string
420    /// channel.
421    Bytes(Vec<u8>),
422    Number(i64),
423    Bool(bool),
424    Null,
425}
426
427impl AuditValue {
428    fn into_json_value(self) -> JsonValue {
429        match self {
430            AuditValue::String(s) => JsonValue::String(s),
431            AuditValue::Bytes(bytes) => JsonValue::String(base64_encode(&bytes)),
432            AuditValue::Number(n) => JsonValue::Number(n as f64),
433            AuditValue::Bool(b) => JsonValue::Bool(b),
434            AuditValue::Null => JsonValue::Null,
435        }
436    }
437}
438
439impl From<String> for AuditValue {
440    fn from(value: String) -> Self {
441        AuditValue::String(value)
442    }
443}
444
445impl From<&str> for AuditValue {
446    fn from(value: &str) -> Self {
447        AuditValue::String(value.to_string())
448    }
449}
450
451impl From<&String> for AuditValue {
452    fn from(value: &String) -> Self {
453        AuditValue::String(value.clone())
454    }
455}
456
457impl From<Vec<u8>> for AuditValue {
458    fn from(value: Vec<u8>) -> Self {
459        AuditValue::Bytes(value)
460    }
461}
462
463impl From<&[u8]> for AuditValue {
464    fn from(value: &[u8]) -> Self {
465        AuditValue::Bytes(value.to_vec())
466    }
467}
468
469impl From<i64> for AuditValue {
470    fn from(value: i64) -> Self {
471        AuditValue::Number(value)
472    }
473}
474
475impl From<u64> for AuditValue {
476    fn from(value: u64) -> Self {
477        // Saturate at i64::MAX rather than wrapping; audit fields
478        // are observability data, not arithmetic inputs.
479        AuditValue::Number(if value > i64::MAX as u64 {
480            i64::MAX
481        } else {
482            value as i64
483        })
484    }
485}
486
487impl From<u32> for AuditValue {
488    fn from(value: u32) -> Self {
489        AuditValue::Number(value as i64)
490    }
491}
492
493impl From<usize> for AuditValue {
494    fn from(value: usize) -> Self {
495        AuditValue::Number(if value > i64::MAX as usize {
496            i64::MAX
497        } else {
498            value as i64
499        })
500    }
501}
502
503impl From<bool> for AuditValue {
504    fn from(value: bool) -> Self {
505        AuditValue::Bool(value)
506    }
507}
508
509impl<T: Into<AuditValue>> From<Option<T>> for AuditValue {
510    fn from(value: Option<T>) -> Self {
511        match value {
512            Some(v) => v.into(),
513            None => AuditValue::Null,
514        }
515    }
516}
517
518/// A single typed audit field (key + typed value). Construction is
519/// gated by [`AuditFieldEscaper::field`] so the typed value cannot
520/// be bypassed — there is no `pub` constructor for the field that
521/// accepts a free-form string value.
522#[derive(Debug, Clone)]
523pub struct AuditField {
524    name: &'static str,
525    value: AuditValue,
526}
527
528impl AuditField {
529    pub fn name(&self) -> &'static str {
530        self.name
531    }
532    pub fn value(&self) -> &AuditValue {
533        &self.value
534    }
535}
536
537/// Typed-field guard for audit emission (ADR 0010).
538///
539/// The guard owns the boundary's escape contract: caller-supplied
540/// bytes always round-trip through a typed `AuditValue`, never
541/// through `format!`, never through string concatenation. The
542/// canonical JSON encoder is `crate::serde_json::Value::escape_string`
543/// (RFC 8259 §7 compliant after F-01 hotfix #181) which the
544/// `AuditEvent::to_json_line` serializer uses for every string slot.
545///
546/// The other two encoder paths in the codebase
547/// (`utils::json::JsonValue::write_json` and
548/// `grpc::scan_json::write_json_string`) are correct after #181 but
549/// not the canonical owner of the audit boundary. Both are marked
550/// `#[deprecated]` on the audit path and remain in place for
551/// non-audit call sites pending follow-up retirement.
552pub struct AuditFieldEscaper;
553
554impl AuditFieldEscaper {
555    /// Construct a typed audit field. The `name` is `'static` so
556    /// the schema key set is fixed at compile time — only `value`
557    /// can carry attacker-influenced bytes, and `value` is a typed
558    /// enum, not a string sink.
559    pub fn field(name: &'static str, value: impl Into<AuditValue>) -> AuditField {
560        AuditField {
561            name,
562            value: value.into(),
563        }
564    }
565}
566
567// ---------------------------------------------------------------------------
568// Base64 (audit-only, no external dep)
569// ---------------------------------------------------------------------------
570
571/// Standard base64 (RFC 4648 §4) encoder. Audit-only; we don't
572/// import the base64 crate from this slice.
573fn base64_encode(input: &[u8]) -> String {
574    const ALPHABET: &[u8; 64] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
575    let mut out = String::with_capacity(input.len().div_ceil(3) * 4);
576    let mut i = 0;
577    while i + 3 <= input.len() {
578        let b0 = input[i] as u32;
579        let b1 = input[i + 1] as u32;
580        let b2 = input[i + 2] as u32;
581        let n = (b0 << 16) | (b1 << 8) | b2;
582        out.push(ALPHABET[((n >> 18) & 0x3f) as usize] as char);
583        out.push(ALPHABET[((n >> 12) & 0x3f) as usize] as char);
584        out.push(ALPHABET[((n >> 6) & 0x3f) as usize] as char);
585        out.push(ALPHABET[(n & 0x3f) as usize] as char);
586        i += 3;
587    }
588    let rem = input.len() - i;
589    if rem == 1 {
590        let n = (input[i] as u32) << 16;
591        out.push(ALPHABET[((n >> 18) & 0x3f) as usize] as char);
592        out.push(ALPHABET[((n >> 12) & 0x3f) as usize] as char);
593        out.push('=');
594        out.push('=');
595    } else if rem == 2 {
596        let n = ((input[i] as u32) << 16) | ((input[i + 1] as u32) << 8);
597        out.push(ALPHABET[((n >> 18) & 0x3f) as usize] as char);
598        out.push(ALPHABET[((n >> 12) & 0x3f) as usize] as char);
599        out.push(ALPHABET[((n >> 6) & 0x3f) as usize] as char);
600        out.push('=');
601    }
602    out
603}
604
605// ---------------------------------------------------------------------------
606// Identifier
607// ---------------------------------------------------------------------------
608
609/// Crockford-base32 ULID-like ID: 10-char timestamp prefix + 16-char
610/// random suffix = 26 chars total. Ordered lexicographically by time.
611fn new_event_id() -> String {
612    const ALPHABET: &[u8; 32] = b"0123456789ABCDEFGHJKMNPQRSTVWXYZ";
613    let now_ms = crate::utils::now_unix_millis();
614    let mut rand_bytes = [0u8; 10];
615    let _ = os_random::fill_bytes(&mut rand_bytes);
616
617    let mut out = String::with_capacity(26);
618    // 48-bit timestamp -> 10 base32 chars (50 bits, top 2 are zero).
619    for i in (0..10).rev() {
620        let shift = (i as u32) * 5;
621        let idx = ((now_ms >> shift) & 0x1f) as usize;
622        out.push(ALPHABET[idx] as char);
623    }
624    // 80 random bits -> 16 base32 chars.
625    let mut acc: u128 = 0;
626    for &b in &rand_bytes {
627        acc = (acc << 8) | (b as u128);
628    }
629    for i in (0..16).rev() {
630        let shift = (i as u32) * 5;
631        let idx = ((acc >> shift) & 0x1f) as usize;
632        out.push(ALPHABET[idx] as char);
633    }
634    out
635}
636
637// ---------------------------------------------------------------------------
638// Fsync mode
639// ---------------------------------------------------------------------------
640
641#[derive(Debug, Clone, Copy, PartialEq, Eq)]
642enum FsyncMode {
643    /// Buffered, fsync only on rotation + shutdown.
644    Off,
645    /// fsync after every event. Safe for HIPAA strong-durability.
646    Every,
647    /// fsync every N ms (default 250) — compliance-appropriate
648    /// without tanking write throughput.
649    Periodic,
650}
651
652impl FsyncMode {
653    fn from_env() -> Self {
654        match std::env::var("RED_AUDIT_FSYNC")
655            .unwrap_or_default()
656            .to_ascii_lowercase()
657            .as_str()
658        {
659            "every" | "strong" | "on" => Self::Every,
660            "off" | "none" => Self::Off,
661            _ => Self::Periodic,
662        }
663    }
664}
665
666// ---------------------------------------------------------------------------
667// Channel message
668// ---------------------------------------------------------------------------
669
670#[allow(clippy::large_enum_variant)]
671enum WriterMsg {
672    Event(AuditEvent),
673    Flush(mpsc::Sender<()>),
674    Shutdown,
675}
676
677// ---------------------------------------------------------------------------
678// AuditLogger
679// ---------------------------------------------------------------------------
680
681#[derive(Debug)]
682pub struct AuditLogger {
683    path: PathBuf,
684    tx: Mutex<Option<mpsc::SyncSender<WriterMsg>>>,
685    /// Direct-write fallback mutex. Used when the writer thread isn't
686    /// running (in-memory tests) or the channel is full and the emit
687    /// site picks the sync path so the event can't be dropped.
688    fallback_lock: Mutex<()>,
689    /// Snapshot of the most recent line's sha256 for tamper-evidence.
690    /// Shared between the writer thread and the direct-write fallback
691    /// so the chain stays consistent across both paths.
692    last_hash: Arc<Mutex<Option<String>>>,
693    max_bytes: u64,
694    fsync_mode: FsyncMode,
695    stream_url: Option<String>,
696    /// Background-write flag; tests use `wait_idle()` to synchronise.
697    writer_alive: Arc<AtomicBool>,
698    pending: Arc<AtomicU64>,
699    handle: Mutex<Option<JoinHandle<()>>>,
700}
701
702impl AuditLogger {
703    /// Place the audit log next to the primary `.rdb` file so backup
704    /// + restore flows can ship it together.
705    pub fn for_data_path(data_path: &Path) -> Self {
706        Self::with_path(reddb_file::layout::legacy_audit_log_path(data_path))
707    }
708
709    /// Resolve a [`crate::storage::layout::LogDestination`] into a concrete
710    /// audit-log sink. `File(p)` writes to that exact path (Performance /
711    /// Max tier route here). `Stderr` and `Syslog` fall back to the legacy
712    /// `.audit.log` next to `fallback_data_path` — emit sites still get
713    /// durable storage while the dedicated stderr / syslog sinks are not
714    /// yet wired (ADR 0018 marks Syslog as a stub-and-warn lane).
715    pub fn for_destination(
716        dest: &crate::storage::layout::LogDestination,
717        fallback_data_path: &Path,
718    ) -> Self {
719        use crate::storage::layout::LogDestination;
720        match dest {
721            LogDestination::File(path) => Self::with_path(path.clone()),
722            LogDestination::Stderr => Self::for_data_path(fallback_data_path),
723            LogDestination::Syslog => {
724                tracing::warn!(
725                    target: "reddb::audit",
726                    "audit LogDestination::Syslog requested; sink not implemented, falling back to file next to data path"
727                );
728                Self::for_data_path(fallback_data_path)
729            }
730        }
731    }
732
733    /// Direct constructor primarily for tests that want a custom path.
734    pub fn with_path(path: PathBuf) -> Self {
735        let max_bytes = std::env::var("RED_AUDIT_MAX_BYTES")
736            .ok()
737            .and_then(|v| v.parse::<u64>().ok())
738            .unwrap_or(64 * 1024 * 1024);
739        let fsync_mode = FsyncMode::from_env();
740        let stream_url = std::env::var("RED_AUDIT_STREAM_URL")
741            .ok()
742            .filter(|s| !s.is_empty());
743        Self::with_settings(path, max_bytes, fsync_mode, stream_url)
744    }
745
746    /// Test/integration constructor with explicit settings — bypasses
747    /// env-var resolution so parallel tests don't race on `set_var`.
748    pub fn with_max_bytes(path: PathBuf, max_bytes: u64) -> Self {
749        Self::with_settings(path, max_bytes, FsyncMode::Periodic, None)
750    }
751
752    fn with_settings(
753        path: PathBuf,
754        max_bytes: u64,
755        fsync_mode: FsyncMode,
756        stream_url: Option<String>,
757    ) -> Self {
758        let writer_alive = Arc::new(AtomicBool::new(false));
759        let pending = Arc::new(AtomicU64::new(0));
760        // Seed the chain from the existing tail so a restart doesn't
761        // break tamper evidence.
762        let mut seed: Option<String> = None;
763        if let Ok(body) = std::fs::read_to_string(&path) {
764            if let Some(line) = body.lines().last() {
765                seed = Some(crate::utils::to_hex(&sha256::sha256(line.as_bytes())));
766            }
767        }
768        let last_hash = Arc::new(Mutex::new(seed));
769        let logger = Self {
770            path,
771            tx: Mutex::new(None),
772            fallback_lock: Mutex::new(()),
773            last_hash,
774            max_bytes,
775            fsync_mode,
776            stream_url,
777            writer_alive: Arc::clone(&writer_alive),
778            pending: Arc::clone(&pending),
779            handle: Mutex::new(None),
780        };
781        logger.start_writer_thread();
782        logger
783    }
784
785    pub fn path(&self) -> &Path {
786        &self.path
787    }
788
789    /// Active rotation threshold (bytes).
790    pub fn max_bytes(&self) -> u64 {
791        self.max_bytes
792    }
793
794    fn start_writer_thread(&self) {
795        let (tx, rx) = mpsc::sync_channel::<WriterMsg>(4096);
796        *self.tx.lock().unwrap_or_else(|e| e.into_inner()) = Some(tx);
797        let path = self.path.clone();
798        let max_bytes = self.max_bytes;
799        let fsync_mode = self.fsync_mode;
800        let stream_url = self.stream_url.clone();
801        let writer_alive = Arc::clone(&self.writer_alive);
802        let pending = Arc::clone(&self.pending);
803        let last_hash = Arc::clone(&self.last_hash);
804        writer_alive.store(true, Ordering::SeqCst);
805        let handle = thread::Builder::new()
806            .name("reddb-audit-writer".to_string())
807            .spawn(move || {
808                writer_loop(
809                    rx,
810                    path,
811                    max_bytes,
812                    fsync_mode,
813                    stream_url,
814                    writer_alive,
815                    pending,
816                    last_hash,
817                );
818            })
819            .expect("spawn audit writer thread");
820        *self.handle.lock().unwrap_or_else(|e| e.into_inner()) = Some(handle);
821    }
822
823    /// Latest tip-hash of the chain (for diagnostics + tests).
824    pub fn current_hash(&self) -> Option<String> {
825        self.last_hash
826            .lock()
827            .unwrap_or_else(|e| e.into_inner())
828            .clone()
829    }
830
831    /// Block until the writer thread has drained every pending event.
832    /// Tests use this to make assertions deterministic.
833    pub fn wait_idle(&self, timeout: Duration) -> bool {
834        let deadline = Instant::now() + timeout;
835        let tx_guard = self.tx.lock().unwrap_or_else(|e| e.into_inner());
836        if let Some(tx) = tx_guard.as_ref() {
837            let (back_tx, back_rx) = mpsc::channel();
838            if tx.send(WriterMsg::Flush(back_tx)).is_err() {
839                return false;
840            }
841            drop(tx_guard);
842            let remaining = deadline.saturating_duration_since(Instant::now());
843            return back_rx.recv_timeout(remaining).is_ok();
844        }
845        false
846    }
847
848    /// Back-compat record signature. Used by every existing emit site.
849    /// Wraps into the new schema and forwards to `record_event`.
850    pub fn record(
851        &self,
852        action: &str,
853        principal: &str,
854        target: &str,
855        result: &str,
856        details: JsonValue,
857    ) {
858        let event = AuditEvent::from_legacy(action, principal, target, result, details);
859        self.record_event(event);
860    }
861
862    /// Primary new entry point. Non-blocking when the channel has
863    /// capacity; falls back to a sync write when full or the writer
864    /// thread has shut down.
865    pub fn record_event(&self, event: AuditEvent) {
866        let tx_guard = self.tx.lock().unwrap_or_else(|e| e.into_inner());
867        let recovered_event: AuditEvent;
868        if let Some(tx) = tx_guard.as_ref() {
869            self.pending.fetch_add(1, Ordering::SeqCst);
870            match tx.try_send(WriterMsg::Event(event)) {
871                Ok(()) => return,
872                Err(mpsc::TrySendError::Full(msg)) => {
873                    self.pending.fetch_sub(1, Ordering::SeqCst);
874                    tracing::warn!(
875                        target: "reddb::audit",
876                        "audit channel saturated; falling back to sync write"
877                    );
878                    recovered_event = match msg {
879                        WriterMsg::Event(ev) => ev,
880                        _ => return,
881                    };
882                }
883                Err(mpsc::TrySendError::Disconnected(msg)) => {
884                    self.pending.fetch_sub(1, Ordering::SeqCst);
885                    recovered_event = match msg {
886                        WriterMsg::Event(ev) => ev,
887                        _ => return,
888                    };
889                }
890            }
891        } else {
892            recovered_event = event;
893        }
894        drop(tx_guard);
895        self.write_direct(recovered_event);
896    }
897
898    /// Fallback path: a direct, synchronous append. Used when the
899    /// background channel is full or the writer thread isn't running.
900    fn write_direct(&self, event: AuditEvent) {
901        let _g = self.fallback_lock.lock().unwrap_or_else(|e| e.into_inner());
902        let prev = self
903            .last_hash
904            .lock()
905            .unwrap_or_else(|e| e.into_inner())
906            .clone();
907        let line = event.to_json_line(prev.as_deref());
908        if let Err(err) = append_line_with_rotation(&self.path, &line, self.max_bytes) {
909            tracing::warn!(
910                target: "reddb::audit",
911                error = %err,
912                path = %self.path.display(),
913                "direct audit append failed"
914            );
915            return;
916        }
917        let new_hash = crate::utils::to_hex(&sha256::sha256(line.as_bytes()));
918        if let Ok(mut g) = self.last_hash.lock() {
919            *g = Some(new_hash);
920        }
921        if let Some(url) = &self.stream_url {
922            stream_post(url, &line);
923        }
924        tracing::info!(target: "reddb::audit", "{line}");
925    }
926}
927
928impl Drop for AuditLogger {
929    fn drop(&mut self) {
930        let mut tx_guard = self.tx.lock().unwrap_or_else(|e| e.into_inner());
931        if let Some(tx) = tx_guard.take() {
932            let _ = tx.send(WriterMsg::Shutdown);
933        }
934        drop(tx_guard);
935        if let Some(handle) = self.handle.lock().unwrap_or_else(|e| e.into_inner()).take() {
936            let _ = handle.join();
937        }
938    }
939}
940
941// ---------------------------------------------------------------------------
942// Writer thread
943// ---------------------------------------------------------------------------
944
945#[allow(clippy::too_many_arguments)]
946fn writer_loop(
947    rx: mpsc::Receiver<WriterMsg>,
948    path: PathBuf,
949    max_bytes: u64,
950    fsync_mode: FsyncMode,
951    stream_url: Option<String>,
952    writer_alive: Arc<AtomicBool>,
953    pending: Arc<AtomicU64>,
954    last_hash: Arc<Mutex<Option<String>>>,
955) {
956    if let Some(parent) = path.parent() {
957        if !parent.as_os_str().is_empty() {
958            let _ = std::fs::create_dir_all(parent);
959        }
960    }
961
962    let mut writer = match open_active(&path) {
963        Ok(w) => w,
964        Err(err) => {
965            tracing::error!(target: "reddb::audit", error = %err, "audit writer init failed");
966            writer_alive.store(false, Ordering::SeqCst);
967            return;
968        }
969    };
970    // Track size in-memory; BufWriter hides the on-disk size until
971    // flush, and we rotate on bytes-actually-written so a slow
972    // flush cadence doesn't run away.
973    let mut bytes_written: u64 = std::fs::metadata(&path).map(|m| m.len()).unwrap_or(0);
974
975    let periodic_interval = Duration::from_millis(250);
976    let mut last_flush = Instant::now();
977    let mut buffered_since_fsync: u64 = 0;
978
979    loop {
980        // Wake up periodically so the periodic-fsync mode can run even
981        // when no events arrive (compliance-driven).
982        let recv_timeout = match fsync_mode {
983            FsyncMode::Periodic => periodic_interval,
984            FsyncMode::Every | FsyncMode::Off => Duration::from_secs(1),
985        };
986        match rx.recv_timeout(recv_timeout) {
987            Ok(WriterMsg::Event(event)) => {
988                let prev = last_hash.lock().unwrap_or_else(|e| e.into_inner()).clone();
989                let line = event.to_json_line(prev.as_deref());
990
991                let line_bytes = line.len() as u64 + 1; // newline
992                if let Err(err) = write_line(&mut writer, &line) {
993                    tracing::warn!(
994                        target: "reddb::audit",
995                        error = %err,
996                        "audit write failed; reopening"
997                    );
998                    if let Ok(w2) = open_active(&path) {
999                        writer = w2;
1000                        let _ = write_line(&mut writer, &line);
1001                    }
1002                }
1003                bytes_written = bytes_written.saturating_add(line_bytes);
1004                let new_hash = crate::utils::to_hex(&sha256::sha256(line.as_bytes()));
1005                if let Ok(mut g) = last_hash.lock() {
1006                    *g = Some(new_hash);
1007                }
1008                if let Some(url) = &stream_url {
1009                    stream_post(url, &line);
1010                }
1011                tracing::info!(target: "reddb::audit", "{line}");
1012                pending.fetch_sub(1, Ordering::SeqCst);
1013                buffered_since_fsync += 1;
1014
1015                match fsync_mode {
1016                    FsyncMode::Every => {
1017                        let _ = writer.flush();
1018                        let _ = writer.get_ref().sync_data();
1019                        buffered_since_fsync = 0;
1020                    }
1021                    FsyncMode::Periodic => {
1022                        if last_flush.elapsed() >= periodic_interval {
1023                            let _ = writer.flush();
1024                            let _ = writer.get_ref().sync_data();
1025                            last_flush = Instant::now();
1026                            buffered_since_fsync = 0;
1027                        }
1028                    }
1029                    FsyncMode::Off => {}
1030                }
1031
1032                // Rotation check based on in-memory accounting; BufWriter
1033                // metadata can lag.
1034                if bytes_written >= max_bytes {
1035                    let _ = writer.flush();
1036                    let _ = writer.get_ref().sync_data();
1037                    if let Err(err) = rotate(&path) {
1038                        tracing::warn!(
1039                            target: "reddb::audit",
1040                            error = %err,
1041                            "audit rotation failed; continuing in-place"
1042                        );
1043                    }
1044                    match open_active(&path) {
1045                        Ok(w2) => writer = w2,
1046                        Err(err) => {
1047                            tracing::error!(
1048                                target: "reddb::audit",
1049                                error = %err,
1050                                "audit reopen failed after rotate"
1051                            );
1052                            break;
1053                        }
1054                    }
1055                    last_flush = Instant::now();
1056                    buffered_since_fsync = 0;
1057                    bytes_written = 0;
1058                }
1059            }
1060            Ok(WriterMsg::Flush(ack)) => {
1061                let _ = writer.flush();
1062                let _ = writer.get_ref().sync_data();
1063                last_flush = Instant::now();
1064                buffered_since_fsync = 0;
1065                // Acks are sent only after pending == 0; in this design
1066                // every event sent before Flush has already been
1067                // processed (channel is FIFO), so we can ack now.
1068                let _ = ack.send(());
1069            }
1070            Ok(WriterMsg::Shutdown) => {
1071                let _ = writer.flush();
1072                let _ = writer.get_ref().sync_data();
1073                break;
1074            }
1075            Err(mpsc::RecvTimeoutError::Timeout) => {
1076                if buffered_since_fsync > 0 {
1077                    let _ = writer.flush();
1078                    let _ = writer.get_ref().sync_data();
1079                    last_flush = Instant::now();
1080                    buffered_since_fsync = 0;
1081                }
1082            }
1083            Err(mpsc::RecvTimeoutError::Disconnected) => {
1084                let _ = writer.flush();
1085                let _ = writer.get_ref().sync_data();
1086                break;
1087            }
1088        }
1089    }
1090
1091    writer_alive.store(false, Ordering::SeqCst);
1092}
1093
1094fn open_active(path: &Path) -> std::io::Result<BufWriter<std::fs::File>> {
1095    if let Some(parent) = path.parent() {
1096        if !parent.as_os_str().is_empty() {
1097            std::fs::create_dir_all(parent)?;
1098        }
1099    }
1100    let f = std::fs::OpenOptions::new()
1101        .create(true)
1102        .append(true)
1103        .open(path)?;
1104    Ok(BufWriter::new(f))
1105}
1106
1107fn write_line(writer: &mut BufWriter<std::fs::File>, line: &str) -> std::io::Result<()> {
1108    writer.write_all(line.as_bytes())?;
1109    writer.write_all(b"\n")?;
1110    Ok(())
1111}
1112
1113fn append_line_with_rotation(path: &Path, line: &str, max_bytes: u64) -> std::io::Result<()> {
1114    if let Some(parent) = path.parent() {
1115        if !parent.as_os_str().is_empty() {
1116            std::fs::create_dir_all(parent)?;
1117        }
1118    }
1119    let mut file = std::fs::OpenOptions::new()
1120        .create(true)
1121        .append(true)
1122        .open(path)?;
1123    file.write_all(line.as_bytes())?;
1124    file.write_all(b"\n")?;
1125    file.sync_data()?;
1126    drop(file);
1127    if let Ok(meta) = std::fs::metadata(path) {
1128        if meta.len() >= max_bytes {
1129            let _ = rotate(path);
1130        }
1131    }
1132    Ok(())
1133}
1134
1135/// Rename the active file to `<path>.<ms>.zst` and zstd-compress it
1136/// in-place. The compressed file replaces the renamed plaintext copy
1137/// so the on-disk artefact is `.audit.log.<ms>.zst`.
1138///
1139/// Rotation timestamp uses unix nanos so back-to-back rotations
1140/// under load (or in a tight test loop) don't collide on the same
1141/// filename.
1142fn rotate(active: &Path) -> std::io::Result<()> {
1143    let ts = crate::utils::now_unix_nanos();
1144    let rotation = reddb_file::rotate_audit_log(active, ts)?;
1145    if let Some(err) = rotation.compression_error {
1146        tracing::warn!(
1147            target: "reddb::audit",
1148            error = %err,
1149            "audit rotation: zstd compress failed; leaving plaintext"
1150        );
1151    }
1152    Ok(())
1153}
1154
1155// ---------------------------------------------------------------------------
1156// SIEM streaming (fire-and-forget)
1157// ---------------------------------------------------------------------------
1158
1159fn stream_post(url: &str, line: &str) {
1160    let url = url.to_string();
1161    let line = line.to_string();
1162    // Spawn a one-shot thread; ureq builds a fresh agent per call.
1163    // Best-effort: one attempt, no retry — SIEM ingestion lag is
1164    // not the RedDB hot path's problem.
1165    let _ = thread::Builder::new()
1166        .name("reddb-audit-siem".to_string())
1167        .spawn(move || {
1168            let agent: ureq::Agent = ureq::Agent::config_builder()
1169                .timeout_connect(Some(Duration::from_secs(2)))
1170                .timeout_send_request(Some(Duration::from_secs(3)))
1171                .timeout_recv_response(Some(Duration::from_secs(3)))
1172                .http_status_as_error(false)
1173                .build()
1174                .into();
1175            let _ = agent
1176                .post(&url)
1177                .header("content-type", "application/x-ndjson")
1178                .send(line.as_bytes());
1179        });
1180}
1181
1182// ---------------------------------------------------------------------------
1183// ISO-8601 helper (kept from the previous implementation)
1184// ---------------------------------------------------------------------------
1185
1186fn format_iso8601(ms_since_epoch: u64) -> String {
1187    let secs = ms_since_epoch / 1000;
1188    let ms = ms_since_epoch % 1000;
1189    let days = secs / 86_400;
1190    let rem = secs % 86_400;
1191    let (y, mo, d) = civil_from_days(days as i64);
1192    let h = rem / 3600;
1193    let mi = (rem % 3600) / 60;
1194    let s = rem % 60;
1195    format!(
1196        "{:04}-{:02}-{:02}T{:02}:{:02}:{:02}.{:03}Z",
1197        y, mo, d, h, mi, s, ms
1198    )
1199}
1200
1201fn civil_from_days(z: i64) -> (i64, u32, u32) {
1202    let z = z + 719_468;
1203    let era = if z >= 0 { z } else { z - 146_096 } / 146_097;
1204    let doe = (z - era * 146_097) as u64;
1205    let yoe = (doe - doe / 1460 + doe / 36_524 - doe / 146_096) / 365;
1206    let y = (yoe as i64) + era * 400;
1207    let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
1208    let mp = (5 * doy + 2) / 153;
1209    let d = (doy - (153 * mp + 2) / 5 + 1) as u32;
1210    let m = (if mp < 10 { mp + 3 } else { mp - 9 }) as u32;
1211    (if m <= 2 { y + 1 } else { y }, m, d)
1212}
1213
1214// ---------------------------------------------------------------------------
1215// Tests
1216// ---------------------------------------------------------------------------
1217
1218#[cfg(test)]
1219mod tests {
1220    use super::*;
1221
1222    fn temp_data_path(tag: &str) -> PathBuf {
1223        let mut p = std::env::temp_dir();
1224        p.push(format!(
1225            "reddb-audit-{}-{}-{}",
1226            tag,
1227            std::process::id(),
1228            crate::utils::now_unix_nanos()
1229        ));
1230        std::fs::create_dir_all(&p).unwrap();
1231        p.push("data.rdb");
1232        p
1233    }
1234
1235    fn drain(logger: &AuditLogger) {
1236        assert!(logger.wait_idle(Duration::from_secs(2)));
1237    }
1238
1239    #[test]
1240    fn record_writes_one_line_per_call() {
1241        let data = temp_data_path("one-line");
1242        let logger = AuditLogger::for_data_path(&data);
1243        logger.record(
1244            "admin/readonly",
1245            "operator",
1246            "instance",
1247            "ok",
1248            JsonValue::Null,
1249        );
1250        drain(&logger);
1251        let body = std::fs::read_to_string(logger.path()).unwrap();
1252        let lines: Vec<&str> = body.lines().collect();
1253        assert_eq!(lines.len(), 1);
1254        assert!(lines[0].contains("\"action\":\"admin/readonly\""));
1255        assert!(lines[0].contains("\"outcome\":\"success\""));
1256    }
1257
1258    #[test]
1259    fn record_appends_across_calls() {
1260        let data = temp_data_path("append");
1261        let logger = AuditLogger::for_data_path(&data);
1262        logger.record("admin/drain", "op", "instance", "ok", JsonValue::Null);
1263        logger.record("admin/shutdown", "op", "instance", "ok", JsonValue::Null);
1264        drain(&logger);
1265        let lines = std::fs::read_to_string(logger.path()).unwrap();
1266        assert_eq!(lines.lines().count(), 2);
1267    }
1268
1269    #[test]
1270    fn record_event_emits_full_schema() {
1271        let data = temp_data_path("schema");
1272        let logger = AuditLogger::for_data_path(&data);
1273        let mut detail = Map::new();
1274        detail.insert("ms".to_string(), JsonValue::Number(412.0));
1275        let ev = AuditEvent::builder("admin/shutdown")
1276            .principal("alice@acme")
1277            .source(AuditAuthSource::Session)
1278            .tenant("acme")
1279            .resource("instance")
1280            .outcome(Outcome::Success)
1281            .detail(JsonValue::Object(detail))
1282            .remote_addr("203.0.113.5")
1283            .correlation_id("req-42")
1284            .build();
1285        logger.record_event(ev);
1286        drain(&logger);
1287        let body = std::fs::read_to_string(logger.path()).unwrap();
1288        assert!(body.contains("\"action\":\"admin/shutdown\""));
1289        assert!(body.contains("\"principal\":\"alice@acme\""));
1290        assert!(body.contains("\"tenant\":\"acme\""));
1291        assert!(body.contains("\"source\":\"session\""));
1292        assert!(body.contains("\"correlation_id\":\"req-42\""));
1293        assert!(body.contains("\"remote_addr\":\"203.0.113.5\""));
1294        assert!(body.contains("\"event_id\":\""));
1295        assert!(body.contains("\"prev_hash\":") || body.lines().count() == 1);
1296    }
1297
1298    #[test]
1299    fn hash_chain_links_every_event() {
1300        let data = temp_data_path("chain");
1301        let logger = AuditLogger::for_data_path(&data);
1302        for i in 0..5 {
1303            logger.record_event(
1304                AuditEvent::builder(format!("test/event/{i}"))
1305                    .principal("tester")
1306                    .build(),
1307            );
1308        }
1309        drain(&logger);
1310        let body = std::fs::read_to_string(logger.path()).unwrap();
1311        let lines: Vec<&str> = body.lines().collect();
1312        assert_eq!(lines.len(), 5);
1313        let mut prev: Option<String> = None;
1314        for (idx, line) in lines.iter().enumerate() {
1315            let parsed: JsonValue = crate::json::from_str(line).unwrap();
1316            let stored_prev = parsed
1317                .get("prev_hash")
1318                .and_then(|v| v.as_str())
1319                .map(|s| s.to_string());
1320            assert_eq!(stored_prev, prev, "line {idx} prev_hash mismatch");
1321            prev = Some(crate::utils::to_hex(&sha256::sha256(line.as_bytes())));
1322        }
1323    }
1324
1325    #[test]
1326    fn legacy_record_back_compat_maps_outcomes() {
1327        let data = temp_data_path("legacy");
1328        let logger = AuditLogger::for_data_path(&data);
1329        logger.record(
1330            "admin/restore",
1331            "operator",
1332            "instance",
1333            "err: disk full",
1334            JsonValue::Null,
1335        );
1336        drain(&logger);
1337        let body = std::fs::read_to_string(logger.path()).unwrap();
1338        assert!(body.contains("\"outcome\":\"error\""));
1339        assert!(body.contains("\"result_text\":\"err: disk full\""));
1340    }
1341
1342    #[test]
1343    fn iso8601_formats_known_epoch() {
1344        assert_eq!(
1345            format_iso8601(1_709_210_096_789),
1346            "2024-02-29T12:34:56.789Z"
1347        );
1348    }
1349
1350    #[test]
1351    fn rotation_at_threshold() {
1352        let data = temp_data_path("rotate");
1353        let parent = data.parent().unwrap().to_path_buf();
1354        let logger = AuditLogger::with_max_bytes(parent.join(".audit.log"), 1024);
1355        for i in 0..30 {
1356            logger.record_event(
1357                AuditEvent::builder(format!("test/rotate/{i}"))
1358                    .principal("rotator")
1359                    .detail(JsonValue::String(
1360                        "lorem ipsum dolor sit amet consectetur padding padding padding"
1361                            .to_string(),
1362                    ))
1363                    .build(),
1364            );
1365        }
1366        drain(&logger);
1367        let parent = logger.path().parent().unwrap();
1368        let rotated: Vec<_> = std::fs::read_dir(parent)
1369            .unwrap()
1370            .filter_map(|e| e.ok())
1371            .filter(|e| {
1372                e.file_name()
1373                    .to_str()
1374                    .map(|n| n.starts_with(".audit.log.") && n.ends_with(".zst"))
1375                    .unwrap_or(false)
1376            })
1377            .collect();
1378        assert!(
1379            !rotated.is_empty(),
1380            "expected at least one rotated .zst file"
1381        );
1382    }
1383
1384    #[test]
1385    fn parse_line_round_trips() {
1386        let ev = AuditEvent::builder("auth/login.ok")
1387            .principal("alice")
1388            .source(AuditAuthSource::Password)
1389            .tenant("acme")
1390            .outcome(Outcome::Success)
1391            .build();
1392        let line = ev.to_json_line(None);
1393        let parsed = AuditEvent::parse_line(&line).expect("round-trip parse");
1394        assert_eq!(parsed.action, "auth/login.ok");
1395        assert_eq!(parsed.principal.as_deref(), Some("alice"));
1396        assert_eq!(parsed.tenant.as_deref(), Some("acme"));
1397        assert_eq!(parsed.outcome, Outcome::Success);
1398        assert_eq!(parsed.source, AuditAuthSource::Password);
1399    }
1400
1401    #[test]
1402    fn event_id_is_lexicographically_sortable_by_time() {
1403        let a = new_event_id();
1404        std::thread::sleep(Duration::from_millis(2));
1405        let b = new_event_id();
1406        assert!(a < b, "event_id ordering broken: {a} >= {b}");
1407    }
1408
1409    // -------------------------------------------------------------------
1410    // AuditFieldEscaper / AuditValue tests (issue #177)
1411    // -------------------------------------------------------------------
1412
1413    #[test]
1414    fn audit_field_escaper_typed_string() {
1415        let f = AuditFieldEscaper::field("collection", "users");
1416        assert_eq!(f.name(), "collection");
1417        match f.value() {
1418            AuditValue::String(s) => assert_eq!(s, "users"),
1419            other => panic!("expected String, got {:?}", other),
1420        }
1421    }
1422
1423    #[test]
1424    fn audit_field_escaper_bytes_emit_base64() {
1425        let bytes = vec![0xDEu8, 0xAD, 0xBE, 0xEF];
1426        let f = AuditFieldEscaper::field("blob", bytes);
1427        let ev = AuditEvent::builder("test/bytes").field(f).build();
1428        let line = ev.to_json_line(None);
1429        // base64 of 0xDEADBEEF is "3q2+7w=="
1430        assert!(
1431            line.contains("\"blob\":\"3q2+7w==\""),
1432            "expected base64 emission: {line}"
1433        );
1434    }
1435
1436    #[test]
1437    fn audit_field_escaper_number_bool_null() {
1438        let ev = AuditEvent::builder("test/types")
1439            .field(AuditFieldEscaper::field("count", 42i64))
1440            .field(AuditFieldEscaper::field("ok", true))
1441            .field(AuditFieldEscaper::field("missing", AuditValue::Null))
1442            .build();
1443        let line = ev.to_json_line(None);
1444        assert!(line.contains("\"count\":42"));
1445        assert!(line.contains("\"ok\":true"));
1446        assert!(line.contains("\"missing\":null"));
1447    }
1448
1449    #[test]
1450    fn audit_field_escaper_adversarial_corpus_preserves_structure() {
1451        // The full F-01 / F-02 adversarial corpus: CRLF, NUL,
1452        // quote, semicolon, JSON-in-JSON, control bytes 0x00..0x20.
1453        // Every payload must encode to a single JSONL row that
1454        // round-trips through the in-house parser back to the
1455        // original bytes.
1456        let cases: &[(&str, &str)] = &[
1457            ("crlf", "line1\r\nline2"),
1458            ("nul", "before\0after"),
1459            ("quote", "she said \"hi\""),
1460            ("semicolon", "a;b;c"),
1461            ("json_in_json", r#"{"injected":"yes"}"#),
1462            ("low_ctrl", "\x01\x02\x03\x07\x1f"),
1463            ("backslash", "C:\\path\\file"),
1464            ("mixed", "name=\"x\"\n\\path\t\x01end"),
1465            ("empty", ""),
1466            // Note: legal Unicode payloads round-trip through the
1467            // RFC 8259 encoder fine; the in-house parser
1468            // (`utils::json`) used by `crate::json::from_str` has a
1469            // pre-existing byte-oriented bug for multi-byte UTF-8
1470            // sequences, which is orthogonal to the F-01 / F-02
1471            // boundary-smuggling threat model. ASCII-only here.
1472        ];
1473        let mut survivors = 0usize;
1474        for (label, payload) in cases {
1475            let f = AuditFieldEscaper::field("user_input", *payload);
1476            let ev = AuditEvent::builder(format!("test/adv/{label}"))
1477                .principal("attacker")
1478                .field(f)
1479                .build();
1480            let line = ev.to_json_line(None);
1481            // 1. exactly one row, no embedded newline.
1482            assert!(
1483                !line.contains('\n'),
1484                "{label}: embedded newline in JSONL row: {line:?}"
1485            );
1486            // 2. parse back via the canonical decoder and read user_input.
1487            let parsed: JsonValue = crate::json::from_str(&line)
1488                .unwrap_or_else(|err| panic!("{label}: line did not parse: {err} :: {line:?}"));
1489            let detail = parsed.get("detail").expect("detail present");
1490            let recovered = detail.get("user_input").and_then(|v| v.as_str()).unwrap();
1491            assert_eq!(
1492                recovered, *payload,
1493                "{label}: round-trip mismatch: {recovered:?} != {payload:?}"
1494            );
1495            survivors += 1;
1496        }
1497        assert_eq!(
1498            survivors,
1499            cases.len(),
1500            "adversarial corpus survival rate: {survivors}/{}",
1501            cases.len()
1502        );
1503    }
1504
1505    #[test]
1506    fn audit_emission_emits_one_line_per_call_through_guard() {
1507        let data = temp_data_path("guard-emission");
1508        let logger = AuditLogger::for_data_path(&data);
1509        // Smuggle attempt: NUL + CRLF + JSON-injection in collection name.
1510        let attacker = "users\";DROP\r\n{\"x\":1}\0";
1511        logger.record_event(
1512            AuditEvent::builder("admin/scan")
1513                .principal("evil")
1514                .field(AuditFieldEscaper::field("collection", attacker))
1515                .build(),
1516        );
1517        drain(&logger);
1518        let body = std::fs::read_to_string(logger.path()).unwrap();
1519        let lines: Vec<&str> = body.lines().collect();
1520        assert_eq!(lines.len(), 1, "guard must emit exactly one JSONL row");
1521        // The smuggled "{...}" cannot have escaped the JSON string.
1522        let parsed: JsonValue = crate::json::from_str(lines[0]).unwrap();
1523        let recovered = parsed
1524            .get("detail")
1525            .and_then(|d| d.get("collection"))
1526            .and_then(|v| v.as_str())
1527            .unwrap();
1528        assert_eq!(recovered, attacker);
1529    }
1530
1531    #[test]
1532    fn audit_field_escaper_no_format_macro_in_value_path() {
1533        // Compile-time guarantee: AuditField construction goes
1534        // through AuditValue, not through `Display`. This test is
1535        // a documentation anchor — if a contributor adds a `Display`
1536        // impl that bypasses the typed value, the property tests
1537        // below still catch the smuggling, but this test makes the
1538        // intent explicit.
1539        let _ = AuditFieldEscaper::field("name", "value"); // compiles
1540                                                           // No `format!` path, no `to_string()` of attacker bytes:
1541                                                           // the only entry is `Into<AuditValue>`.
1542    }
1543
1544    #[test]
1545    fn audit_field_escaper_chains_via_builder_fields() {
1546        let ev = AuditEvent::builder("test/multi")
1547            .fields([
1548                AuditFieldEscaper::field("a", "x"),
1549                AuditFieldEscaper::field("b", 7i64),
1550                AuditFieldEscaper::field("c", true),
1551            ])
1552            .build();
1553        let line = ev.to_json_line(None);
1554        let parsed: JsonValue = crate::json::from_str(&line).unwrap();
1555        let d = parsed.get("detail").unwrap();
1556        assert_eq!(d.get("a").and_then(|v| v.as_str()), Some("x"));
1557        assert_eq!(d.get("b").and_then(|v| v.as_i64()), Some(7));
1558        assert_eq!(d.get("c").and_then(|v| v.as_bool()), Some(true));
1559    }
1560
1561    proptest::proptest! {
1562        /// Random user-supplied strings must always round-trip through
1563        /// the typed-field guard exactly. No silent drops, no smuggling.
1564        ///
1565        /// ASCII-only here: the encoder is RFC 8259 §7 compliant for
1566        /// arbitrary Unicode, but the in-house decoder shared by
1567        /// `crate::json::from_str` (`utils::json::parse_string`) is
1568        /// byte-oriented for non-escape characters and does not
1569        /// recompose multi-byte UTF-8 sequences. The boundary-smuggling
1570        /// threat model is about control bytes (0x00..0x20) and
1571        /// structural bytes (`"`, `\`, `\r`, `\n`), not legal Unicode,
1572        /// so this restriction does not narrow the security claim.
1573        #[test]
1574        fn prop_audit_field_round_trips_arbitrary_strings(
1575            payload in proptest::string::string_regex("[\\x00-\\x7f]{0,128}").unwrap()
1576        ) {
1577            let f = AuditFieldEscaper::field("p", payload.as_str());
1578            let ev = AuditEvent::builder("prop/test").field(f).build();
1579            let line = ev.to_json_line(None);
1580            // Single-line invariant.
1581            proptest::prop_assert!(!line.contains('\n'));
1582            let parsed: JsonValue = crate::json::from_str(&line)
1583                .expect("emission must always parse");
1584            let recovered = parsed
1585                .get("detail")
1586                .and_then(|d| d.get("p"))
1587                .and_then(|v| v.as_str())
1588                .unwrap();
1589            proptest::prop_assert_eq!(recovered, payload.as_str());
1590        }
1591
1592        /// Random byte sequences via base64 round-trip — non-UTF-8
1593        /// payloads must never enter the JSON string channel raw.
1594        #[test]
1595        fn prop_audit_field_bytes_base64_round_trip(
1596            bytes in proptest::collection::vec(proptest::bits::u8::ANY, 0..64)
1597        ) {
1598            let f = AuditFieldEscaper::field("b", bytes.clone());
1599            let ev = AuditEvent::builder("prop/bytes").field(f).build();
1600            let line = ev.to_json_line(None);
1601            proptest::prop_assert!(!line.contains('\n'));
1602            let parsed: JsonValue = crate::json::from_str(&line).unwrap();
1603            let recovered_b64 = parsed
1604                .get("detail")
1605                .and_then(|d| d.get("b"))
1606                .and_then(|v| v.as_str())
1607                .unwrap()
1608                .to_string();
1609            proptest::prop_assert_eq!(recovered_b64, base64_encode(&bytes));
1610        }
1611    }
1612}