Skip to main content

reddb_server/telemetry/
operator_event.rs

1//! Operator-grade event bus for high-severity system conditions.
2//!
3//! # Operator / developer split
4//!
5//! RedDB telemetry has two audiences:
6//!
7//! - **Developer signal** (`tracing` spans at `DEBUG` / `INFO`): ephemeral,
8//!   high-volume, lives in `red.log` or stdout. Helps engineers trace request
9//!   flows and understand runtime behaviour during development.
10//!
11//! - **Operator-grade events** (this module): low-volume, high-severity
12//!   conditions that a production operator *must* see and act on.
13//!   Persisted to the tamper-evident audit log first so they survive process
14//!   crashes; a `tracing::warn!` breadcrumb lands in the normal log channel
15//!   as a secondary copy; `eprintln!` fallback ensures the event is never
16//!   silently swallowed even if both sinks fail.
17//!
18//! `OperatorEvent::emit` always runs synchronously — it is intentionally
19//! *not* async so callers in crash paths, signal handlers, and `Drop` impls
20//! can call it without an async runtime.
21
22use std::sync::{Arc, OnceLock};
23
24use crate::runtime::audit_log::{
25    AuditAuthSource, AuditEvent, AuditField, AuditFieldEscaper, AuditLogger, Outcome,
26};
27
28// ---------------------------------------------------------------------------
29// Process-wide sink
30// ---------------------------------------------------------------------------
31//
32// The OperatorEvent enum is defined in `telemetry/` but the deepest
33// emit sites (storage layer, replication apply loop, signal handlers,
34// drop impls) cannot thread an `&AuditLogger` reference through their
35// call stacks without a sweeping refactor. To stay surgical (#205) we
36// expose a process-wide sink that the runtime registers at startup and
37// every emit site consults via `OperatorEvent::emit_global`.
38//
39// Trade-off: the sink is a `OnceLock<Arc<AuditLogger>>`, which means
40// emits that fire *before* the runtime registers the logger fall back
41// to `tracing::warn!` + `eprintln!` only — the audit copy is lost. The
42// tamper-evident audit copy is the primary record; the breadcrumb /
43// stderr fallbacks are the safety net the original `emit(&AuditLogger)`
44// shape already accepted, so the degradation is the same one already
45// documented in the module rustdoc.
46
47static GLOBAL_SINK: OnceLock<Arc<AuditLogger>> = OnceLock::new();
48
49/// Install the process-wide [`AuditLogger`] used by
50/// [`OperatorEvent::emit_global`]. Called once at runtime startup; a
51/// second call is a no-op (the first registration wins) so test
52/// harnesses that build multiple in-memory runtimes cannot stomp on
53/// each other's loggers — they fall back to tracing+eprintln.
54pub fn install_global_audit_sink(logger: Arc<AuditLogger>) {
55    let _ = GLOBAL_SINK.set(logger);
56}
57
58// ---------------------------------------------------------------------------
59// OperatorEvent
60// ---------------------------------------------------------------------------
61
62/// High-severity system conditions that require operator attention.
63///
64/// Every variant carries typed [`crate::runtime::audit_log::AuditValue`]
65/// fields so adversarial bytes (CRLF, NUL, quote, non-UTF-8) are
66/// escape-safe at the audit boundary (ADR 0010).
67#[derive(Debug)]
68pub enum OperatorEvent {
69    /// A replication stream to a follower/replica broke unexpectedly.
70    ReplicationBroken { peer: String, reason: String },
71    /// Replication state diverged: the follower's committed LSN or
72    /// checksum disagrees with the leader.
73    Divergence {
74        peer: String,
75        leader_lsn: u64,
76        follower_lsn: u64,
77    },
78    /// The WAL fsync call failed. Data may be at risk on the current host.
79    WalFsyncFailed { path: String, error: String },
80    /// Available disk space fell below the configured critical threshold.
81    DiskSpaceCritical {
82        path: String,
83        available_bytes: u64,
84        threshold_bytes: u64,
85    },
86    /// An authentication bypass was detected (e.g. auth gate returned
87    /// `allow` for a request that should have been rejected).
88    AuthBypass {
89        principal: String,
90        resource: String,
91        detail: String,
92    },
93    /// An admin capability was granted to a principal at runtime.
94    AdminCapabilityGranted {
95        granted_to: String,
96        capability: String,
97        granted_by: String,
98    },
99    /// Secret rotation failed; the current secret may be stale.
100    SecretRotationFailed { secret_ref: String, error: String },
101    /// A runtime configuration change was applied to a live instance.
102    ConfigChanged {
103        key: String,
104        old_value: String,
105        new_value: String,
106        changed_by: String,
107    },
108    /// The server process failed to start cleanly.
109    StartupFailed { phase: String, error: String },
110    /// The server process was forced to shut down (e.g. OOM killer,
111    /// SIGKILL, unrecoverable error).
112    ShutdownForced { reason: String },
113    /// On-disk schema metadata is corrupt or inconsistent.
114    SchemaCorruption { collection: String, detail: String },
115    /// A scheduled or triggered checkpoint failed to complete.
116    CheckpointFailed { lsn: u64, error: String },
117    /// An admin intent that was started but never reached a terminal phase
118    /// (completed or aborted). Emitted by [`super::admin_intent_log::AdminIntentLog::scan_and_report`]
119    /// at startup so operators can investigate interrupted operations.
120    DanglingAdminIntent {
121        id: crate::crypto::uuid::Uuid,
122        op: crate::telemetry::admin_intent_log::IntentOp,
123        /// Unix milliseconds when the intent was started.
124        started_at_ms: u64,
125        last_phase: crate::telemetry::admin_intent_log::IntentPhase,
126    },
127    /// A config-file change was detected but one or more changed fields
128    /// require a full server restart to take effect. The change was NOT
129    /// applied; the operator must restart to pick it up.
130    ConfigChangeRequiresRestart { fields_changed: String },
131    /// An ALTER TABLE on a collection with active event subscriptions
132    /// added or removed columns. Downstream consumers may see a different
133    /// payload shape starting at `lsn`.
134    SubscriptionSchemaChange {
135        collection: String,
136        /// Comma-separated subscription names affected.
137        subscription_names: String,
138        /// Comma-separated columns added (empty string if none).
139        fields_added: String,
140        /// Comma-separated columns removed (empty string if none).
141        fields_removed: String,
142        lsn: u64,
143    },
144    /// An event could not be delivered to its target queue after all
145    /// retry attempts, and was routed to the dead-letter queue instead.
146    OutboxDlqActivated {
147        queue: String,
148        dlq: String,
149        reason: String,
150    },
151    /// A queue message exhausted `max_attempts` and was promoted to its
152    /// DLQ target. Forensic: data has left the normal flow and operators
153    /// must be able to trace it later. Slice 10 of issue #527.
154    QueueDlqPromoted {
155        queue: String,
156        group: String,
157        dlq: String,
158        message_id: u64,
159        attempts: u32,
160        reason: String,
161    },
162    /// A deposed primary auto-rolled-back its divergent tail to the common
163    /// point on rejoin (issue #840, ADR 0030). The tail is, by definition,
164    /// non-committed (above the commit watermark), so removing it from the
165    /// live timeline is correct — but the discarded writes are **always**
166    /// persisted to a rollback file so they remain auditable and
167    /// reconcilable. Rollback is never silent. The `common_point_lsn` is the
168    /// recover-to-LSN target; everything in `(common_point_lsn,
169    /// tail_to_lsn]` was removed from the live timeline and saved to
170    /// `rollback_file`. `commit_watermark` is the durable floor that was
171    /// never crossed.
172    DeposedPrimaryRollback {
173        /// LSN the deposed primary recovered to — the common point with the
174        /// new primary, the recover-to-LSN target.
175        common_point_lsn: u64,
176        /// Highest LSN in the discarded divergent tail (inclusive).
177        tail_to_lsn: u64,
178        /// Number of LSNs removed from the live timeline.
179        tail_lsns: u64,
180        /// The commit watermark — the durable floor. The invariant holds:
181        /// `common_point_lsn >= commit_watermark`, so nothing at or below
182        /// the watermark was rolled back.
183        commit_watermark: u64,
184        /// Path/handle of the rollback file the discarded tail was saved to.
185        rollback_file: String,
186        /// The new primary the node rejoins as a replica of.
187        new_primary_addr: String,
188        /// The term the node now follows.
189        new_term: u64,
190    },
191}
192
193impl OperatorEvent {
194    /// All variant names as CamelCase strings, in declaration order.
195    pub fn all_variant_names() -> &'static [&'static str] {
196        &[
197            "ReplicationBroken",
198            "Divergence",
199            "WalFsyncFailed",
200            "DiskSpaceCritical",
201            "AuthBypass",
202            "AdminCapabilityGranted",
203            "SecretRotationFailed",
204            "ConfigChanged",
205            "StartupFailed",
206            "ShutdownForced",
207            "SchemaCorruption",
208            "CheckpointFailed",
209            "DanglingAdminIntent",
210            "ConfigChangeRequiresRestart",
211            "SubscriptionSchemaChange",
212            "OutboxDlqActivated",
213            "QueueDlqPromoted",
214            "DeposedPrimaryRollback",
215        ]
216    }
217
218    /// Return the CamelCase variant name for routing table lookup.
219    pub(super) fn variant_name(&self) -> &'static str {
220        match self {
221            Self::ReplicationBroken { .. } => "ReplicationBroken",
222            Self::Divergence { .. } => "Divergence",
223            Self::WalFsyncFailed { .. } => "WalFsyncFailed",
224            Self::DiskSpaceCritical { .. } => "DiskSpaceCritical",
225            Self::AuthBypass { .. } => "AuthBypass",
226            Self::AdminCapabilityGranted { .. } => "AdminCapabilityGranted",
227            Self::SecretRotationFailed { .. } => "SecretRotationFailed",
228            Self::ConfigChanged { .. } => "ConfigChanged",
229            Self::StartupFailed { .. } => "StartupFailed",
230            Self::ShutdownForced { .. } => "ShutdownForced",
231            Self::SchemaCorruption { .. } => "SchemaCorruption",
232            Self::CheckpointFailed { .. } => "CheckpointFailed",
233            Self::DanglingAdminIntent { .. } => "DanglingAdminIntent",
234            Self::ConfigChangeRequiresRestart { .. } => "ConfigChangeRequiresRestart",
235            Self::SubscriptionSchemaChange { .. } => "SubscriptionSchemaChange",
236            Self::OutboxDlqActivated { .. } => "OutboxDlqActivated",
237            Self::QueueDlqPromoted { .. } => "QueueDlqPromoted",
238            Self::DeposedPrimaryRollback { .. } => "DeposedPrimaryRollback",
239        }
240    }
241
242    /// Emit the event.
243    ///
244    /// Execution order (per issue #202):
245    /// 1. Persist to `audit` — tamper-evident, durable.
246    /// 2. `tracing::warn!` breadcrumb — lands in `red.log` / stderr.
247    /// 3. `eprintln!` fallback — fires only if the audit write fails,
248    ///    ensuring the event is never silently lost.
249    ///
250    /// Emit the event using the process-wide sink installed by the
251    /// runtime at startup. When no sink is installed (early boot,
252    /// tests without an audit logger), the tracing breadcrumb and
253    /// eprintln fallback still fire so the event is never silently
254    /// lost.
255    pub fn emit_global(self) {
256        match GLOBAL_SINK.get() {
257            Some(logger) => self.emit(logger.as_ref()),
258            None => {
259                let (_, _, summary) = self.decompose();
260                tracing::warn!(target: "reddb::operator", "{summary}");
261                eprintln!("[reddb::operator] (no audit sink) {summary}");
262            }
263        }
264    }
265
266    pub fn emit(self, audit: &AuditLogger) {
267        let (action, fields, summary) = self.decompose();
268
269        let ev = AuditEvent::builder(action)
270            .source(AuditAuthSource::System)
271            .outcome(Outcome::Error)
272            .fields(fields)
273            .build();
274
275        // 1. Audit log (primary, tamper-evident).
276        let audit_ok = {
277            // `record_event` is infallible from the caller's perspective
278            // (it falls back to sync write internally), so we treat it as
279            // always succeeding for the breadcrumb decision.
280            audit.record_event(ev);
281            true
282        };
283
284        // 2. tracing breadcrumb.
285        tracing::warn!(target: "reddb::operator", "{summary}");
286
287        // 3. eprintln fallback — guard against silent loss when audit
288        //    is unhealthy (e.g. disk full, writer thread dead).
289        if !audit_ok {
290            eprintln!("[reddb::operator] {summary}");
291        }
292    }
293
294    /// Decompose `self` into `(action, audit_fields, human_summary)`.
295    pub(super) fn decompose(self) -> (&'static str, Vec<AuditField>, String) {
296        match self {
297            Self::ReplicationBroken { peer, reason } => {
298                let summary = format!("replication broken: peer={peer} reason={reason}");
299                let fields = vec![
300                    AuditFieldEscaper::field("peer", peer),
301                    AuditFieldEscaper::field("reason", reason),
302                ];
303                ("operator/replication_broken", fields, summary)
304            }
305            Self::Divergence {
306                peer,
307                leader_lsn,
308                follower_lsn,
309            } => {
310                let summary = format!(
311                    "replication divergence: peer={peer} leader_lsn={leader_lsn} follower_lsn={follower_lsn}"
312                );
313                let fields = vec![
314                    AuditFieldEscaper::field("peer", peer),
315                    AuditFieldEscaper::field("leader_lsn", leader_lsn),
316                    AuditFieldEscaper::field("follower_lsn", follower_lsn),
317                ];
318                ("operator/divergence", fields, summary)
319            }
320            Self::WalFsyncFailed { path, error } => {
321                let summary = format!("WAL fsync failed: path={path} error={error}");
322                let fields = vec![
323                    AuditFieldEscaper::field("path", path),
324                    AuditFieldEscaper::field("error", error),
325                ];
326                ("operator/wal_fsync_failed", fields, summary)
327            }
328            Self::DiskSpaceCritical {
329                path,
330                available_bytes,
331                threshold_bytes,
332            } => {
333                let summary = format!(
334                    "disk space critical: path={path} available={available_bytes} threshold={threshold_bytes}"
335                );
336                let fields = vec![
337                    AuditFieldEscaper::field("path", path),
338                    AuditFieldEscaper::field("available_bytes", available_bytes),
339                    AuditFieldEscaper::field("threshold_bytes", threshold_bytes),
340                ];
341                ("operator/disk_space_critical", fields, summary)
342            }
343            Self::AuthBypass {
344                principal,
345                resource,
346                detail,
347            } => {
348                let summary =
349                    format!("auth bypass detected: principal={principal} resource={resource}");
350                let fields = vec![
351                    AuditFieldEscaper::field("principal", principal),
352                    AuditFieldEscaper::field("resource", resource),
353                    AuditFieldEscaper::field("detail", detail),
354                ];
355                ("operator/auth_bypass", fields, summary)
356            }
357            Self::AdminCapabilityGranted {
358                granted_to,
359                capability,
360                granted_by,
361            } => {
362                let summary = format!(
363                    "admin capability granted: to={granted_to} capability={capability} by={granted_by}"
364                );
365                let fields = vec![
366                    AuditFieldEscaper::field("granted_to", granted_to),
367                    AuditFieldEscaper::field("capability", capability),
368                    AuditFieldEscaper::field("granted_by", granted_by),
369                ];
370                ("operator/admin_capability_granted", fields, summary)
371            }
372            Self::SecretRotationFailed { secret_ref, error } => {
373                let summary = format!("secret rotation failed: ref={secret_ref} error={error}");
374                let fields = vec![
375                    AuditFieldEscaper::field("secret_ref", secret_ref),
376                    AuditFieldEscaper::field("error", error),
377                ];
378                ("operator/secret_rotation_failed", fields, summary)
379            }
380            Self::ConfigChanged {
381                key,
382                old_value,
383                new_value,
384                changed_by,
385            } => {
386                let summary = format!(
387                    "config changed: key={key} old={old_value} new={new_value} by={changed_by}"
388                );
389                let fields = vec![
390                    AuditFieldEscaper::field("key", key),
391                    AuditFieldEscaper::field("old_value", old_value),
392                    AuditFieldEscaper::field("new_value", new_value),
393                    AuditFieldEscaper::field("changed_by", changed_by),
394                ];
395                ("operator/config_changed", fields, summary)
396            }
397            Self::StartupFailed { phase, error } => {
398                let summary = format!("startup failed: phase={phase} error={error}");
399                let fields = vec![
400                    AuditFieldEscaper::field("phase", phase),
401                    AuditFieldEscaper::field("error", error),
402                ];
403                ("operator/startup_failed", fields, summary)
404            }
405            Self::ShutdownForced { reason } => {
406                let summary = format!("shutdown forced: reason={reason}");
407                let fields = vec![AuditFieldEscaper::field("reason", reason)];
408                ("operator/shutdown_forced", fields, summary)
409            }
410            Self::SchemaCorruption { collection, detail } => {
411                let summary = format!("schema corruption: collection={collection} detail={detail}");
412                let fields = vec![
413                    AuditFieldEscaper::field("collection", collection),
414                    AuditFieldEscaper::field("detail", detail),
415                ];
416                ("operator/schema_corruption", fields, summary)
417            }
418            Self::CheckpointFailed { lsn, error } => {
419                let summary = format!("checkpoint failed: lsn={lsn} error={error}");
420                let fields = vec![
421                    AuditFieldEscaper::field("lsn", lsn),
422                    AuditFieldEscaper::field("error", error),
423                ];
424                ("operator/checkpoint_failed", fields, summary)
425            }
426            Self::DanglingAdminIntent {
427                id,
428                op,
429                started_at_ms,
430                last_phase,
431            } => {
432                let summary = format!(
433                    "dangling admin intent: id={id} op={op} started_at_ms={started_at_ms} last_phase={last_phase}"
434                );
435                let fields = vec![
436                    AuditFieldEscaper::field("id", id.to_string()),
437                    AuditFieldEscaper::field("op", op.to_string()),
438                    AuditFieldEscaper::field("started_at_ms", started_at_ms),
439                    AuditFieldEscaper::field("last_phase", last_phase.to_string()),
440                ];
441                ("operator/dangling_admin_intent", fields, summary)
442            }
443            Self::ConfigChangeRequiresRestart { fields_changed } => {
444                let summary = format!("config change requires restart: fields={fields_changed}");
445                let fields = vec![AuditFieldEscaper::field("fields_changed", fields_changed)];
446                ("operator/config_change_requires_restart", fields, summary)
447            }
448            Self::SubscriptionSchemaChange {
449                collection,
450                subscription_names,
451                fields_added,
452                fields_removed,
453                lsn,
454            } => {
455                let summary = format!(
456                    "subscription schema change: collection={collection} subscriptions=[{subscription_names}] added=[{fields_added}] removed=[{fields_removed}] lsn={lsn}"
457                );
458                let fields = vec![
459                    AuditFieldEscaper::field("collection", collection),
460                    AuditFieldEscaper::field("subscription_names", subscription_names),
461                    AuditFieldEscaper::field("fields_added", fields_added),
462                    AuditFieldEscaper::field("fields_removed", fields_removed),
463                    AuditFieldEscaper::field("lsn", lsn),
464                ];
465                ("operator/subscription_schema_change", fields, summary)
466            }
467            Self::OutboxDlqActivated { queue, dlq, reason } => {
468                let summary =
469                    format!("outbox DLQ activated: queue={queue} dlq={dlq} reason={reason}");
470                let fields = vec![
471                    AuditFieldEscaper::field("queue", queue),
472                    AuditFieldEscaper::field("dlq", dlq),
473                    AuditFieldEscaper::field("reason", reason),
474                ];
475                ("operator/outbox_dlq_activated", fields, summary)
476            }
477            Self::QueueDlqPromoted {
478                queue,
479                group,
480                dlq,
481                message_id,
482                attempts,
483                reason,
484            } => {
485                let summary = format!(
486                    "queue DLQ promoted: queue={queue} group={group} dlq={dlq} message_id={message_id} attempts={attempts} reason={reason}"
487                );
488                let fields = vec![
489                    AuditFieldEscaper::field("queue", queue),
490                    AuditFieldEscaper::field("group", group),
491                    AuditFieldEscaper::field("dlq", dlq),
492                    AuditFieldEscaper::field("message_id", message_id),
493                    AuditFieldEscaper::field("attempts", attempts as u64),
494                    AuditFieldEscaper::field("reason", reason),
495                ];
496                ("operator/queue_dlq_promoted", fields, summary)
497            }
498            Self::DeposedPrimaryRollback {
499                common_point_lsn,
500                tail_to_lsn,
501                tail_lsns,
502                commit_watermark,
503                rollback_file,
504                new_primary_addr,
505                new_term,
506            } => {
507                let summary = format!(
508                    "deposed primary auto-rollback: recovered to common_point_lsn={common_point_lsn} \
509                     (commit_watermark={commit_watermark}); discarded divergent tail of {tail_lsns} LSN(s) \
510                     up to {tail_to_lsn} saved to {rollback_file}; rejoining as replica of \
511                     {new_primary_addr} under term {new_term}"
512                );
513                let fields = vec![
514                    AuditFieldEscaper::field("common_point_lsn", common_point_lsn),
515                    AuditFieldEscaper::field("tail_to_lsn", tail_to_lsn),
516                    AuditFieldEscaper::field("tail_lsns", tail_lsns),
517                    AuditFieldEscaper::field("commit_watermark", commit_watermark),
518                    AuditFieldEscaper::field("rollback_file", rollback_file),
519                    AuditFieldEscaper::field("new_primary_addr", new_primary_addr),
520                    AuditFieldEscaper::field("new_term", new_term),
521                ];
522                ("operator/deposed_primary_rollback", fields, summary)
523            }
524        }
525    }
526}
527
528// ---------------------------------------------------------------------------
529// Tests
530// ---------------------------------------------------------------------------
531
532#[cfg(test)]
533mod tests {
534    use std::time::Duration;
535
536    use super::*;
537    use crate::runtime::audit_log::AuditLogger;
538
539    fn make_logger() -> (AuditLogger, std::path::PathBuf) {
540        let mut dir = std::env::temp_dir();
541        dir.push(format!(
542            "reddb-op-event-{}-{}",
543            std::process::id(),
544            crate::utils::now_unix_nanos()
545        ));
546        std::fs::create_dir_all(&dir).unwrap();
547        let path = dir.join(".audit.log");
548        let logger = AuditLogger::with_path(path.clone());
549        (logger, path)
550    }
551
552    fn drain(logger: &AuditLogger) {
553        assert!(
554            logger.wait_idle(Duration::from_secs(2)),
555            "audit logger drain timed out"
556        );
557    }
558
559    fn read_last_line(path: &std::path::Path) -> crate::json::Value {
560        let body = std::fs::read_to_string(path).unwrap();
561        let line = body.lines().last().expect("at least one audit line");
562        crate::json::from_str(line).expect("valid JSON")
563    }
564
565    // ------------------------------------------------------------------
566    // One test per variant — verifies action string + a representative field
567    // ------------------------------------------------------------------
568
569    #[test]
570    fn replication_broken_emits() {
571        let (logger, path) = make_logger();
572        OperatorEvent::ReplicationBroken {
573            peer: "replica-1".into(),
574            reason: "TCP reset".into(),
575        }
576        .emit(&logger);
577        drain(&logger);
578        let v = read_last_line(&path);
579        assert_eq!(
580            v.get("action").and_then(|x| x.as_str()),
581            Some("operator/replication_broken")
582        );
583        let peer = v
584            .get("detail")
585            .and_then(|d| d.get("peer"))
586            .and_then(|x| x.as_str());
587        assert_eq!(peer, Some("replica-1"));
588    }
589
590    #[test]
591    fn divergence_emits() {
592        let (logger, path) = make_logger();
593        OperatorEvent::Divergence {
594            peer: "replica-2".into(),
595            leader_lsn: 1000,
596            follower_lsn: 999,
597        }
598        .emit(&logger);
599        drain(&logger);
600        let v = read_last_line(&path);
601        assert_eq!(
602            v.get("action").and_then(|x| x.as_str()),
603            Some("operator/divergence")
604        );
605        let lsn = v
606            .get("detail")
607            .and_then(|d| d.get("leader_lsn"))
608            .and_then(|x| x.as_i64());
609        assert_eq!(lsn, Some(1000));
610    }
611
612    #[test]
613    fn wal_fsync_failed_emits() {
614        let (logger, path) = make_logger();
615        OperatorEvent::WalFsyncFailed {
616            path: "/data/wal".into(),
617            error: "EIO".into(),
618        }
619        .emit(&logger);
620        drain(&logger);
621        let v = read_last_line(&path);
622        assert_eq!(
623            v.get("action").and_then(|x| x.as_str()),
624            Some("operator/wal_fsync_failed")
625        );
626        let err = v
627            .get("detail")
628            .and_then(|d| d.get("error"))
629            .and_then(|x| x.as_str());
630        assert_eq!(err, Some("EIO"));
631    }
632
633    #[test]
634    fn disk_space_critical_emits() {
635        let (logger, path) = make_logger();
636        OperatorEvent::DiskSpaceCritical {
637            path: "/data".into(),
638            available_bytes: 1024,
639            threshold_bytes: 10240,
640        }
641        .emit(&logger);
642        drain(&logger);
643        let v = read_last_line(&path);
644        assert_eq!(
645            v.get("action").and_then(|x| x.as_str()),
646            Some("operator/disk_space_critical")
647        );
648        let avail = v
649            .get("detail")
650            .and_then(|d| d.get("available_bytes"))
651            .and_then(|x| x.as_i64());
652        assert_eq!(avail, Some(1024));
653    }
654
655    #[test]
656    fn auth_bypass_emits() {
657        let (logger, path) = make_logger();
658        OperatorEvent::AuthBypass {
659            principal: "alice".into(),
660            resource: "/admin/drop".into(),
661            detail: "scope check skipped".into(),
662        }
663        .emit(&logger);
664        drain(&logger);
665        let v = read_last_line(&path);
666        assert_eq!(
667            v.get("action").and_then(|x| x.as_str()),
668            Some("operator/auth_bypass")
669        );
670        let res = v
671            .get("detail")
672            .and_then(|d| d.get("resource"))
673            .and_then(|x| x.as_str());
674        assert_eq!(res, Some("/admin/drop"));
675    }
676
677    #[test]
678    fn admin_capability_granted_emits() {
679        let (logger, path) = make_logger();
680        OperatorEvent::AdminCapabilityGranted {
681            granted_to: "bob".into(),
682            capability: "ADMIN_WRITE".into(),
683            granted_by: "root".into(),
684        }
685        .emit(&logger);
686        drain(&logger);
687        let v = read_last_line(&path);
688        assert_eq!(
689            v.get("action").and_then(|x| x.as_str()),
690            Some("operator/admin_capability_granted")
691        );
692        let cap = v
693            .get("detail")
694            .and_then(|d| d.get("capability"))
695            .and_then(|x| x.as_str());
696        assert_eq!(cap, Some("ADMIN_WRITE"));
697    }
698
699    #[test]
700    fn secret_rotation_failed_emits() {
701        let (logger, path) = make_logger();
702        OperatorEvent::SecretRotationFailed {
703            secret_ref: "jwt-signing-key".into(),
704            error: "HSM unreachable".into(),
705        }
706        .emit(&logger);
707        drain(&logger);
708        let v = read_last_line(&path);
709        assert_eq!(
710            v.get("action").and_then(|x| x.as_str()),
711            Some("operator/secret_rotation_failed")
712        );
713        let r = v
714            .get("detail")
715            .and_then(|d| d.get("secret_ref"))
716            .and_then(|x| x.as_str());
717        assert_eq!(r, Some("jwt-signing-key"));
718    }
719
720    #[test]
721    fn config_changed_emits() {
722        let (logger, path) = make_logger();
723        OperatorEvent::ConfigChanged {
724            key: "max_connections".into(),
725            old_value: "100".into(),
726            new_value: "200".into(),
727            changed_by: "ops-bot".into(),
728        }
729        .emit(&logger);
730        drain(&logger);
731        let v = read_last_line(&path);
732        assert_eq!(
733            v.get("action").and_then(|x| x.as_str()),
734            Some("operator/config_changed")
735        );
736        let nv = v
737            .get("detail")
738            .and_then(|d| d.get("new_value"))
739            .and_then(|x| x.as_str());
740        assert_eq!(nv, Some("200"));
741    }
742
743    #[test]
744    fn startup_failed_emits() {
745        let (logger, path) = make_logger();
746        OperatorEvent::StartupFailed {
747            phase: "wal_recovery".into(),
748            error: "corrupt frame".into(),
749        }
750        .emit(&logger);
751        drain(&logger);
752        let v = read_last_line(&path);
753        assert_eq!(
754            v.get("action").and_then(|x| x.as_str()),
755            Some("operator/startup_failed")
756        );
757        let phase = v
758            .get("detail")
759            .and_then(|d| d.get("phase"))
760            .and_then(|x| x.as_str());
761        assert_eq!(phase, Some("wal_recovery"));
762    }
763
764    #[test]
765    fn shutdown_forced_emits() {
766        let (logger, path) = make_logger();
767        OperatorEvent::ShutdownForced {
768            reason: "OOM".into(),
769        }
770        .emit(&logger);
771        drain(&logger);
772        let v = read_last_line(&path);
773        assert_eq!(
774            v.get("action").and_then(|x| x.as_str()),
775            Some("operator/shutdown_forced")
776        );
777        let r = v
778            .get("detail")
779            .and_then(|d| d.get("reason"))
780            .and_then(|x| x.as_str());
781        assert_eq!(r, Some("OOM"));
782    }
783
784    #[test]
785    fn schema_corruption_emits() {
786        let (logger, path) = make_logger();
787        OperatorEvent::SchemaCorruption {
788            collection: "users".into(),
789            detail: "unknown type tag 0xFF".into(),
790        }
791        .emit(&logger);
792        drain(&logger);
793        let v = read_last_line(&path);
794        assert_eq!(
795            v.get("action").and_then(|x| x.as_str()),
796            Some("operator/schema_corruption")
797        );
798        let coll = v
799            .get("detail")
800            .and_then(|d| d.get("collection"))
801            .and_then(|x| x.as_str());
802        assert_eq!(coll, Some("users"));
803    }
804
805    #[test]
806    fn checkpoint_failed_emits() {
807        let (logger, path) = make_logger();
808        OperatorEvent::CheckpointFailed {
809            lsn: 42_000,
810            error: "write stall".into(),
811        }
812        .emit(&logger);
813        drain(&logger);
814        let v = read_last_line(&path);
815        assert_eq!(
816            v.get("action").and_then(|x| x.as_str()),
817            Some("operator/checkpoint_failed")
818        );
819        let lsn = v
820            .get("detail")
821            .and_then(|d| d.get("lsn"))
822            .and_then(|x| x.as_i64());
823        assert_eq!(lsn, Some(42_000));
824    }
825
826    #[test]
827    fn deposed_primary_rollback_emits() {
828        let (logger, path) = make_logger();
829        OperatorEvent::DeposedPrimaryRollback {
830            common_point_lsn: 200,
831            tail_to_lsn: 230,
832            tail_lsns: 30,
833            commit_watermark: 200,
834            rollback_file: "/data/rollback/term7-lsn200-230.rbk".into(),
835            new_primary_addr: "http://node-b:50051".into(),
836            new_term: 8,
837        }
838        .emit(&logger);
839        drain(&logger);
840        let v = read_last_line(&path);
841        assert_eq!(
842            v.get("action").and_then(|x| x.as_str()),
843            Some("operator/deposed_primary_rollback")
844        );
845        let cp = v
846            .get("detail")
847            .and_then(|d| d.get("common_point_lsn"))
848            .and_then(|x| x.as_i64());
849        assert_eq!(cp, Some(200));
850        let file = v
851            .get("detail")
852            .and_then(|d| d.get("rollback_file"))
853            .and_then(|x| x.as_str());
854        assert_eq!(file, Some("/data/rollback/term7-lsn200-230.rbk"));
855    }
856
857    // ------------------------------------------------------------------
858    // Adversarial corpus: CRLF / NUL / quote / non-UTF-8-ish in fields
859    // ------------------------------------------------------------------
860
861    #[test]
862    fn adversarial_fields_are_escape_safe() {
863        let payloads: &[(&str, &str)] = &[
864            ("crlf", "line1\r\nline2"),
865            ("nul", "before\0after"),
866            ("quote", r#"she said "hi""#),
867            ("json_inject", r#"{"injected":true}"#),
868            ("low_ctrl", "\x01\x02\x07\x1f"),
869            ("backslash", "C:\\path\\file"),
870            ("mixed", "name=\"x\"\n\\path\t\x01end"),
871        ];
872
873        for (label, payload) in payloads {
874            let (logger, path) = make_logger();
875            OperatorEvent::SchemaCorruption {
876                collection: payload.to_string(),
877                detail: payload.to_string(),
878            }
879            .emit(&logger);
880            drain(&logger);
881
882            let body = std::fs::read_to_string(&path).unwrap();
883            let line = body.lines().last().unwrap_or("");
884
885            // Single JSONL row — no embedded newline.
886            assert!(
887                !line.contains('\n'),
888                "{label}: embedded newline in JSONL row"
889            );
890
891            let v: crate::json::Value = crate::json::from_str(line)
892                .unwrap_or_else(|e| panic!("{label}: audit line not valid JSON: {e}\n{line:?}"));
893            let recovered = v
894                .get("detail")
895                .and_then(|d| d.get("collection"))
896                .and_then(|x| x.as_str())
897                .unwrap_or("");
898            assert_eq!(recovered, *payload, "{label}: round-trip mismatch");
899        }
900    }
901
902    // ------------------------------------------------------------------
903    // Outcome is always Error; source is always System
904    // ------------------------------------------------------------------
905
906    #[test]
907    fn emit_sets_outcome_error_and_source_system() {
908        let (logger, path) = make_logger();
909        OperatorEvent::ShutdownForced {
910            reason: "test".into(),
911        }
912        .emit(&logger);
913        drain(&logger);
914        let v = read_last_line(&path);
915        assert_eq!(v.get("outcome").and_then(|x| x.as_str()), Some("error"));
916        assert_eq!(v.get("source").and_then(|x| x.as_str()), Some("system"));
917    }
918}