1use std::sync::{Arc, OnceLock};
23
24use crate::runtime::audit_log::{
25 AuditAuthSource, AuditEvent, AuditField, AuditFieldEscaper, AuditLogger, Outcome,
26};
27
28static GLOBAL_SINK: OnceLock<Arc<AuditLogger>> = OnceLock::new();
48
49pub fn install_global_audit_sink(logger: Arc<AuditLogger>) {
55 let _ = GLOBAL_SINK.set(logger);
56}
57
58#[derive(Debug)]
68pub enum OperatorEvent {
69 ReplicationBroken { peer: String, reason: String },
71 Divergence {
74 peer: String,
75 leader_lsn: u64,
76 follower_lsn: u64,
77 },
78 WalFsyncFailed { path: String, error: String },
80 DiskSpaceCritical {
82 path: String,
83 available_bytes: u64,
84 threshold_bytes: u64,
85 },
86 AuthBypass {
89 principal: String,
90 resource: String,
91 detail: String,
92 },
93 AdminCapabilityGranted {
95 granted_to: String,
96 capability: String,
97 granted_by: String,
98 },
99 SecretRotationFailed { secret_ref: String, error: String },
101 ConfigChanged {
103 key: String,
104 old_value: String,
105 new_value: String,
106 changed_by: String,
107 },
108 StartupFailed { phase: String, error: String },
110 ShutdownForced { reason: String },
113 SchemaCorruption { collection: String, detail: String },
115 CheckpointFailed { lsn: u64, error: String },
117 DanglingAdminIntent {
121 id: crate::crypto::uuid::Uuid,
122 op: crate::telemetry::admin_intent_log::IntentOp,
123 started_at_ms: u64,
125 last_phase: crate::telemetry::admin_intent_log::IntentPhase,
126 },
127 ConfigChangeRequiresRestart { fields_changed: String },
131 SubscriptionSchemaChange {
135 collection: String,
136 subscription_names: String,
138 fields_added: String,
140 fields_removed: String,
142 lsn: u64,
143 },
144 OutboxDlqActivated {
147 queue: String,
148 dlq: String,
149 reason: String,
150 },
151}
152
153impl OperatorEvent {
154 pub fn all_variant_names() -> &'static [&'static str] {
156 &[
157 "ReplicationBroken",
158 "Divergence",
159 "WalFsyncFailed",
160 "DiskSpaceCritical",
161 "AuthBypass",
162 "AdminCapabilityGranted",
163 "SecretRotationFailed",
164 "ConfigChanged",
165 "StartupFailed",
166 "ShutdownForced",
167 "SchemaCorruption",
168 "CheckpointFailed",
169 "DanglingAdminIntent",
170 "ConfigChangeRequiresRestart",
171 "SubscriptionSchemaChange",
172 "OutboxDlqActivated",
173 ]
174 }
175
176 pub(super) fn variant_name(&self) -> &'static str {
178 match self {
179 Self::ReplicationBroken { .. } => "ReplicationBroken",
180 Self::Divergence { .. } => "Divergence",
181 Self::WalFsyncFailed { .. } => "WalFsyncFailed",
182 Self::DiskSpaceCritical { .. } => "DiskSpaceCritical",
183 Self::AuthBypass { .. } => "AuthBypass",
184 Self::AdminCapabilityGranted { .. } => "AdminCapabilityGranted",
185 Self::SecretRotationFailed { .. } => "SecretRotationFailed",
186 Self::ConfigChanged { .. } => "ConfigChanged",
187 Self::StartupFailed { .. } => "StartupFailed",
188 Self::ShutdownForced { .. } => "ShutdownForced",
189 Self::SchemaCorruption { .. } => "SchemaCorruption",
190 Self::CheckpointFailed { .. } => "CheckpointFailed",
191 Self::DanglingAdminIntent { .. } => "DanglingAdminIntent",
192 Self::ConfigChangeRequiresRestart { .. } => "ConfigChangeRequiresRestart",
193 Self::SubscriptionSchemaChange { .. } => "SubscriptionSchemaChange",
194 Self::OutboxDlqActivated { .. } => "OutboxDlqActivated",
195 }
196 }
197
198 pub fn emit_global(self) {
212 match GLOBAL_SINK.get() {
213 Some(logger) => self.emit(logger.as_ref()),
214 None => {
215 let (_, _, summary) = self.decompose();
216 tracing::warn!(target: "reddb::operator", "{summary}");
217 eprintln!("[reddb::operator] (no audit sink) {summary}");
218 }
219 }
220 }
221
222 pub fn emit(self, audit: &AuditLogger) {
223 let (action, fields, summary) = self.decompose();
224
225 let ev = AuditEvent::builder(action)
226 .source(AuditAuthSource::System)
227 .outcome(Outcome::Error)
228 .fields(fields)
229 .build();
230
231 let audit_ok = {
233 audit.record_event(ev);
237 true
238 };
239
240 tracing::warn!(target: "reddb::operator", "{summary}");
242
243 if !audit_ok {
246 eprintln!("[reddb::operator] {summary}");
247 }
248 }
249
250 pub(super) fn decompose(self) -> (&'static str, Vec<AuditField>, String) {
252 match self {
253 Self::ReplicationBroken { peer, reason } => {
254 let summary = format!("replication broken: peer={peer} reason={reason}");
255 let fields = vec![
256 AuditFieldEscaper::field("peer", peer),
257 AuditFieldEscaper::field("reason", reason),
258 ];
259 ("operator/replication_broken", fields, summary)
260 }
261 Self::Divergence {
262 peer,
263 leader_lsn,
264 follower_lsn,
265 } => {
266 let summary = format!(
267 "replication divergence: peer={peer} leader_lsn={leader_lsn} follower_lsn={follower_lsn}"
268 );
269 let fields = vec![
270 AuditFieldEscaper::field("peer", peer),
271 AuditFieldEscaper::field("leader_lsn", leader_lsn),
272 AuditFieldEscaper::field("follower_lsn", follower_lsn),
273 ];
274 ("operator/divergence", fields, summary)
275 }
276 Self::WalFsyncFailed { path, error } => {
277 let summary = format!("WAL fsync failed: path={path} error={error}");
278 let fields = vec![
279 AuditFieldEscaper::field("path", path),
280 AuditFieldEscaper::field("error", error),
281 ];
282 ("operator/wal_fsync_failed", fields, summary)
283 }
284 Self::DiskSpaceCritical {
285 path,
286 available_bytes,
287 threshold_bytes,
288 } => {
289 let summary = format!(
290 "disk space critical: path={path} available={available_bytes} threshold={threshold_bytes}"
291 );
292 let fields = vec![
293 AuditFieldEscaper::field("path", path),
294 AuditFieldEscaper::field("available_bytes", available_bytes),
295 AuditFieldEscaper::field("threshold_bytes", threshold_bytes),
296 ];
297 ("operator/disk_space_critical", fields, summary)
298 }
299 Self::AuthBypass {
300 principal,
301 resource,
302 detail,
303 } => {
304 let summary =
305 format!("auth bypass detected: principal={principal} resource={resource}");
306 let fields = vec![
307 AuditFieldEscaper::field("principal", principal),
308 AuditFieldEscaper::field("resource", resource),
309 AuditFieldEscaper::field("detail", detail),
310 ];
311 ("operator/auth_bypass", fields, summary)
312 }
313 Self::AdminCapabilityGranted {
314 granted_to,
315 capability,
316 granted_by,
317 } => {
318 let summary = format!(
319 "admin capability granted: to={granted_to} capability={capability} by={granted_by}"
320 );
321 let fields = vec![
322 AuditFieldEscaper::field("granted_to", granted_to),
323 AuditFieldEscaper::field("capability", capability),
324 AuditFieldEscaper::field("granted_by", granted_by),
325 ];
326 ("operator/admin_capability_granted", fields, summary)
327 }
328 Self::SecretRotationFailed { secret_ref, error } => {
329 let summary = format!("secret rotation failed: ref={secret_ref} error={error}");
330 let fields = vec![
331 AuditFieldEscaper::field("secret_ref", secret_ref),
332 AuditFieldEscaper::field("error", error),
333 ];
334 ("operator/secret_rotation_failed", fields, summary)
335 }
336 Self::ConfigChanged {
337 key,
338 old_value,
339 new_value,
340 changed_by,
341 } => {
342 let summary = format!(
343 "config changed: key={key} old={old_value} new={new_value} by={changed_by}"
344 );
345 let fields = vec![
346 AuditFieldEscaper::field("key", key),
347 AuditFieldEscaper::field("old_value", old_value),
348 AuditFieldEscaper::field("new_value", new_value),
349 AuditFieldEscaper::field("changed_by", changed_by),
350 ];
351 ("operator/config_changed", fields, summary)
352 }
353 Self::StartupFailed { phase, error } => {
354 let summary = format!("startup failed: phase={phase} error={error}");
355 let fields = vec![
356 AuditFieldEscaper::field("phase", phase),
357 AuditFieldEscaper::field("error", error),
358 ];
359 ("operator/startup_failed", fields, summary)
360 }
361 Self::ShutdownForced { reason } => {
362 let summary = format!("shutdown forced: reason={reason}");
363 let fields = vec![AuditFieldEscaper::field("reason", reason)];
364 ("operator/shutdown_forced", fields, summary)
365 }
366 Self::SchemaCorruption { collection, detail } => {
367 let summary = format!("schema corruption: collection={collection} detail={detail}");
368 let fields = vec![
369 AuditFieldEscaper::field("collection", collection),
370 AuditFieldEscaper::field("detail", detail),
371 ];
372 ("operator/schema_corruption", fields, summary)
373 }
374 Self::CheckpointFailed { lsn, error } => {
375 let summary = format!("checkpoint failed: lsn={lsn} error={error}");
376 let fields = vec![
377 AuditFieldEscaper::field("lsn", lsn),
378 AuditFieldEscaper::field("error", error),
379 ];
380 ("operator/checkpoint_failed", fields, summary)
381 }
382 Self::DanglingAdminIntent {
383 id,
384 op,
385 started_at_ms,
386 last_phase,
387 } => {
388 let summary = format!(
389 "dangling admin intent: id={id} op={op} started_at_ms={started_at_ms} last_phase={last_phase}"
390 );
391 let fields = vec![
392 AuditFieldEscaper::field("id", id.to_string()),
393 AuditFieldEscaper::field("op", op.to_string()),
394 AuditFieldEscaper::field("started_at_ms", started_at_ms),
395 AuditFieldEscaper::field("last_phase", last_phase.to_string()),
396 ];
397 ("operator/dangling_admin_intent", fields, summary)
398 }
399 Self::ConfigChangeRequiresRestart { fields_changed } => {
400 let summary = format!("config change requires restart: fields={fields_changed}");
401 let fields = vec![AuditFieldEscaper::field("fields_changed", fields_changed)];
402 ("operator/config_change_requires_restart", fields, summary)
403 }
404 Self::SubscriptionSchemaChange {
405 collection,
406 subscription_names,
407 fields_added,
408 fields_removed,
409 lsn,
410 } => {
411 let summary = format!(
412 "subscription schema change: collection={collection} subscriptions=[{subscription_names}] added=[{fields_added}] removed=[{fields_removed}] lsn={lsn}"
413 );
414 let fields = vec![
415 AuditFieldEscaper::field("collection", collection),
416 AuditFieldEscaper::field("subscription_names", subscription_names),
417 AuditFieldEscaper::field("fields_added", fields_added),
418 AuditFieldEscaper::field("fields_removed", fields_removed),
419 AuditFieldEscaper::field("lsn", lsn),
420 ];
421 ("operator/subscription_schema_change", fields, summary)
422 }
423 Self::OutboxDlqActivated { queue, dlq, reason } => {
424 let summary =
425 format!("outbox DLQ activated: queue={queue} dlq={dlq} reason={reason}");
426 let fields = vec![
427 AuditFieldEscaper::field("queue", queue),
428 AuditFieldEscaper::field("dlq", dlq),
429 AuditFieldEscaper::field("reason", reason),
430 ];
431 ("operator/outbox_dlq_activated", fields, summary)
432 }
433 }
434 }
435}
436
437#[cfg(test)]
442mod tests {
443 use std::time::Duration;
444
445 use super::*;
446 use crate::runtime::audit_log::AuditLogger;
447
448 fn make_logger() -> (AuditLogger, std::path::PathBuf) {
449 let mut dir = std::env::temp_dir();
450 dir.push(format!(
451 "reddb-op-event-{}-{}",
452 std::process::id(),
453 crate::utils::now_unix_nanos()
454 ));
455 std::fs::create_dir_all(&dir).unwrap();
456 let path = dir.join(".audit.log");
457 let logger = AuditLogger::with_path(path.clone());
458 (logger, path)
459 }
460
461 fn drain(logger: &AuditLogger) {
462 assert!(
463 logger.wait_idle(Duration::from_secs(2)),
464 "audit logger drain timed out"
465 );
466 }
467
468 fn read_last_line(path: &std::path::Path) -> crate::json::Value {
469 let body = std::fs::read_to_string(path).unwrap();
470 let line = body.lines().last().expect("at least one audit line");
471 crate::json::from_str(line).expect("valid JSON")
472 }
473
474 #[test]
479 fn replication_broken_emits() {
480 let (logger, path) = make_logger();
481 OperatorEvent::ReplicationBroken {
482 peer: "replica-1".into(),
483 reason: "TCP reset".into(),
484 }
485 .emit(&logger);
486 drain(&logger);
487 let v = read_last_line(&path);
488 assert_eq!(
489 v.get("action").and_then(|x| x.as_str()),
490 Some("operator/replication_broken")
491 );
492 let peer = v
493 .get("detail")
494 .and_then(|d| d.get("peer"))
495 .and_then(|x| x.as_str());
496 assert_eq!(peer, Some("replica-1"));
497 }
498
499 #[test]
500 fn divergence_emits() {
501 let (logger, path) = make_logger();
502 OperatorEvent::Divergence {
503 peer: "replica-2".into(),
504 leader_lsn: 1000,
505 follower_lsn: 999,
506 }
507 .emit(&logger);
508 drain(&logger);
509 let v = read_last_line(&path);
510 assert_eq!(
511 v.get("action").and_then(|x| x.as_str()),
512 Some("operator/divergence")
513 );
514 let lsn = v
515 .get("detail")
516 .and_then(|d| d.get("leader_lsn"))
517 .and_then(|x| x.as_i64());
518 assert_eq!(lsn, Some(1000));
519 }
520
521 #[test]
522 fn wal_fsync_failed_emits() {
523 let (logger, path) = make_logger();
524 OperatorEvent::WalFsyncFailed {
525 path: "/data/wal".into(),
526 error: "EIO".into(),
527 }
528 .emit(&logger);
529 drain(&logger);
530 let v = read_last_line(&path);
531 assert_eq!(
532 v.get("action").and_then(|x| x.as_str()),
533 Some("operator/wal_fsync_failed")
534 );
535 let err = v
536 .get("detail")
537 .and_then(|d| d.get("error"))
538 .and_then(|x| x.as_str());
539 assert_eq!(err, Some("EIO"));
540 }
541
542 #[test]
543 fn disk_space_critical_emits() {
544 let (logger, path) = make_logger();
545 OperatorEvent::DiskSpaceCritical {
546 path: "/data".into(),
547 available_bytes: 1024,
548 threshold_bytes: 10240,
549 }
550 .emit(&logger);
551 drain(&logger);
552 let v = read_last_line(&path);
553 assert_eq!(
554 v.get("action").and_then(|x| x.as_str()),
555 Some("operator/disk_space_critical")
556 );
557 let avail = v
558 .get("detail")
559 .and_then(|d| d.get("available_bytes"))
560 .and_then(|x| x.as_i64());
561 assert_eq!(avail, Some(1024));
562 }
563
564 #[test]
565 fn auth_bypass_emits() {
566 let (logger, path) = make_logger();
567 OperatorEvent::AuthBypass {
568 principal: "alice".into(),
569 resource: "/admin/drop".into(),
570 detail: "scope check skipped".into(),
571 }
572 .emit(&logger);
573 drain(&logger);
574 let v = read_last_line(&path);
575 assert_eq!(
576 v.get("action").and_then(|x| x.as_str()),
577 Some("operator/auth_bypass")
578 );
579 let res = v
580 .get("detail")
581 .and_then(|d| d.get("resource"))
582 .and_then(|x| x.as_str());
583 assert_eq!(res, Some("/admin/drop"));
584 }
585
586 #[test]
587 fn admin_capability_granted_emits() {
588 let (logger, path) = make_logger();
589 OperatorEvent::AdminCapabilityGranted {
590 granted_to: "bob".into(),
591 capability: "ADMIN_WRITE".into(),
592 granted_by: "root".into(),
593 }
594 .emit(&logger);
595 drain(&logger);
596 let v = read_last_line(&path);
597 assert_eq!(
598 v.get("action").and_then(|x| x.as_str()),
599 Some("operator/admin_capability_granted")
600 );
601 let cap = v
602 .get("detail")
603 .and_then(|d| d.get("capability"))
604 .and_then(|x| x.as_str());
605 assert_eq!(cap, Some("ADMIN_WRITE"));
606 }
607
608 #[test]
609 fn secret_rotation_failed_emits() {
610 let (logger, path) = make_logger();
611 OperatorEvent::SecretRotationFailed {
612 secret_ref: "jwt-signing-key".into(),
613 error: "HSM unreachable".into(),
614 }
615 .emit(&logger);
616 drain(&logger);
617 let v = read_last_line(&path);
618 assert_eq!(
619 v.get("action").and_then(|x| x.as_str()),
620 Some("operator/secret_rotation_failed")
621 );
622 let r = v
623 .get("detail")
624 .and_then(|d| d.get("secret_ref"))
625 .and_then(|x| x.as_str());
626 assert_eq!(r, Some("jwt-signing-key"));
627 }
628
629 #[test]
630 fn config_changed_emits() {
631 let (logger, path) = make_logger();
632 OperatorEvent::ConfigChanged {
633 key: "max_connections".into(),
634 old_value: "100".into(),
635 new_value: "200".into(),
636 changed_by: "ops-bot".into(),
637 }
638 .emit(&logger);
639 drain(&logger);
640 let v = read_last_line(&path);
641 assert_eq!(
642 v.get("action").and_then(|x| x.as_str()),
643 Some("operator/config_changed")
644 );
645 let nv = v
646 .get("detail")
647 .and_then(|d| d.get("new_value"))
648 .and_then(|x| x.as_str());
649 assert_eq!(nv, Some("200"));
650 }
651
652 #[test]
653 fn startup_failed_emits() {
654 let (logger, path) = make_logger();
655 OperatorEvent::StartupFailed {
656 phase: "wal_recovery".into(),
657 error: "corrupt frame".into(),
658 }
659 .emit(&logger);
660 drain(&logger);
661 let v = read_last_line(&path);
662 assert_eq!(
663 v.get("action").and_then(|x| x.as_str()),
664 Some("operator/startup_failed")
665 );
666 let phase = v
667 .get("detail")
668 .and_then(|d| d.get("phase"))
669 .and_then(|x| x.as_str());
670 assert_eq!(phase, Some("wal_recovery"));
671 }
672
673 #[test]
674 fn shutdown_forced_emits() {
675 let (logger, path) = make_logger();
676 OperatorEvent::ShutdownForced {
677 reason: "OOM".into(),
678 }
679 .emit(&logger);
680 drain(&logger);
681 let v = read_last_line(&path);
682 assert_eq!(
683 v.get("action").and_then(|x| x.as_str()),
684 Some("operator/shutdown_forced")
685 );
686 let r = v
687 .get("detail")
688 .and_then(|d| d.get("reason"))
689 .and_then(|x| x.as_str());
690 assert_eq!(r, Some("OOM"));
691 }
692
693 #[test]
694 fn schema_corruption_emits() {
695 let (logger, path) = make_logger();
696 OperatorEvent::SchemaCorruption {
697 collection: "users".into(),
698 detail: "unknown type tag 0xFF".into(),
699 }
700 .emit(&logger);
701 drain(&logger);
702 let v = read_last_line(&path);
703 assert_eq!(
704 v.get("action").and_then(|x| x.as_str()),
705 Some("operator/schema_corruption")
706 );
707 let coll = v
708 .get("detail")
709 .and_then(|d| d.get("collection"))
710 .and_then(|x| x.as_str());
711 assert_eq!(coll, Some("users"));
712 }
713
714 #[test]
715 fn checkpoint_failed_emits() {
716 let (logger, path) = make_logger();
717 OperatorEvent::CheckpointFailed {
718 lsn: 42_000,
719 error: "write stall".into(),
720 }
721 .emit(&logger);
722 drain(&logger);
723 let v = read_last_line(&path);
724 assert_eq!(
725 v.get("action").and_then(|x| x.as_str()),
726 Some("operator/checkpoint_failed")
727 );
728 let lsn = v
729 .get("detail")
730 .and_then(|d| d.get("lsn"))
731 .and_then(|x| x.as_i64());
732 assert_eq!(lsn, Some(42_000));
733 }
734
735 #[test]
740 fn adversarial_fields_are_escape_safe() {
741 let payloads: &[(&str, &str)] = &[
742 ("crlf", "line1\r\nline2"),
743 ("nul", "before\0after"),
744 ("quote", r#"she said "hi""#),
745 ("json_inject", r#"{"injected":true}"#),
746 ("low_ctrl", "\x01\x02\x07\x1f"),
747 ("backslash", "C:\\path\\file"),
748 ("mixed", "name=\"x\"\n\\path\t\x01end"),
749 ];
750
751 for (label, payload) in payloads {
752 let (logger, path) = make_logger();
753 OperatorEvent::SchemaCorruption {
754 collection: payload.to_string(),
755 detail: payload.to_string(),
756 }
757 .emit(&logger);
758 drain(&logger);
759
760 let body = std::fs::read_to_string(&path).unwrap();
761 let line = body.lines().last().unwrap_or("");
762
763 assert!(
765 !line.contains('\n'),
766 "{label}: embedded newline in JSONL row"
767 );
768
769 let v: crate::json::Value = crate::json::from_str(line)
770 .unwrap_or_else(|e| panic!("{label}: audit line not valid JSON: {e}\n{line:?}"));
771 let recovered = v
772 .get("detail")
773 .and_then(|d| d.get("collection"))
774 .and_then(|x| x.as_str())
775 .unwrap_or("");
776 assert_eq!(recovered, *payload, "{label}: round-trip mismatch");
777 }
778 }
779
780 #[test]
785 fn emit_sets_outcome_error_and_source_system() {
786 let (logger, path) = make_logger();
787 OperatorEvent::ShutdownForced {
788 reason: "test".into(),
789 }
790 .emit(&logger);
791 drain(&logger);
792 let v = read_last_line(&path);
793 assert_eq!(v.get("outcome").and_then(|x| x.as_str()), Some("error"));
794 assert_eq!(v.get("source").and_then(|x| x.as_str()), Some("system"));
795 }
796}