1use std::sync::{Arc, OnceLock};
23
24use crate::runtime::audit_log::{
25 AuditAuthSource, AuditEvent, AuditField, AuditFieldEscaper, AuditLogger, Outcome,
26};
27
28static GLOBAL_SINK: OnceLock<Arc<AuditLogger>> = OnceLock::new();
48
49pub fn install_global_audit_sink(logger: Arc<AuditLogger>) {
55 let _ = GLOBAL_SINK.set(logger);
56}
57
58#[derive(Debug)]
68pub enum OperatorEvent {
69 ReplicationBroken { peer: String, reason: String },
71 Divergence {
74 peer: String,
75 leader_lsn: u64,
76 follower_lsn: u64,
77 },
78 WalFsyncFailed { path: String, error: String },
80 DiskSpaceCritical {
82 path: String,
83 available_bytes: u64,
84 threshold_bytes: u64,
85 },
86 AuthBypass {
89 principal: String,
90 resource: String,
91 detail: String,
92 },
93 AdminCapabilityGranted {
95 granted_to: String,
96 capability: String,
97 granted_by: String,
98 },
99 SecretRotationFailed { secret_ref: String, error: String },
101 ConfigChanged {
103 key: String,
104 old_value: String,
105 new_value: String,
106 changed_by: String,
107 },
108 StartupFailed { phase: String, error: String },
110 ShutdownForced { reason: String },
113 SchemaCorruption { collection: String, detail: String },
115 CheckpointFailed { lsn: u64, error: String },
117 DanglingAdminIntent {
121 id: crate::crypto::uuid::Uuid,
122 op: crate::telemetry::admin_intent_log::IntentOp,
123 started_at_ms: u64,
125 last_phase: crate::telemetry::admin_intent_log::IntentPhase,
126 },
127 ConfigChangeRequiresRestart { fields_changed: String },
131 SubscriptionSchemaChange {
135 collection: String,
136 subscription_names: String,
138 fields_added: String,
140 fields_removed: String,
142 lsn: u64,
143 },
144 OutboxDlqActivated {
147 queue: String,
148 dlq: String,
149 reason: String,
150 },
151 QueueDlqPromoted {
155 queue: String,
156 group: String,
157 dlq: String,
158 message_id: u64,
159 attempts: u32,
160 reason: String,
161 },
162 DeposedPrimaryRollback {
173 common_point_lsn: u64,
176 tail_to_lsn: u64,
178 tail_lsns: u64,
180 commit_watermark: u64,
184 rollback_file: String,
186 new_primary_addr: String,
188 new_term: u64,
190 },
191}
192
193impl OperatorEvent {
194 pub fn all_variant_names() -> &'static [&'static str] {
196 &[
197 "ReplicationBroken",
198 "Divergence",
199 "WalFsyncFailed",
200 "DiskSpaceCritical",
201 "AuthBypass",
202 "AdminCapabilityGranted",
203 "SecretRotationFailed",
204 "ConfigChanged",
205 "StartupFailed",
206 "ShutdownForced",
207 "SchemaCorruption",
208 "CheckpointFailed",
209 "DanglingAdminIntent",
210 "ConfigChangeRequiresRestart",
211 "SubscriptionSchemaChange",
212 "OutboxDlqActivated",
213 "QueueDlqPromoted",
214 "DeposedPrimaryRollback",
215 ]
216 }
217
218 pub(super) fn variant_name(&self) -> &'static str {
220 match self {
221 Self::ReplicationBroken { .. } => "ReplicationBroken",
222 Self::Divergence { .. } => "Divergence",
223 Self::WalFsyncFailed { .. } => "WalFsyncFailed",
224 Self::DiskSpaceCritical { .. } => "DiskSpaceCritical",
225 Self::AuthBypass { .. } => "AuthBypass",
226 Self::AdminCapabilityGranted { .. } => "AdminCapabilityGranted",
227 Self::SecretRotationFailed { .. } => "SecretRotationFailed",
228 Self::ConfigChanged { .. } => "ConfigChanged",
229 Self::StartupFailed { .. } => "StartupFailed",
230 Self::ShutdownForced { .. } => "ShutdownForced",
231 Self::SchemaCorruption { .. } => "SchemaCorruption",
232 Self::CheckpointFailed { .. } => "CheckpointFailed",
233 Self::DanglingAdminIntent { .. } => "DanglingAdminIntent",
234 Self::ConfigChangeRequiresRestart { .. } => "ConfigChangeRequiresRestart",
235 Self::SubscriptionSchemaChange { .. } => "SubscriptionSchemaChange",
236 Self::OutboxDlqActivated { .. } => "OutboxDlqActivated",
237 Self::QueueDlqPromoted { .. } => "QueueDlqPromoted",
238 Self::DeposedPrimaryRollback { .. } => "DeposedPrimaryRollback",
239 }
240 }
241
242 pub fn emit_global(self) {
256 match GLOBAL_SINK.get() {
257 Some(logger) => self.emit(logger.as_ref()),
258 None => {
259 let (_, _, summary) = self.decompose();
260 tracing::warn!(target: "reddb::operator", "{summary}");
261 eprintln!("[reddb::operator] (no audit sink) {summary}");
262 }
263 }
264 }
265
266 pub fn emit(self, audit: &AuditLogger) {
267 let (action, fields, summary) = self.decompose();
268
269 let ev = AuditEvent::builder(action)
270 .source(AuditAuthSource::System)
271 .outcome(Outcome::Error)
272 .fields(fields)
273 .build();
274
275 let audit_ok = {
277 audit.record_event(ev);
281 true
282 };
283
284 tracing::warn!(target: "reddb::operator", "{summary}");
286
287 if !audit_ok {
290 eprintln!("[reddb::operator] {summary}");
291 }
292 }
293
294 pub(super) fn decompose(self) -> (&'static str, Vec<AuditField>, String) {
296 match self {
297 Self::ReplicationBroken { peer, reason } => {
298 let summary = format!("replication broken: peer={peer} reason={reason}");
299 let fields = vec![
300 AuditFieldEscaper::field("peer", peer),
301 AuditFieldEscaper::field("reason", reason),
302 ];
303 ("operator/replication_broken", fields, summary)
304 }
305 Self::Divergence {
306 peer,
307 leader_lsn,
308 follower_lsn,
309 } => {
310 let summary = format!(
311 "replication divergence: peer={peer} leader_lsn={leader_lsn} follower_lsn={follower_lsn}"
312 );
313 let fields = vec![
314 AuditFieldEscaper::field("peer", peer),
315 AuditFieldEscaper::field("leader_lsn", leader_lsn),
316 AuditFieldEscaper::field("follower_lsn", follower_lsn),
317 ];
318 ("operator/divergence", fields, summary)
319 }
320 Self::WalFsyncFailed { path, error } => {
321 let summary = format!("WAL fsync failed: path={path} error={error}");
322 let fields = vec![
323 AuditFieldEscaper::field("path", path),
324 AuditFieldEscaper::field("error", error),
325 ];
326 ("operator/wal_fsync_failed", fields, summary)
327 }
328 Self::DiskSpaceCritical {
329 path,
330 available_bytes,
331 threshold_bytes,
332 } => {
333 let summary = format!(
334 "disk space critical: path={path} available={available_bytes} threshold={threshold_bytes}"
335 );
336 let fields = vec![
337 AuditFieldEscaper::field("path", path),
338 AuditFieldEscaper::field("available_bytes", available_bytes),
339 AuditFieldEscaper::field("threshold_bytes", threshold_bytes),
340 ];
341 ("operator/disk_space_critical", fields, summary)
342 }
343 Self::AuthBypass {
344 principal,
345 resource,
346 detail,
347 } => {
348 let summary =
349 format!("auth bypass detected: principal={principal} resource={resource}");
350 let fields = vec![
351 AuditFieldEscaper::field("principal", principal),
352 AuditFieldEscaper::field("resource", resource),
353 AuditFieldEscaper::field("detail", detail),
354 ];
355 ("operator/auth_bypass", fields, summary)
356 }
357 Self::AdminCapabilityGranted {
358 granted_to,
359 capability,
360 granted_by,
361 } => {
362 let summary = format!(
363 "admin capability granted: to={granted_to} capability={capability} by={granted_by}"
364 );
365 let fields = vec![
366 AuditFieldEscaper::field("granted_to", granted_to),
367 AuditFieldEscaper::field("capability", capability),
368 AuditFieldEscaper::field("granted_by", granted_by),
369 ];
370 ("operator/admin_capability_granted", fields, summary)
371 }
372 Self::SecretRotationFailed { secret_ref, error } => {
373 let summary = format!("secret rotation failed: ref={secret_ref} error={error}");
374 let fields = vec![
375 AuditFieldEscaper::field("secret_ref", secret_ref),
376 AuditFieldEscaper::field("error", error),
377 ];
378 ("operator/secret_rotation_failed", fields, summary)
379 }
380 Self::ConfigChanged {
381 key,
382 old_value,
383 new_value,
384 changed_by,
385 } => {
386 let summary = format!(
387 "config changed: key={key} old={old_value} new={new_value} by={changed_by}"
388 );
389 let fields = vec![
390 AuditFieldEscaper::field("key", key),
391 AuditFieldEscaper::field("old_value", old_value),
392 AuditFieldEscaper::field("new_value", new_value),
393 AuditFieldEscaper::field("changed_by", changed_by),
394 ];
395 ("operator/config_changed", fields, summary)
396 }
397 Self::StartupFailed { phase, error } => {
398 let summary = format!("startup failed: phase={phase} error={error}");
399 let fields = vec![
400 AuditFieldEscaper::field("phase", phase),
401 AuditFieldEscaper::field("error", error),
402 ];
403 ("operator/startup_failed", fields, summary)
404 }
405 Self::ShutdownForced { reason } => {
406 let summary = format!("shutdown forced: reason={reason}");
407 let fields = vec![AuditFieldEscaper::field("reason", reason)];
408 ("operator/shutdown_forced", fields, summary)
409 }
410 Self::SchemaCorruption { collection, detail } => {
411 let summary = format!("schema corruption: collection={collection} detail={detail}");
412 let fields = vec![
413 AuditFieldEscaper::field("collection", collection),
414 AuditFieldEscaper::field("detail", detail),
415 ];
416 ("operator/schema_corruption", fields, summary)
417 }
418 Self::CheckpointFailed { lsn, error } => {
419 let summary = format!("checkpoint failed: lsn={lsn} error={error}");
420 let fields = vec![
421 AuditFieldEscaper::field("lsn", lsn),
422 AuditFieldEscaper::field("error", error),
423 ];
424 ("operator/checkpoint_failed", fields, summary)
425 }
426 Self::DanglingAdminIntent {
427 id,
428 op,
429 started_at_ms,
430 last_phase,
431 } => {
432 let summary = format!(
433 "dangling admin intent: id={id} op={op} started_at_ms={started_at_ms} last_phase={last_phase}"
434 );
435 let fields = vec![
436 AuditFieldEscaper::field("id", id.to_string()),
437 AuditFieldEscaper::field("op", op.to_string()),
438 AuditFieldEscaper::field("started_at_ms", started_at_ms),
439 AuditFieldEscaper::field("last_phase", last_phase.to_string()),
440 ];
441 ("operator/dangling_admin_intent", fields, summary)
442 }
443 Self::ConfigChangeRequiresRestart { fields_changed } => {
444 let summary = format!("config change requires restart: fields={fields_changed}");
445 let fields = vec![AuditFieldEscaper::field("fields_changed", fields_changed)];
446 ("operator/config_change_requires_restart", fields, summary)
447 }
448 Self::SubscriptionSchemaChange {
449 collection,
450 subscription_names,
451 fields_added,
452 fields_removed,
453 lsn,
454 } => {
455 let summary = format!(
456 "subscription schema change: collection={collection} subscriptions=[{subscription_names}] added=[{fields_added}] removed=[{fields_removed}] lsn={lsn}"
457 );
458 let fields = vec![
459 AuditFieldEscaper::field("collection", collection),
460 AuditFieldEscaper::field("subscription_names", subscription_names),
461 AuditFieldEscaper::field("fields_added", fields_added),
462 AuditFieldEscaper::field("fields_removed", fields_removed),
463 AuditFieldEscaper::field("lsn", lsn),
464 ];
465 ("operator/subscription_schema_change", fields, summary)
466 }
467 Self::OutboxDlqActivated { queue, dlq, reason } => {
468 let summary =
469 format!("outbox DLQ activated: queue={queue} dlq={dlq} reason={reason}");
470 let fields = vec![
471 AuditFieldEscaper::field("queue", queue),
472 AuditFieldEscaper::field("dlq", dlq),
473 AuditFieldEscaper::field("reason", reason),
474 ];
475 ("operator/outbox_dlq_activated", fields, summary)
476 }
477 Self::QueueDlqPromoted {
478 queue,
479 group,
480 dlq,
481 message_id,
482 attempts,
483 reason,
484 } => {
485 let summary = format!(
486 "queue DLQ promoted: queue={queue} group={group} dlq={dlq} message_id={message_id} attempts={attempts} reason={reason}"
487 );
488 let fields = vec![
489 AuditFieldEscaper::field("queue", queue),
490 AuditFieldEscaper::field("group", group),
491 AuditFieldEscaper::field("dlq", dlq),
492 AuditFieldEscaper::field("message_id", message_id),
493 AuditFieldEscaper::field("attempts", attempts as u64),
494 AuditFieldEscaper::field("reason", reason),
495 ];
496 ("operator/queue_dlq_promoted", fields, summary)
497 }
498 Self::DeposedPrimaryRollback {
499 common_point_lsn,
500 tail_to_lsn,
501 tail_lsns,
502 commit_watermark,
503 rollback_file,
504 new_primary_addr,
505 new_term,
506 } => {
507 let summary = format!(
508 "deposed primary auto-rollback: recovered to common_point_lsn={common_point_lsn} \
509 (commit_watermark={commit_watermark}); discarded divergent tail of {tail_lsns} LSN(s) \
510 up to {tail_to_lsn} saved to {rollback_file}; rejoining as replica of \
511 {new_primary_addr} under term {new_term}"
512 );
513 let fields = vec![
514 AuditFieldEscaper::field("common_point_lsn", common_point_lsn),
515 AuditFieldEscaper::field("tail_to_lsn", tail_to_lsn),
516 AuditFieldEscaper::field("tail_lsns", tail_lsns),
517 AuditFieldEscaper::field("commit_watermark", commit_watermark),
518 AuditFieldEscaper::field("rollback_file", rollback_file),
519 AuditFieldEscaper::field("new_primary_addr", new_primary_addr),
520 AuditFieldEscaper::field("new_term", new_term),
521 ];
522 ("operator/deposed_primary_rollback", fields, summary)
523 }
524 }
525 }
526}
527
528#[cfg(test)]
533mod tests {
534 use std::time::Duration;
535
536 use super::*;
537 use crate::runtime::audit_log::AuditLogger;
538
539 fn make_logger() -> (AuditLogger, std::path::PathBuf) {
540 let mut dir = std::env::temp_dir();
541 dir.push(format!(
542 "reddb-op-event-{}-{}",
543 std::process::id(),
544 crate::utils::now_unix_nanos()
545 ));
546 std::fs::create_dir_all(&dir).unwrap();
547 let path = dir.join(".audit.log");
548 let logger = AuditLogger::with_path(path.clone());
549 (logger, path)
550 }
551
552 fn drain(logger: &AuditLogger) {
553 assert!(
554 logger.wait_idle(Duration::from_secs(2)),
555 "audit logger drain timed out"
556 );
557 }
558
559 fn read_last_line(path: &std::path::Path) -> crate::json::Value {
560 let body = std::fs::read_to_string(path).unwrap();
561 let line = body.lines().last().expect("at least one audit line");
562 crate::json::from_str(line).expect("valid JSON")
563 }
564
565 #[test]
570 fn replication_broken_emits() {
571 let (logger, path) = make_logger();
572 OperatorEvent::ReplicationBroken {
573 peer: "replica-1".into(),
574 reason: "TCP reset".into(),
575 }
576 .emit(&logger);
577 drain(&logger);
578 let v = read_last_line(&path);
579 assert_eq!(
580 v.get("action").and_then(|x| x.as_str()),
581 Some("operator/replication_broken")
582 );
583 let peer = v
584 .get("detail")
585 .and_then(|d| d.get("peer"))
586 .and_then(|x| x.as_str());
587 assert_eq!(peer, Some("replica-1"));
588 }
589
590 #[test]
591 fn divergence_emits() {
592 let (logger, path) = make_logger();
593 OperatorEvent::Divergence {
594 peer: "replica-2".into(),
595 leader_lsn: 1000,
596 follower_lsn: 999,
597 }
598 .emit(&logger);
599 drain(&logger);
600 let v = read_last_line(&path);
601 assert_eq!(
602 v.get("action").and_then(|x| x.as_str()),
603 Some("operator/divergence")
604 );
605 let lsn = v
606 .get("detail")
607 .and_then(|d| d.get("leader_lsn"))
608 .and_then(|x| x.as_i64());
609 assert_eq!(lsn, Some(1000));
610 }
611
612 #[test]
613 fn wal_fsync_failed_emits() {
614 let (logger, path) = make_logger();
615 OperatorEvent::WalFsyncFailed {
616 path: "/data/wal".into(),
617 error: "EIO".into(),
618 }
619 .emit(&logger);
620 drain(&logger);
621 let v = read_last_line(&path);
622 assert_eq!(
623 v.get("action").and_then(|x| x.as_str()),
624 Some("operator/wal_fsync_failed")
625 );
626 let err = v
627 .get("detail")
628 .and_then(|d| d.get("error"))
629 .and_then(|x| x.as_str());
630 assert_eq!(err, Some("EIO"));
631 }
632
633 #[test]
634 fn disk_space_critical_emits() {
635 let (logger, path) = make_logger();
636 OperatorEvent::DiskSpaceCritical {
637 path: "/data".into(),
638 available_bytes: 1024,
639 threshold_bytes: 10240,
640 }
641 .emit(&logger);
642 drain(&logger);
643 let v = read_last_line(&path);
644 assert_eq!(
645 v.get("action").and_then(|x| x.as_str()),
646 Some("operator/disk_space_critical")
647 );
648 let avail = v
649 .get("detail")
650 .and_then(|d| d.get("available_bytes"))
651 .and_then(|x| x.as_i64());
652 assert_eq!(avail, Some(1024));
653 }
654
655 #[test]
656 fn auth_bypass_emits() {
657 let (logger, path) = make_logger();
658 OperatorEvent::AuthBypass {
659 principal: "alice".into(),
660 resource: "/admin/drop".into(),
661 detail: "scope check skipped".into(),
662 }
663 .emit(&logger);
664 drain(&logger);
665 let v = read_last_line(&path);
666 assert_eq!(
667 v.get("action").and_then(|x| x.as_str()),
668 Some("operator/auth_bypass")
669 );
670 let res = v
671 .get("detail")
672 .and_then(|d| d.get("resource"))
673 .and_then(|x| x.as_str());
674 assert_eq!(res, Some("/admin/drop"));
675 }
676
677 #[test]
678 fn admin_capability_granted_emits() {
679 let (logger, path) = make_logger();
680 OperatorEvent::AdminCapabilityGranted {
681 granted_to: "bob".into(),
682 capability: "ADMIN_WRITE".into(),
683 granted_by: "root".into(),
684 }
685 .emit(&logger);
686 drain(&logger);
687 let v = read_last_line(&path);
688 assert_eq!(
689 v.get("action").and_then(|x| x.as_str()),
690 Some("operator/admin_capability_granted")
691 );
692 let cap = v
693 .get("detail")
694 .and_then(|d| d.get("capability"))
695 .and_then(|x| x.as_str());
696 assert_eq!(cap, Some("ADMIN_WRITE"));
697 }
698
699 #[test]
700 fn secret_rotation_failed_emits() {
701 let (logger, path) = make_logger();
702 OperatorEvent::SecretRotationFailed {
703 secret_ref: "jwt-signing-key".into(),
704 error: "HSM unreachable".into(),
705 }
706 .emit(&logger);
707 drain(&logger);
708 let v = read_last_line(&path);
709 assert_eq!(
710 v.get("action").and_then(|x| x.as_str()),
711 Some("operator/secret_rotation_failed")
712 );
713 let r = v
714 .get("detail")
715 .and_then(|d| d.get("secret_ref"))
716 .and_then(|x| x.as_str());
717 assert_eq!(r, Some("jwt-signing-key"));
718 }
719
720 #[test]
721 fn config_changed_emits() {
722 let (logger, path) = make_logger();
723 OperatorEvent::ConfigChanged {
724 key: "max_connections".into(),
725 old_value: "100".into(),
726 new_value: "200".into(),
727 changed_by: "ops-bot".into(),
728 }
729 .emit(&logger);
730 drain(&logger);
731 let v = read_last_line(&path);
732 assert_eq!(
733 v.get("action").and_then(|x| x.as_str()),
734 Some("operator/config_changed")
735 );
736 let nv = v
737 .get("detail")
738 .and_then(|d| d.get("new_value"))
739 .and_then(|x| x.as_str());
740 assert_eq!(nv, Some("200"));
741 }
742
743 #[test]
744 fn startup_failed_emits() {
745 let (logger, path) = make_logger();
746 OperatorEvent::StartupFailed {
747 phase: "wal_recovery".into(),
748 error: "corrupt frame".into(),
749 }
750 .emit(&logger);
751 drain(&logger);
752 let v = read_last_line(&path);
753 assert_eq!(
754 v.get("action").and_then(|x| x.as_str()),
755 Some("operator/startup_failed")
756 );
757 let phase = v
758 .get("detail")
759 .and_then(|d| d.get("phase"))
760 .and_then(|x| x.as_str());
761 assert_eq!(phase, Some("wal_recovery"));
762 }
763
764 #[test]
765 fn shutdown_forced_emits() {
766 let (logger, path) = make_logger();
767 OperatorEvent::ShutdownForced {
768 reason: "OOM".into(),
769 }
770 .emit(&logger);
771 drain(&logger);
772 let v = read_last_line(&path);
773 assert_eq!(
774 v.get("action").and_then(|x| x.as_str()),
775 Some("operator/shutdown_forced")
776 );
777 let r = v
778 .get("detail")
779 .and_then(|d| d.get("reason"))
780 .and_then(|x| x.as_str());
781 assert_eq!(r, Some("OOM"));
782 }
783
784 #[test]
785 fn schema_corruption_emits() {
786 let (logger, path) = make_logger();
787 OperatorEvent::SchemaCorruption {
788 collection: "users".into(),
789 detail: "unknown type tag 0xFF".into(),
790 }
791 .emit(&logger);
792 drain(&logger);
793 let v = read_last_line(&path);
794 assert_eq!(
795 v.get("action").and_then(|x| x.as_str()),
796 Some("operator/schema_corruption")
797 );
798 let coll = v
799 .get("detail")
800 .and_then(|d| d.get("collection"))
801 .and_then(|x| x.as_str());
802 assert_eq!(coll, Some("users"));
803 }
804
805 #[test]
806 fn checkpoint_failed_emits() {
807 let (logger, path) = make_logger();
808 OperatorEvent::CheckpointFailed {
809 lsn: 42_000,
810 error: "write stall".into(),
811 }
812 .emit(&logger);
813 drain(&logger);
814 let v = read_last_line(&path);
815 assert_eq!(
816 v.get("action").and_then(|x| x.as_str()),
817 Some("operator/checkpoint_failed")
818 );
819 let lsn = v
820 .get("detail")
821 .and_then(|d| d.get("lsn"))
822 .and_then(|x| x.as_i64());
823 assert_eq!(lsn, Some(42_000));
824 }
825
826 #[test]
827 fn deposed_primary_rollback_emits() {
828 let (logger, path) = make_logger();
829 OperatorEvent::DeposedPrimaryRollback {
830 common_point_lsn: 200,
831 tail_to_lsn: 230,
832 tail_lsns: 30,
833 commit_watermark: 200,
834 rollback_file: "/data/rollback/term7-lsn200-230.rbk".into(),
835 new_primary_addr: "http://node-b:50051".into(),
836 new_term: 8,
837 }
838 .emit(&logger);
839 drain(&logger);
840 let v = read_last_line(&path);
841 assert_eq!(
842 v.get("action").and_then(|x| x.as_str()),
843 Some("operator/deposed_primary_rollback")
844 );
845 let cp = v
846 .get("detail")
847 .and_then(|d| d.get("common_point_lsn"))
848 .and_then(|x| x.as_i64());
849 assert_eq!(cp, Some(200));
850 let file = v
851 .get("detail")
852 .and_then(|d| d.get("rollback_file"))
853 .and_then(|x| x.as_str());
854 assert_eq!(file, Some("/data/rollback/term7-lsn200-230.rbk"));
855 }
856
857 #[test]
862 fn adversarial_fields_are_escape_safe() {
863 let payloads: &[(&str, &str)] = &[
864 ("crlf", "line1\r\nline2"),
865 ("nul", "before\0after"),
866 ("quote", r#"she said "hi""#),
867 ("json_inject", r#"{"injected":true}"#),
868 ("low_ctrl", "\x01\x02\x07\x1f"),
869 ("backslash", "C:\\path\\file"),
870 ("mixed", "name=\"x\"\n\\path\t\x01end"),
871 ];
872
873 for (label, payload) in payloads {
874 let (logger, path) = make_logger();
875 OperatorEvent::SchemaCorruption {
876 collection: payload.to_string(),
877 detail: payload.to_string(),
878 }
879 .emit(&logger);
880 drain(&logger);
881
882 let body = std::fs::read_to_string(&path).unwrap();
883 let line = body.lines().last().unwrap_or("");
884
885 assert!(
887 !line.contains('\n'),
888 "{label}: embedded newline in JSONL row"
889 );
890
891 let v: crate::json::Value = crate::json::from_str(line)
892 .unwrap_or_else(|e| panic!("{label}: audit line not valid JSON: {e}\n{line:?}"));
893 let recovered = v
894 .get("detail")
895 .and_then(|d| d.get("collection"))
896 .and_then(|x| x.as_str())
897 .unwrap_or("");
898 assert_eq!(recovered, *payload, "{label}: round-trip mismatch");
899 }
900 }
901
902 #[test]
907 fn emit_sets_outcome_error_and_source_system() {
908 let (logger, path) = make_logger();
909 OperatorEvent::ShutdownForced {
910 reason: "test".into(),
911 }
912 .emit(&logger);
913 drain(&logger);
914 let v = read_last_line(&path);
915 assert_eq!(v.get("outcome").and_then(|x| x.as_str()), Some("error"));
916 assert_eq!(v.get("source").and_then(|x| x.as_str()), Some("system"));
917 }
918}