Skip to main content

reddb_server/runtime/
audit_log.rs

1//! Structured audit log for SOC 2 / HIPAA / ISO 27001 deploys.
2//!
3//! Replaces the previous free-form `record(action, principal, target,
4//! result, details)` API with a stable JSON-Lines schema keyed on
5//! `event_id`, `ts`, `principal`, `tenant`, `action`, `resource`,
6//! `outcome`, `detail`, `remote_addr`, and `correlation_id` so external
7//! tooling (Splunk, Datadog HEC, ELK, BigQuery, Athena) can ingest the
8//! file without per-deploy regex.
9//!
10//! Operational properties:
11//!   * **Async-write**: emit sites push onto a bounded `std::sync::mpsc`
12//!     channel; a dedicated thread owns the file handle and flushes on
13//!     a periodic timer (default 250 ms) or per-event when
14//!     `RED_AUDIT_FSYNC=every`. If the channel fills the emit site
15//!     falls back to a direct sync write — losing throughput is
16//!     preferable to dropping audit lines.
17//!   * **Rotation**: when the active file exceeds `RED_AUDIT_MAX_BYTES`
18//!     (default 64 MiB) the writer renames it to
19//!     `.audit.log.<ms>.zst`, zstd-compresses it, and starts a fresh
20//!     active file. The repo already pulls `zstd`; we don't add a gzip
21//!     dependency.
22//!   * **Hash chain (tamper-evidence)**: each event carries a
23//!     `prev_hash` field — the sha256 of the previous JSON line. An
24//!     auditor verifying the file recomputes the chain; a single edit
25//!     anywhere in the file breaks every subsequent hash. Does **not**
26//!     defend against an attacker with `root + write` (they could
27//!     rebuild the chain), but it does defend against accidental edits
28//!     and most insider tampering.
29//!   * **SIEM streaming**: when `RED_AUDIT_STREAM_URL` is set every
30//!     line is also POSTed to that URL fire-and-forget.
31//!
32//! Pre-1.0: the file format breaks from the previous shape. Old
33//! `.audit.log` files are NOT readable by the new query endpoint.
34//! That's an accepted regression — operators upgrading should rotate
35//! the file before the deploy.
36
37use std::io::{BufWriter, Write};
38use std::path::{Path, PathBuf};
39use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
40use std::sync::{mpsc, Arc, Mutex};
41use std::thread::{self, JoinHandle};
42use std::time::{Duration, Instant};
43
44use crate::crypto::{os_random, sha256};
45use crate::json::{Map, Value as JsonValue};
46
47// ---------------------------------------------------------------------------
48// Schema
49// ---------------------------------------------------------------------------
50
51/// Auth pathway that produced the principal. Decoupled from
52/// `crate::auth::AuthSource` so we can record System / Anonymous /
53/// Session / ApiKey lanes that aren't surfaced by the runtime auth
54/// enum (which only covers Password / ClientCert / Oauth today).
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56pub enum AuditAuthSource {
57    ApiKey,
58    Session,
59    Password,
60    Oauth,
61    ClientCert,
62    Anonymous,
63    System,
64}
65
66impl AuditAuthSource {
67    pub fn as_str(self) -> &'static str {
68        match self {
69            Self::ApiKey => "api_key",
70            Self::Session => "session",
71            Self::Password => "password",
72            Self::Oauth => "oauth",
73            Self::ClientCert => "client_cert",
74            Self::Anonymous => "anonymous",
75            Self::System => "system",
76        }
77    }
78
79    pub fn parse(s: &str) -> Option<Self> {
80        Some(match s {
81            "api_key" => Self::ApiKey,
82            "session" => Self::Session,
83            "password" => Self::Password,
84            "oauth" => Self::Oauth,
85            "client_cert" => Self::ClientCert,
86            "anonymous" => Self::Anonymous,
87            "system" => Self::System,
88            _ => return None,
89        })
90    }
91}
92
93/// Outcome of the audited action.
94#[derive(Debug, Clone, Copy, PartialEq, Eq)]
95pub enum Outcome {
96    Success,
97    Denied,
98    Error,
99}
100
101impl Outcome {
102    pub fn as_str(self) -> &'static str {
103        match self {
104            Self::Success => "success",
105            Self::Denied => "denied",
106            Self::Error => "error",
107        }
108    }
109
110    pub fn parse(s: &str) -> Option<Self> {
111        Some(match s {
112            "success" | "ok" => Self::Success,
113            "denied" | "deny" => Self::Denied,
114            "error" | "err" => Self::Error,
115            _ => return None,
116        })
117    }
118}
119
120/// Structured audit event. Serialised as one JSONL row per call.
121#[derive(Debug, Clone)]
122pub struct AuditEvent {
123    pub ts: u128,
124    pub event_id: String,
125    pub principal: Option<String>,
126    pub source: AuditAuthSource,
127    pub tenant: Option<String>,
128    pub action: String,
129    pub resource: Option<String>,
130    pub outcome: Outcome,
131    pub detail: JsonValue,
132    pub remote_addr: Option<String>,
133    pub correlation_id: Option<String>,
134}
135
136impl AuditEvent {
137    /// Convenience builder. The caller fills the required fields and
138    /// chains the optional ones — keeps emit sites readable.
139    pub fn builder(action: impl Into<String>) -> AuditEventBuilder {
140        AuditEventBuilder {
141            inner: AuditEvent {
142                ts: crate::utils::now_unix_millis() as u128,
143                event_id: new_event_id(),
144                principal: None,
145                source: AuditAuthSource::System,
146                tenant: None,
147                action: action.into(),
148                resource: None,
149                outcome: Outcome::Success,
150                detail: JsonValue::Null,
151                remote_addr: None,
152                correlation_id: None,
153            },
154        }
155    }
156
157    /// Wrap a pre-structured-API call into the new schema. Used by the
158    /// back-compat `record(action, principal, target, result, details)`
159    /// path so existing emit sites keep compiling.
160    pub fn from_legacy(
161        action: &str,
162        principal: &str,
163        target: &str,
164        result: &str,
165        details: JsonValue,
166    ) -> Self {
167        let outcome = if result == "ok" {
168            Outcome::Success
169        } else if result.starts_with("err") {
170            Outcome::Error
171        } else if result.starts_with("denied") || result.starts_with("deny") {
172            Outcome::Denied
173        } else {
174            Outcome::Success
175        };
176        let mut detail = details;
177        if !result.is_empty() && result != "ok" {
178            // Preserve the human-readable result string for forensic
179            // continuity with pre-restructuring lines.
180            let mut obj = match detail {
181                JsonValue::Object(map) => map,
182                JsonValue::Null => Map::new(),
183                other => {
184                    let mut m = Map::new();
185                    m.insert("legacy".to_string(), other);
186                    m
187                }
188            };
189            obj.entry("result_text".to_string())
190                .or_insert(JsonValue::String(result.to_string()));
191            detail = JsonValue::Object(obj);
192        }
193        Self {
194            ts: crate::utils::now_unix_millis() as u128,
195            event_id: new_event_id(),
196            principal: if principal.is_empty() || principal == "system" {
197                None
198            } else {
199                Some(principal.to_string())
200            },
201            source: if principal == "system" {
202                AuditAuthSource::System
203            } else if principal.is_empty() {
204                AuditAuthSource::Anonymous
205            } else {
206                AuditAuthSource::Password
207            },
208            tenant: None,
209            action: action.to_string(),
210            resource: if target.is_empty() {
211                None
212            } else {
213                Some(target.to_string())
214            },
215            outcome,
216            detail,
217            remote_addr: None,
218            correlation_id: None,
219        }
220    }
221
222    /// Render to a single JSONL line, optionally seeded with a
223    /// `prev_hash` for the tamper-evident chain.
224    pub fn to_json_line(&self, prev_hash: Option<&str>) -> String {
225        let mut object = Map::new();
226        object.insert("ts".to_string(), JsonValue::Number(self.ts as f64));
227        object.insert(
228            "ts_iso".to_string(),
229            JsonValue::String(format_iso8601(self.ts as u64)),
230        );
231        object.insert(
232            "event_id".to_string(),
233            JsonValue::String(self.event_id.clone()),
234        );
235        if let Some(p) = &self.principal {
236            object.insert("principal".to_string(), JsonValue::String(p.clone()));
237        }
238        object.insert(
239            "source".to_string(),
240            JsonValue::String(self.source.as_str().to_string()),
241        );
242        if let Some(t) = &self.tenant {
243            object.insert("tenant".to_string(), JsonValue::String(t.clone()));
244        }
245        object.insert("action".to_string(), JsonValue::String(self.action.clone()));
246        if let Some(r) = &self.resource {
247            object.insert("resource".to_string(), JsonValue::String(r.clone()));
248        }
249        object.insert(
250            "outcome".to_string(),
251            JsonValue::String(self.outcome.as_str().to_string()),
252        );
253        if !matches!(self.detail, JsonValue::Null) {
254            object.insert("detail".to_string(), self.detail.clone());
255        }
256        if let Some(ip) = &self.remote_addr {
257            object.insert("remote_addr".to_string(), JsonValue::String(ip.clone()));
258        }
259        if let Some(cid) = &self.correlation_id {
260            object.insert("correlation_id".to_string(), JsonValue::String(cid.clone()));
261        }
262        if let Some(h) = prev_hash {
263            object.insert("prev_hash".to_string(), JsonValue::String(h.to_string()));
264        }
265        JsonValue::Object(object).to_string_compact()
266    }
267
268    /// Parse one JSONL line back into an `AuditEvent`. Returns `None`
269    /// for legacy lines (pre-restructuring) or malformed JSON; callers
270    /// in the query path use this to filter.
271    pub fn parse_line(line: &str) -> Option<Self> {
272        let v: JsonValue = crate::json::from_str(line).ok()?;
273        let action = v.get("action")?.as_str()?.to_string();
274        let outcome_s = v.get("outcome").and_then(|n| n.as_str()).unwrap_or("");
275        let outcome = Outcome::parse(outcome_s)?;
276        let ts = v.get("ts").and_then(|n| n.as_f64()).unwrap_or(0.0) as u128;
277        let event_id = v
278            .get("event_id")
279            .and_then(|n| n.as_str())
280            .unwrap_or("")
281            .to_string();
282        let source = v
283            .get("source")
284            .and_then(|n| n.as_str())
285            .and_then(AuditAuthSource::parse)
286            .unwrap_or(AuditAuthSource::System);
287        Some(Self {
288            ts,
289            event_id,
290            principal: v
291                .get("principal")
292                .and_then(|n| n.as_str())
293                .map(|s| s.to_string()),
294            source,
295            tenant: v
296                .get("tenant")
297                .and_then(|n| n.as_str())
298                .map(|s| s.to_string()),
299            action,
300            resource: v
301                .get("resource")
302                .and_then(|n| n.as_str())
303                .map(|s| s.to_string()),
304            outcome,
305            detail: v.get("detail").cloned().unwrap_or(JsonValue::Null),
306            remote_addr: v
307                .get("remote_addr")
308                .and_then(|n| n.as_str())
309                .map(|s| s.to_string()),
310            correlation_id: v
311                .get("correlation_id")
312                .and_then(|n| n.as_str())
313                .map(|s| s.to_string()),
314        })
315    }
316}
317
318/// Builder for `AuditEvent`. Generated by `AuditEvent::builder()`.
319pub struct AuditEventBuilder {
320    inner: AuditEvent,
321}
322
323impl AuditEventBuilder {
324    pub fn principal(mut self, principal: impl Into<String>) -> Self {
325        self.inner.principal = Some(principal.into());
326        self
327    }
328
329    pub fn principal_opt(mut self, principal: Option<String>) -> Self {
330        self.inner.principal = principal;
331        self
332    }
333
334    pub fn source(mut self, source: AuditAuthSource) -> Self {
335        self.inner.source = source;
336        self
337    }
338
339    pub fn tenant(mut self, tenant: impl Into<String>) -> Self {
340        self.inner.tenant = Some(tenant.into());
341        self
342    }
343
344    pub fn resource(mut self, resource: impl Into<String>) -> Self {
345        self.inner.resource = Some(resource.into());
346        self
347    }
348
349    pub fn outcome(mut self, outcome: Outcome) -> Self {
350        self.inner.outcome = outcome;
351        self
352    }
353
354    pub fn detail(mut self, detail: JsonValue) -> Self {
355        self.inner.detail = detail;
356        self
357    }
358
359    /// Add a single typed audit field to the detail object. Goes
360    /// through `AuditFieldEscaper` so caller-supplied bytes never
361    /// reach `format!` or string concatenation — see ADR 0010.
362    pub fn field(mut self, field: AuditField) -> Self {
363        let mut obj = match std::mem::replace(&mut self.inner.detail, JsonValue::Null) {
364            JsonValue::Object(map) => map,
365            JsonValue::Null => Map::new(),
366            other => {
367                // Preserve any preceding non-object detail under a
368                // reserved key rather than dropping it on the floor.
369                let mut m = Map::new();
370                m.insert("legacy_detail".to_string(), other);
371                m
372            }
373        };
374        obj.insert(field.name.to_string(), field.value.into_json_value());
375        self.inner.detail = JsonValue::Object(obj);
376        self
377    }
378
379    /// Bulk variant of [`Self::field`] for multi-field call sites.
380    pub fn fields(mut self, fields: impl IntoIterator<Item = AuditField>) -> Self {
381        for f in fields {
382            self = self.field(f);
383        }
384        self
385    }
386
387    pub fn remote_addr(mut self, addr: impl Into<String>) -> Self {
388        self.inner.remote_addr = Some(addr.into());
389        self
390    }
391
392    pub fn correlation_id(mut self, cid: impl Into<String>) -> Self {
393        self.inner.correlation_id = Some(cid.into());
394        self
395    }
396
397    pub fn build(self) -> AuditEvent {
398        self.inner
399    }
400}
401
402// ---------------------------------------------------------------------------
403// AuditFieldEscaper — typed-field guard (ADR 0010 / issue #177)
404// ---------------------------------------------------------------------------
405
406/// Typed value variants accepted by the audit-field guard. The
407/// serializer (`to_json_line`) owns the framing; an `AuditValue`
408/// cannot smuggle structural bytes past the canonical encoder
409/// (`crate::serde_json::Value::escape_string`, RFC 8259 §7) because
410/// the variant is consumed as a typed value, not as an interpolated
411/// string. Adversarial corpora (CRLF, NUL, quote, semicolon,
412/// JSON-in-JSON, control bytes 0x00..0x20) survive the boundary
413/// because the encoder emits `\u00XX` escapes for every byte below
414/// 0x20.
415#[derive(Debug, Clone)]
416pub enum AuditValue {
417    String(String),
418    /// Arbitrary bytes — base64-encoded on emit so non-UTF-8 payloads
419    /// (binary keys, raw frame bytes) never enter the JSON string
420    /// channel.
421    Bytes(Vec<u8>),
422    Number(i64),
423    Bool(bool),
424    Null,
425}
426
427impl AuditValue {
428    fn into_json_value(self) -> JsonValue {
429        match self {
430            AuditValue::String(s) => JsonValue::String(s),
431            AuditValue::Bytes(bytes) => JsonValue::String(base64_encode(&bytes)),
432            AuditValue::Number(n) => JsonValue::Number(n as f64),
433            AuditValue::Bool(b) => JsonValue::Bool(b),
434            AuditValue::Null => JsonValue::Null,
435        }
436    }
437}
438
439impl From<String> for AuditValue {
440    fn from(value: String) -> Self {
441        AuditValue::String(value)
442    }
443}
444
445impl From<&str> for AuditValue {
446    fn from(value: &str) -> Self {
447        AuditValue::String(value.to_string())
448    }
449}
450
451impl From<&String> for AuditValue {
452    fn from(value: &String) -> Self {
453        AuditValue::String(value.clone())
454    }
455}
456
457impl From<Vec<u8>> for AuditValue {
458    fn from(value: Vec<u8>) -> Self {
459        AuditValue::Bytes(value)
460    }
461}
462
463impl From<&[u8]> for AuditValue {
464    fn from(value: &[u8]) -> Self {
465        AuditValue::Bytes(value.to_vec())
466    }
467}
468
469impl From<i64> for AuditValue {
470    fn from(value: i64) -> Self {
471        AuditValue::Number(value)
472    }
473}
474
475impl From<u64> for AuditValue {
476    fn from(value: u64) -> Self {
477        // Saturate at i64::MAX rather than wrapping; audit fields
478        // are observability data, not arithmetic inputs.
479        AuditValue::Number(if value > i64::MAX as u64 {
480            i64::MAX
481        } else {
482            value as i64
483        })
484    }
485}
486
487impl From<u32> for AuditValue {
488    fn from(value: u32) -> Self {
489        AuditValue::Number(value as i64)
490    }
491}
492
493impl From<usize> for AuditValue {
494    fn from(value: usize) -> Self {
495        AuditValue::Number(if value > i64::MAX as usize {
496            i64::MAX
497        } else {
498            value as i64
499        })
500    }
501}
502
503impl From<bool> for AuditValue {
504    fn from(value: bool) -> Self {
505        AuditValue::Bool(value)
506    }
507}
508
509impl<T: Into<AuditValue>> From<Option<T>> for AuditValue {
510    fn from(value: Option<T>) -> Self {
511        match value {
512            Some(v) => v.into(),
513            None => AuditValue::Null,
514        }
515    }
516}
517
518/// A single typed audit field (key + typed value). Construction is
519/// gated by [`AuditFieldEscaper::field`] so the typed value cannot
520/// be bypassed — there is no `pub` constructor for the field that
521/// accepts a free-form string value.
522#[derive(Debug, Clone)]
523pub struct AuditField {
524    name: &'static str,
525    value: AuditValue,
526}
527
528impl AuditField {
529    pub fn name(&self) -> &'static str {
530        self.name
531    }
532    pub fn value(&self) -> &AuditValue {
533        &self.value
534    }
535}
536
537/// Typed-field guard for audit emission (ADR 0010).
538///
539/// The guard owns the boundary's escape contract: caller-supplied
540/// bytes always round-trip through a typed `AuditValue`, never
541/// through `format!`, never through string concatenation. The
542/// canonical JSON encoder is `crate::serde_json::Value::escape_string`
543/// (RFC 8259 §7 compliant after F-01 hotfix #181) which the
544/// `AuditEvent::to_json_line` serializer uses for every string slot.
545///
546/// The other two encoder paths in the codebase
547/// (`utils::json::JsonValue::write_json` and
548/// `grpc::scan_json::write_json_string`) are correct after #181 but
549/// not the canonical owner of the audit boundary. Both are marked
550/// `#[deprecated]` on the audit path and remain in place for
551/// non-audit call sites pending follow-up retirement.
552pub struct AuditFieldEscaper;
553
554impl AuditFieldEscaper {
555    /// Construct a typed audit field. The `name` is `'static` so
556    /// the schema key set is fixed at compile time — only `value`
557    /// can carry attacker-influenced bytes, and `value` is a typed
558    /// enum, not a string sink.
559    pub fn field(name: &'static str, value: impl Into<AuditValue>) -> AuditField {
560        AuditField {
561            name,
562            value: value.into(),
563        }
564    }
565}
566
567// ---------------------------------------------------------------------------
568// Base64 (audit-only, no external dep)
569// ---------------------------------------------------------------------------
570
571/// Standard base64 (RFC 4648 §4) encoder. Audit-only; we don't
572/// import the base64 crate from this slice.
573fn base64_encode(input: &[u8]) -> String {
574    const ALPHABET: &[u8; 64] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
575    let mut out = String::with_capacity(input.len().div_ceil(3) * 4);
576    let mut i = 0;
577    while i + 3 <= input.len() {
578        let b0 = input[i] as u32;
579        let b1 = input[i + 1] as u32;
580        let b2 = input[i + 2] as u32;
581        let n = (b0 << 16) | (b1 << 8) | b2;
582        out.push(ALPHABET[((n >> 18) & 0x3f) as usize] as char);
583        out.push(ALPHABET[((n >> 12) & 0x3f) as usize] as char);
584        out.push(ALPHABET[((n >> 6) & 0x3f) as usize] as char);
585        out.push(ALPHABET[(n & 0x3f) as usize] as char);
586        i += 3;
587    }
588    let rem = input.len() - i;
589    if rem == 1 {
590        let n = (input[i] as u32) << 16;
591        out.push(ALPHABET[((n >> 18) & 0x3f) as usize] as char);
592        out.push(ALPHABET[((n >> 12) & 0x3f) as usize] as char);
593        out.push('=');
594        out.push('=');
595    } else if rem == 2 {
596        let n = ((input[i] as u32) << 16) | ((input[i + 1] as u32) << 8);
597        out.push(ALPHABET[((n >> 18) & 0x3f) as usize] as char);
598        out.push(ALPHABET[((n >> 12) & 0x3f) as usize] as char);
599        out.push(ALPHABET[((n >> 6) & 0x3f) as usize] as char);
600        out.push('=');
601    }
602    out
603}
604
605// ---------------------------------------------------------------------------
606// Identifier
607// ---------------------------------------------------------------------------
608
609/// Crockford-base32 ULID-like ID: 10-char timestamp prefix + 16-char
610/// random suffix = 26 chars total. Ordered lexicographically by time.
611fn new_event_id() -> String {
612    const ALPHABET: &[u8; 32] = b"0123456789ABCDEFGHJKMNPQRSTVWXYZ";
613    let now_ms = crate::utils::now_unix_millis();
614    let mut rand_bytes = [0u8; 10];
615    let _ = os_random::fill_bytes(&mut rand_bytes);
616
617    let mut out = String::with_capacity(26);
618    // 48-bit timestamp -> 10 base32 chars (50 bits, top 2 are zero).
619    for i in (0..10).rev() {
620        let shift = (i as u32) * 5;
621        let idx = ((now_ms >> shift) & 0x1f) as usize;
622        out.push(ALPHABET[idx] as char);
623    }
624    // 80 random bits -> 16 base32 chars.
625    let mut acc: u128 = 0;
626    for &b in &rand_bytes {
627        acc = (acc << 8) | (b as u128);
628    }
629    for i in (0..16).rev() {
630        let shift = (i as u32) * 5;
631        let idx = ((acc >> shift) & 0x1f) as usize;
632        out.push(ALPHABET[idx] as char);
633    }
634    out
635}
636
637// ---------------------------------------------------------------------------
638// Fsync mode
639// ---------------------------------------------------------------------------
640
641#[derive(Debug, Clone, Copy, PartialEq, Eq)]
642enum FsyncMode {
643    /// Buffered, fsync only on rotation + shutdown.
644    Off,
645    /// fsync after every event. Safe for HIPAA strong-durability.
646    Every,
647    /// fsync every N ms (default 250) — compliance-appropriate
648    /// without tanking write throughput.
649    Periodic,
650}
651
652impl FsyncMode {
653    fn from_env() -> Self {
654        match std::env::var("RED_AUDIT_FSYNC")
655            .unwrap_or_default()
656            .to_ascii_lowercase()
657            .as_str()
658        {
659            "every" | "strong" | "on" => Self::Every,
660            "off" | "none" => Self::Off,
661            _ => Self::Periodic,
662        }
663    }
664}
665
666// ---------------------------------------------------------------------------
667// Channel message
668// ---------------------------------------------------------------------------
669
670#[allow(clippy::large_enum_variant)]
671enum WriterMsg {
672    Event(AuditEvent),
673    Flush(mpsc::Sender<()>),
674    Shutdown,
675}
676
677// ---------------------------------------------------------------------------
678// AuditLogger
679// ---------------------------------------------------------------------------
680
681#[derive(Debug)]
682pub struct AuditLogger {
683    path: PathBuf,
684    tx: Mutex<Option<mpsc::SyncSender<WriterMsg>>>,
685    /// Direct-write fallback mutex. Used when the writer thread isn't
686    /// running (in-memory tests) or the channel is full and the emit
687    /// site picks the sync path so the event can't be dropped.
688    fallback_lock: Mutex<()>,
689    /// Snapshot of the most recent line's sha256 for tamper-evidence.
690    /// Shared between the writer thread and the direct-write fallback
691    /// so the chain stays consistent across both paths.
692    last_hash: Arc<Mutex<Option<String>>>,
693    max_bytes: u64,
694    fsync_mode: FsyncMode,
695    stream_url: Option<String>,
696    /// Background-write flag; tests use `wait_idle()` to synchronise.
697    writer_alive: Arc<AtomicBool>,
698    pending: Arc<AtomicU64>,
699    handle: Mutex<Option<JoinHandle<()>>>,
700}
701
702impl AuditLogger {
703    /// Place the audit log next to the primary `.rdb` file so backup
704    /// + restore flows can ship it together.
705    pub fn for_data_path(data_path: &Path) -> Self {
706        let parent = data_path.parent().unwrap_or_else(|| Path::new("."));
707        let path = parent.join(".audit.log");
708        Self::with_path(path)
709    }
710
711    /// Resolve a [`crate::storage::layout::LogDestination`] into a concrete
712    /// audit-log sink. `File(p)` writes to that exact path (Performance /
713    /// Max tier route here). `Stderr` and `Syslog` fall back to the legacy
714    /// `.audit.log` next to `fallback_data_path` — emit sites still get
715    /// durable storage while the dedicated stderr / syslog sinks are not
716    /// yet wired (ADR 0018 marks Syslog as a stub-and-warn lane).
717    pub fn for_destination(
718        dest: &crate::storage::layout::LogDestination,
719        fallback_data_path: &Path,
720    ) -> Self {
721        use crate::storage::layout::LogDestination;
722        match dest {
723            LogDestination::File(path) => Self::with_path(path.clone()),
724            LogDestination::Stderr => Self::for_data_path(fallback_data_path),
725            LogDestination::Syslog => {
726                tracing::warn!(
727                    target: "reddb::audit",
728                    "audit LogDestination::Syslog requested; sink not implemented, falling back to file next to data path"
729                );
730                Self::for_data_path(fallback_data_path)
731            }
732        }
733    }
734
735    /// Direct constructor primarily for tests that want a custom path.
736    pub fn with_path(path: PathBuf) -> Self {
737        let max_bytes = std::env::var("RED_AUDIT_MAX_BYTES")
738            .ok()
739            .and_then(|v| v.parse::<u64>().ok())
740            .unwrap_or(64 * 1024 * 1024);
741        let fsync_mode = FsyncMode::from_env();
742        let stream_url = std::env::var("RED_AUDIT_STREAM_URL")
743            .ok()
744            .filter(|s| !s.is_empty());
745        Self::with_settings(path, max_bytes, fsync_mode, stream_url)
746    }
747
748    /// Test/integration constructor with explicit settings — bypasses
749    /// env-var resolution so parallel tests don't race on `set_var`.
750    pub fn with_max_bytes(path: PathBuf, max_bytes: u64) -> Self {
751        Self::with_settings(path, max_bytes, FsyncMode::Periodic, None)
752    }
753
754    fn with_settings(
755        path: PathBuf,
756        max_bytes: u64,
757        fsync_mode: FsyncMode,
758        stream_url: Option<String>,
759    ) -> Self {
760        let writer_alive = Arc::new(AtomicBool::new(false));
761        let pending = Arc::new(AtomicU64::new(0));
762        // Seed the chain from the existing tail so a restart doesn't
763        // break tamper evidence.
764        let mut seed: Option<String> = None;
765        if let Ok(body) = std::fs::read_to_string(&path) {
766            if let Some(line) = body.lines().last() {
767                seed = Some(crate::utils::to_hex(&sha256::sha256(line.as_bytes())));
768            }
769        }
770        let last_hash = Arc::new(Mutex::new(seed));
771        let logger = Self {
772            path,
773            tx: Mutex::new(None),
774            fallback_lock: Mutex::new(()),
775            last_hash,
776            max_bytes,
777            fsync_mode,
778            stream_url,
779            writer_alive: Arc::clone(&writer_alive),
780            pending: Arc::clone(&pending),
781            handle: Mutex::new(None),
782        };
783        logger.start_writer_thread();
784        logger
785    }
786
787    pub fn path(&self) -> &Path {
788        &self.path
789    }
790
791    /// Active rotation threshold (bytes).
792    pub fn max_bytes(&self) -> u64 {
793        self.max_bytes
794    }
795
796    fn start_writer_thread(&self) {
797        let (tx, rx) = mpsc::sync_channel::<WriterMsg>(4096);
798        *self.tx.lock().unwrap_or_else(|e| e.into_inner()) = Some(tx);
799        let path = self.path.clone();
800        let max_bytes = self.max_bytes;
801        let fsync_mode = self.fsync_mode;
802        let stream_url = self.stream_url.clone();
803        let writer_alive = Arc::clone(&self.writer_alive);
804        let pending = Arc::clone(&self.pending);
805        let last_hash = Arc::clone(&self.last_hash);
806        writer_alive.store(true, Ordering::SeqCst);
807        let handle = thread::Builder::new()
808            .name("reddb-audit-writer".to_string())
809            .spawn(move || {
810                writer_loop(
811                    rx,
812                    path,
813                    max_bytes,
814                    fsync_mode,
815                    stream_url,
816                    writer_alive,
817                    pending,
818                    last_hash,
819                );
820            })
821            .expect("spawn audit writer thread");
822        *self.handle.lock().unwrap_or_else(|e| e.into_inner()) = Some(handle);
823    }
824
825    /// Latest tip-hash of the chain (for diagnostics + tests).
826    pub fn current_hash(&self) -> Option<String> {
827        self.last_hash
828            .lock()
829            .unwrap_or_else(|e| e.into_inner())
830            .clone()
831    }
832
833    /// Block until the writer thread has drained every pending event.
834    /// Tests use this to make assertions deterministic.
835    pub fn wait_idle(&self, timeout: Duration) -> bool {
836        let deadline = Instant::now() + timeout;
837        let tx_guard = self.tx.lock().unwrap_or_else(|e| e.into_inner());
838        if let Some(tx) = tx_guard.as_ref() {
839            let (back_tx, back_rx) = mpsc::channel();
840            if tx.send(WriterMsg::Flush(back_tx)).is_err() {
841                return false;
842            }
843            drop(tx_guard);
844            let remaining = deadline.saturating_duration_since(Instant::now());
845            return back_rx.recv_timeout(remaining).is_ok();
846        }
847        false
848    }
849
850    /// Back-compat record signature. Used by every existing emit site.
851    /// Wraps into the new schema and forwards to `record_event`.
852    pub fn record(
853        &self,
854        action: &str,
855        principal: &str,
856        target: &str,
857        result: &str,
858        details: JsonValue,
859    ) {
860        let event = AuditEvent::from_legacy(action, principal, target, result, details);
861        self.record_event(event);
862    }
863
864    /// Primary new entry point. Non-blocking when the channel has
865    /// capacity; falls back to a sync write when full or the writer
866    /// thread has shut down.
867    pub fn record_event(&self, event: AuditEvent) {
868        let tx_guard = self.tx.lock().unwrap_or_else(|e| e.into_inner());
869        let recovered_event: AuditEvent;
870        if let Some(tx) = tx_guard.as_ref() {
871            self.pending.fetch_add(1, Ordering::SeqCst);
872            match tx.try_send(WriterMsg::Event(event)) {
873                Ok(()) => return,
874                Err(mpsc::TrySendError::Full(msg)) => {
875                    self.pending.fetch_sub(1, Ordering::SeqCst);
876                    tracing::warn!(
877                        target: "reddb::audit",
878                        "audit channel saturated; falling back to sync write"
879                    );
880                    recovered_event = match msg {
881                        WriterMsg::Event(ev) => ev,
882                        _ => return,
883                    };
884                }
885                Err(mpsc::TrySendError::Disconnected(msg)) => {
886                    self.pending.fetch_sub(1, Ordering::SeqCst);
887                    recovered_event = match msg {
888                        WriterMsg::Event(ev) => ev,
889                        _ => return,
890                    };
891                }
892            }
893        } else {
894            recovered_event = event;
895        }
896        drop(tx_guard);
897        self.write_direct(recovered_event);
898    }
899
900    /// Fallback path: a direct, synchronous append. Used when the
901    /// background channel is full or the writer thread isn't running.
902    fn write_direct(&self, event: AuditEvent) {
903        let _g = self.fallback_lock.lock().unwrap_or_else(|e| e.into_inner());
904        let prev = self
905            .last_hash
906            .lock()
907            .unwrap_or_else(|e| e.into_inner())
908            .clone();
909        let line = event.to_json_line(prev.as_deref());
910        if let Err(err) = append_line_with_rotation(&self.path, &line, self.max_bytes) {
911            tracing::warn!(
912                target: "reddb::audit",
913                error = %err,
914                path = %self.path.display(),
915                "direct audit append failed"
916            );
917            return;
918        }
919        let new_hash = crate::utils::to_hex(&sha256::sha256(line.as_bytes()));
920        if let Ok(mut g) = self.last_hash.lock() {
921            *g = Some(new_hash);
922        }
923        if let Some(url) = &self.stream_url {
924            stream_post(url, &line);
925        }
926        tracing::info!(target: "reddb::audit", "{line}");
927    }
928}
929
930impl Drop for AuditLogger {
931    fn drop(&mut self) {
932        let mut tx_guard = self.tx.lock().unwrap_or_else(|e| e.into_inner());
933        if let Some(tx) = tx_guard.take() {
934            let _ = tx.send(WriterMsg::Shutdown);
935        }
936        drop(tx_guard);
937        if let Some(handle) = self.handle.lock().unwrap_or_else(|e| e.into_inner()).take() {
938            let _ = handle.join();
939        }
940    }
941}
942
943// ---------------------------------------------------------------------------
944// Writer thread
945// ---------------------------------------------------------------------------
946
947#[allow(clippy::too_many_arguments)]
948fn writer_loop(
949    rx: mpsc::Receiver<WriterMsg>,
950    path: PathBuf,
951    max_bytes: u64,
952    fsync_mode: FsyncMode,
953    stream_url: Option<String>,
954    writer_alive: Arc<AtomicBool>,
955    pending: Arc<AtomicU64>,
956    last_hash: Arc<Mutex<Option<String>>>,
957) {
958    if let Some(parent) = path.parent() {
959        if !parent.as_os_str().is_empty() {
960            let _ = std::fs::create_dir_all(parent);
961        }
962    }
963
964    let mut writer = match open_active(&path) {
965        Ok(w) => w,
966        Err(err) => {
967            tracing::error!(target: "reddb::audit", error = %err, "audit writer init failed");
968            writer_alive.store(false, Ordering::SeqCst);
969            return;
970        }
971    };
972    // Track size in-memory; BufWriter hides the on-disk size until
973    // flush, and we rotate on bytes-actually-written so a slow
974    // flush cadence doesn't run away.
975    let mut bytes_written: u64 = std::fs::metadata(&path).map(|m| m.len()).unwrap_or(0);
976
977    let periodic_interval = Duration::from_millis(250);
978    let mut last_flush = Instant::now();
979    let mut buffered_since_fsync: u64 = 0;
980
981    loop {
982        // Wake up periodically so the periodic-fsync mode can run even
983        // when no events arrive (compliance-driven).
984        let recv_timeout = match fsync_mode {
985            FsyncMode::Periodic => periodic_interval,
986            FsyncMode::Every | FsyncMode::Off => Duration::from_secs(1),
987        };
988        match rx.recv_timeout(recv_timeout) {
989            Ok(WriterMsg::Event(event)) => {
990                let prev = last_hash.lock().unwrap_or_else(|e| e.into_inner()).clone();
991                let line = event.to_json_line(prev.as_deref());
992
993                let line_bytes = line.len() as u64 + 1; // newline
994                if let Err(err) = write_line(&mut writer, &line) {
995                    tracing::warn!(
996                        target: "reddb::audit",
997                        error = %err,
998                        "audit write failed; reopening"
999                    );
1000                    if let Ok(w2) = open_active(&path) {
1001                        writer = w2;
1002                        let _ = write_line(&mut writer, &line);
1003                    }
1004                }
1005                bytes_written = bytes_written.saturating_add(line_bytes);
1006                let new_hash = crate::utils::to_hex(&sha256::sha256(line.as_bytes()));
1007                if let Ok(mut g) = last_hash.lock() {
1008                    *g = Some(new_hash);
1009                }
1010                if let Some(url) = &stream_url {
1011                    stream_post(url, &line);
1012                }
1013                tracing::info!(target: "reddb::audit", "{line}");
1014                pending.fetch_sub(1, Ordering::SeqCst);
1015                buffered_since_fsync += 1;
1016
1017                match fsync_mode {
1018                    FsyncMode::Every => {
1019                        let _ = writer.flush();
1020                        let _ = writer.get_ref().sync_data();
1021                        buffered_since_fsync = 0;
1022                    }
1023                    FsyncMode::Periodic => {
1024                        if last_flush.elapsed() >= periodic_interval {
1025                            let _ = writer.flush();
1026                            let _ = writer.get_ref().sync_data();
1027                            last_flush = Instant::now();
1028                            buffered_since_fsync = 0;
1029                        }
1030                    }
1031                    FsyncMode::Off => {}
1032                }
1033
1034                // Rotation check based on in-memory accounting; BufWriter
1035                // metadata can lag.
1036                if bytes_written >= max_bytes {
1037                    let _ = writer.flush();
1038                    let _ = writer.get_ref().sync_data();
1039                    if let Err(err) = rotate(&path) {
1040                        tracing::warn!(
1041                            target: "reddb::audit",
1042                            error = %err,
1043                            "audit rotation failed; continuing in-place"
1044                        );
1045                    }
1046                    match open_active(&path) {
1047                        Ok(w2) => writer = w2,
1048                        Err(err) => {
1049                            tracing::error!(
1050                                target: "reddb::audit",
1051                                error = %err,
1052                                "audit reopen failed after rotate"
1053                            );
1054                            break;
1055                        }
1056                    }
1057                    last_flush = Instant::now();
1058                    buffered_since_fsync = 0;
1059                    bytes_written = 0;
1060                }
1061            }
1062            Ok(WriterMsg::Flush(ack)) => {
1063                let _ = writer.flush();
1064                let _ = writer.get_ref().sync_data();
1065                last_flush = Instant::now();
1066                buffered_since_fsync = 0;
1067                // Acks are sent only after pending == 0; in this design
1068                // every event sent before Flush has already been
1069                // processed (channel is FIFO), so we can ack now.
1070                let _ = ack.send(());
1071            }
1072            Ok(WriterMsg::Shutdown) => {
1073                let _ = writer.flush();
1074                let _ = writer.get_ref().sync_data();
1075                break;
1076            }
1077            Err(mpsc::RecvTimeoutError::Timeout) => {
1078                if buffered_since_fsync > 0 {
1079                    let _ = writer.flush();
1080                    let _ = writer.get_ref().sync_data();
1081                    last_flush = Instant::now();
1082                    buffered_since_fsync = 0;
1083                }
1084            }
1085            Err(mpsc::RecvTimeoutError::Disconnected) => {
1086                let _ = writer.flush();
1087                let _ = writer.get_ref().sync_data();
1088                break;
1089            }
1090        }
1091    }
1092
1093    writer_alive.store(false, Ordering::SeqCst);
1094}
1095
1096fn open_active(path: &Path) -> std::io::Result<BufWriter<std::fs::File>> {
1097    if let Some(parent) = path.parent() {
1098        if !parent.as_os_str().is_empty() {
1099            std::fs::create_dir_all(parent)?;
1100        }
1101    }
1102    let f = std::fs::OpenOptions::new()
1103        .create(true)
1104        .append(true)
1105        .open(path)?;
1106    Ok(BufWriter::new(f))
1107}
1108
1109fn write_line(writer: &mut BufWriter<std::fs::File>, line: &str) -> std::io::Result<()> {
1110    writer.write_all(line.as_bytes())?;
1111    writer.write_all(b"\n")?;
1112    Ok(())
1113}
1114
1115fn append_line_with_rotation(path: &Path, line: &str, max_bytes: u64) -> std::io::Result<()> {
1116    if let Some(parent) = path.parent() {
1117        if !parent.as_os_str().is_empty() {
1118            std::fs::create_dir_all(parent)?;
1119        }
1120    }
1121    let mut file = std::fs::OpenOptions::new()
1122        .create(true)
1123        .append(true)
1124        .open(path)?;
1125    file.write_all(line.as_bytes())?;
1126    file.write_all(b"\n")?;
1127    file.sync_data()?;
1128    drop(file);
1129    if let Ok(meta) = std::fs::metadata(path) {
1130        if meta.len() >= max_bytes {
1131            let _ = rotate(path);
1132        }
1133    }
1134    Ok(())
1135}
1136
1137/// Rename the active file to `<path>.<ms>.zst` and zstd-compress it
1138/// in-place. The compressed file replaces the renamed plaintext copy
1139/// so the on-disk artefact is `.audit.log.<ms>.zst`.
1140///
1141/// Rotation timestamp uses unix nanos so back-to-back rotations
1142/// under load (or in a tight test loop) don't collide on the same
1143/// filename.
1144fn rotate(active: &Path) -> std::io::Result<()> {
1145    let ts = crate::utils::now_unix_nanos();
1146    let stem = active
1147        .file_name()
1148        .and_then(|s| s.to_str())
1149        .unwrap_or(".audit.log");
1150    let parent = active.parent().unwrap_or_else(|| Path::new("."));
1151    let plain = parent.join(format!("{stem}.{ts}"));
1152    std::fs::rename(active, &plain)?;
1153    let raw = std::fs::read(&plain)?;
1154    let compressed = match zstd::bulk::compress(&raw, 3) {
1155        Ok(c) => c,
1156        Err(err) => {
1157            // Compression failed: leave the rotated file uncompressed
1158            // rather than lose audit data.
1159            tracing::warn!(
1160                target: "reddb::audit",
1161                error = %err,
1162                "audit rotation: zstd compress failed; leaving plaintext"
1163            );
1164            return Ok(());
1165        }
1166    };
1167    let zst = parent.join(format!("{stem}.{ts}.zst"));
1168    let mut out = std::fs::File::create(&zst)?;
1169    out.write_all(&compressed)?;
1170    out.sync_data()?;
1171    drop(out);
1172    let _ = std::fs::remove_file(&plain);
1173    Ok(())
1174}
1175
1176// ---------------------------------------------------------------------------
1177// SIEM streaming (fire-and-forget)
1178// ---------------------------------------------------------------------------
1179
1180fn stream_post(url: &str, line: &str) {
1181    let url = url.to_string();
1182    let line = line.to_string();
1183    // Spawn a one-shot thread; ureq builds a fresh agent per call.
1184    // Best-effort: one attempt, no retry — SIEM ingestion lag is
1185    // not the RedDB hot path's problem.
1186    let _ = thread::Builder::new()
1187        .name("reddb-audit-siem".to_string())
1188        .spawn(move || {
1189            let agent: ureq::Agent = ureq::Agent::config_builder()
1190                .timeout_connect(Some(Duration::from_secs(2)))
1191                .timeout_send_request(Some(Duration::from_secs(3)))
1192                .timeout_recv_response(Some(Duration::from_secs(3)))
1193                .http_status_as_error(false)
1194                .build()
1195                .into();
1196            let _ = agent
1197                .post(&url)
1198                .header("content-type", "application/x-ndjson")
1199                .send(line.as_bytes());
1200        });
1201}
1202
1203// ---------------------------------------------------------------------------
1204// ISO-8601 helper (kept from the previous implementation)
1205// ---------------------------------------------------------------------------
1206
1207fn format_iso8601(ms_since_epoch: u64) -> String {
1208    let secs = ms_since_epoch / 1000;
1209    let ms = ms_since_epoch % 1000;
1210    let days = secs / 86_400;
1211    let rem = secs % 86_400;
1212    let (y, mo, d) = civil_from_days(days as i64);
1213    let h = rem / 3600;
1214    let mi = (rem % 3600) / 60;
1215    let s = rem % 60;
1216    format!(
1217        "{:04}-{:02}-{:02}T{:02}:{:02}:{:02}.{:03}Z",
1218        y, mo, d, h, mi, s, ms
1219    )
1220}
1221
1222fn civil_from_days(z: i64) -> (i64, u32, u32) {
1223    let z = z + 719_468;
1224    let era = if z >= 0 { z } else { z - 146_096 } / 146_097;
1225    let doe = (z - era * 146_097) as u64;
1226    let yoe = (doe - doe / 1460 + doe / 36_524 - doe / 146_096) / 365;
1227    let y = (yoe as i64) + era * 400;
1228    let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
1229    let mp = (5 * doy + 2) / 153;
1230    let d = (doy - (153 * mp + 2) / 5 + 1) as u32;
1231    let m = (if mp < 10 { mp + 3 } else { mp - 9 }) as u32;
1232    (if m <= 2 { y + 1 } else { y }, m, d)
1233}
1234
1235// ---------------------------------------------------------------------------
1236// Tests
1237// ---------------------------------------------------------------------------
1238
1239#[cfg(test)]
1240mod tests {
1241    use super::*;
1242
1243    fn temp_data_path(tag: &str) -> PathBuf {
1244        let mut p = std::env::temp_dir();
1245        p.push(format!(
1246            "reddb-audit-{}-{}-{}",
1247            tag,
1248            std::process::id(),
1249            crate::utils::now_unix_nanos()
1250        ));
1251        std::fs::create_dir_all(&p).unwrap();
1252        p.push("data.rdb");
1253        p
1254    }
1255
1256    fn drain(logger: &AuditLogger) {
1257        assert!(logger.wait_idle(Duration::from_secs(2)));
1258    }
1259
1260    #[test]
1261    fn record_writes_one_line_per_call() {
1262        let data = temp_data_path("one-line");
1263        let logger = AuditLogger::for_data_path(&data);
1264        logger.record(
1265            "admin/readonly",
1266            "operator",
1267            "instance",
1268            "ok",
1269            JsonValue::Null,
1270        );
1271        drain(&logger);
1272        let body = std::fs::read_to_string(logger.path()).unwrap();
1273        let lines: Vec<&str> = body.lines().collect();
1274        assert_eq!(lines.len(), 1);
1275        assert!(lines[0].contains("\"action\":\"admin/readonly\""));
1276        assert!(lines[0].contains("\"outcome\":\"success\""));
1277    }
1278
1279    #[test]
1280    fn record_appends_across_calls() {
1281        let data = temp_data_path("append");
1282        let logger = AuditLogger::for_data_path(&data);
1283        logger.record("admin/drain", "op", "instance", "ok", JsonValue::Null);
1284        logger.record("admin/shutdown", "op", "instance", "ok", JsonValue::Null);
1285        drain(&logger);
1286        let lines = std::fs::read_to_string(logger.path()).unwrap();
1287        assert_eq!(lines.lines().count(), 2);
1288    }
1289
1290    #[test]
1291    fn record_event_emits_full_schema() {
1292        let data = temp_data_path("schema");
1293        let logger = AuditLogger::for_data_path(&data);
1294        let mut detail = Map::new();
1295        detail.insert("ms".to_string(), JsonValue::Number(412.0));
1296        let ev = AuditEvent::builder("admin/shutdown")
1297            .principal("alice@acme")
1298            .source(AuditAuthSource::Session)
1299            .tenant("acme")
1300            .resource("instance")
1301            .outcome(Outcome::Success)
1302            .detail(JsonValue::Object(detail))
1303            .remote_addr("203.0.113.5")
1304            .correlation_id("req-42")
1305            .build();
1306        logger.record_event(ev);
1307        drain(&logger);
1308        let body = std::fs::read_to_string(logger.path()).unwrap();
1309        assert!(body.contains("\"action\":\"admin/shutdown\""));
1310        assert!(body.contains("\"principal\":\"alice@acme\""));
1311        assert!(body.contains("\"tenant\":\"acme\""));
1312        assert!(body.contains("\"source\":\"session\""));
1313        assert!(body.contains("\"correlation_id\":\"req-42\""));
1314        assert!(body.contains("\"remote_addr\":\"203.0.113.5\""));
1315        assert!(body.contains("\"event_id\":\""));
1316        assert!(body.contains("\"prev_hash\":") || body.lines().count() == 1);
1317    }
1318
1319    #[test]
1320    fn hash_chain_links_every_event() {
1321        let data = temp_data_path("chain");
1322        let logger = AuditLogger::for_data_path(&data);
1323        for i in 0..5 {
1324            logger.record_event(
1325                AuditEvent::builder(format!("test/event/{i}"))
1326                    .principal("tester")
1327                    .build(),
1328            );
1329        }
1330        drain(&logger);
1331        let body = std::fs::read_to_string(logger.path()).unwrap();
1332        let lines: Vec<&str> = body.lines().collect();
1333        assert_eq!(lines.len(), 5);
1334        let mut prev: Option<String> = None;
1335        for (idx, line) in lines.iter().enumerate() {
1336            let parsed: JsonValue = crate::json::from_str(line).unwrap();
1337            let stored_prev = parsed
1338                .get("prev_hash")
1339                .and_then(|v| v.as_str())
1340                .map(|s| s.to_string());
1341            assert_eq!(stored_prev, prev, "line {idx} prev_hash mismatch");
1342            prev = Some(crate::utils::to_hex(&sha256::sha256(line.as_bytes())));
1343        }
1344    }
1345
1346    #[test]
1347    fn legacy_record_back_compat_maps_outcomes() {
1348        let data = temp_data_path("legacy");
1349        let logger = AuditLogger::for_data_path(&data);
1350        logger.record(
1351            "admin/restore",
1352            "operator",
1353            "instance",
1354            "err: disk full",
1355            JsonValue::Null,
1356        );
1357        drain(&logger);
1358        let body = std::fs::read_to_string(logger.path()).unwrap();
1359        assert!(body.contains("\"outcome\":\"error\""));
1360        assert!(body.contains("\"result_text\":\"err: disk full\""));
1361    }
1362
1363    #[test]
1364    fn iso8601_formats_known_epoch() {
1365        assert_eq!(
1366            format_iso8601(1_709_210_096_789),
1367            "2024-02-29T12:34:56.789Z"
1368        );
1369    }
1370
1371    #[test]
1372    fn rotation_at_threshold() {
1373        let data = temp_data_path("rotate");
1374        let parent = data.parent().unwrap().to_path_buf();
1375        let logger = AuditLogger::with_max_bytes(parent.join(".audit.log"), 1024);
1376        for i in 0..30 {
1377            logger.record_event(
1378                AuditEvent::builder(format!("test/rotate/{i}"))
1379                    .principal("rotator")
1380                    .detail(JsonValue::String(
1381                        "lorem ipsum dolor sit amet consectetur padding padding padding"
1382                            .to_string(),
1383                    ))
1384                    .build(),
1385            );
1386        }
1387        drain(&logger);
1388        let parent = logger.path().parent().unwrap();
1389        let rotated: Vec<_> = std::fs::read_dir(parent)
1390            .unwrap()
1391            .filter_map(|e| e.ok())
1392            .filter(|e| {
1393                e.file_name()
1394                    .to_str()
1395                    .map(|n| n.starts_with(".audit.log.") && n.ends_with(".zst"))
1396                    .unwrap_or(false)
1397            })
1398            .collect();
1399        assert!(
1400            !rotated.is_empty(),
1401            "expected at least one rotated .zst file"
1402        );
1403    }
1404
1405    #[test]
1406    fn parse_line_round_trips() {
1407        let ev = AuditEvent::builder("auth/login.ok")
1408            .principal("alice")
1409            .source(AuditAuthSource::Password)
1410            .tenant("acme")
1411            .outcome(Outcome::Success)
1412            .build();
1413        let line = ev.to_json_line(None);
1414        let parsed = AuditEvent::parse_line(&line).expect("round-trip parse");
1415        assert_eq!(parsed.action, "auth/login.ok");
1416        assert_eq!(parsed.principal.as_deref(), Some("alice"));
1417        assert_eq!(parsed.tenant.as_deref(), Some("acme"));
1418        assert_eq!(parsed.outcome, Outcome::Success);
1419        assert_eq!(parsed.source, AuditAuthSource::Password);
1420    }
1421
1422    #[test]
1423    fn event_id_is_lexicographically_sortable_by_time() {
1424        let a = new_event_id();
1425        std::thread::sleep(Duration::from_millis(2));
1426        let b = new_event_id();
1427        assert!(a < b, "event_id ordering broken: {a} >= {b}");
1428    }
1429
1430    // -------------------------------------------------------------------
1431    // AuditFieldEscaper / AuditValue tests (issue #177)
1432    // -------------------------------------------------------------------
1433
1434    #[test]
1435    fn audit_field_escaper_typed_string() {
1436        let f = AuditFieldEscaper::field("collection", "users");
1437        assert_eq!(f.name(), "collection");
1438        match f.value() {
1439            AuditValue::String(s) => assert_eq!(s, "users"),
1440            other => panic!("expected String, got {:?}", other),
1441        }
1442    }
1443
1444    #[test]
1445    fn audit_field_escaper_bytes_emit_base64() {
1446        let bytes = vec![0xDEu8, 0xAD, 0xBE, 0xEF];
1447        let f = AuditFieldEscaper::field("blob", bytes);
1448        let ev = AuditEvent::builder("test/bytes").field(f).build();
1449        let line = ev.to_json_line(None);
1450        // base64 of 0xDEADBEEF is "3q2+7w=="
1451        assert!(
1452            line.contains("\"blob\":\"3q2+7w==\""),
1453            "expected base64 emission: {line}"
1454        );
1455    }
1456
1457    #[test]
1458    fn audit_field_escaper_number_bool_null() {
1459        let ev = AuditEvent::builder("test/types")
1460            .field(AuditFieldEscaper::field("count", 42i64))
1461            .field(AuditFieldEscaper::field("ok", true))
1462            .field(AuditFieldEscaper::field("missing", AuditValue::Null))
1463            .build();
1464        let line = ev.to_json_line(None);
1465        assert!(line.contains("\"count\":42"));
1466        assert!(line.contains("\"ok\":true"));
1467        assert!(line.contains("\"missing\":null"));
1468    }
1469
1470    #[test]
1471    fn audit_field_escaper_adversarial_corpus_preserves_structure() {
1472        // The full F-01 / F-02 adversarial corpus: CRLF, NUL,
1473        // quote, semicolon, JSON-in-JSON, control bytes 0x00..0x20.
1474        // Every payload must encode to a single JSONL row that
1475        // round-trips through the in-house parser back to the
1476        // original bytes.
1477        let cases: &[(&str, &str)] = &[
1478            ("crlf", "line1\r\nline2"),
1479            ("nul", "before\0after"),
1480            ("quote", "she said \"hi\""),
1481            ("semicolon", "a;b;c"),
1482            ("json_in_json", r#"{"injected":"yes"}"#),
1483            ("low_ctrl", "\x01\x02\x03\x07\x1f"),
1484            ("backslash", "C:\\path\\file"),
1485            ("mixed", "name=\"x\"\n\\path\t\x01end"),
1486            ("empty", ""),
1487            // Note: legal Unicode payloads round-trip through the
1488            // RFC 8259 encoder fine; the in-house parser
1489            // (`utils::json`) used by `crate::json::from_str` has a
1490            // pre-existing byte-oriented bug for multi-byte UTF-8
1491            // sequences, which is orthogonal to the F-01 / F-02
1492            // boundary-smuggling threat model. ASCII-only here.
1493        ];
1494        let mut survivors = 0usize;
1495        for (label, payload) in cases {
1496            let f = AuditFieldEscaper::field("user_input", *payload);
1497            let ev = AuditEvent::builder(format!("test/adv/{label}"))
1498                .principal("attacker")
1499                .field(f)
1500                .build();
1501            let line = ev.to_json_line(None);
1502            // 1. exactly one row, no embedded newline.
1503            assert!(
1504                !line.contains('\n'),
1505                "{label}: embedded newline in JSONL row: {line:?}"
1506            );
1507            // 2. parse back via the canonical decoder and read user_input.
1508            let parsed: JsonValue = crate::json::from_str(&line)
1509                .unwrap_or_else(|err| panic!("{label}: line did not parse: {err} :: {line:?}"));
1510            let detail = parsed.get("detail").expect("detail present");
1511            let recovered = detail.get("user_input").and_then(|v| v.as_str()).unwrap();
1512            assert_eq!(
1513                recovered, *payload,
1514                "{label}: round-trip mismatch: {recovered:?} != {payload:?}"
1515            );
1516            survivors += 1;
1517        }
1518        assert_eq!(
1519            survivors,
1520            cases.len(),
1521            "adversarial corpus survival rate: {survivors}/{}",
1522            cases.len()
1523        );
1524    }
1525
1526    #[test]
1527    fn audit_emission_emits_one_line_per_call_through_guard() {
1528        let data = temp_data_path("guard-emission");
1529        let logger = AuditLogger::for_data_path(&data);
1530        // Smuggle attempt: NUL + CRLF + JSON-injection in collection name.
1531        let attacker = "users\";DROP\r\n{\"x\":1}\0";
1532        logger.record_event(
1533            AuditEvent::builder("admin/scan")
1534                .principal("evil")
1535                .field(AuditFieldEscaper::field("collection", attacker))
1536                .build(),
1537        );
1538        drain(&logger);
1539        let body = std::fs::read_to_string(logger.path()).unwrap();
1540        let lines: Vec<&str> = body.lines().collect();
1541        assert_eq!(lines.len(), 1, "guard must emit exactly one JSONL row");
1542        // The smuggled "{...}" cannot have escaped the JSON string.
1543        let parsed: JsonValue = crate::json::from_str(lines[0]).unwrap();
1544        let recovered = parsed
1545            .get("detail")
1546            .and_then(|d| d.get("collection"))
1547            .and_then(|v| v.as_str())
1548            .unwrap();
1549        assert_eq!(recovered, attacker);
1550    }
1551
1552    #[test]
1553    fn audit_field_escaper_no_format_macro_in_value_path() {
1554        // Compile-time guarantee: AuditField construction goes
1555        // through AuditValue, not through `Display`. This test is
1556        // a documentation anchor — if a contributor adds a `Display`
1557        // impl that bypasses the typed value, the property tests
1558        // below still catch the smuggling, but this test makes the
1559        // intent explicit.
1560        let _ = AuditFieldEscaper::field("name", "value"); // compiles
1561                                                           // No `format!` path, no `to_string()` of attacker bytes:
1562                                                           // the only entry is `Into<AuditValue>`.
1563    }
1564
1565    #[test]
1566    fn audit_field_escaper_chains_via_builder_fields() {
1567        let ev = AuditEvent::builder("test/multi")
1568            .fields([
1569                AuditFieldEscaper::field("a", "x"),
1570                AuditFieldEscaper::field("b", 7i64),
1571                AuditFieldEscaper::field("c", true),
1572            ])
1573            .build();
1574        let line = ev.to_json_line(None);
1575        let parsed: JsonValue = crate::json::from_str(&line).unwrap();
1576        let d = parsed.get("detail").unwrap();
1577        assert_eq!(d.get("a").and_then(|v| v.as_str()), Some("x"));
1578        assert_eq!(d.get("b").and_then(|v| v.as_i64()), Some(7));
1579        assert_eq!(d.get("c").and_then(|v| v.as_bool()), Some(true));
1580    }
1581
1582    proptest::proptest! {
1583        /// Random user-supplied strings must always round-trip through
1584        /// the typed-field guard exactly. No silent drops, no smuggling.
1585        ///
1586        /// ASCII-only here: the encoder is RFC 8259 §7 compliant for
1587        /// arbitrary Unicode, but the in-house decoder shared by
1588        /// `crate::json::from_str` (`utils::json::parse_string`) is
1589        /// byte-oriented for non-escape characters and does not
1590        /// recompose multi-byte UTF-8 sequences. The boundary-smuggling
1591        /// threat model is about control bytes (0x00..0x20) and
1592        /// structural bytes (`"`, `\`, `\r`, `\n`), not legal Unicode,
1593        /// so this restriction does not narrow the security claim.
1594        #[test]
1595        fn prop_audit_field_round_trips_arbitrary_strings(
1596            payload in proptest::string::string_regex("[\\x00-\\x7f]{0,128}").unwrap()
1597        ) {
1598            let f = AuditFieldEscaper::field("p", payload.as_str());
1599            let ev = AuditEvent::builder("prop/test").field(f).build();
1600            let line = ev.to_json_line(None);
1601            // Single-line invariant.
1602            proptest::prop_assert!(!line.contains('\n'));
1603            let parsed: JsonValue = crate::json::from_str(&line)
1604                .expect("emission must always parse");
1605            let recovered = parsed
1606                .get("detail")
1607                .and_then(|d| d.get("p"))
1608                .and_then(|v| v.as_str())
1609                .unwrap();
1610            proptest::prop_assert_eq!(recovered, payload.as_str());
1611        }
1612
1613        /// Random byte sequences via base64 round-trip — non-UTF-8
1614        /// payloads must never enter the JSON string channel raw.
1615        #[test]
1616        fn prop_audit_field_bytes_base64_round_trip(
1617            bytes in proptest::collection::vec(proptest::bits::u8::ANY, 0..64)
1618        ) {
1619            let f = AuditFieldEscaper::field("b", bytes.clone());
1620            let ev = AuditEvent::builder("prop/bytes").field(f).build();
1621            let line = ev.to_json_line(None);
1622            proptest::prop_assert!(!line.contains('\n'));
1623            let parsed: JsonValue = crate::json::from_str(&line).unwrap();
1624            let recovered_b64 = parsed
1625                .get("detail")
1626                .and_then(|d| d.get("b"))
1627                .and_then(|v| v.as_str())
1628                .unwrap()
1629                .to_string();
1630            proptest::prop_assert_eq!(recovered_b64, base64_encode(&bytes));
1631        }
1632    }
1633}