Skip to main content

reddb_server/telemetry/
operator_event.rs

1//! Operator-grade event bus for high-severity system conditions.
2//!
3//! # Operator / developer split
4//!
5//! RedDB telemetry has two audiences:
6//!
7//! - **Developer signal** (`tracing` spans at `DEBUG` / `INFO`): ephemeral,
8//!   high-volume, lives in `red.log` or stdout. Helps engineers trace request
9//!   flows and understand runtime behaviour during development.
10//!
11//! - **Operator-grade events** (this module): low-volume, high-severity
12//!   conditions that a production operator *must* see and act on.
13//!   Persisted to the tamper-evident audit log first so they survive process
14//!   crashes; a `tracing::warn!` breadcrumb lands in the normal log channel
15//!   as a secondary copy; `eprintln!` fallback ensures the event is never
16//!   silently swallowed even if both sinks fail.
17//!
18//! `OperatorEvent::emit` always runs synchronously — it is intentionally
19//! *not* async so callers in crash paths, signal handlers, and `Drop` impls
20//! can call it without an async runtime.
21
22use std::sync::{Arc, OnceLock};
23
24use crate::runtime::audit_log::{
25    AuditAuthSource, AuditEvent, AuditField, AuditFieldEscaper, AuditLogger, Outcome,
26};
27
28// ---------------------------------------------------------------------------
29// Process-wide sink
30// ---------------------------------------------------------------------------
31//
32// The OperatorEvent enum is defined in `telemetry/` but the deepest
33// emit sites (storage layer, replication apply loop, signal handlers,
34// drop impls) cannot thread an `&AuditLogger` reference through their
35// call stacks without a sweeping refactor. To stay surgical (#205) we
36// expose a process-wide sink that the runtime registers at startup and
37// every emit site consults via `OperatorEvent::emit_global`.
38//
39// Trade-off: the sink is a `OnceLock<Arc<AuditLogger>>`, which means
40// emits that fire *before* the runtime registers the logger fall back
41// to `tracing::warn!` + `eprintln!` only — the audit copy is lost. The
42// tamper-evident audit copy is the primary record; the breadcrumb /
43// stderr fallbacks are the safety net the original `emit(&AuditLogger)`
44// shape already accepted, so the degradation is the same one already
45// documented in the module rustdoc.
46
47static GLOBAL_SINK: OnceLock<Arc<AuditLogger>> = OnceLock::new();
48
49/// Install the process-wide [`AuditLogger`] used by
50/// [`OperatorEvent::emit_global`]. Called once at runtime startup; a
51/// second call is a no-op (the first registration wins) so test
52/// harnesses that build multiple in-memory runtimes cannot stomp on
53/// each other's loggers — they fall back to tracing+eprintln.
54pub fn install_global_audit_sink(logger: Arc<AuditLogger>) {
55    let _ = GLOBAL_SINK.set(logger);
56}
57
58// ---------------------------------------------------------------------------
59// OperatorEvent
60// ---------------------------------------------------------------------------
61
62/// High-severity system conditions that require operator attention.
63///
64/// Every variant carries typed [`crate::runtime::audit_log::AuditValue`]
65/// fields so adversarial bytes (CRLF, NUL, quote, non-UTF-8) are
66/// escape-safe at the audit boundary (ADR 0010).
67#[derive(Debug)]
68pub enum OperatorEvent {
69    /// A replication stream to a follower/replica broke unexpectedly.
70    ReplicationBroken { peer: String, reason: String },
71    /// Replication state diverged: the follower's committed LSN or
72    /// checksum disagrees with the leader.
73    Divergence {
74        peer: String,
75        leader_lsn: u64,
76        follower_lsn: u64,
77    },
78    /// The WAL fsync call failed. Data may be at risk on the current host.
79    WalFsyncFailed { path: String, error: String },
80    /// Available disk space fell below the configured critical threshold.
81    DiskSpaceCritical {
82        path: String,
83        available_bytes: u64,
84        threshold_bytes: u64,
85    },
86    /// An authentication bypass was detected (e.g. auth gate returned
87    /// `allow` for a request that should have been rejected).
88    AuthBypass {
89        principal: String,
90        resource: String,
91        detail: String,
92    },
93    /// An admin capability was granted to a principal at runtime.
94    AdminCapabilityGranted {
95        granted_to: String,
96        capability: String,
97        granted_by: String,
98    },
99    /// Secret rotation failed; the current secret may be stale.
100    SecretRotationFailed { secret_ref: String, error: String },
101    /// A runtime configuration change was applied to a live instance.
102    ConfigChanged {
103        key: String,
104        old_value: String,
105        new_value: String,
106        changed_by: String,
107    },
108    /// The server process failed to start cleanly.
109    StartupFailed { phase: String, error: String },
110    /// The server process was forced to shut down (e.g. OOM killer,
111    /// SIGKILL, unrecoverable error).
112    ShutdownForced { reason: String },
113    /// On-disk schema metadata is corrupt or inconsistent.
114    SchemaCorruption { collection: String, detail: String },
115    /// A scheduled or triggered checkpoint failed to complete.
116    CheckpointFailed { lsn: u64, error: String },
117    /// An admin intent that was started but never reached a terminal phase
118    /// (completed or aborted). Emitted by [`super::admin_intent_log::AdminIntentLog::scan_and_report`]
119    /// at startup so operators can investigate interrupted operations.
120    DanglingAdminIntent {
121        id: crate::crypto::uuid::Uuid,
122        op: crate::telemetry::admin_intent_log::IntentOp,
123        /// Unix milliseconds when the intent was started.
124        started_at_ms: u64,
125        last_phase: crate::telemetry::admin_intent_log::IntentPhase,
126    },
127    /// A config-file change was detected but one or more changed fields
128    /// require a full server restart to take effect. The change was NOT
129    /// applied; the operator must restart to pick it up.
130    ConfigChangeRequiresRestart { fields_changed: String },
131    /// An ALTER TABLE on a collection with active event subscriptions
132    /// added or removed columns. Downstream consumers may see a different
133    /// payload shape starting at `lsn`.
134    SubscriptionSchemaChange {
135        collection: String,
136        /// Comma-separated subscription names affected.
137        subscription_names: String,
138        /// Comma-separated columns added (empty string if none).
139        fields_added: String,
140        /// Comma-separated columns removed (empty string if none).
141        fields_removed: String,
142        lsn: u64,
143    },
144    /// An event could not be delivered to its target queue after all
145    /// retry attempts, and was routed to the dead-letter queue instead.
146    OutboxDlqActivated {
147        queue: String,
148        dlq: String,
149        reason: String,
150    },
151    /// A queue message exhausted `max_attempts` and was promoted to its
152    /// DLQ target. Forensic: data has left the normal flow and operators
153    /// must be able to trace it later. Slice 10 of issue #527.
154    QueueDlqPromoted {
155        queue: String,
156        group: String,
157        dlq: String,
158        message_id: u64,
159        attempts: u32,
160        reason: String,
161    },
162}
163
164impl OperatorEvent {
165    /// All variant names as CamelCase strings, in declaration order.
166    pub fn all_variant_names() -> &'static [&'static str] {
167        &[
168            "ReplicationBroken",
169            "Divergence",
170            "WalFsyncFailed",
171            "DiskSpaceCritical",
172            "AuthBypass",
173            "AdminCapabilityGranted",
174            "SecretRotationFailed",
175            "ConfigChanged",
176            "StartupFailed",
177            "ShutdownForced",
178            "SchemaCorruption",
179            "CheckpointFailed",
180            "DanglingAdminIntent",
181            "ConfigChangeRequiresRestart",
182            "SubscriptionSchemaChange",
183            "OutboxDlqActivated",
184            "QueueDlqPromoted",
185        ]
186    }
187
188    /// Return the CamelCase variant name for routing table lookup.
189    pub(super) fn variant_name(&self) -> &'static str {
190        match self {
191            Self::ReplicationBroken { .. } => "ReplicationBroken",
192            Self::Divergence { .. } => "Divergence",
193            Self::WalFsyncFailed { .. } => "WalFsyncFailed",
194            Self::DiskSpaceCritical { .. } => "DiskSpaceCritical",
195            Self::AuthBypass { .. } => "AuthBypass",
196            Self::AdminCapabilityGranted { .. } => "AdminCapabilityGranted",
197            Self::SecretRotationFailed { .. } => "SecretRotationFailed",
198            Self::ConfigChanged { .. } => "ConfigChanged",
199            Self::StartupFailed { .. } => "StartupFailed",
200            Self::ShutdownForced { .. } => "ShutdownForced",
201            Self::SchemaCorruption { .. } => "SchemaCorruption",
202            Self::CheckpointFailed { .. } => "CheckpointFailed",
203            Self::DanglingAdminIntent { .. } => "DanglingAdminIntent",
204            Self::ConfigChangeRequiresRestart { .. } => "ConfigChangeRequiresRestart",
205            Self::SubscriptionSchemaChange { .. } => "SubscriptionSchemaChange",
206            Self::OutboxDlqActivated { .. } => "OutboxDlqActivated",
207            Self::QueueDlqPromoted { .. } => "QueueDlqPromoted",
208        }
209    }
210
211    /// Emit the event.
212    ///
213    /// Execution order (per issue #202):
214    /// 1. Persist to `audit` — tamper-evident, durable.
215    /// 2. `tracing::warn!` breadcrumb — lands in `red.log` / stderr.
216    /// 3. `eprintln!` fallback — fires only if the audit write fails,
217    ///    ensuring the event is never silently lost.
218    ///
219    /// Emit the event using the process-wide sink installed by the
220    /// runtime at startup. When no sink is installed (early boot,
221    /// tests without an audit logger), the tracing breadcrumb and
222    /// eprintln fallback still fire so the event is never silently
223    /// lost.
224    pub fn emit_global(self) {
225        match GLOBAL_SINK.get() {
226            Some(logger) => self.emit(logger.as_ref()),
227            None => {
228                let (_, _, summary) = self.decompose();
229                tracing::warn!(target: "reddb::operator", "{summary}");
230                eprintln!("[reddb::operator] (no audit sink) {summary}");
231            }
232        }
233    }
234
235    pub fn emit(self, audit: &AuditLogger) {
236        let (action, fields, summary) = self.decompose();
237
238        let ev = AuditEvent::builder(action)
239            .source(AuditAuthSource::System)
240            .outcome(Outcome::Error)
241            .fields(fields)
242            .build();
243
244        // 1. Audit log (primary, tamper-evident).
245        let audit_ok = {
246            // `record_event` is infallible from the caller's perspective
247            // (it falls back to sync write internally), so we treat it as
248            // always succeeding for the breadcrumb decision.
249            audit.record_event(ev);
250            true
251        };
252
253        // 2. tracing breadcrumb.
254        tracing::warn!(target: "reddb::operator", "{summary}");
255
256        // 3. eprintln fallback — guard against silent loss when audit
257        //    is unhealthy (e.g. disk full, writer thread dead).
258        if !audit_ok {
259            eprintln!("[reddb::operator] {summary}");
260        }
261    }
262
263    /// Decompose `self` into `(action, audit_fields, human_summary)`.
264    pub(super) fn decompose(self) -> (&'static str, Vec<AuditField>, String) {
265        match self {
266            Self::ReplicationBroken { peer, reason } => {
267                let summary = format!("replication broken: peer={peer} reason={reason}");
268                let fields = vec![
269                    AuditFieldEscaper::field("peer", peer),
270                    AuditFieldEscaper::field("reason", reason),
271                ];
272                ("operator/replication_broken", fields, summary)
273            }
274            Self::Divergence {
275                peer,
276                leader_lsn,
277                follower_lsn,
278            } => {
279                let summary = format!(
280                    "replication divergence: peer={peer} leader_lsn={leader_lsn} follower_lsn={follower_lsn}"
281                );
282                let fields = vec![
283                    AuditFieldEscaper::field("peer", peer),
284                    AuditFieldEscaper::field("leader_lsn", leader_lsn),
285                    AuditFieldEscaper::field("follower_lsn", follower_lsn),
286                ];
287                ("operator/divergence", fields, summary)
288            }
289            Self::WalFsyncFailed { path, error } => {
290                let summary = format!("WAL fsync failed: path={path} error={error}");
291                let fields = vec![
292                    AuditFieldEscaper::field("path", path),
293                    AuditFieldEscaper::field("error", error),
294                ];
295                ("operator/wal_fsync_failed", fields, summary)
296            }
297            Self::DiskSpaceCritical {
298                path,
299                available_bytes,
300                threshold_bytes,
301            } => {
302                let summary = format!(
303                    "disk space critical: path={path} available={available_bytes} threshold={threshold_bytes}"
304                );
305                let fields = vec![
306                    AuditFieldEscaper::field("path", path),
307                    AuditFieldEscaper::field("available_bytes", available_bytes),
308                    AuditFieldEscaper::field("threshold_bytes", threshold_bytes),
309                ];
310                ("operator/disk_space_critical", fields, summary)
311            }
312            Self::AuthBypass {
313                principal,
314                resource,
315                detail,
316            } => {
317                let summary =
318                    format!("auth bypass detected: principal={principal} resource={resource}");
319                let fields = vec![
320                    AuditFieldEscaper::field("principal", principal),
321                    AuditFieldEscaper::field("resource", resource),
322                    AuditFieldEscaper::field("detail", detail),
323                ];
324                ("operator/auth_bypass", fields, summary)
325            }
326            Self::AdminCapabilityGranted {
327                granted_to,
328                capability,
329                granted_by,
330            } => {
331                let summary = format!(
332                    "admin capability granted: to={granted_to} capability={capability} by={granted_by}"
333                );
334                let fields = vec![
335                    AuditFieldEscaper::field("granted_to", granted_to),
336                    AuditFieldEscaper::field("capability", capability),
337                    AuditFieldEscaper::field("granted_by", granted_by),
338                ];
339                ("operator/admin_capability_granted", fields, summary)
340            }
341            Self::SecretRotationFailed { secret_ref, error } => {
342                let summary = format!("secret rotation failed: ref={secret_ref} error={error}");
343                let fields = vec![
344                    AuditFieldEscaper::field("secret_ref", secret_ref),
345                    AuditFieldEscaper::field("error", error),
346                ];
347                ("operator/secret_rotation_failed", fields, summary)
348            }
349            Self::ConfigChanged {
350                key,
351                old_value,
352                new_value,
353                changed_by,
354            } => {
355                let summary = format!(
356                    "config changed: key={key} old={old_value} new={new_value} by={changed_by}"
357                );
358                let fields = vec![
359                    AuditFieldEscaper::field("key", key),
360                    AuditFieldEscaper::field("old_value", old_value),
361                    AuditFieldEscaper::field("new_value", new_value),
362                    AuditFieldEscaper::field("changed_by", changed_by),
363                ];
364                ("operator/config_changed", fields, summary)
365            }
366            Self::StartupFailed { phase, error } => {
367                let summary = format!("startup failed: phase={phase} error={error}");
368                let fields = vec![
369                    AuditFieldEscaper::field("phase", phase),
370                    AuditFieldEscaper::field("error", error),
371                ];
372                ("operator/startup_failed", fields, summary)
373            }
374            Self::ShutdownForced { reason } => {
375                let summary = format!("shutdown forced: reason={reason}");
376                let fields = vec![AuditFieldEscaper::field("reason", reason)];
377                ("operator/shutdown_forced", fields, summary)
378            }
379            Self::SchemaCorruption { collection, detail } => {
380                let summary = format!("schema corruption: collection={collection} detail={detail}");
381                let fields = vec![
382                    AuditFieldEscaper::field("collection", collection),
383                    AuditFieldEscaper::field("detail", detail),
384                ];
385                ("operator/schema_corruption", fields, summary)
386            }
387            Self::CheckpointFailed { lsn, error } => {
388                let summary = format!("checkpoint failed: lsn={lsn} error={error}");
389                let fields = vec![
390                    AuditFieldEscaper::field("lsn", lsn),
391                    AuditFieldEscaper::field("error", error),
392                ];
393                ("operator/checkpoint_failed", fields, summary)
394            }
395            Self::DanglingAdminIntent {
396                id,
397                op,
398                started_at_ms,
399                last_phase,
400            } => {
401                let summary = format!(
402                    "dangling admin intent: id={id} op={op} started_at_ms={started_at_ms} last_phase={last_phase}"
403                );
404                let fields = vec![
405                    AuditFieldEscaper::field("id", id.to_string()),
406                    AuditFieldEscaper::field("op", op.to_string()),
407                    AuditFieldEscaper::field("started_at_ms", started_at_ms),
408                    AuditFieldEscaper::field("last_phase", last_phase.to_string()),
409                ];
410                ("operator/dangling_admin_intent", fields, summary)
411            }
412            Self::ConfigChangeRequiresRestart { fields_changed } => {
413                let summary = format!("config change requires restart: fields={fields_changed}");
414                let fields = vec![AuditFieldEscaper::field("fields_changed", fields_changed)];
415                ("operator/config_change_requires_restart", fields, summary)
416            }
417            Self::SubscriptionSchemaChange {
418                collection,
419                subscription_names,
420                fields_added,
421                fields_removed,
422                lsn,
423            } => {
424                let summary = format!(
425                    "subscription schema change: collection={collection} subscriptions=[{subscription_names}] added=[{fields_added}] removed=[{fields_removed}] lsn={lsn}"
426                );
427                let fields = vec![
428                    AuditFieldEscaper::field("collection", collection),
429                    AuditFieldEscaper::field("subscription_names", subscription_names),
430                    AuditFieldEscaper::field("fields_added", fields_added),
431                    AuditFieldEscaper::field("fields_removed", fields_removed),
432                    AuditFieldEscaper::field("lsn", lsn),
433                ];
434                ("operator/subscription_schema_change", fields, summary)
435            }
436            Self::OutboxDlqActivated { queue, dlq, reason } => {
437                let summary =
438                    format!("outbox DLQ activated: queue={queue} dlq={dlq} reason={reason}");
439                let fields = vec![
440                    AuditFieldEscaper::field("queue", queue),
441                    AuditFieldEscaper::field("dlq", dlq),
442                    AuditFieldEscaper::field("reason", reason),
443                ];
444                ("operator/outbox_dlq_activated", fields, summary)
445            }
446            Self::QueueDlqPromoted {
447                queue,
448                group,
449                dlq,
450                message_id,
451                attempts,
452                reason,
453            } => {
454                let summary = format!(
455                    "queue DLQ promoted: queue={queue} group={group} dlq={dlq} message_id={message_id} attempts={attempts} reason={reason}"
456                );
457                let fields = vec![
458                    AuditFieldEscaper::field("queue", queue),
459                    AuditFieldEscaper::field("group", group),
460                    AuditFieldEscaper::field("dlq", dlq),
461                    AuditFieldEscaper::field("message_id", message_id),
462                    AuditFieldEscaper::field("attempts", attempts as u64),
463                    AuditFieldEscaper::field("reason", reason),
464                ];
465                ("operator/queue_dlq_promoted", fields, summary)
466            }
467        }
468    }
469}
470
471// ---------------------------------------------------------------------------
472// Tests
473// ---------------------------------------------------------------------------
474
475#[cfg(test)]
476mod tests {
477    use std::time::Duration;
478
479    use super::*;
480    use crate::runtime::audit_log::AuditLogger;
481
482    fn make_logger() -> (AuditLogger, std::path::PathBuf) {
483        let mut dir = std::env::temp_dir();
484        dir.push(format!(
485            "reddb-op-event-{}-{}",
486            std::process::id(),
487            crate::utils::now_unix_nanos()
488        ));
489        std::fs::create_dir_all(&dir).unwrap();
490        let path = dir.join(".audit.log");
491        let logger = AuditLogger::with_path(path.clone());
492        (logger, path)
493    }
494
495    fn drain(logger: &AuditLogger) {
496        assert!(
497            logger.wait_idle(Duration::from_secs(2)),
498            "audit logger drain timed out"
499        );
500    }
501
502    fn read_last_line(path: &std::path::Path) -> crate::json::Value {
503        let body = std::fs::read_to_string(path).unwrap();
504        let line = body.lines().last().expect("at least one audit line");
505        crate::json::from_str(line).expect("valid JSON")
506    }
507
508    // ------------------------------------------------------------------
509    // One test per variant — verifies action string + a representative field
510    // ------------------------------------------------------------------
511
512    #[test]
513    fn replication_broken_emits() {
514        let (logger, path) = make_logger();
515        OperatorEvent::ReplicationBroken {
516            peer: "replica-1".into(),
517            reason: "TCP reset".into(),
518        }
519        .emit(&logger);
520        drain(&logger);
521        let v = read_last_line(&path);
522        assert_eq!(
523            v.get("action").and_then(|x| x.as_str()),
524            Some("operator/replication_broken")
525        );
526        let peer = v
527            .get("detail")
528            .and_then(|d| d.get("peer"))
529            .and_then(|x| x.as_str());
530        assert_eq!(peer, Some("replica-1"));
531    }
532
533    #[test]
534    fn divergence_emits() {
535        let (logger, path) = make_logger();
536        OperatorEvent::Divergence {
537            peer: "replica-2".into(),
538            leader_lsn: 1000,
539            follower_lsn: 999,
540        }
541        .emit(&logger);
542        drain(&logger);
543        let v = read_last_line(&path);
544        assert_eq!(
545            v.get("action").and_then(|x| x.as_str()),
546            Some("operator/divergence")
547        );
548        let lsn = v
549            .get("detail")
550            .and_then(|d| d.get("leader_lsn"))
551            .and_then(|x| x.as_i64());
552        assert_eq!(lsn, Some(1000));
553    }
554
555    #[test]
556    fn wal_fsync_failed_emits() {
557        let (logger, path) = make_logger();
558        OperatorEvent::WalFsyncFailed {
559            path: "/data/wal".into(),
560            error: "EIO".into(),
561        }
562        .emit(&logger);
563        drain(&logger);
564        let v = read_last_line(&path);
565        assert_eq!(
566            v.get("action").and_then(|x| x.as_str()),
567            Some("operator/wal_fsync_failed")
568        );
569        let err = v
570            .get("detail")
571            .and_then(|d| d.get("error"))
572            .and_then(|x| x.as_str());
573        assert_eq!(err, Some("EIO"));
574    }
575
576    #[test]
577    fn disk_space_critical_emits() {
578        let (logger, path) = make_logger();
579        OperatorEvent::DiskSpaceCritical {
580            path: "/data".into(),
581            available_bytes: 1024,
582            threshold_bytes: 10240,
583        }
584        .emit(&logger);
585        drain(&logger);
586        let v = read_last_line(&path);
587        assert_eq!(
588            v.get("action").and_then(|x| x.as_str()),
589            Some("operator/disk_space_critical")
590        );
591        let avail = v
592            .get("detail")
593            .and_then(|d| d.get("available_bytes"))
594            .and_then(|x| x.as_i64());
595        assert_eq!(avail, Some(1024));
596    }
597
598    #[test]
599    fn auth_bypass_emits() {
600        let (logger, path) = make_logger();
601        OperatorEvent::AuthBypass {
602            principal: "alice".into(),
603            resource: "/admin/drop".into(),
604            detail: "scope check skipped".into(),
605        }
606        .emit(&logger);
607        drain(&logger);
608        let v = read_last_line(&path);
609        assert_eq!(
610            v.get("action").and_then(|x| x.as_str()),
611            Some("operator/auth_bypass")
612        );
613        let res = v
614            .get("detail")
615            .and_then(|d| d.get("resource"))
616            .and_then(|x| x.as_str());
617        assert_eq!(res, Some("/admin/drop"));
618    }
619
620    #[test]
621    fn admin_capability_granted_emits() {
622        let (logger, path) = make_logger();
623        OperatorEvent::AdminCapabilityGranted {
624            granted_to: "bob".into(),
625            capability: "ADMIN_WRITE".into(),
626            granted_by: "root".into(),
627        }
628        .emit(&logger);
629        drain(&logger);
630        let v = read_last_line(&path);
631        assert_eq!(
632            v.get("action").and_then(|x| x.as_str()),
633            Some("operator/admin_capability_granted")
634        );
635        let cap = v
636            .get("detail")
637            .and_then(|d| d.get("capability"))
638            .and_then(|x| x.as_str());
639        assert_eq!(cap, Some("ADMIN_WRITE"));
640    }
641
642    #[test]
643    fn secret_rotation_failed_emits() {
644        let (logger, path) = make_logger();
645        OperatorEvent::SecretRotationFailed {
646            secret_ref: "jwt-signing-key".into(),
647            error: "HSM unreachable".into(),
648        }
649        .emit(&logger);
650        drain(&logger);
651        let v = read_last_line(&path);
652        assert_eq!(
653            v.get("action").and_then(|x| x.as_str()),
654            Some("operator/secret_rotation_failed")
655        );
656        let r = v
657            .get("detail")
658            .and_then(|d| d.get("secret_ref"))
659            .and_then(|x| x.as_str());
660        assert_eq!(r, Some("jwt-signing-key"));
661    }
662
663    #[test]
664    fn config_changed_emits() {
665        let (logger, path) = make_logger();
666        OperatorEvent::ConfigChanged {
667            key: "max_connections".into(),
668            old_value: "100".into(),
669            new_value: "200".into(),
670            changed_by: "ops-bot".into(),
671        }
672        .emit(&logger);
673        drain(&logger);
674        let v = read_last_line(&path);
675        assert_eq!(
676            v.get("action").and_then(|x| x.as_str()),
677            Some("operator/config_changed")
678        );
679        let nv = v
680            .get("detail")
681            .and_then(|d| d.get("new_value"))
682            .and_then(|x| x.as_str());
683        assert_eq!(nv, Some("200"));
684    }
685
686    #[test]
687    fn startup_failed_emits() {
688        let (logger, path) = make_logger();
689        OperatorEvent::StartupFailed {
690            phase: "wal_recovery".into(),
691            error: "corrupt frame".into(),
692        }
693        .emit(&logger);
694        drain(&logger);
695        let v = read_last_line(&path);
696        assert_eq!(
697            v.get("action").and_then(|x| x.as_str()),
698            Some("operator/startup_failed")
699        );
700        let phase = v
701            .get("detail")
702            .and_then(|d| d.get("phase"))
703            .and_then(|x| x.as_str());
704        assert_eq!(phase, Some("wal_recovery"));
705    }
706
707    #[test]
708    fn shutdown_forced_emits() {
709        let (logger, path) = make_logger();
710        OperatorEvent::ShutdownForced {
711            reason: "OOM".into(),
712        }
713        .emit(&logger);
714        drain(&logger);
715        let v = read_last_line(&path);
716        assert_eq!(
717            v.get("action").and_then(|x| x.as_str()),
718            Some("operator/shutdown_forced")
719        );
720        let r = v
721            .get("detail")
722            .and_then(|d| d.get("reason"))
723            .and_then(|x| x.as_str());
724        assert_eq!(r, Some("OOM"));
725    }
726
727    #[test]
728    fn schema_corruption_emits() {
729        let (logger, path) = make_logger();
730        OperatorEvent::SchemaCorruption {
731            collection: "users".into(),
732            detail: "unknown type tag 0xFF".into(),
733        }
734        .emit(&logger);
735        drain(&logger);
736        let v = read_last_line(&path);
737        assert_eq!(
738            v.get("action").and_then(|x| x.as_str()),
739            Some("operator/schema_corruption")
740        );
741        let coll = v
742            .get("detail")
743            .and_then(|d| d.get("collection"))
744            .and_then(|x| x.as_str());
745        assert_eq!(coll, Some("users"));
746    }
747
748    #[test]
749    fn checkpoint_failed_emits() {
750        let (logger, path) = make_logger();
751        OperatorEvent::CheckpointFailed {
752            lsn: 42_000,
753            error: "write stall".into(),
754        }
755        .emit(&logger);
756        drain(&logger);
757        let v = read_last_line(&path);
758        assert_eq!(
759            v.get("action").and_then(|x| x.as_str()),
760            Some("operator/checkpoint_failed")
761        );
762        let lsn = v
763            .get("detail")
764            .and_then(|d| d.get("lsn"))
765            .and_then(|x| x.as_i64());
766        assert_eq!(lsn, Some(42_000));
767    }
768
769    // ------------------------------------------------------------------
770    // Adversarial corpus: CRLF / NUL / quote / non-UTF-8-ish in fields
771    // ------------------------------------------------------------------
772
773    #[test]
774    fn adversarial_fields_are_escape_safe() {
775        let payloads: &[(&str, &str)] = &[
776            ("crlf", "line1\r\nline2"),
777            ("nul", "before\0after"),
778            ("quote", r#"she said "hi""#),
779            ("json_inject", r#"{"injected":true}"#),
780            ("low_ctrl", "\x01\x02\x07\x1f"),
781            ("backslash", "C:\\path\\file"),
782            ("mixed", "name=\"x\"\n\\path\t\x01end"),
783        ];
784
785        for (label, payload) in payloads {
786            let (logger, path) = make_logger();
787            OperatorEvent::SchemaCorruption {
788                collection: payload.to_string(),
789                detail: payload.to_string(),
790            }
791            .emit(&logger);
792            drain(&logger);
793
794            let body = std::fs::read_to_string(&path).unwrap();
795            let line = body.lines().last().unwrap_or("");
796
797            // Single JSONL row — no embedded newline.
798            assert!(
799                !line.contains('\n'),
800                "{label}: embedded newline in JSONL row"
801            );
802
803            let v: crate::json::Value = crate::json::from_str(line)
804                .unwrap_or_else(|e| panic!("{label}: audit line not valid JSON: {e}\n{line:?}"));
805            let recovered = v
806                .get("detail")
807                .and_then(|d| d.get("collection"))
808                .and_then(|x| x.as_str())
809                .unwrap_or("");
810            assert_eq!(recovered, *payload, "{label}: round-trip mismatch");
811        }
812    }
813
814    // ------------------------------------------------------------------
815    // Outcome is always Error; source is always System
816    // ------------------------------------------------------------------
817
818    #[test]
819    fn emit_sets_outcome_error_and_source_system() {
820        let (logger, path) = make_logger();
821        OperatorEvent::ShutdownForced {
822            reason: "test".into(),
823        }
824        .emit(&logger);
825        drain(&logger);
826        let v = read_last_line(&path);
827        assert_eq!(v.get("outcome").and_then(|x| x.as_str()), Some("error"));
828        assert_eq!(v.get("source").and_then(|x| x.as_str()), Some("system"));
829    }
830}