Skip to main content

reddb_server/telemetry/
admin_intent_log.rs

1//! Intent log for long-running admin operations.
2//!
3//! Records a JSONL trail of every admin operation from begin → checkpoints →
4//! complete/abort. At startup, `scan_and_report` finds any intents that never
5//! reached a terminal phase and emits [`OperatorEvent::DanglingAdminIntent`]
6//! for each one so operators can investigate interrupted operations.
7//!
8//! # Durability contract
9//!
10//! - File opened with `O_APPEND`; POSIX guarantees atomic writes up to
11//!   `PIPE_BUF` (4096 bytes on Linux) for regular files. Records are
12//!   capped at 3 KiB so multi-writer atomicity holds on supported kernels.
13//! - `fsync` on `begin` only. Checkpoint / complete / abort writes are
14//!   buffered — a crash between `begin` and `complete` is exactly the
15//!   "dangling intent" condition `scan_and_report` is designed to surface.
16
17use std::collections::HashMap;
18use std::fmt;
19use std::fs::{File, OpenOptions};
20use std::io::Write;
21use std::path::{Path, PathBuf};
22use std::sync::Mutex;
23
24use crate::crypto::uuid::Uuid;
25use crate::json::{Map, Value as JsonValue};
26use crate::utils::time::now_unix_millis;
27
28const MAX_RECORD_BYTES: usize = 3 * 1024;
29
30// ---------------------------------------------------------------------------
31// Error
32// ---------------------------------------------------------------------------
33
34#[derive(Debug)]
35pub enum IntentLogError {
36    Io(std::io::Error),
37    TooLarge { bytes: usize },
38    SyncFailed(std::io::Error),
39}
40
41impl fmt::Display for IntentLogError {
42    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
43        match self {
44            Self::Io(e) => write!(f, "intent log I/O: {e}"),
45            Self::TooLarge { bytes } => {
46                write!(
47                    f,
48                    "intent record too large: {bytes} bytes (max {MAX_RECORD_BYTES})"
49                )
50            }
51            Self::SyncFailed(e) => write!(f, "intent log fsync failed: {e}"),
52        }
53    }
54}
55
56impl From<std::io::Error> for IntentLogError {
57    fn from(e: std::io::Error) -> Self {
58        Self::Io(e)
59    }
60}
61
62// ---------------------------------------------------------------------------
63// IntentOp — closed enum; add variants when new consumers arrive
64// ---------------------------------------------------------------------------
65
66#[derive(Debug, Clone, Copy, PartialEq, Eq)]
67pub enum IntentOp {
68    ReplicaBootstrap,
69}
70
71impl IntentOp {
72    fn as_str(self) -> &'static str {
73        match self {
74            Self::ReplicaBootstrap => "replica_bootstrap",
75        }
76    }
77
78    fn from_str(s: &str) -> Option<Self> {
79        match s {
80            "replica_bootstrap" => Some(Self::ReplicaBootstrap),
81            _ => None,
82        }
83    }
84}
85
86impl fmt::Display for IntentOp {
87    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
88        f.write_str(self.as_str())
89    }
90}
91
92// ---------------------------------------------------------------------------
93// IntentPhase
94// ---------------------------------------------------------------------------
95
96#[derive(Debug, Clone, PartialEq, Eq)]
97pub enum IntentPhase {
98    Running,
99    Checkpoint(u32),
100    Completed,
101    Aborted,
102}
103
104impl IntentPhase {
105    fn as_str(&self) -> String {
106        match self {
107            Self::Running => "running".to_string(),
108            Self::Checkpoint(n) => format!("checkpoint_{n}"),
109            Self::Completed => "completed".to_string(),
110            Self::Aborted => "aborted".to_string(),
111        }
112    }
113
114    fn is_terminal(&self) -> bool {
115        matches!(self, Self::Completed | Self::Aborted)
116    }
117
118    fn from_str(s: &str) -> Option<Self> {
119        match s {
120            "running" => Some(Self::Running),
121            "completed" => Some(Self::Completed),
122            "aborted" => Some(Self::Aborted),
123            _ if s.starts_with("checkpoint_") => s["checkpoint_".len()..]
124                .parse::<u32>()
125                .ok()
126                .map(Self::Checkpoint),
127            _ => None,
128        }
129    }
130}
131
132impl fmt::Display for IntentPhase {
133    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
134        write!(f, "{}", self.as_str())
135    }
136}
137
138// ---------------------------------------------------------------------------
139// Redaction helpers
140// ---------------------------------------------------------------------------
141
142const SENSITIVE_SUBSTRINGS: &[&str] = &["password", "secret", "token", "key", "credential", "auth"];
143
144fn is_sensitive_key(k: &str) -> bool {
145    let lower = k.to_ascii_lowercase();
146    SENSITIVE_SUBSTRINGS.iter().any(|s| lower.contains(s))
147}
148
149fn redact_map(map: &Map<String, JsonValue>) -> JsonValue {
150    let mut out = Map::new();
151    for (k, v) in map {
152        let v = if is_sensitive_key(k) {
153            JsonValue::String("***REDACTED***".to_string())
154        } else {
155            v.clone()
156        };
157        out.insert(k.clone(), v);
158    }
159    JsonValue::Object(out)
160}
161
162// ---------------------------------------------------------------------------
163// IntentArgs / IntentProgress / IntentSummary
164// ---------------------------------------------------------------------------
165
166/// Caller-supplied arguments for an intent. Sensitive keys are redacted
167/// before writing to the log.
168#[derive(Debug, Default, Clone)]
169pub struct IntentArgs(Map<String, JsonValue>);
170
171impl IntentArgs {
172    pub fn new() -> Self {
173        Self(Map::new())
174    }
175
176    pub fn insert(mut self, key: impl Into<String>, value: JsonValue) -> Self {
177        self.0.insert(key.into(), value);
178        self
179    }
180
181    fn to_json_value(&self) -> JsonValue {
182        redact_map(&self.0)
183    }
184}
185
186/// Progress snapshot attached to a checkpoint record.
187#[derive(Debug, Default, Clone)]
188pub struct IntentProgress(Map<String, JsonValue>);
189
190impl IntentProgress {
191    pub fn new() -> Self {
192        Self(Map::new())
193    }
194
195    pub fn insert(mut self, key: impl Into<String>, value: JsonValue) -> Self {
196        self.0.insert(key.into(), value);
197        self
198    }
199
200    fn to_json_value(&self) -> JsonValue {
201        redact_map(&self.0)
202    }
203}
204
205/// Summary attached to a completed intent record.
206#[derive(Debug, Default, Clone)]
207pub struct IntentSummary(Map<String, JsonValue>);
208
209impl IntentSummary {
210    pub fn new() -> Self {
211        Self(Map::new())
212    }
213
214    pub fn insert(mut self, key: impl Into<String>, value: JsonValue) -> Self {
215        self.0.insert(key.into(), value);
216        self
217    }
218
219    fn to_json_value(&self) -> JsonValue {
220        redact_map(&self.0)
221    }
222}
223
224// ---------------------------------------------------------------------------
225// Record helpers
226// ---------------------------------------------------------------------------
227
228fn build_record(
229    id: Uuid,
230    op: IntentOp,
231    phase: &IntentPhase,
232    ts: u64,
233    actor: &str,
234    args: &JsonValue,
235    progress: Option<JsonValue>,
236    summary: Option<JsonValue>,
237) -> Map<String, JsonValue> {
238    let mut m = Map::new();
239    m.insert("id".to_string(), JsonValue::String(id.to_string()));
240    m.insert("op".to_string(), JsonValue::String(op.as_str().to_string()));
241    m.insert("phase".to_string(), JsonValue::String(phase.as_str()));
242    m.insert("ts".to_string(), JsonValue::Number(ts as f64));
243    m.insert("actor".to_string(), JsonValue::String(actor.to_string()));
244    m.insert("args".to_string(), args.clone());
245    if let Some(p) = progress {
246        m.insert("progress".to_string(), p);
247    }
248    if let Some(s) = summary {
249        m.insert("summary".to_string(), s);
250    }
251    m
252}
253
254fn serialize_record(record: &Map<String, JsonValue>) -> Result<String, IntentLogError> {
255    let line = JsonValue::Object(record.clone()).to_string_compact();
256    let bytes = line.len();
257    if bytes > MAX_RECORD_BYTES {
258        return Err(IntentLogError::TooLarge { bytes });
259    }
260    Ok(line)
261}
262
263// ---------------------------------------------------------------------------
264// AdminIntentLog
265// ---------------------------------------------------------------------------
266
267pub struct AdminIntentLog {
268    path: PathBuf,
269    file: Mutex<File>,
270}
271
272impl AdminIntentLog {
273    /// Open (or create) the intent log at `path`.
274    pub fn open(path: impl AsRef<Path>) -> Result<Self, IntentLogError> {
275        let path = path.as_ref().to_path_buf();
276        let file = OpenOptions::new().create(true).append(true).open(&path)?;
277        Ok(Self {
278            path,
279            file: Mutex::new(file),
280        })
281    }
282
283    /// Begin a new intent. Writes the opening record and fsyncs.
284    pub fn begin(
285        &self,
286        op: IntentOp,
287        actor: &str,
288        args: IntentArgs,
289    ) -> Result<IntentHandle<'_>, IntentLogError> {
290        let id = Uuid::new_v7();
291        let ts = now_unix_millis();
292        let args_json = args.to_json_value();
293        let record = build_record(
294            id,
295            op,
296            &IntentPhase::Running,
297            ts,
298            actor,
299            &args_json,
300            None,
301            None,
302        );
303        let line = serialize_record(&record)?;
304
305        {
306            let mut file = self.file.lock().unwrap();
307            file.write_all(line.as_bytes())?;
308            file.write_all(b"\n")?;
309            file.flush()?;
310            // fsync on begin only — see module doc
311            file.sync_data().map_err(IntentLogError::SyncFailed)?;
312        }
313
314        Ok(IntentHandle {
315            log: self,
316            id,
317            op,
318            actor: actor.to_string(),
319            args_json,
320            started_at_ms: ts,
321            last_phase: IntentPhase::Running,
322            done: false,
323        })
324    }
325
326    /// Return metadata about every intent that has not yet reached a
327    /// terminal phase (completed or aborted).
328    pub fn list_unfinished(&self) -> Vec<UnfinishedIntent> {
329        self.scan_intents_internal()
330    }
331
332    /// Scan the log and emit [`crate::telemetry::operator_event::OperatorEvent::DanglingAdminIntent`]
333    /// for every unfinished intent. Corrupted lines are skipped with a
334    /// `tracing::warn!` breadcrumb — they do not abort the scan.
335    pub fn scan_and_report(&self) {
336        for item in self.scan_intents_internal() {
337            crate::telemetry::operator_event::OperatorEvent::DanglingAdminIntent {
338                id: item.id,
339                op: item.op,
340                started_at_ms: item.started_at_ms,
341                last_phase: item.last_phase,
342            }
343            .emit_global();
344        }
345    }
346
347    fn write_record(
348        &self,
349        id: Uuid,
350        op: IntentOp,
351        phase: &IntentPhase,
352        actor: &str,
353        args_json: &JsonValue,
354        progress: Option<&IntentProgress>,
355        summary: Option<&IntentSummary>,
356    ) -> Result<(), IntentLogError> {
357        let ts = now_unix_millis();
358        let record = build_record(
359            id,
360            op,
361            phase,
362            ts,
363            actor,
364            args_json,
365            progress.map(|p| p.to_json_value()),
366            summary.map(|s| s.to_json_value()),
367        );
368        let line = serialize_record(&record)?;
369        let mut file = self.file.lock().unwrap();
370        file.write_all(line.as_bytes())?;
371        file.write_all(b"\n")?;
372        file.flush()?;
373        Ok(())
374    }
375
376    fn scan_intents_internal(&self) -> Vec<UnfinishedIntent> {
377        let content = match std::fs::read_to_string(&self.path) {
378            Ok(c) => c,
379            Err(_) => return Vec::new(),
380        };
381
382        struct ScanEntry {
383            op: IntentOp,
384            started_at_ms: u64,
385            phase: IntentPhase,
386            args: Map<String, JsonValue>,
387            last_progress: Option<Map<String, JsonValue>>,
388        }
389
390        let mut intents: HashMap<String, ScanEntry> = HashMap::new();
391
392        for raw_line in content.lines() {
393            let line = raw_line.trim();
394            if line.is_empty() {
395                continue;
396            }
397
398            let v: JsonValue = match crate::json::from_str(line) {
399                Ok(v) => v,
400                Err(_) => {
401                    tracing::warn!(
402                        target: "reddb::admin_intent_log",
403                        "corrupted intent log line skipped"
404                    );
405                    continue;
406                }
407            };
408
409            let Some(id) = v.get("id").and_then(|x| x.as_str()).map(|s| s.to_string()) else {
410                continue;
411            };
412            let Some(op_str) = v.get("op").and_then(|x| x.as_str()) else {
413                continue;
414            };
415            let Some(op) = IntentOp::from_str(op_str) else {
416                continue;
417            };
418            let Some(phase_str) = v.get("phase").and_then(|x| x.as_str()) else {
419                continue;
420            };
421            let Some(phase) = IntentPhase::from_str(phase_str) else {
422                continue;
423            };
424            let ts = v.get("ts").and_then(|x| x.as_f64()).unwrap_or(0.0) as u64;
425
426            let args_map = v
427                .get("args")
428                .and_then(|x| x.as_object())
429                .cloned()
430                .unwrap_or_default();
431            let progress_map = v.get("progress").and_then(|x| x.as_object()).cloned();
432
433            intents
434                .entry(id)
435                .and_modify(|e| {
436                    // Keep earliest ts and args (from running record); update phase and progress.
437                    e.phase = phase.clone();
438                    if let Some(p) = progress_map.clone() {
439                        e.last_progress = Some(p);
440                    }
441                })
442                .or_insert(ScanEntry {
443                    op,
444                    started_at_ms: ts,
445                    phase,
446                    args: args_map,
447                    last_progress: progress_map,
448                });
449        }
450
451        intents
452            .into_iter()
453            .filter(|(_, e)| !e.phase.is_terminal())
454            .map(|(id_str, e)| {
455                let id = Uuid::parse_str(&id_str).unwrap_or_else(|_| Uuid::new_v4());
456                UnfinishedIntent {
457                    id,
458                    op: e.op,
459                    started_at_ms: e.started_at_ms,
460                    last_phase: e.phase,
461                    args: e.args,
462                    last_progress: e.last_progress,
463                }
464            })
465            .collect()
466    }
467}
468
469// ---------------------------------------------------------------------------
470// UnfinishedIntent (returned by list_unfinished / used by scan_and_report)
471// ---------------------------------------------------------------------------
472
473pub struct UnfinishedIntent {
474    pub id: Uuid,
475    pub op: IntentOp,
476    pub started_at_ms: u64,
477    pub last_phase: IntentPhase,
478    /// Args from the opening `running` record. Used by consumers to filter by
479    /// owner fields (e.g., `replica_id`) and implement single-resumer policy.
480    pub args: Map<String, JsonValue>,
481    /// Progress map from the most recent checkpoint record, if any. `None`
482    /// means the intent started but never reached a checkpoint.
483    pub last_progress: Option<Map<String, JsonValue>>,
484}
485
486// ---------------------------------------------------------------------------
487// IntentHandle — linear type; Drop writes aborted if complete() not called
488// ---------------------------------------------------------------------------
489
490pub struct IntentHandle<'a> {
491    log: &'a AdminIntentLog,
492    id: Uuid,
493    op: IntentOp,
494    actor: String,
495    args_json: JsonValue,
496    pub started_at_ms: u64,
497    last_phase: IntentPhase,
498    done: bool,
499}
500
501impl<'a> IntentHandle<'a> {
502    pub fn id(&self) -> Uuid {
503        self.id
504    }
505
506    pub fn last_phase(&self) -> &IntentPhase {
507        &self.last_phase
508    }
509
510    /// Write a checkpoint record. `n` should be monotonically increasing.
511    pub fn checkpoint(
512        &mut self,
513        n: u32,
514        progress: Option<IntentProgress>,
515    ) -> Result<(), IntentLogError> {
516        let phase = IntentPhase::Checkpoint(n);
517        self.log.write_record(
518            self.id,
519            self.op,
520            &phase,
521            &self.actor,
522            &self.args_json,
523            progress.as_ref(),
524            None,
525        )?;
526        self.last_phase = phase;
527        Ok(())
528    }
529
530    /// Mark the intent complete. Consumes the handle; Drop will not write aborted.
531    pub fn complete(mut self, summary: Option<IntentSummary>) -> Result<(), IntentLogError> {
532        let result = self.log.write_record(
533            self.id,
534            self.op,
535            &IntentPhase::Completed,
536            &self.actor,
537            &self.args_json,
538            None,
539            summary.as_ref(),
540        );
541        if result.is_ok() {
542            self.done = true;
543        }
544        result
545    }
546}
547
548impl Drop for IntentHandle<'_> {
549    fn drop(&mut self) {
550        if !self.done {
551            let _ = self.log.write_record(
552                self.id,
553                self.op,
554                &IntentPhase::Aborted,
555                &self.actor,
556                &self.args_json,
557                None,
558                None,
559            );
560        }
561    }
562}
563
564// ---------------------------------------------------------------------------
565// Tests
566// ---------------------------------------------------------------------------
567
568#[cfg(test)]
569mod tests {
570    use super::*;
571    use crate::json::Value as JsonValue;
572
573    fn tmp_path(label: &str) -> PathBuf {
574        let mut p = std::env::temp_dir();
575        p.push(format!(
576            "reddb-intent-{}-{}-{}.log",
577            label,
578            std::process::id(),
579            crate::utils::now_unix_nanos()
580        ));
581        p
582    }
583
584    fn last_line_json(path: &Path) -> JsonValue {
585        let body = std::fs::read_to_string(path).unwrap();
586        let line = body.lines().last().expect("at least one line");
587        crate::json::from_str(line).expect("valid JSON")
588    }
589
590    fn all_lines_json(path: &Path) -> Vec<JsonValue> {
591        let body = std::fs::read_to_string(path).unwrap();
592        body.lines()
593            .filter(|l| !l.trim().is_empty())
594            .map(|l| crate::json::from_str(l).expect("valid JSON"))
595            .collect()
596    }
597
598    // -----------------------------------------------------------------------
599    // 1. begin writes a running record and fsyncs
600    // -----------------------------------------------------------------------
601    #[test]
602    fn begin_writes_running_record() {
603        let path = tmp_path("begin");
604        let log = AdminIntentLog::open(&path).unwrap();
605        let handle = log
606            .begin(IntentOp::ReplicaBootstrap, "ops-bot", IntentArgs::new())
607            .unwrap();
608        drop(handle); // also writes aborted; just check first line
609
610        let body = std::fs::read_to_string(&path).unwrap();
611        let first_line = body.lines().next().unwrap();
612        let v: JsonValue = crate::json::from_str(first_line).unwrap();
613        assert_eq!(v.get("phase").and_then(|x| x.as_str()), Some("running"));
614        assert_eq!(
615            v.get("op").and_then(|x| x.as_str()),
616            Some("replica_bootstrap")
617        );
618        assert_eq!(v.get("actor").and_then(|x| x.as_str()), Some("ops-bot"));
619    }
620
621    // -----------------------------------------------------------------------
622    // 2. complete writes completed, Drop writes nothing extra
623    // -----------------------------------------------------------------------
624    #[test]
625    fn complete_writes_completed_phase() {
626        let path = tmp_path("complete");
627        let log = AdminIntentLog::open(&path).unwrap();
628        let handle = log
629            .begin(IntentOp::ReplicaBootstrap, "admin", IntentArgs::new())
630            .unwrap();
631        handle.complete(None).unwrap();
632
633        let lines = all_lines_json(&path);
634        assert_eq!(lines.len(), 2, "begin + complete = 2 lines");
635        assert_eq!(
636            lines[1].get("phase").and_then(|x| x.as_str()),
637            Some("completed")
638        );
639    }
640
641    // -----------------------------------------------------------------------
642    // 3. Drop without complete writes aborted
643    // -----------------------------------------------------------------------
644    #[test]
645    fn drop_without_complete_writes_aborted() {
646        let path = tmp_path("drop-abort");
647        let log = AdminIntentLog::open(&path).unwrap();
648        {
649            let _handle = log
650                .begin(IntentOp::ReplicaBootstrap, "admin", IntentArgs::new())
651                .unwrap();
652            // drop here without calling complete
653        }
654
655        let last = last_line_json(&path);
656        assert_eq!(last.get("phase").and_then(|x| x.as_str()), Some("aborted"));
657    }
658
659    // -----------------------------------------------------------------------
660    // 4. checkpoint writes intermediate records
661    // -----------------------------------------------------------------------
662    #[test]
663    fn checkpoint_writes_intermediate_records() {
664        let path = tmp_path("checkpoint");
665        let log = AdminIntentLog::open(&path).unwrap();
666        let mut handle = log
667            .begin(IntentOp::ReplicaBootstrap, "admin", IntentArgs::new())
668            .unwrap();
669        handle.checkpoint(1, None).unwrap();
670        handle.checkpoint(2, None).unwrap();
671        handle.complete(None).unwrap();
672
673        let lines = all_lines_json(&path);
674        assert_eq!(lines.len(), 4); // begin + 2 checkpoints + complete
675        assert_eq!(
676            lines[1].get("phase").and_then(|x| x.as_str()),
677            Some("checkpoint_1")
678        );
679        assert_eq!(
680            lines[2].get("phase").and_then(|x| x.as_str()),
681            Some("checkpoint_2")
682        );
683    }
684
685    // -----------------------------------------------------------------------
686    // 5. scan_and_report emits N DanglingAdminIntent events for N unfinished
687    // -----------------------------------------------------------------------
688    #[test]
689    fn scan_and_report_finds_unfinished_intents() {
690        let path = tmp_path("scan");
691        let log = AdminIntentLog::open(&path).unwrap();
692
693        // Use mem::forget to simulate a crash — prevents Drop from writing aborted,
694        // leaving 2 intents with only a "running" record (no terminal phase).
695        let h1 = log
696            .begin(IntentOp::ReplicaBootstrap, "a", IntentArgs::new())
697            .unwrap();
698        let h2 = log
699            .begin(IntentOp::ReplicaBootstrap, "b", IntentArgs::new())
700            .unwrap();
701        std::mem::forget(h1);
702        std::mem::forget(h2);
703
704        // 1 completed normally
705        let h3 = log
706            .begin(IntentOp::ReplicaBootstrap, "c", IntentArgs::new())
707            .unwrap();
708        h3.complete(None).unwrap();
709
710        let log2 = AdminIntentLog::open(&path).unwrap();
711        let unfinished = log2.list_unfinished();
712        assert_eq!(unfinished.len(), 2, "expected exactly 2 dangling intents");
713    }
714
715    // -----------------------------------------------------------------------
716    // 6. Record > 3 KiB returns TooLarge, no write
717    // -----------------------------------------------------------------------
718    #[test]
719    fn record_too_large_returns_error_no_write() {
720        let path = tmp_path("toolarge");
721        let log = AdminIntentLog::open(&path).unwrap();
722
723        // Build args that blow past 3KB
724        let big_value = "x".repeat(4096);
725        let args = IntentArgs::new().insert("data", JsonValue::String(big_value));
726        let err = log.begin(IntentOp::ReplicaBootstrap, "admin", args);
727        assert!(
728            matches!(err, Err(IntentLogError::TooLarge { .. })),
729            "expected TooLarge, got {:?}",
730            err.err().map(|e| e.to_string())
731        );
732
733        // No lines written
734        let content = std::fs::read_to_string(&path).unwrap_or_default();
735        assert!(
736            content.lines().all(|l| l.trim().is_empty()),
737            "no lines should have been written"
738        );
739    }
740
741    // -----------------------------------------------------------------------
742    // 7. Corrupted JSON line does not crash scan; emits tracing::warn
743    // -----------------------------------------------------------------------
744    #[test]
745    fn corrupted_line_skipped_in_scan() {
746        let path = tmp_path("corrupt");
747        let log = AdminIntentLog::open(&path).unwrap();
748        let h = log
749            .begin(IntentOp::ReplicaBootstrap, "admin", IntentArgs::new())
750            .unwrap();
751        drop(h); // aborted
752
753        // Inject a corrupted line between existing lines
754        let mut content = std::fs::read_to_string(&path).unwrap();
755        content.push_str("not-valid-json\n");
756        std::fs::write(&path, &content).unwrap();
757
758        // Reopen and scan — should not panic
759        let log2 = AdminIntentLog::open(&path).unwrap();
760        let unfinished = log2.list_unfinished(); // aborted is terminal, so 0
761                                                 // The important assertion: we got here without panic
762        assert_eq!(unfinished.len(), 0);
763    }
764
765    // -----------------------------------------------------------------------
766    // 8. Sensitive keys are redacted in args
767    // -----------------------------------------------------------------------
768    #[test]
769    fn sensitive_keys_are_redacted() {
770        let path = tmp_path("redact");
771        let log = AdminIntentLog::open(&path).unwrap();
772        let args = IntentArgs::new()
773            .insert("password", JsonValue::String("hunter2".to_string()))
774            .insert("host", JsonValue::String("db.internal".to_string()));
775        let h = log
776            .begin(IntentOp::ReplicaBootstrap, "admin", args)
777            .unwrap();
778        h.complete(None).unwrap();
779
780        let body = std::fs::read_to_string(&path).unwrap();
781        let first_line = body.lines().next().unwrap();
782        let v: JsonValue = crate::json::from_str(first_line).unwrap();
783        let args_obj = v.get("args").unwrap();
784        let pwd = args_obj.get("password").and_then(|x| x.as_str());
785        assert_eq!(pwd, Some("***REDACTED***"), "password should be redacted");
786        let host = args_obj.get("host").and_then(|x| x.as_str());
787        assert_eq!(host, Some("db.internal"), "host should not be redacted");
788    }
789
790    // -----------------------------------------------------------------------
791    // 9. Multi-process POSIX atomicity test
792    //    Spawns 2 child processes + parent all writing concurrently.
793    //    Every line must parse as valid JSON — no record interleaving.
794    // -----------------------------------------------------------------------
795    #[test]
796    fn multi_process_posix_atomicity() {
797        const LOG_PATH_ENV: &str = "INTENT_LOG_CHILD_PATH";
798        const CHILD_OPS: u32 = 20;
799
800        // --- child mode ---
801        if let Ok(path) = std::env::var(LOG_PATH_ENV) {
802            let log = AdminIntentLog::open(&path).unwrap();
803            for _ in 0..CHILD_OPS {
804                let h = log
805                    .begin(IntentOp::ReplicaBootstrap, "child", IntentArgs::new())
806                    .unwrap();
807                h.complete(None).unwrap();
808            }
809            return;
810        }
811
812        // --- parent mode ---
813        let path = format!(
814            "/tmp/reddb-intent-mp-{}-{}.log",
815            std::process::id(),
816            crate::utils::now_unix_nanos()
817        );
818
819        // Create the file before spawning children
820        let log = AdminIntentLog::open(&path).unwrap();
821
822        let exe = std::env::current_exe().unwrap();
823        let spawn_child = || {
824            std::process::Command::new(&exe)
825                .arg("multi_process_posix_atomicity")
826                .env(LOG_PATH_ENV, &path)
827                .stdout(std::process::Stdio::null())
828                .stderr(std::process::Stdio::null())
829                .spawn()
830                .expect("spawn child process")
831        };
832
833        let mut child1 = spawn_child();
834        let mut child2 = spawn_child();
835
836        // Parent writes concurrently
837        for _ in 0..CHILD_OPS {
838            let h = log
839                .begin(IntentOp::ReplicaBootstrap, "parent", IntentArgs::new())
840                .unwrap();
841            h.complete(None).unwrap();
842        }
843
844        let s1 = child1.wait().expect("child1 wait");
845        let s2 = child2.wait().expect("child2 wait");
846        assert!(s1.success(), "child1 exited with failure");
847        assert!(s2.success(), "child2 exited with failure");
848
849        // Verify: every non-empty line parses as valid JSON (no interleaving)
850        let content = std::fs::read_to_string(&path).unwrap();
851        let mut line_count = 0_usize;
852        for (i, line) in content.lines().enumerate() {
853            let line = line.trim();
854            if line.is_empty() {
855                continue;
856            }
857            crate::json::from_str::<JsonValue>(line)
858                .unwrap_or_else(|e| panic!("line {i} is not valid JSON: {e}\n{line:?}"));
859            line_count += 1;
860        }
861        // 3 writers × 20 ops × 2 records (begin + complete) = 120 minimum
862        assert!(line_count >= 120, "expected ≥120 lines, got {line_count}");
863
864        let _ = std::fs::remove_file(&path);
865    }
866}