Skip to main content

reddb_server/telemetry/
admin_intent_log.rs

1//! Intent log for long-running admin operations.
2//!
3//! Records a JSONL trail of every admin operation from begin → checkpoints →
4//! complete/abort. At startup, `scan_and_report` finds any intents that never
5//! reached a terminal phase and emits [`OperatorEvent::DanglingAdminIntent`]
6//! for each one so operators can investigate interrupted operations.
7//!
8//! # Durability contract
9//!
10//! - File opened with `O_APPEND`; POSIX guarantees atomic writes up to
11//!   `PIPE_BUF` (4096 bytes on Linux) for regular files. Records are
12//!   capped at 3 KiB so multi-writer atomicity holds on supported kernels.
13//! - `fsync` on `begin` only. Checkpoint / complete / abort writes are
14//!   buffered — a crash between `begin` and `complete` is exactly the
15//!   "dangling intent" condition `scan_and_report` is designed to surface.
16
17use std::collections::HashMap;
18use std::fmt;
19use std::fs::{File, OpenOptions};
20use std::io::Write;
21use std::path::{Path, PathBuf};
22use std::sync::Mutex;
23
24use crate::crypto::uuid::Uuid;
25use crate::json::{Map, Value as JsonValue};
26use crate::utils::time::now_unix_millis;
27
28const MAX_RECORD_BYTES: usize = 3 * 1024;
29
30// ---------------------------------------------------------------------------
31// Error
32// ---------------------------------------------------------------------------
33
34#[derive(Debug)]
35pub enum IntentLogError {
36    Io(std::io::Error),
37    TooLarge { bytes: usize },
38    SyncFailed(std::io::Error),
39}
40
41impl fmt::Display for IntentLogError {
42    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
43        match self {
44            Self::Io(e) => write!(f, "intent log I/O: {e}"),
45            Self::TooLarge { bytes } => {
46                write!(
47                    f,
48                    "intent record too large: {bytes} bytes (max {MAX_RECORD_BYTES})"
49                )
50            }
51            Self::SyncFailed(e) => write!(f, "intent log fsync failed: {e}"),
52        }
53    }
54}
55
56impl From<std::io::Error> for IntentLogError {
57    fn from(e: std::io::Error) -> Self {
58        Self::Io(e)
59    }
60}
61
62// ---------------------------------------------------------------------------
63// IntentOp — closed enum; add variants when new consumers arrive
64// ---------------------------------------------------------------------------
65
66#[derive(Debug, Clone, Copy, PartialEq, Eq)]
67pub enum IntentOp {
68    ReplicaBootstrap,
69}
70
71impl IntentOp {
72    fn as_str(self) -> &'static str {
73        match self {
74            Self::ReplicaBootstrap => "replica_bootstrap",
75        }
76    }
77
78    fn from_str(s: &str) -> Option<Self> {
79        match s {
80            "replica_bootstrap" => Some(Self::ReplicaBootstrap),
81            _ => None,
82        }
83    }
84}
85
86impl fmt::Display for IntentOp {
87    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
88        f.write_str(self.as_str())
89    }
90}
91
92// ---------------------------------------------------------------------------
93// IntentPhase
94// ---------------------------------------------------------------------------
95
96#[derive(Debug, Clone, PartialEq, Eq)]
97pub enum IntentPhase {
98    Running,
99    Checkpoint(u32),
100    Completed,
101    Aborted,
102}
103
104impl IntentPhase {
105    fn as_str(&self) -> String {
106        match self {
107            Self::Running => "running".to_string(),
108            Self::Checkpoint(n) => format!("checkpoint_{n}"),
109            Self::Completed => "completed".to_string(),
110            Self::Aborted => "aborted".to_string(),
111        }
112    }
113
114    fn is_terminal(&self) -> bool {
115        matches!(self, Self::Completed | Self::Aborted)
116    }
117
118    fn from_str(s: &str) -> Option<Self> {
119        match s {
120            "running" => Some(Self::Running),
121            "completed" => Some(Self::Completed),
122            "aborted" => Some(Self::Aborted),
123            _ if s.starts_with("checkpoint_") => s["checkpoint_".len()..]
124                .parse::<u32>()
125                .ok()
126                .map(Self::Checkpoint),
127            _ => None,
128        }
129    }
130}
131
132impl fmt::Display for IntentPhase {
133    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
134        write!(f, "{}", self.as_str())
135    }
136}
137
138// ---------------------------------------------------------------------------
139// Redaction helpers
140// ---------------------------------------------------------------------------
141
142const SENSITIVE_SUBSTRINGS: &[&str] = &["password", "secret", "token", "key", "credential", "auth"];
143
144fn is_sensitive_key(k: &str) -> bool {
145    let lower = k.to_ascii_lowercase();
146    SENSITIVE_SUBSTRINGS.iter().any(|s| lower.contains(s))
147}
148
149fn redact_map(map: &Map<String, JsonValue>) -> JsonValue {
150    let mut out = Map::new();
151    for (k, v) in map {
152        let v = if is_sensitive_key(k) {
153            JsonValue::String("***REDACTED***".to_string())
154        } else {
155            v.clone()
156        };
157        out.insert(k.clone(), v);
158    }
159    JsonValue::Object(out)
160}
161
162// ---------------------------------------------------------------------------
163// IntentArgs / IntentProgress / IntentSummary
164// ---------------------------------------------------------------------------
165
166/// Caller-supplied arguments for an intent. Sensitive keys are redacted
167/// before writing to the log.
168#[derive(Debug, Default, Clone)]
169pub struct IntentArgs(Map<String, JsonValue>);
170
171impl IntentArgs {
172    pub fn new() -> Self {
173        Self(Map::new())
174    }
175
176    pub fn insert(mut self, key: impl Into<String>, value: JsonValue) -> Self {
177        self.0.insert(key.into(), value);
178        self
179    }
180
181    fn to_json_value(&self) -> JsonValue {
182        redact_map(&self.0)
183    }
184}
185
186/// Progress snapshot attached to a checkpoint record.
187#[derive(Debug, Default, Clone)]
188pub struct IntentProgress(Map<String, JsonValue>);
189
190impl IntentProgress {
191    pub fn new() -> Self {
192        Self(Map::new())
193    }
194
195    pub fn insert(mut self, key: impl Into<String>, value: JsonValue) -> Self {
196        self.0.insert(key.into(), value);
197        self
198    }
199
200    fn to_json_value(&self) -> JsonValue {
201        redact_map(&self.0)
202    }
203}
204
205/// Summary attached to a completed intent record.
206#[derive(Debug, Default, Clone)]
207pub struct IntentSummary(Map<String, JsonValue>);
208
209impl IntentSummary {
210    pub fn new() -> Self {
211        Self(Map::new())
212    }
213
214    pub fn insert(mut self, key: impl Into<String>, value: JsonValue) -> Self {
215        self.0.insert(key.into(), value);
216        self
217    }
218
219    fn to_json_value(&self) -> JsonValue {
220        redact_map(&self.0)
221    }
222}
223
224// ---------------------------------------------------------------------------
225// Record helpers
226// ---------------------------------------------------------------------------
227
228fn build_record(
229    id: Uuid,
230    op: IntentOp,
231    phase: &IntentPhase,
232    ts: u64,
233    actor: &str,
234    args: &JsonValue,
235    progress: Option<JsonValue>,
236    summary: Option<JsonValue>,
237) -> Map<String, JsonValue> {
238    let mut m = Map::new();
239    m.insert("id".to_string(), JsonValue::String(id.to_string()));
240    m.insert("op".to_string(), JsonValue::String(op.as_str().to_string()));
241    m.insert("phase".to_string(), JsonValue::String(phase.as_str()));
242    m.insert("ts".to_string(), JsonValue::Number(ts as f64));
243    m.insert("actor".to_string(), JsonValue::String(actor.to_string()));
244    m.insert("args".to_string(), args.clone());
245    if let Some(p) = progress {
246        m.insert("progress".to_string(), p);
247    }
248    if let Some(s) = summary {
249        m.insert("summary".to_string(), s);
250    }
251    m
252}
253
254fn serialize_record(record: &Map<String, JsonValue>) -> Result<String, IntentLogError> {
255    let line = JsonValue::Object(record.clone()).to_string_compact();
256    let bytes = line.len();
257    if bytes > MAX_RECORD_BYTES {
258        return Err(IntentLogError::TooLarge { bytes });
259    }
260    Ok(line)
261}
262
263// ---------------------------------------------------------------------------
264// AdminIntentLog
265// ---------------------------------------------------------------------------
266
267pub struct AdminIntentLog {
268    path: PathBuf,
269    file: Mutex<File>,
270}
271
272impl AdminIntentLog {
273    /// Open (or create) the intent log at `path`.
274    pub fn open(path: impl AsRef<Path>) -> Result<Self, IntentLogError> {
275        let path = path.as_ref().to_path_buf();
276        let file = OpenOptions::new().create(true).append(true).open(&path)?;
277        Ok(Self {
278            path,
279            file: Mutex::new(file),
280        })
281    }
282
283    /// Begin a new intent. Writes the opening record and fsyncs.
284    pub fn begin(
285        &self,
286        op: IntentOp,
287        actor: &str,
288        args: IntentArgs,
289    ) -> Result<IntentHandle<'_>, IntentLogError> {
290        let id = Uuid::new_v7();
291        let ts = now_unix_millis();
292        let args_json = args.to_json_value();
293        let record = build_record(
294            id,
295            op,
296            &IntentPhase::Running,
297            ts,
298            actor,
299            &args_json,
300            None,
301            None,
302        );
303        let line = serialize_record(&record)?;
304
305        {
306            let mut bytes = line.into_bytes();
307            bytes.push(b'\n');
308            let mut file = self.file.lock().unwrap();
309            file.write_all(&bytes)?;
310            file.flush()?;
311            // fsync on begin only — see module doc
312            file.sync_data().map_err(IntentLogError::SyncFailed)?;
313        }
314
315        Ok(IntentHandle {
316            log: self,
317            id,
318            op,
319            actor: actor.to_string(),
320            args_json,
321            started_at_ms: ts,
322            last_phase: IntentPhase::Running,
323            done: false,
324        })
325    }
326
327    /// Return metadata about every intent that has not yet reached a
328    /// terminal phase (completed or aborted).
329    pub fn list_unfinished(&self) -> Vec<UnfinishedIntent> {
330        self.scan_intents_internal()
331    }
332
333    /// Scan the log and emit [`crate::telemetry::operator_event::OperatorEvent::DanglingAdminIntent`]
334    /// for every unfinished intent. Corrupted lines are skipped with a
335    /// `tracing::warn!` breadcrumb — they do not abort the scan.
336    pub fn scan_and_report(&self) {
337        for item in self.scan_intents_internal() {
338            crate::telemetry::operator_event::OperatorEvent::DanglingAdminIntent {
339                id: item.id,
340                op: item.op,
341                started_at_ms: item.started_at_ms,
342                last_phase: item.last_phase,
343            }
344            .emit_global();
345        }
346    }
347
348    fn write_record(
349        &self,
350        id: Uuid,
351        op: IntentOp,
352        phase: &IntentPhase,
353        actor: &str,
354        args_json: &JsonValue,
355        progress: Option<&IntentProgress>,
356        summary: Option<&IntentSummary>,
357    ) -> Result<(), IntentLogError> {
358        let ts = now_unix_millis();
359        let record = build_record(
360            id,
361            op,
362            phase,
363            ts,
364            actor,
365            args_json,
366            progress.map(|p| p.to_json_value()),
367            summary.map(|s| s.to_json_value()),
368        );
369        let line = serialize_record(&record)?;
370        let mut bytes = line.into_bytes();
371        bytes.push(b'\n');
372        let mut file = self.file.lock().unwrap();
373        file.write_all(&bytes)?;
374        file.flush()?;
375        Ok(())
376    }
377
378    fn scan_intents_internal(&self) -> Vec<UnfinishedIntent> {
379        let content = match std::fs::read_to_string(&self.path) {
380            Ok(c) => c,
381            Err(_) => return Vec::new(),
382        };
383
384        struct ScanEntry {
385            op: IntentOp,
386            started_at_ms: u64,
387            actor: String,
388            phase: IntentPhase,
389            args: Map<String, JsonValue>,
390            last_progress: Option<Map<String, JsonValue>>,
391        }
392
393        let mut intents: HashMap<String, ScanEntry> = HashMap::new();
394
395        for raw_line in content.lines() {
396            let line = raw_line.trim();
397            if line.is_empty() {
398                continue;
399            }
400
401            let v: JsonValue = match crate::json::from_str(line) {
402                Ok(v) => v,
403                Err(_) => {
404                    tracing::warn!(
405                        target: "reddb::admin_intent_log",
406                        "corrupted intent log line skipped"
407                    );
408                    continue;
409                }
410            };
411
412            let Some(id) = v.get("id").and_then(|x| x.as_str()).map(|s| s.to_string()) else {
413                continue;
414            };
415            let Some(op_str) = v.get("op").and_then(|x| x.as_str()) else {
416                continue;
417            };
418            let Some(op) = IntentOp::from_str(op_str) else {
419                continue;
420            };
421            let Some(phase_str) = v.get("phase").and_then(|x| x.as_str()) else {
422                continue;
423            };
424            let Some(phase) = IntentPhase::from_str(phase_str) else {
425                continue;
426            };
427            let ts = v.get("ts").and_then(|x| x.as_f64()).unwrap_or(0.0) as u64;
428            let actor = v
429                .get("actor")
430                .and_then(|x| x.as_str())
431                .unwrap_or("")
432                .to_string();
433
434            let args_map = v
435                .get("args")
436                .and_then(|x| x.as_object())
437                .cloned()
438                .unwrap_or_default();
439            let progress_map = v.get("progress").and_then(|x| x.as_object()).cloned();
440
441            intents
442                .entry(id)
443                .and_modify(|e| {
444                    // Keep earliest ts and args (from running record); update phase and progress.
445                    e.phase = phase.clone();
446                    if let Some(p) = progress_map.clone() {
447                        e.last_progress = Some(p);
448                    }
449                })
450                .or_insert(ScanEntry {
451                    op,
452                    started_at_ms: ts,
453                    actor,
454                    phase,
455                    args: args_map,
456                    last_progress: progress_map,
457                });
458        }
459
460        intents
461            .into_iter()
462            .filter(|(_, e)| !e.phase.is_terminal())
463            .map(|(id_str, e)| {
464                let id = Uuid::parse_str(&id_str).unwrap_or_else(|_| Uuid::new_v4());
465                UnfinishedIntent {
466                    id,
467                    op: e.op,
468                    started_at_ms: e.started_at_ms,
469                    actor: e.actor,
470                    last_phase: e.phase,
471                    args: e.args,
472                    last_progress: e.last_progress,
473                }
474            })
475            .collect()
476    }
477
478    /// Recreate a linear handle for an unfinished intent after process restart.
479    ///
480    /// This deliberately does not write a new `running` record: future
481    /// checkpoints and completion records are appended under the original id,
482    /// so a resumed operation does not leave a permanent dangling intent.
483    pub fn resume_unfinished<'a>(&'a self, item: &UnfinishedIntent) -> IntentHandle<'a> {
484        IntentHandle {
485            log: self,
486            id: item.id,
487            op: item.op,
488            actor: item.actor.clone(),
489            args_json: JsonValue::Object(item.args.clone()),
490            started_at_ms: item.started_at_ms,
491            last_phase: item.last_phase.clone(),
492            done: false,
493        }
494    }
495}
496
497// ---------------------------------------------------------------------------
498// UnfinishedIntent (returned by list_unfinished / used by scan_and_report)
499// ---------------------------------------------------------------------------
500
501pub struct UnfinishedIntent {
502    pub id: Uuid,
503    pub op: IntentOp,
504    pub started_at_ms: u64,
505    pub actor: String,
506    pub last_phase: IntentPhase,
507    /// Args from the opening `running` record. Used by consumers to filter by
508    /// owner fields (e.g., `replica_id`) and implement single-resumer policy.
509    pub args: Map<String, JsonValue>,
510    /// Progress map from the most recent checkpoint record, if any. `None`
511    /// means the intent started but never reached a checkpoint.
512    pub last_progress: Option<Map<String, JsonValue>>,
513}
514
515// ---------------------------------------------------------------------------
516// IntentHandle — linear type; Drop writes aborted if complete() not called
517// ---------------------------------------------------------------------------
518
519pub struct IntentHandle<'a> {
520    log: &'a AdminIntentLog,
521    id: Uuid,
522    op: IntentOp,
523    actor: String,
524    args_json: JsonValue,
525    pub started_at_ms: u64,
526    last_phase: IntentPhase,
527    done: bool,
528}
529
530impl<'a> IntentHandle<'a> {
531    pub fn id(&self) -> Uuid {
532        self.id
533    }
534
535    pub fn last_phase(&self) -> &IntentPhase {
536        &self.last_phase
537    }
538
539    /// Write a checkpoint record. `n` should be monotonically increasing.
540    pub fn checkpoint(
541        &mut self,
542        n: u32,
543        progress: Option<IntentProgress>,
544    ) -> Result<(), IntentLogError> {
545        let phase = IntentPhase::Checkpoint(n);
546        self.log.write_record(
547            self.id,
548            self.op,
549            &phase,
550            &self.actor,
551            &self.args_json,
552            progress.as_ref(),
553            None,
554        )?;
555        self.last_phase = phase;
556        Ok(())
557    }
558
559    /// Mark the intent complete. Consumes the handle; Drop will not write aborted.
560    pub fn complete(mut self, summary: Option<IntentSummary>) -> Result<(), IntentLogError> {
561        let result = self.log.write_record(
562            self.id,
563            self.op,
564            &IntentPhase::Completed,
565            &self.actor,
566            &self.args_json,
567            None,
568            summary.as_ref(),
569        );
570        if result.is_ok() {
571            self.done = true;
572        }
573        result
574    }
575}
576
577impl Drop for IntentHandle<'_> {
578    fn drop(&mut self) {
579        if !self.done {
580            let _ = self.log.write_record(
581                self.id,
582                self.op,
583                &IntentPhase::Aborted,
584                &self.actor,
585                &self.args_json,
586                None,
587                None,
588            );
589        }
590    }
591}
592
593// ---------------------------------------------------------------------------
594// Tests
595// ---------------------------------------------------------------------------
596
597#[cfg(test)]
598mod tests {
599    use super::*;
600    use crate::json::Value as JsonValue;
601
602    fn tmp_path(label: &str) -> PathBuf {
603        let mut p = std::env::temp_dir();
604        p.push(format!(
605            "reddb-intent-{}-{}-{}.log",
606            label,
607            std::process::id(),
608            crate::utils::now_unix_nanos()
609        ));
610        p
611    }
612
613    fn last_line_json(path: &Path) -> JsonValue {
614        let body = std::fs::read_to_string(path).unwrap();
615        let line = body.lines().last().expect("at least one line");
616        crate::json::from_str(line).expect("valid JSON")
617    }
618
619    fn all_lines_json(path: &Path) -> Vec<JsonValue> {
620        let body = std::fs::read_to_string(path).unwrap();
621        body.lines()
622            .filter(|l| !l.trim().is_empty())
623            .map(|l| crate::json::from_str(l).expect("valid JSON"))
624            .collect()
625    }
626
627    // -----------------------------------------------------------------------
628    // 1. begin writes a running record and fsyncs
629    // -----------------------------------------------------------------------
630    #[test]
631    fn begin_writes_running_record() {
632        let path = tmp_path("begin");
633        let log = AdminIntentLog::open(&path).unwrap();
634        let handle = log
635            .begin(IntentOp::ReplicaBootstrap, "ops-bot", IntentArgs::new())
636            .unwrap();
637        drop(handle); // also writes aborted; just check first line
638
639        let body = std::fs::read_to_string(&path).unwrap();
640        let first_line = body.lines().next().unwrap();
641        let v: JsonValue = crate::json::from_str(first_line).unwrap();
642        assert_eq!(v.get("phase").and_then(|x| x.as_str()), Some("running"));
643        assert_eq!(
644            v.get("op").and_then(|x| x.as_str()),
645            Some("replica_bootstrap")
646        );
647        assert_eq!(v.get("actor").and_then(|x| x.as_str()), Some("ops-bot"));
648    }
649
650    // -----------------------------------------------------------------------
651    // 2. complete writes completed, Drop writes nothing extra
652    // -----------------------------------------------------------------------
653    #[test]
654    fn complete_writes_completed_phase() {
655        let path = tmp_path("complete");
656        let log = AdminIntentLog::open(&path).unwrap();
657        let handle = log
658            .begin(IntentOp::ReplicaBootstrap, "admin", IntentArgs::new())
659            .unwrap();
660        handle.complete(None).unwrap();
661
662        let lines = all_lines_json(&path);
663        assert_eq!(lines.len(), 2, "begin + complete = 2 lines");
664        assert_eq!(
665            lines[1].get("phase").and_then(|x| x.as_str()),
666            Some("completed")
667        );
668    }
669
670    // -----------------------------------------------------------------------
671    // 3. Drop without complete writes aborted
672    // -----------------------------------------------------------------------
673    #[test]
674    fn drop_without_complete_writes_aborted() {
675        let path = tmp_path("drop-abort");
676        let log = AdminIntentLog::open(&path).unwrap();
677        {
678            let _handle = log
679                .begin(IntentOp::ReplicaBootstrap, "admin", IntentArgs::new())
680                .unwrap();
681            // drop here without calling complete
682        }
683
684        let last = last_line_json(&path);
685        assert_eq!(last.get("phase").and_then(|x| x.as_str()), Some("aborted"));
686    }
687
688    // -----------------------------------------------------------------------
689    // 4. checkpoint writes intermediate records
690    // -----------------------------------------------------------------------
691    #[test]
692    fn checkpoint_writes_intermediate_records() {
693        let path = tmp_path("checkpoint");
694        let log = AdminIntentLog::open(&path).unwrap();
695        let mut handle = log
696            .begin(IntentOp::ReplicaBootstrap, "admin", IntentArgs::new())
697            .unwrap();
698        handle.checkpoint(1, None).unwrap();
699        handle.checkpoint(2, None).unwrap();
700        handle.complete(None).unwrap();
701
702        let lines = all_lines_json(&path);
703        assert_eq!(lines.len(), 4); // begin + 2 checkpoints + complete
704        assert_eq!(
705            lines[1].get("phase").and_then(|x| x.as_str()),
706            Some("checkpoint_1")
707        );
708        assert_eq!(
709            lines[2].get("phase").and_then(|x| x.as_str()),
710            Some("checkpoint_2")
711        );
712    }
713
714    // -----------------------------------------------------------------------
715    // 5. scan_and_report emits N DanglingAdminIntent events for N unfinished
716    // -----------------------------------------------------------------------
717    #[test]
718    fn scan_and_report_finds_unfinished_intents() {
719        let path = tmp_path("scan");
720        let log = AdminIntentLog::open(&path).unwrap();
721
722        // Use mem::forget to simulate a crash — prevents Drop from writing aborted,
723        // leaving 2 intents with only a "running" record (no terminal phase).
724        let h1 = log
725            .begin(IntentOp::ReplicaBootstrap, "a", IntentArgs::new())
726            .unwrap();
727        let h2 = log
728            .begin(IntentOp::ReplicaBootstrap, "b", IntentArgs::new())
729            .unwrap();
730        std::mem::forget(h1);
731        std::mem::forget(h2);
732
733        // 1 completed normally
734        let h3 = log
735            .begin(IntentOp::ReplicaBootstrap, "c", IntentArgs::new())
736            .unwrap();
737        h3.complete(None).unwrap();
738
739        let log2 = AdminIntentLog::open(&path).unwrap();
740        let unfinished = log2.list_unfinished();
741        assert_eq!(unfinished.len(), 2, "expected exactly 2 dangling intents");
742    }
743
744    // -----------------------------------------------------------------------
745    // 6. Record > 3 KiB returns TooLarge, no write
746    // -----------------------------------------------------------------------
747    #[test]
748    fn record_too_large_returns_error_no_write() {
749        let path = tmp_path("toolarge");
750        let log = AdminIntentLog::open(&path).unwrap();
751
752        // Build args that blow past 3KB
753        let big_value = "x".repeat(4096);
754        let args = IntentArgs::new().insert("data", JsonValue::String(big_value));
755        let err = log.begin(IntentOp::ReplicaBootstrap, "admin", args);
756        assert!(
757            matches!(err, Err(IntentLogError::TooLarge { .. })),
758            "expected TooLarge, got {:?}",
759            err.err().map(|e| e.to_string())
760        );
761
762        // No lines written
763        let content = std::fs::read_to_string(&path).unwrap_or_default();
764        assert!(
765            content.lines().all(|l| l.trim().is_empty()),
766            "no lines should have been written"
767        );
768    }
769
770    // -----------------------------------------------------------------------
771    // 7. Corrupted JSON line does not crash scan; emits tracing::warn
772    // -----------------------------------------------------------------------
773    #[test]
774    fn corrupted_line_skipped_in_scan() {
775        let path = tmp_path("corrupt");
776        let log = AdminIntentLog::open(&path).unwrap();
777        let h = log
778            .begin(IntentOp::ReplicaBootstrap, "admin", IntentArgs::new())
779            .unwrap();
780        drop(h); // aborted
781
782        // Inject a corrupted line between existing lines
783        let mut content = std::fs::read_to_string(&path).unwrap();
784        content.push_str("not-valid-json\n");
785        std::fs::write(&path, &content).unwrap();
786
787        // Reopen and scan — should not panic
788        let log2 = AdminIntentLog::open(&path).unwrap();
789        let unfinished = log2.list_unfinished(); // aborted is terminal, so 0
790                                                 // The important assertion: we got here without panic
791        assert_eq!(unfinished.len(), 0);
792    }
793
794    // -----------------------------------------------------------------------
795    // 8. Sensitive keys are redacted in args
796    // -----------------------------------------------------------------------
797    #[test]
798    fn sensitive_keys_are_redacted() {
799        let path = tmp_path("redact");
800        let log = AdminIntentLog::open(&path).unwrap();
801        let args = IntentArgs::new()
802            .insert("password", JsonValue::String("hunter2".to_string()))
803            .insert("host", JsonValue::String("db.internal".to_string()));
804        let h = log
805            .begin(IntentOp::ReplicaBootstrap, "admin", args)
806            .unwrap();
807        h.complete(None).unwrap();
808
809        let body = std::fs::read_to_string(&path).unwrap();
810        let first_line = body.lines().next().unwrap();
811        let v: JsonValue = crate::json::from_str(first_line).unwrap();
812        let args_obj = v.get("args").unwrap();
813        let pwd = args_obj.get("password").and_then(|x| x.as_str());
814        assert_eq!(pwd, Some("***REDACTED***"), "password should be redacted");
815        let host = args_obj.get("host").and_then(|x| x.as_str());
816        assert_eq!(host, Some("db.internal"), "host should not be redacted");
817    }
818
819    // -----------------------------------------------------------------------
820    // 9. Multi-process POSIX atomicity test
821    //    Spawns 2 child processes + parent all writing concurrently.
822    //    Every line must parse as valid JSON — no record interleaving.
823    // -----------------------------------------------------------------------
824    #[test]
825    fn multi_process_posix_atomicity() {
826        const LOG_PATH_ENV: &str = "INTENT_LOG_CHILD_PATH";
827        const CHILD_OPS: u32 = 20;
828
829        // --- child mode ---
830        if let Ok(path) = std::env::var(LOG_PATH_ENV) {
831            let log = AdminIntentLog::open(&path).unwrap();
832            for _ in 0..CHILD_OPS {
833                let h = log
834                    .begin(IntentOp::ReplicaBootstrap, "child", IntentArgs::new())
835                    .unwrap();
836                h.complete(None).unwrap();
837            }
838            return;
839        }
840
841        // --- parent mode ---
842        let dir = std::env::current_dir()
843            .unwrap()
844            .join(".red/tmp/admin-intent-log-tests");
845        std::fs::create_dir_all(&dir).unwrap();
846        let path = dir.join(format!(
847            "reddb-intent-mp-{}-{}.log",
848            std::process::id(),
849            crate::utils::now_unix_nanos()
850        ));
851
852        // Create the file before spawning children
853        let log = AdminIntentLog::open(&path).unwrap();
854
855        let exe = std::env::current_exe().unwrap();
856        let spawn_child = || {
857            std::process::Command::new(&exe)
858                .arg("multi_process_posix_atomicity")
859                .env(LOG_PATH_ENV, &path)
860                .stdout(std::process::Stdio::null())
861                .stderr(std::process::Stdio::null())
862                .spawn()
863                .expect("spawn child process")
864        };
865
866        let mut child1 = spawn_child();
867        let mut child2 = spawn_child();
868
869        // Parent writes concurrently
870        for _ in 0..CHILD_OPS {
871            let h = log
872                .begin(IntentOp::ReplicaBootstrap, "parent", IntentArgs::new())
873                .unwrap();
874            h.complete(None).unwrap();
875        }
876
877        let s1 = wait_child_with_deadline(&mut child1, "child1");
878        let s2 = wait_child_with_deadline(&mut child2, "child2");
879        assert!(s1.success(), "child1 exited with failure");
880        assert!(s2.success(), "child2 exited with failure");
881
882        // Verify: every non-empty line parses as valid JSON (no interleaving)
883        let content = std::fs::read_to_string(&path).unwrap();
884        let mut line_count = 0_usize;
885        for (i, line) in content.lines().enumerate() {
886            let line = line.trim();
887            if line.is_empty() {
888                continue;
889            }
890            crate::json::from_str::<JsonValue>(line)
891                .unwrap_or_else(|e| panic!("line {i} is not valid JSON: {e}\n{line:?}"));
892            line_count += 1;
893        }
894        // 3 writers × 20 ops × 2 records (begin + complete) = 120 minimum
895        assert!(line_count >= 120, "expected ≥120 lines, got {line_count}");
896
897        let _ = std::fs::remove_file(&path);
898    }
899
900    fn wait_child_with_deadline(
901        child: &mut std::process::Child,
902        name: &str,
903    ) -> std::process::ExitStatus {
904        let deadline = std::time::Instant::now() + std::time::Duration::from_secs(10);
905        loop {
906            match child
907                .try_wait()
908                .unwrap_or_else(|err| panic!("{name} wait failed: {err}"))
909            {
910                Some(status) => return status,
911                None if std::time::Instant::now() >= deadline => {
912                    let _ = child.kill();
913                    let _ = child.wait();
914                    panic!("{name} did not exit before deadline");
915                }
916                None => std::thread::yield_now(),
917            }
918        }
919    }
920}