1use std::sync::{Arc, OnceLock};
23
24use crate::runtime::audit_log::{
25 AuditAuthSource, AuditEvent, AuditField, AuditFieldEscaper, AuditLogger, Outcome,
26};
27
28static GLOBAL_SINK: OnceLock<Arc<AuditLogger>> = OnceLock::new();
48
49pub fn install_global_audit_sink(logger: Arc<AuditLogger>) {
55 let _ = GLOBAL_SINK.set(logger);
56}
57
58#[derive(Debug)]
68pub enum OperatorEvent {
69 ReplicationBroken { peer: String, reason: String },
71 Divergence {
74 peer: String,
75 leader_lsn: u64,
76 follower_lsn: u64,
77 },
78 WalFsyncFailed { path: String, error: String },
80 DiskSpaceCritical {
82 path: String,
83 available_bytes: u64,
84 threshold_bytes: u64,
85 },
86 AuthBypass {
89 principal: String,
90 resource: String,
91 detail: String,
92 },
93 AdminCapabilityGranted {
95 granted_to: String,
96 capability: String,
97 granted_by: String,
98 },
99 SecretRotationFailed { secret_ref: String, error: String },
101 ConfigChanged {
103 key: String,
104 old_value: String,
105 new_value: String,
106 changed_by: String,
107 },
108 StartupFailed { phase: String, error: String },
110 ShutdownForced { reason: String },
113 SchemaCorruption { collection: String, detail: String },
115 CheckpointFailed { lsn: u64, error: String },
117 DanglingAdminIntent {
121 id: crate::crypto::uuid::Uuid,
122 op: crate::telemetry::admin_intent_log::IntentOp,
123 started_at_ms: u64,
125 last_phase: crate::telemetry::admin_intent_log::IntentPhase,
126 },
127 ConfigChangeRequiresRestart { fields_changed: String },
131 SubscriptionSchemaChange {
135 collection: String,
136 subscription_names: String,
138 fields_added: String,
140 fields_removed: String,
142 lsn: u64,
143 },
144 OutboxDlqActivated {
147 queue: String,
148 dlq: String,
149 reason: String,
150 },
151 QueueDlqPromoted {
155 queue: String,
156 group: String,
157 dlq: String,
158 message_id: u64,
159 attempts: u32,
160 reason: String,
161 },
162}
163
164impl OperatorEvent {
165 pub fn all_variant_names() -> &'static [&'static str] {
167 &[
168 "ReplicationBroken",
169 "Divergence",
170 "WalFsyncFailed",
171 "DiskSpaceCritical",
172 "AuthBypass",
173 "AdminCapabilityGranted",
174 "SecretRotationFailed",
175 "ConfigChanged",
176 "StartupFailed",
177 "ShutdownForced",
178 "SchemaCorruption",
179 "CheckpointFailed",
180 "DanglingAdminIntent",
181 "ConfigChangeRequiresRestart",
182 "SubscriptionSchemaChange",
183 "OutboxDlqActivated",
184 "QueueDlqPromoted",
185 ]
186 }
187
188 pub(super) fn variant_name(&self) -> &'static str {
190 match self {
191 Self::ReplicationBroken { .. } => "ReplicationBroken",
192 Self::Divergence { .. } => "Divergence",
193 Self::WalFsyncFailed { .. } => "WalFsyncFailed",
194 Self::DiskSpaceCritical { .. } => "DiskSpaceCritical",
195 Self::AuthBypass { .. } => "AuthBypass",
196 Self::AdminCapabilityGranted { .. } => "AdminCapabilityGranted",
197 Self::SecretRotationFailed { .. } => "SecretRotationFailed",
198 Self::ConfigChanged { .. } => "ConfigChanged",
199 Self::StartupFailed { .. } => "StartupFailed",
200 Self::ShutdownForced { .. } => "ShutdownForced",
201 Self::SchemaCorruption { .. } => "SchemaCorruption",
202 Self::CheckpointFailed { .. } => "CheckpointFailed",
203 Self::DanglingAdminIntent { .. } => "DanglingAdminIntent",
204 Self::ConfigChangeRequiresRestart { .. } => "ConfigChangeRequiresRestart",
205 Self::SubscriptionSchemaChange { .. } => "SubscriptionSchemaChange",
206 Self::OutboxDlqActivated { .. } => "OutboxDlqActivated",
207 Self::QueueDlqPromoted { .. } => "QueueDlqPromoted",
208 }
209 }
210
211 pub fn emit_global(self) {
225 match GLOBAL_SINK.get() {
226 Some(logger) => self.emit(logger.as_ref()),
227 None => {
228 let (_, _, summary) = self.decompose();
229 tracing::warn!(target: "reddb::operator", "{summary}");
230 eprintln!("[reddb::operator] (no audit sink) {summary}");
231 }
232 }
233 }
234
235 pub fn emit(self, audit: &AuditLogger) {
236 let (action, fields, summary) = self.decompose();
237
238 let ev = AuditEvent::builder(action)
239 .source(AuditAuthSource::System)
240 .outcome(Outcome::Error)
241 .fields(fields)
242 .build();
243
244 let audit_ok = {
246 audit.record_event(ev);
250 true
251 };
252
253 tracing::warn!(target: "reddb::operator", "{summary}");
255
256 if !audit_ok {
259 eprintln!("[reddb::operator] {summary}");
260 }
261 }
262
263 pub(super) fn decompose(self) -> (&'static str, Vec<AuditField>, String) {
265 match self {
266 Self::ReplicationBroken { peer, reason } => {
267 let summary = format!("replication broken: peer={peer} reason={reason}");
268 let fields = vec![
269 AuditFieldEscaper::field("peer", peer),
270 AuditFieldEscaper::field("reason", reason),
271 ];
272 ("operator/replication_broken", fields, summary)
273 }
274 Self::Divergence {
275 peer,
276 leader_lsn,
277 follower_lsn,
278 } => {
279 let summary = format!(
280 "replication divergence: peer={peer} leader_lsn={leader_lsn} follower_lsn={follower_lsn}"
281 );
282 let fields = vec![
283 AuditFieldEscaper::field("peer", peer),
284 AuditFieldEscaper::field("leader_lsn", leader_lsn),
285 AuditFieldEscaper::field("follower_lsn", follower_lsn),
286 ];
287 ("operator/divergence", fields, summary)
288 }
289 Self::WalFsyncFailed { path, error } => {
290 let summary = format!("WAL fsync failed: path={path} error={error}");
291 let fields = vec![
292 AuditFieldEscaper::field("path", path),
293 AuditFieldEscaper::field("error", error),
294 ];
295 ("operator/wal_fsync_failed", fields, summary)
296 }
297 Self::DiskSpaceCritical {
298 path,
299 available_bytes,
300 threshold_bytes,
301 } => {
302 let summary = format!(
303 "disk space critical: path={path} available={available_bytes} threshold={threshold_bytes}"
304 );
305 let fields = vec![
306 AuditFieldEscaper::field("path", path),
307 AuditFieldEscaper::field("available_bytes", available_bytes),
308 AuditFieldEscaper::field("threshold_bytes", threshold_bytes),
309 ];
310 ("operator/disk_space_critical", fields, summary)
311 }
312 Self::AuthBypass {
313 principal,
314 resource,
315 detail,
316 } => {
317 let summary =
318 format!("auth bypass detected: principal={principal} resource={resource}");
319 let fields = vec![
320 AuditFieldEscaper::field("principal", principal),
321 AuditFieldEscaper::field("resource", resource),
322 AuditFieldEscaper::field("detail", detail),
323 ];
324 ("operator/auth_bypass", fields, summary)
325 }
326 Self::AdminCapabilityGranted {
327 granted_to,
328 capability,
329 granted_by,
330 } => {
331 let summary = format!(
332 "admin capability granted: to={granted_to} capability={capability} by={granted_by}"
333 );
334 let fields = vec![
335 AuditFieldEscaper::field("granted_to", granted_to),
336 AuditFieldEscaper::field("capability", capability),
337 AuditFieldEscaper::field("granted_by", granted_by),
338 ];
339 ("operator/admin_capability_granted", fields, summary)
340 }
341 Self::SecretRotationFailed { secret_ref, error } => {
342 let summary = format!("secret rotation failed: ref={secret_ref} error={error}");
343 let fields = vec![
344 AuditFieldEscaper::field("secret_ref", secret_ref),
345 AuditFieldEscaper::field("error", error),
346 ];
347 ("operator/secret_rotation_failed", fields, summary)
348 }
349 Self::ConfigChanged {
350 key,
351 old_value,
352 new_value,
353 changed_by,
354 } => {
355 let summary = format!(
356 "config changed: key={key} old={old_value} new={new_value} by={changed_by}"
357 );
358 let fields = vec![
359 AuditFieldEscaper::field("key", key),
360 AuditFieldEscaper::field("old_value", old_value),
361 AuditFieldEscaper::field("new_value", new_value),
362 AuditFieldEscaper::field("changed_by", changed_by),
363 ];
364 ("operator/config_changed", fields, summary)
365 }
366 Self::StartupFailed { phase, error } => {
367 let summary = format!("startup failed: phase={phase} error={error}");
368 let fields = vec![
369 AuditFieldEscaper::field("phase", phase),
370 AuditFieldEscaper::field("error", error),
371 ];
372 ("operator/startup_failed", fields, summary)
373 }
374 Self::ShutdownForced { reason } => {
375 let summary = format!("shutdown forced: reason={reason}");
376 let fields = vec![AuditFieldEscaper::field("reason", reason)];
377 ("operator/shutdown_forced", fields, summary)
378 }
379 Self::SchemaCorruption { collection, detail } => {
380 let summary = format!("schema corruption: collection={collection} detail={detail}");
381 let fields = vec![
382 AuditFieldEscaper::field("collection", collection),
383 AuditFieldEscaper::field("detail", detail),
384 ];
385 ("operator/schema_corruption", fields, summary)
386 }
387 Self::CheckpointFailed { lsn, error } => {
388 let summary = format!("checkpoint failed: lsn={lsn} error={error}");
389 let fields = vec![
390 AuditFieldEscaper::field("lsn", lsn),
391 AuditFieldEscaper::field("error", error),
392 ];
393 ("operator/checkpoint_failed", fields, summary)
394 }
395 Self::DanglingAdminIntent {
396 id,
397 op,
398 started_at_ms,
399 last_phase,
400 } => {
401 let summary = format!(
402 "dangling admin intent: id={id} op={op} started_at_ms={started_at_ms} last_phase={last_phase}"
403 );
404 let fields = vec![
405 AuditFieldEscaper::field("id", id.to_string()),
406 AuditFieldEscaper::field("op", op.to_string()),
407 AuditFieldEscaper::field("started_at_ms", started_at_ms),
408 AuditFieldEscaper::field("last_phase", last_phase.to_string()),
409 ];
410 ("operator/dangling_admin_intent", fields, summary)
411 }
412 Self::ConfigChangeRequiresRestart { fields_changed } => {
413 let summary = format!("config change requires restart: fields={fields_changed}");
414 let fields = vec![AuditFieldEscaper::field("fields_changed", fields_changed)];
415 ("operator/config_change_requires_restart", fields, summary)
416 }
417 Self::SubscriptionSchemaChange {
418 collection,
419 subscription_names,
420 fields_added,
421 fields_removed,
422 lsn,
423 } => {
424 let summary = format!(
425 "subscription schema change: collection={collection} subscriptions=[{subscription_names}] added=[{fields_added}] removed=[{fields_removed}] lsn={lsn}"
426 );
427 let fields = vec![
428 AuditFieldEscaper::field("collection", collection),
429 AuditFieldEscaper::field("subscription_names", subscription_names),
430 AuditFieldEscaper::field("fields_added", fields_added),
431 AuditFieldEscaper::field("fields_removed", fields_removed),
432 AuditFieldEscaper::field("lsn", lsn),
433 ];
434 ("operator/subscription_schema_change", fields, summary)
435 }
436 Self::OutboxDlqActivated { queue, dlq, reason } => {
437 let summary =
438 format!("outbox DLQ activated: queue={queue} dlq={dlq} reason={reason}");
439 let fields = vec![
440 AuditFieldEscaper::field("queue", queue),
441 AuditFieldEscaper::field("dlq", dlq),
442 AuditFieldEscaper::field("reason", reason),
443 ];
444 ("operator/outbox_dlq_activated", fields, summary)
445 }
446 Self::QueueDlqPromoted {
447 queue,
448 group,
449 dlq,
450 message_id,
451 attempts,
452 reason,
453 } => {
454 let summary = format!(
455 "queue DLQ promoted: queue={queue} group={group} dlq={dlq} message_id={message_id} attempts={attempts} reason={reason}"
456 );
457 let fields = vec![
458 AuditFieldEscaper::field("queue", queue),
459 AuditFieldEscaper::field("group", group),
460 AuditFieldEscaper::field("dlq", dlq),
461 AuditFieldEscaper::field("message_id", message_id),
462 AuditFieldEscaper::field("attempts", attempts as u64),
463 AuditFieldEscaper::field("reason", reason),
464 ];
465 ("operator/queue_dlq_promoted", fields, summary)
466 }
467 }
468 }
469}
470
471#[cfg(test)]
476mod tests {
477 use std::time::Duration;
478
479 use super::*;
480 use crate::runtime::audit_log::AuditLogger;
481
482 fn make_logger() -> (AuditLogger, std::path::PathBuf) {
483 let mut dir = std::env::temp_dir();
484 dir.push(format!(
485 "reddb-op-event-{}-{}",
486 std::process::id(),
487 crate::utils::now_unix_nanos()
488 ));
489 std::fs::create_dir_all(&dir).unwrap();
490 let path = dir.join(".audit.log");
491 let logger = AuditLogger::with_path(path.clone());
492 (logger, path)
493 }
494
495 fn drain(logger: &AuditLogger) {
496 assert!(
497 logger.wait_idle(Duration::from_secs(2)),
498 "audit logger drain timed out"
499 );
500 }
501
502 fn read_last_line(path: &std::path::Path) -> crate::json::Value {
503 let body = std::fs::read_to_string(path).unwrap();
504 let line = body.lines().last().expect("at least one audit line");
505 crate::json::from_str(line).expect("valid JSON")
506 }
507
508 #[test]
513 fn replication_broken_emits() {
514 let (logger, path) = make_logger();
515 OperatorEvent::ReplicationBroken {
516 peer: "replica-1".into(),
517 reason: "TCP reset".into(),
518 }
519 .emit(&logger);
520 drain(&logger);
521 let v = read_last_line(&path);
522 assert_eq!(
523 v.get("action").and_then(|x| x.as_str()),
524 Some("operator/replication_broken")
525 );
526 let peer = v
527 .get("detail")
528 .and_then(|d| d.get("peer"))
529 .and_then(|x| x.as_str());
530 assert_eq!(peer, Some("replica-1"));
531 }
532
533 #[test]
534 fn divergence_emits() {
535 let (logger, path) = make_logger();
536 OperatorEvent::Divergence {
537 peer: "replica-2".into(),
538 leader_lsn: 1000,
539 follower_lsn: 999,
540 }
541 .emit(&logger);
542 drain(&logger);
543 let v = read_last_line(&path);
544 assert_eq!(
545 v.get("action").and_then(|x| x.as_str()),
546 Some("operator/divergence")
547 );
548 let lsn = v
549 .get("detail")
550 .and_then(|d| d.get("leader_lsn"))
551 .and_then(|x| x.as_i64());
552 assert_eq!(lsn, Some(1000));
553 }
554
555 #[test]
556 fn wal_fsync_failed_emits() {
557 let (logger, path) = make_logger();
558 OperatorEvent::WalFsyncFailed {
559 path: "/data/wal".into(),
560 error: "EIO".into(),
561 }
562 .emit(&logger);
563 drain(&logger);
564 let v = read_last_line(&path);
565 assert_eq!(
566 v.get("action").and_then(|x| x.as_str()),
567 Some("operator/wal_fsync_failed")
568 );
569 let err = v
570 .get("detail")
571 .and_then(|d| d.get("error"))
572 .and_then(|x| x.as_str());
573 assert_eq!(err, Some("EIO"));
574 }
575
576 #[test]
577 fn disk_space_critical_emits() {
578 let (logger, path) = make_logger();
579 OperatorEvent::DiskSpaceCritical {
580 path: "/data".into(),
581 available_bytes: 1024,
582 threshold_bytes: 10240,
583 }
584 .emit(&logger);
585 drain(&logger);
586 let v = read_last_line(&path);
587 assert_eq!(
588 v.get("action").and_then(|x| x.as_str()),
589 Some("operator/disk_space_critical")
590 );
591 let avail = v
592 .get("detail")
593 .and_then(|d| d.get("available_bytes"))
594 .and_then(|x| x.as_i64());
595 assert_eq!(avail, Some(1024));
596 }
597
598 #[test]
599 fn auth_bypass_emits() {
600 let (logger, path) = make_logger();
601 OperatorEvent::AuthBypass {
602 principal: "alice".into(),
603 resource: "/admin/drop".into(),
604 detail: "scope check skipped".into(),
605 }
606 .emit(&logger);
607 drain(&logger);
608 let v = read_last_line(&path);
609 assert_eq!(
610 v.get("action").and_then(|x| x.as_str()),
611 Some("operator/auth_bypass")
612 );
613 let res = v
614 .get("detail")
615 .and_then(|d| d.get("resource"))
616 .and_then(|x| x.as_str());
617 assert_eq!(res, Some("/admin/drop"));
618 }
619
620 #[test]
621 fn admin_capability_granted_emits() {
622 let (logger, path) = make_logger();
623 OperatorEvent::AdminCapabilityGranted {
624 granted_to: "bob".into(),
625 capability: "ADMIN_WRITE".into(),
626 granted_by: "root".into(),
627 }
628 .emit(&logger);
629 drain(&logger);
630 let v = read_last_line(&path);
631 assert_eq!(
632 v.get("action").and_then(|x| x.as_str()),
633 Some("operator/admin_capability_granted")
634 );
635 let cap = v
636 .get("detail")
637 .and_then(|d| d.get("capability"))
638 .and_then(|x| x.as_str());
639 assert_eq!(cap, Some("ADMIN_WRITE"));
640 }
641
642 #[test]
643 fn secret_rotation_failed_emits() {
644 let (logger, path) = make_logger();
645 OperatorEvent::SecretRotationFailed {
646 secret_ref: "jwt-signing-key".into(),
647 error: "HSM unreachable".into(),
648 }
649 .emit(&logger);
650 drain(&logger);
651 let v = read_last_line(&path);
652 assert_eq!(
653 v.get("action").and_then(|x| x.as_str()),
654 Some("operator/secret_rotation_failed")
655 );
656 let r = v
657 .get("detail")
658 .and_then(|d| d.get("secret_ref"))
659 .and_then(|x| x.as_str());
660 assert_eq!(r, Some("jwt-signing-key"));
661 }
662
663 #[test]
664 fn config_changed_emits() {
665 let (logger, path) = make_logger();
666 OperatorEvent::ConfigChanged {
667 key: "max_connections".into(),
668 old_value: "100".into(),
669 new_value: "200".into(),
670 changed_by: "ops-bot".into(),
671 }
672 .emit(&logger);
673 drain(&logger);
674 let v = read_last_line(&path);
675 assert_eq!(
676 v.get("action").and_then(|x| x.as_str()),
677 Some("operator/config_changed")
678 );
679 let nv = v
680 .get("detail")
681 .and_then(|d| d.get("new_value"))
682 .and_then(|x| x.as_str());
683 assert_eq!(nv, Some("200"));
684 }
685
686 #[test]
687 fn startup_failed_emits() {
688 let (logger, path) = make_logger();
689 OperatorEvent::StartupFailed {
690 phase: "wal_recovery".into(),
691 error: "corrupt frame".into(),
692 }
693 .emit(&logger);
694 drain(&logger);
695 let v = read_last_line(&path);
696 assert_eq!(
697 v.get("action").and_then(|x| x.as_str()),
698 Some("operator/startup_failed")
699 );
700 let phase = v
701 .get("detail")
702 .and_then(|d| d.get("phase"))
703 .and_then(|x| x.as_str());
704 assert_eq!(phase, Some("wal_recovery"));
705 }
706
707 #[test]
708 fn shutdown_forced_emits() {
709 let (logger, path) = make_logger();
710 OperatorEvent::ShutdownForced {
711 reason: "OOM".into(),
712 }
713 .emit(&logger);
714 drain(&logger);
715 let v = read_last_line(&path);
716 assert_eq!(
717 v.get("action").and_then(|x| x.as_str()),
718 Some("operator/shutdown_forced")
719 );
720 let r = v
721 .get("detail")
722 .and_then(|d| d.get("reason"))
723 .and_then(|x| x.as_str());
724 assert_eq!(r, Some("OOM"));
725 }
726
727 #[test]
728 fn schema_corruption_emits() {
729 let (logger, path) = make_logger();
730 OperatorEvent::SchemaCorruption {
731 collection: "users".into(),
732 detail: "unknown type tag 0xFF".into(),
733 }
734 .emit(&logger);
735 drain(&logger);
736 let v = read_last_line(&path);
737 assert_eq!(
738 v.get("action").and_then(|x| x.as_str()),
739 Some("operator/schema_corruption")
740 );
741 let coll = v
742 .get("detail")
743 .and_then(|d| d.get("collection"))
744 .and_then(|x| x.as_str());
745 assert_eq!(coll, Some("users"));
746 }
747
748 #[test]
749 fn checkpoint_failed_emits() {
750 let (logger, path) = make_logger();
751 OperatorEvent::CheckpointFailed {
752 lsn: 42_000,
753 error: "write stall".into(),
754 }
755 .emit(&logger);
756 drain(&logger);
757 let v = read_last_line(&path);
758 assert_eq!(
759 v.get("action").and_then(|x| x.as_str()),
760 Some("operator/checkpoint_failed")
761 );
762 let lsn = v
763 .get("detail")
764 .and_then(|d| d.get("lsn"))
765 .and_then(|x| x.as_i64());
766 assert_eq!(lsn, Some(42_000));
767 }
768
769 #[test]
774 fn adversarial_fields_are_escape_safe() {
775 let payloads: &[(&str, &str)] = &[
776 ("crlf", "line1\r\nline2"),
777 ("nul", "before\0after"),
778 ("quote", r#"she said "hi""#),
779 ("json_inject", r#"{"injected":true}"#),
780 ("low_ctrl", "\x01\x02\x07\x1f"),
781 ("backslash", "C:\\path\\file"),
782 ("mixed", "name=\"x\"\n\\path\t\x01end"),
783 ];
784
785 for (label, payload) in payloads {
786 let (logger, path) = make_logger();
787 OperatorEvent::SchemaCorruption {
788 collection: payload.to_string(),
789 detail: payload.to_string(),
790 }
791 .emit(&logger);
792 drain(&logger);
793
794 let body = std::fs::read_to_string(&path).unwrap();
795 let line = body.lines().last().unwrap_or("");
796
797 assert!(
799 !line.contains('\n'),
800 "{label}: embedded newline in JSONL row"
801 );
802
803 let v: crate::json::Value = crate::json::from_str(line)
804 .unwrap_or_else(|e| panic!("{label}: audit line not valid JSON: {e}\n{line:?}"));
805 let recovered = v
806 .get("detail")
807 .and_then(|d| d.get("collection"))
808 .and_then(|x| x.as_str())
809 .unwrap_or("");
810 assert_eq!(recovered, *payload, "{label}: round-trip mismatch");
811 }
812 }
813
814 #[test]
819 fn emit_sets_outcome_error_and_source_system() {
820 let (logger, path) = make_logger();
821 OperatorEvent::ShutdownForced {
822 reason: "test".into(),
823 }
824 .emit(&logger);
825 drain(&logger);
826 let v = read_last_line(&path);
827 assert_eq!(v.get("outcome").and_then(|x| x.as_str()), Some("error"));
828 assert_eq!(v.get("source").and_then(|x| x.as_str()), Some("system"));
829 }
830}