Skip to main content

reddb_server/telemetry/
operator_event.rs

1//! Operator-grade event bus for high-severity system conditions.
2//!
3//! # Operator / developer split
4//!
5//! RedDB telemetry has two audiences:
6//!
7//! - **Developer signal** (`tracing` spans at `DEBUG` / `INFO`): ephemeral,
8//!   high-volume, lives in `red.log` or stdout. Helps engineers trace request
9//!   flows and understand runtime behaviour during development.
10//!
11//! - **Operator-grade events** (this module): low-volume, high-severity
12//!   conditions that a production operator *must* see and act on.
13//!   Persisted to the tamper-evident audit log first so they survive process
14//!   crashes; a `tracing::warn!` breadcrumb lands in the normal log channel
15//!   as a secondary copy; `eprintln!` fallback ensures the event is never
16//!   silently swallowed even if both sinks fail.
17//!
18//! `OperatorEvent::emit` always runs synchronously — it is intentionally
19//! *not* async so callers in crash paths, signal handlers, and `Drop` impls
20//! can call it without an async runtime.
21
22use std::sync::{Arc, OnceLock};
23
24use crate::runtime::audit_log::{
25    AuditAuthSource, AuditEvent, AuditField, AuditFieldEscaper, AuditLogger, Outcome,
26};
27
28// ---------------------------------------------------------------------------
29// Process-wide sink
30// ---------------------------------------------------------------------------
31//
32// The OperatorEvent enum is defined in `telemetry/` but the deepest
33// emit sites (storage layer, replication apply loop, signal handlers,
34// drop impls) cannot thread an `&AuditLogger` reference through their
35// call stacks without a sweeping refactor. To stay surgical (#205) we
36// expose a process-wide sink that the runtime registers at startup and
37// every emit site consults via `OperatorEvent::emit_global`.
38//
39// Trade-off: the sink is a `OnceLock<Arc<AuditLogger>>`, which means
40// emits that fire *before* the runtime registers the logger fall back
41// to `tracing::warn!` + `eprintln!` only — the audit copy is lost. The
42// tamper-evident audit copy is the primary record; the breadcrumb /
43// stderr fallbacks are the safety net the original `emit(&AuditLogger)`
44// shape already accepted, so the degradation is the same one already
45// documented in the module rustdoc.
46
47static GLOBAL_SINK: OnceLock<Arc<AuditLogger>> = OnceLock::new();
48
49/// Install the process-wide [`AuditLogger`] used by
50/// [`OperatorEvent::emit_global`]. Called once at runtime startup; a
51/// second call is a no-op (the first registration wins) so test
52/// harnesses that build multiple in-memory runtimes cannot stomp on
53/// each other's loggers — they fall back to tracing+eprintln.
54pub fn install_global_audit_sink(logger: Arc<AuditLogger>) {
55    let _ = GLOBAL_SINK.set(logger);
56}
57
58// ---------------------------------------------------------------------------
59// OperatorEvent
60// ---------------------------------------------------------------------------
61
62/// High-severity system conditions that require operator attention.
63///
64/// Every variant carries typed [`crate::runtime::audit_log::AuditValue`]
65/// fields so adversarial bytes (CRLF, NUL, quote, non-UTF-8) are
66/// escape-safe at the audit boundary (ADR 0010).
67#[derive(Debug)]
68pub enum OperatorEvent {
69    /// A replication stream to a follower/replica broke unexpectedly.
70    ReplicationBroken { peer: String, reason: String },
71    /// Replication state diverged: the follower's committed LSN or
72    /// checksum disagrees with the leader.
73    Divergence {
74        peer: String,
75        leader_lsn: u64,
76        follower_lsn: u64,
77    },
78    /// The WAL fsync call failed. Data may be at risk on the current host.
79    WalFsyncFailed { path: String, error: String },
80    /// Available disk space fell below the configured critical threshold.
81    DiskSpaceCritical {
82        path: String,
83        available_bytes: u64,
84        threshold_bytes: u64,
85    },
86    /// An authentication bypass was detected (e.g. auth gate returned
87    /// `allow` for a request that should have been rejected).
88    AuthBypass {
89        principal: String,
90        resource: String,
91        detail: String,
92    },
93    /// An admin capability was granted to a principal at runtime.
94    AdminCapabilityGranted {
95        granted_to: String,
96        capability: String,
97        granted_by: String,
98    },
99    /// Secret rotation failed; the current secret may be stale.
100    SecretRotationFailed { secret_ref: String, error: String },
101    /// A runtime configuration change was applied to a live instance.
102    ConfigChanged {
103        key: String,
104        old_value: String,
105        new_value: String,
106        changed_by: String,
107    },
108    /// The server process failed to start cleanly.
109    StartupFailed { phase: String, error: String },
110    /// The server process was forced to shut down (e.g. OOM killer,
111    /// SIGKILL, unrecoverable error).
112    ShutdownForced { reason: String },
113    /// On-disk schema metadata is corrupt or inconsistent.
114    SchemaCorruption { collection: String, detail: String },
115    /// A scheduled or triggered checkpoint failed to complete.
116    CheckpointFailed { lsn: u64, error: String },
117    /// An admin intent that was started but never reached a terminal phase
118    /// (completed or aborted). Emitted by [`super::admin_intent_log::AdminIntentLog::scan_and_report`]
119    /// at startup so operators can investigate interrupted operations.
120    DanglingAdminIntent {
121        id: crate::crypto::uuid::Uuid,
122        op: crate::telemetry::admin_intent_log::IntentOp,
123        /// Unix milliseconds when the intent was started.
124        started_at_ms: u64,
125        last_phase: crate::telemetry::admin_intent_log::IntentPhase,
126    },
127    /// A config-file change was detected but one or more changed fields
128    /// require a full server restart to take effect. The change was NOT
129    /// applied; the operator must restart to pick it up.
130    ConfigChangeRequiresRestart { fields_changed: String },
131    /// An ALTER TABLE on a collection with active event subscriptions
132    /// added or removed columns. Downstream consumers may see a different
133    /// payload shape starting at `lsn`.
134    SubscriptionSchemaChange {
135        collection: String,
136        /// Comma-separated subscription names affected.
137        subscription_names: String,
138        /// Comma-separated columns added (empty string if none).
139        fields_added: String,
140        /// Comma-separated columns removed (empty string if none).
141        fields_removed: String,
142        lsn: u64,
143    },
144    /// An event could not be delivered to its target queue after all
145    /// retry attempts, and was routed to the dead-letter queue instead.
146    OutboxDlqActivated {
147        queue: String,
148        dlq: String,
149        reason: String,
150    },
151}
152
153impl OperatorEvent {
154    /// All variant names as CamelCase strings, in declaration order.
155    pub fn all_variant_names() -> &'static [&'static str] {
156        &[
157            "ReplicationBroken",
158            "Divergence",
159            "WalFsyncFailed",
160            "DiskSpaceCritical",
161            "AuthBypass",
162            "AdminCapabilityGranted",
163            "SecretRotationFailed",
164            "ConfigChanged",
165            "StartupFailed",
166            "ShutdownForced",
167            "SchemaCorruption",
168            "CheckpointFailed",
169            "DanglingAdminIntent",
170            "ConfigChangeRequiresRestart",
171            "SubscriptionSchemaChange",
172            "OutboxDlqActivated",
173        ]
174    }
175
176    /// Return the CamelCase variant name for routing table lookup.
177    pub(super) fn variant_name(&self) -> &'static str {
178        match self {
179            Self::ReplicationBroken { .. } => "ReplicationBroken",
180            Self::Divergence { .. } => "Divergence",
181            Self::WalFsyncFailed { .. } => "WalFsyncFailed",
182            Self::DiskSpaceCritical { .. } => "DiskSpaceCritical",
183            Self::AuthBypass { .. } => "AuthBypass",
184            Self::AdminCapabilityGranted { .. } => "AdminCapabilityGranted",
185            Self::SecretRotationFailed { .. } => "SecretRotationFailed",
186            Self::ConfigChanged { .. } => "ConfigChanged",
187            Self::StartupFailed { .. } => "StartupFailed",
188            Self::ShutdownForced { .. } => "ShutdownForced",
189            Self::SchemaCorruption { .. } => "SchemaCorruption",
190            Self::CheckpointFailed { .. } => "CheckpointFailed",
191            Self::DanglingAdminIntent { .. } => "DanglingAdminIntent",
192            Self::ConfigChangeRequiresRestart { .. } => "ConfigChangeRequiresRestart",
193            Self::SubscriptionSchemaChange { .. } => "SubscriptionSchemaChange",
194            Self::OutboxDlqActivated { .. } => "OutboxDlqActivated",
195        }
196    }
197
198    /// Emit the event.
199    ///
200    /// Execution order (per issue #202):
201    /// 1. Persist to `audit` — tamper-evident, durable.
202    /// 2. `tracing::warn!` breadcrumb — lands in `red.log` / stderr.
203    /// 3. `eprintln!` fallback — fires only if the audit write fails,
204    ///    ensuring the event is never silently lost.
205    ///
206    /// Emit the event using the process-wide sink installed by the
207    /// runtime at startup. When no sink is installed (early boot,
208    /// tests without an audit logger), the tracing breadcrumb and
209    /// eprintln fallback still fire so the event is never silently
210    /// lost.
211    pub fn emit_global(self) {
212        match GLOBAL_SINK.get() {
213            Some(logger) => self.emit(logger.as_ref()),
214            None => {
215                let (_, _, summary) = self.decompose();
216                tracing::warn!(target: "reddb::operator", "{summary}");
217                eprintln!("[reddb::operator] (no audit sink) {summary}");
218            }
219        }
220    }
221
222    pub fn emit(self, audit: &AuditLogger) {
223        let (action, fields, summary) = self.decompose();
224
225        let ev = AuditEvent::builder(action)
226            .source(AuditAuthSource::System)
227            .outcome(Outcome::Error)
228            .fields(fields)
229            .build();
230
231        // 1. Audit log (primary, tamper-evident).
232        let audit_ok = {
233            // `record_event` is infallible from the caller's perspective
234            // (it falls back to sync write internally), so we treat it as
235            // always succeeding for the breadcrumb decision.
236            audit.record_event(ev);
237            true
238        };
239
240        // 2. tracing breadcrumb.
241        tracing::warn!(target: "reddb::operator", "{summary}");
242
243        // 3. eprintln fallback — guard against silent loss when audit
244        //    is unhealthy (e.g. disk full, writer thread dead).
245        if !audit_ok {
246            eprintln!("[reddb::operator] {summary}");
247        }
248    }
249
250    /// Decompose `self` into `(action, audit_fields, human_summary)`.
251    pub(super) fn decompose(self) -> (&'static str, Vec<AuditField>, String) {
252        match self {
253            Self::ReplicationBroken { peer, reason } => {
254                let summary = format!("replication broken: peer={peer} reason={reason}");
255                let fields = vec![
256                    AuditFieldEscaper::field("peer", peer),
257                    AuditFieldEscaper::field("reason", reason),
258                ];
259                ("operator/replication_broken", fields, summary)
260            }
261            Self::Divergence {
262                peer,
263                leader_lsn,
264                follower_lsn,
265            } => {
266                let summary = format!(
267                    "replication divergence: peer={peer} leader_lsn={leader_lsn} follower_lsn={follower_lsn}"
268                );
269                let fields = vec![
270                    AuditFieldEscaper::field("peer", peer),
271                    AuditFieldEscaper::field("leader_lsn", leader_lsn),
272                    AuditFieldEscaper::field("follower_lsn", follower_lsn),
273                ];
274                ("operator/divergence", fields, summary)
275            }
276            Self::WalFsyncFailed { path, error } => {
277                let summary = format!("WAL fsync failed: path={path} error={error}");
278                let fields = vec![
279                    AuditFieldEscaper::field("path", path),
280                    AuditFieldEscaper::field("error", error),
281                ];
282                ("operator/wal_fsync_failed", fields, summary)
283            }
284            Self::DiskSpaceCritical {
285                path,
286                available_bytes,
287                threshold_bytes,
288            } => {
289                let summary = format!(
290                    "disk space critical: path={path} available={available_bytes} threshold={threshold_bytes}"
291                );
292                let fields = vec![
293                    AuditFieldEscaper::field("path", path),
294                    AuditFieldEscaper::field("available_bytes", available_bytes),
295                    AuditFieldEscaper::field("threshold_bytes", threshold_bytes),
296                ];
297                ("operator/disk_space_critical", fields, summary)
298            }
299            Self::AuthBypass {
300                principal,
301                resource,
302                detail,
303            } => {
304                let summary =
305                    format!("auth bypass detected: principal={principal} resource={resource}");
306                let fields = vec![
307                    AuditFieldEscaper::field("principal", principal),
308                    AuditFieldEscaper::field("resource", resource),
309                    AuditFieldEscaper::field("detail", detail),
310                ];
311                ("operator/auth_bypass", fields, summary)
312            }
313            Self::AdminCapabilityGranted {
314                granted_to,
315                capability,
316                granted_by,
317            } => {
318                let summary = format!(
319                    "admin capability granted: to={granted_to} capability={capability} by={granted_by}"
320                );
321                let fields = vec![
322                    AuditFieldEscaper::field("granted_to", granted_to),
323                    AuditFieldEscaper::field("capability", capability),
324                    AuditFieldEscaper::field("granted_by", granted_by),
325                ];
326                ("operator/admin_capability_granted", fields, summary)
327            }
328            Self::SecretRotationFailed { secret_ref, error } => {
329                let summary = format!("secret rotation failed: ref={secret_ref} error={error}");
330                let fields = vec![
331                    AuditFieldEscaper::field("secret_ref", secret_ref),
332                    AuditFieldEscaper::field("error", error),
333                ];
334                ("operator/secret_rotation_failed", fields, summary)
335            }
336            Self::ConfigChanged {
337                key,
338                old_value,
339                new_value,
340                changed_by,
341            } => {
342                let summary = format!(
343                    "config changed: key={key} old={old_value} new={new_value} by={changed_by}"
344                );
345                let fields = vec![
346                    AuditFieldEscaper::field("key", key),
347                    AuditFieldEscaper::field("old_value", old_value),
348                    AuditFieldEscaper::field("new_value", new_value),
349                    AuditFieldEscaper::field("changed_by", changed_by),
350                ];
351                ("operator/config_changed", fields, summary)
352            }
353            Self::StartupFailed { phase, error } => {
354                let summary = format!("startup failed: phase={phase} error={error}");
355                let fields = vec![
356                    AuditFieldEscaper::field("phase", phase),
357                    AuditFieldEscaper::field("error", error),
358                ];
359                ("operator/startup_failed", fields, summary)
360            }
361            Self::ShutdownForced { reason } => {
362                let summary = format!("shutdown forced: reason={reason}");
363                let fields = vec![AuditFieldEscaper::field("reason", reason)];
364                ("operator/shutdown_forced", fields, summary)
365            }
366            Self::SchemaCorruption { collection, detail } => {
367                let summary = format!("schema corruption: collection={collection} detail={detail}");
368                let fields = vec![
369                    AuditFieldEscaper::field("collection", collection),
370                    AuditFieldEscaper::field("detail", detail),
371                ];
372                ("operator/schema_corruption", fields, summary)
373            }
374            Self::CheckpointFailed { lsn, error } => {
375                let summary = format!("checkpoint failed: lsn={lsn} error={error}");
376                let fields = vec![
377                    AuditFieldEscaper::field("lsn", lsn),
378                    AuditFieldEscaper::field("error", error),
379                ];
380                ("operator/checkpoint_failed", fields, summary)
381            }
382            Self::DanglingAdminIntent {
383                id,
384                op,
385                started_at_ms,
386                last_phase,
387            } => {
388                let summary = format!(
389                    "dangling admin intent: id={id} op={op} started_at_ms={started_at_ms} last_phase={last_phase}"
390                );
391                let fields = vec![
392                    AuditFieldEscaper::field("id", id.to_string()),
393                    AuditFieldEscaper::field("op", op.to_string()),
394                    AuditFieldEscaper::field("started_at_ms", started_at_ms),
395                    AuditFieldEscaper::field("last_phase", last_phase.to_string()),
396                ];
397                ("operator/dangling_admin_intent", fields, summary)
398            }
399            Self::ConfigChangeRequiresRestart { fields_changed } => {
400                let summary = format!("config change requires restart: fields={fields_changed}");
401                let fields = vec![AuditFieldEscaper::field("fields_changed", fields_changed)];
402                ("operator/config_change_requires_restart", fields, summary)
403            }
404            Self::SubscriptionSchemaChange {
405                collection,
406                subscription_names,
407                fields_added,
408                fields_removed,
409                lsn,
410            } => {
411                let summary = format!(
412                    "subscription schema change: collection={collection} subscriptions=[{subscription_names}] added=[{fields_added}] removed=[{fields_removed}] lsn={lsn}"
413                );
414                let fields = vec![
415                    AuditFieldEscaper::field("collection", collection),
416                    AuditFieldEscaper::field("subscription_names", subscription_names),
417                    AuditFieldEscaper::field("fields_added", fields_added),
418                    AuditFieldEscaper::field("fields_removed", fields_removed),
419                    AuditFieldEscaper::field("lsn", lsn),
420                ];
421                ("operator/subscription_schema_change", fields, summary)
422            }
423            Self::OutboxDlqActivated { queue, dlq, reason } => {
424                let summary =
425                    format!("outbox DLQ activated: queue={queue} dlq={dlq} reason={reason}");
426                let fields = vec![
427                    AuditFieldEscaper::field("queue", queue),
428                    AuditFieldEscaper::field("dlq", dlq),
429                    AuditFieldEscaper::field("reason", reason),
430                ];
431                ("operator/outbox_dlq_activated", fields, summary)
432            }
433        }
434    }
435}
436
437// ---------------------------------------------------------------------------
438// Tests
439// ---------------------------------------------------------------------------
440
441#[cfg(test)]
442mod tests {
443    use std::time::Duration;
444
445    use super::*;
446    use crate::runtime::audit_log::AuditLogger;
447
448    fn make_logger() -> (AuditLogger, std::path::PathBuf) {
449        let mut dir = std::env::temp_dir();
450        dir.push(format!(
451            "reddb-op-event-{}-{}",
452            std::process::id(),
453            crate::utils::now_unix_nanos()
454        ));
455        std::fs::create_dir_all(&dir).unwrap();
456        let path = dir.join(".audit.log");
457        let logger = AuditLogger::with_path(path.clone());
458        (logger, path)
459    }
460
461    fn drain(logger: &AuditLogger) {
462        assert!(
463            logger.wait_idle(Duration::from_secs(2)),
464            "audit logger drain timed out"
465        );
466    }
467
468    fn read_last_line(path: &std::path::Path) -> crate::json::Value {
469        let body = std::fs::read_to_string(path).unwrap();
470        let line = body.lines().last().expect("at least one audit line");
471        crate::json::from_str(line).expect("valid JSON")
472    }
473
474    // ------------------------------------------------------------------
475    // One test per variant — verifies action string + a representative field
476    // ------------------------------------------------------------------
477
478    #[test]
479    fn replication_broken_emits() {
480        let (logger, path) = make_logger();
481        OperatorEvent::ReplicationBroken {
482            peer: "replica-1".into(),
483            reason: "TCP reset".into(),
484        }
485        .emit(&logger);
486        drain(&logger);
487        let v = read_last_line(&path);
488        assert_eq!(
489            v.get("action").and_then(|x| x.as_str()),
490            Some("operator/replication_broken")
491        );
492        let peer = v
493            .get("detail")
494            .and_then(|d| d.get("peer"))
495            .and_then(|x| x.as_str());
496        assert_eq!(peer, Some("replica-1"));
497    }
498
499    #[test]
500    fn divergence_emits() {
501        let (logger, path) = make_logger();
502        OperatorEvent::Divergence {
503            peer: "replica-2".into(),
504            leader_lsn: 1000,
505            follower_lsn: 999,
506        }
507        .emit(&logger);
508        drain(&logger);
509        let v = read_last_line(&path);
510        assert_eq!(
511            v.get("action").and_then(|x| x.as_str()),
512            Some("operator/divergence")
513        );
514        let lsn = v
515            .get("detail")
516            .and_then(|d| d.get("leader_lsn"))
517            .and_then(|x| x.as_i64());
518        assert_eq!(lsn, Some(1000));
519    }
520
521    #[test]
522    fn wal_fsync_failed_emits() {
523        let (logger, path) = make_logger();
524        OperatorEvent::WalFsyncFailed {
525            path: "/data/wal".into(),
526            error: "EIO".into(),
527        }
528        .emit(&logger);
529        drain(&logger);
530        let v = read_last_line(&path);
531        assert_eq!(
532            v.get("action").and_then(|x| x.as_str()),
533            Some("operator/wal_fsync_failed")
534        );
535        let err = v
536            .get("detail")
537            .and_then(|d| d.get("error"))
538            .and_then(|x| x.as_str());
539        assert_eq!(err, Some("EIO"));
540    }
541
542    #[test]
543    fn disk_space_critical_emits() {
544        let (logger, path) = make_logger();
545        OperatorEvent::DiskSpaceCritical {
546            path: "/data".into(),
547            available_bytes: 1024,
548            threshold_bytes: 10240,
549        }
550        .emit(&logger);
551        drain(&logger);
552        let v = read_last_line(&path);
553        assert_eq!(
554            v.get("action").and_then(|x| x.as_str()),
555            Some("operator/disk_space_critical")
556        );
557        let avail = v
558            .get("detail")
559            .and_then(|d| d.get("available_bytes"))
560            .and_then(|x| x.as_i64());
561        assert_eq!(avail, Some(1024));
562    }
563
564    #[test]
565    fn auth_bypass_emits() {
566        let (logger, path) = make_logger();
567        OperatorEvent::AuthBypass {
568            principal: "alice".into(),
569            resource: "/admin/drop".into(),
570            detail: "scope check skipped".into(),
571        }
572        .emit(&logger);
573        drain(&logger);
574        let v = read_last_line(&path);
575        assert_eq!(
576            v.get("action").and_then(|x| x.as_str()),
577            Some("operator/auth_bypass")
578        );
579        let res = v
580            .get("detail")
581            .and_then(|d| d.get("resource"))
582            .and_then(|x| x.as_str());
583        assert_eq!(res, Some("/admin/drop"));
584    }
585
586    #[test]
587    fn admin_capability_granted_emits() {
588        let (logger, path) = make_logger();
589        OperatorEvent::AdminCapabilityGranted {
590            granted_to: "bob".into(),
591            capability: "ADMIN_WRITE".into(),
592            granted_by: "root".into(),
593        }
594        .emit(&logger);
595        drain(&logger);
596        let v = read_last_line(&path);
597        assert_eq!(
598            v.get("action").and_then(|x| x.as_str()),
599            Some("operator/admin_capability_granted")
600        );
601        let cap = v
602            .get("detail")
603            .and_then(|d| d.get("capability"))
604            .and_then(|x| x.as_str());
605        assert_eq!(cap, Some("ADMIN_WRITE"));
606    }
607
608    #[test]
609    fn secret_rotation_failed_emits() {
610        let (logger, path) = make_logger();
611        OperatorEvent::SecretRotationFailed {
612            secret_ref: "jwt-signing-key".into(),
613            error: "HSM unreachable".into(),
614        }
615        .emit(&logger);
616        drain(&logger);
617        let v = read_last_line(&path);
618        assert_eq!(
619            v.get("action").and_then(|x| x.as_str()),
620            Some("operator/secret_rotation_failed")
621        );
622        let r = v
623            .get("detail")
624            .and_then(|d| d.get("secret_ref"))
625            .and_then(|x| x.as_str());
626        assert_eq!(r, Some("jwt-signing-key"));
627    }
628
629    #[test]
630    fn config_changed_emits() {
631        let (logger, path) = make_logger();
632        OperatorEvent::ConfigChanged {
633            key: "max_connections".into(),
634            old_value: "100".into(),
635            new_value: "200".into(),
636            changed_by: "ops-bot".into(),
637        }
638        .emit(&logger);
639        drain(&logger);
640        let v = read_last_line(&path);
641        assert_eq!(
642            v.get("action").and_then(|x| x.as_str()),
643            Some("operator/config_changed")
644        );
645        let nv = v
646            .get("detail")
647            .and_then(|d| d.get("new_value"))
648            .and_then(|x| x.as_str());
649        assert_eq!(nv, Some("200"));
650    }
651
652    #[test]
653    fn startup_failed_emits() {
654        let (logger, path) = make_logger();
655        OperatorEvent::StartupFailed {
656            phase: "wal_recovery".into(),
657            error: "corrupt frame".into(),
658        }
659        .emit(&logger);
660        drain(&logger);
661        let v = read_last_line(&path);
662        assert_eq!(
663            v.get("action").and_then(|x| x.as_str()),
664            Some("operator/startup_failed")
665        );
666        let phase = v
667            .get("detail")
668            .and_then(|d| d.get("phase"))
669            .and_then(|x| x.as_str());
670        assert_eq!(phase, Some("wal_recovery"));
671    }
672
673    #[test]
674    fn shutdown_forced_emits() {
675        let (logger, path) = make_logger();
676        OperatorEvent::ShutdownForced {
677            reason: "OOM".into(),
678        }
679        .emit(&logger);
680        drain(&logger);
681        let v = read_last_line(&path);
682        assert_eq!(
683            v.get("action").and_then(|x| x.as_str()),
684            Some("operator/shutdown_forced")
685        );
686        let r = v
687            .get("detail")
688            .and_then(|d| d.get("reason"))
689            .and_then(|x| x.as_str());
690        assert_eq!(r, Some("OOM"));
691    }
692
693    #[test]
694    fn schema_corruption_emits() {
695        let (logger, path) = make_logger();
696        OperatorEvent::SchemaCorruption {
697            collection: "users".into(),
698            detail: "unknown type tag 0xFF".into(),
699        }
700        .emit(&logger);
701        drain(&logger);
702        let v = read_last_line(&path);
703        assert_eq!(
704            v.get("action").and_then(|x| x.as_str()),
705            Some("operator/schema_corruption")
706        );
707        let coll = v
708            .get("detail")
709            .and_then(|d| d.get("collection"))
710            .and_then(|x| x.as_str());
711        assert_eq!(coll, Some("users"));
712    }
713
714    #[test]
715    fn checkpoint_failed_emits() {
716        let (logger, path) = make_logger();
717        OperatorEvent::CheckpointFailed {
718            lsn: 42_000,
719            error: "write stall".into(),
720        }
721        .emit(&logger);
722        drain(&logger);
723        let v = read_last_line(&path);
724        assert_eq!(
725            v.get("action").and_then(|x| x.as_str()),
726            Some("operator/checkpoint_failed")
727        );
728        let lsn = v
729            .get("detail")
730            .and_then(|d| d.get("lsn"))
731            .and_then(|x| x.as_i64());
732        assert_eq!(lsn, Some(42_000));
733    }
734
735    // ------------------------------------------------------------------
736    // Adversarial corpus: CRLF / NUL / quote / non-UTF-8-ish in fields
737    // ------------------------------------------------------------------
738
739    #[test]
740    fn adversarial_fields_are_escape_safe() {
741        let payloads: &[(&str, &str)] = &[
742            ("crlf", "line1\r\nline2"),
743            ("nul", "before\0after"),
744            ("quote", r#"she said "hi""#),
745            ("json_inject", r#"{"injected":true}"#),
746            ("low_ctrl", "\x01\x02\x07\x1f"),
747            ("backslash", "C:\\path\\file"),
748            ("mixed", "name=\"x\"\n\\path\t\x01end"),
749        ];
750
751        for (label, payload) in payloads {
752            let (logger, path) = make_logger();
753            OperatorEvent::SchemaCorruption {
754                collection: payload.to_string(),
755                detail: payload.to_string(),
756            }
757            .emit(&logger);
758            drain(&logger);
759
760            let body = std::fs::read_to_string(&path).unwrap();
761            let line = body.lines().last().unwrap_or("");
762
763            // Single JSONL row — no embedded newline.
764            assert!(
765                !line.contains('\n'),
766                "{label}: embedded newline in JSONL row"
767            );
768
769            let v: crate::json::Value = crate::json::from_str(line)
770                .unwrap_or_else(|e| panic!("{label}: audit line not valid JSON: {e}\n{line:?}"));
771            let recovered = v
772                .get("detail")
773                .and_then(|d| d.get("collection"))
774                .and_then(|x| x.as_str())
775                .unwrap_or("");
776            assert_eq!(recovered, *payload, "{label}: round-trip mismatch");
777        }
778    }
779
780    // ------------------------------------------------------------------
781    // Outcome is always Error; source is always System
782    // ------------------------------------------------------------------
783
784    #[test]
785    fn emit_sets_outcome_error_and_source_system() {
786        let (logger, path) = make_logger();
787        OperatorEvent::ShutdownForced {
788            reason: "test".into(),
789        }
790        .emit(&logger);
791        drain(&logger);
792        let v = read_last_line(&path);
793        assert_eq!(v.get("outcome").and_then(|x| x.as_str()), Some("error"));
794        assert_eq!(v.get("source").and_then(|x| x.as_str()), Some("system"));
795    }
796}