1use std::collections::HashMap;
18use std::fmt;
19use std::fs::{File, OpenOptions};
20use std::io::Write;
21use std::path::{Path, PathBuf};
22use std::sync::Mutex;
23
24use crate::crypto::uuid::Uuid;
25use crate::json::{Map, Value as JsonValue};
26use crate::utils::time::now_unix_millis;
27
28const MAX_RECORD_BYTES: usize = 3 * 1024;
29
30#[derive(Debug)]
35pub enum IntentLogError {
36 Io(std::io::Error),
37 TooLarge { bytes: usize },
38 SyncFailed(std::io::Error),
39}
40
41impl fmt::Display for IntentLogError {
42 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
43 match self {
44 Self::Io(e) => write!(f, "intent log I/O: {e}"),
45 Self::TooLarge { bytes } => {
46 write!(
47 f,
48 "intent record too large: {bytes} bytes (max {MAX_RECORD_BYTES})"
49 )
50 }
51 Self::SyncFailed(e) => write!(f, "intent log fsync failed: {e}"),
52 }
53 }
54}
55
56impl From<std::io::Error> for IntentLogError {
57 fn from(e: std::io::Error) -> Self {
58 Self::Io(e)
59 }
60}
61
62#[derive(Debug, Clone, Copy, PartialEq, Eq)]
67pub enum IntentOp {
68 ReplicaBootstrap,
69}
70
71impl IntentOp {
72 fn as_str(self) -> &'static str {
73 match self {
74 Self::ReplicaBootstrap => "replica_bootstrap",
75 }
76 }
77
78 fn from_str(s: &str) -> Option<Self> {
79 match s {
80 "replica_bootstrap" => Some(Self::ReplicaBootstrap),
81 _ => None,
82 }
83 }
84}
85
86impl fmt::Display for IntentOp {
87 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
88 f.write_str(self.as_str())
89 }
90}
91
92#[derive(Debug, Clone, PartialEq, Eq)]
97pub enum IntentPhase {
98 Running,
99 Checkpoint(u32),
100 Completed,
101 Aborted,
102}
103
104impl IntentPhase {
105 fn as_str(&self) -> String {
106 match self {
107 Self::Running => "running".to_string(),
108 Self::Checkpoint(n) => format!("checkpoint_{n}"),
109 Self::Completed => "completed".to_string(),
110 Self::Aborted => "aborted".to_string(),
111 }
112 }
113
114 fn is_terminal(&self) -> bool {
115 matches!(self, Self::Completed | Self::Aborted)
116 }
117
118 fn from_str(s: &str) -> Option<Self> {
119 match s {
120 "running" => Some(Self::Running),
121 "completed" => Some(Self::Completed),
122 "aborted" => Some(Self::Aborted),
123 _ if s.starts_with("checkpoint_") => s["checkpoint_".len()..]
124 .parse::<u32>()
125 .ok()
126 .map(Self::Checkpoint),
127 _ => None,
128 }
129 }
130}
131
132impl fmt::Display for IntentPhase {
133 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
134 write!(f, "{}", self.as_str())
135 }
136}
137
138const SENSITIVE_SUBSTRINGS: &[&str] = &["password", "secret", "token", "key", "credential", "auth"];
143
144fn is_sensitive_key(k: &str) -> bool {
145 let lower = k.to_ascii_lowercase();
146 SENSITIVE_SUBSTRINGS.iter().any(|s| lower.contains(s))
147}
148
149fn redact_map(map: &Map<String, JsonValue>) -> JsonValue {
150 let mut out = Map::new();
151 for (k, v) in map {
152 let v = if is_sensitive_key(k) {
153 JsonValue::String("***REDACTED***".to_string())
154 } else {
155 v.clone()
156 };
157 out.insert(k.clone(), v);
158 }
159 JsonValue::Object(out)
160}
161
162#[derive(Debug, Default, Clone)]
169pub struct IntentArgs(Map<String, JsonValue>);
170
171impl IntentArgs {
172 pub fn new() -> Self {
173 Self(Map::new())
174 }
175
176 pub fn insert(mut self, key: impl Into<String>, value: JsonValue) -> Self {
177 self.0.insert(key.into(), value);
178 self
179 }
180
181 fn to_json_value(&self) -> JsonValue {
182 redact_map(&self.0)
183 }
184}
185
186#[derive(Debug, Default, Clone)]
188pub struct IntentProgress(Map<String, JsonValue>);
189
190impl IntentProgress {
191 pub fn new() -> Self {
192 Self(Map::new())
193 }
194
195 pub fn insert(mut self, key: impl Into<String>, value: JsonValue) -> Self {
196 self.0.insert(key.into(), value);
197 self
198 }
199
200 fn to_json_value(&self) -> JsonValue {
201 redact_map(&self.0)
202 }
203}
204
205#[derive(Debug, Default, Clone)]
207pub struct IntentSummary(Map<String, JsonValue>);
208
209impl IntentSummary {
210 pub fn new() -> Self {
211 Self(Map::new())
212 }
213
214 pub fn insert(mut self, key: impl Into<String>, value: JsonValue) -> Self {
215 self.0.insert(key.into(), value);
216 self
217 }
218
219 fn to_json_value(&self) -> JsonValue {
220 redact_map(&self.0)
221 }
222}
223
224fn build_record(
229 id: Uuid,
230 op: IntentOp,
231 phase: &IntentPhase,
232 ts: u64,
233 actor: &str,
234 args: &JsonValue,
235 progress: Option<JsonValue>,
236 summary: Option<JsonValue>,
237) -> Map<String, JsonValue> {
238 let mut m = Map::new();
239 m.insert("id".to_string(), JsonValue::String(id.to_string()));
240 m.insert("op".to_string(), JsonValue::String(op.as_str().to_string()));
241 m.insert("phase".to_string(), JsonValue::String(phase.as_str()));
242 m.insert("ts".to_string(), JsonValue::Number(ts as f64));
243 m.insert("actor".to_string(), JsonValue::String(actor.to_string()));
244 m.insert("args".to_string(), args.clone());
245 if let Some(p) = progress {
246 m.insert("progress".to_string(), p);
247 }
248 if let Some(s) = summary {
249 m.insert("summary".to_string(), s);
250 }
251 m
252}
253
254fn serialize_record(record: &Map<String, JsonValue>) -> Result<String, IntentLogError> {
255 let line = JsonValue::Object(record.clone()).to_string_compact();
256 let bytes = line.len();
257 if bytes > MAX_RECORD_BYTES {
258 return Err(IntentLogError::TooLarge { bytes });
259 }
260 Ok(line)
261}
262
263pub struct AdminIntentLog {
268 path: PathBuf,
269 file: Mutex<File>,
270}
271
272impl AdminIntentLog {
273 pub fn open(path: impl AsRef<Path>) -> Result<Self, IntentLogError> {
275 let path = path.as_ref().to_path_buf();
276 let file = OpenOptions::new().create(true).append(true).open(&path)?;
277 Ok(Self {
278 path,
279 file: Mutex::new(file),
280 })
281 }
282
283 pub fn begin(
285 &self,
286 op: IntentOp,
287 actor: &str,
288 args: IntentArgs,
289 ) -> Result<IntentHandle<'_>, IntentLogError> {
290 let id = Uuid::new_v7();
291 let ts = now_unix_millis();
292 let args_json = args.to_json_value();
293 let record = build_record(
294 id,
295 op,
296 &IntentPhase::Running,
297 ts,
298 actor,
299 &args_json,
300 None,
301 None,
302 );
303 let line = serialize_record(&record)?;
304
305 {
306 let mut file = self.file.lock().unwrap();
307 file.write_all(line.as_bytes())?;
308 file.write_all(b"\n")?;
309 file.flush()?;
310 file.sync_data().map_err(IntentLogError::SyncFailed)?;
312 }
313
314 Ok(IntentHandle {
315 log: self,
316 id,
317 op,
318 actor: actor.to_string(),
319 args_json,
320 started_at_ms: ts,
321 last_phase: IntentPhase::Running,
322 done: false,
323 })
324 }
325
326 pub fn list_unfinished(&self) -> Vec<UnfinishedIntent> {
329 self.scan_intents_internal()
330 }
331
332 pub fn scan_and_report(&self) {
336 for item in self.scan_intents_internal() {
337 crate::telemetry::operator_event::OperatorEvent::DanglingAdminIntent {
338 id: item.id,
339 op: item.op,
340 started_at_ms: item.started_at_ms,
341 last_phase: item.last_phase,
342 }
343 .emit_global();
344 }
345 }
346
347 fn write_record(
348 &self,
349 id: Uuid,
350 op: IntentOp,
351 phase: &IntentPhase,
352 actor: &str,
353 args_json: &JsonValue,
354 progress: Option<&IntentProgress>,
355 summary: Option<&IntentSummary>,
356 ) -> Result<(), IntentLogError> {
357 let ts = now_unix_millis();
358 let record = build_record(
359 id,
360 op,
361 phase,
362 ts,
363 actor,
364 args_json,
365 progress.map(|p| p.to_json_value()),
366 summary.map(|s| s.to_json_value()),
367 );
368 let line = serialize_record(&record)?;
369 let mut file = self.file.lock().unwrap();
370 file.write_all(line.as_bytes())?;
371 file.write_all(b"\n")?;
372 file.flush()?;
373 Ok(())
374 }
375
376 fn scan_intents_internal(&self) -> Vec<UnfinishedIntent> {
377 let content = match std::fs::read_to_string(&self.path) {
378 Ok(c) => c,
379 Err(_) => return Vec::new(),
380 };
381
382 struct ScanEntry {
383 op: IntentOp,
384 started_at_ms: u64,
385 phase: IntentPhase,
386 args: Map<String, JsonValue>,
387 last_progress: Option<Map<String, JsonValue>>,
388 }
389
390 let mut intents: HashMap<String, ScanEntry> = HashMap::new();
391
392 for raw_line in content.lines() {
393 let line = raw_line.trim();
394 if line.is_empty() {
395 continue;
396 }
397
398 let v: JsonValue = match crate::json::from_str(line) {
399 Ok(v) => v,
400 Err(_) => {
401 tracing::warn!(
402 target: "reddb::admin_intent_log",
403 "corrupted intent log line skipped"
404 );
405 continue;
406 }
407 };
408
409 let Some(id) = v.get("id").and_then(|x| x.as_str()).map(|s| s.to_string()) else {
410 continue;
411 };
412 let Some(op_str) = v.get("op").and_then(|x| x.as_str()) else {
413 continue;
414 };
415 let Some(op) = IntentOp::from_str(op_str) else {
416 continue;
417 };
418 let Some(phase_str) = v.get("phase").and_then(|x| x.as_str()) else {
419 continue;
420 };
421 let Some(phase) = IntentPhase::from_str(phase_str) else {
422 continue;
423 };
424 let ts = v.get("ts").and_then(|x| x.as_f64()).unwrap_or(0.0) as u64;
425
426 let args_map = v
427 .get("args")
428 .and_then(|x| x.as_object())
429 .cloned()
430 .unwrap_or_default();
431 let progress_map = v.get("progress").and_then(|x| x.as_object()).cloned();
432
433 intents
434 .entry(id)
435 .and_modify(|e| {
436 e.phase = phase.clone();
438 if let Some(p) = progress_map.clone() {
439 e.last_progress = Some(p);
440 }
441 })
442 .or_insert(ScanEntry {
443 op,
444 started_at_ms: ts,
445 phase,
446 args: args_map,
447 last_progress: progress_map,
448 });
449 }
450
451 intents
452 .into_iter()
453 .filter(|(_, e)| !e.phase.is_terminal())
454 .map(|(id_str, e)| {
455 let id = Uuid::parse_str(&id_str).unwrap_or_else(|_| Uuid::new_v4());
456 UnfinishedIntent {
457 id,
458 op: e.op,
459 started_at_ms: e.started_at_ms,
460 last_phase: e.phase,
461 args: e.args,
462 last_progress: e.last_progress,
463 }
464 })
465 .collect()
466 }
467}
468
469pub struct UnfinishedIntent {
474 pub id: Uuid,
475 pub op: IntentOp,
476 pub started_at_ms: u64,
477 pub last_phase: IntentPhase,
478 pub args: Map<String, JsonValue>,
481 pub last_progress: Option<Map<String, JsonValue>>,
484}
485
486pub struct IntentHandle<'a> {
491 log: &'a AdminIntentLog,
492 id: Uuid,
493 op: IntentOp,
494 actor: String,
495 args_json: JsonValue,
496 pub started_at_ms: u64,
497 last_phase: IntentPhase,
498 done: bool,
499}
500
501impl<'a> IntentHandle<'a> {
502 pub fn id(&self) -> Uuid {
503 self.id
504 }
505
506 pub fn last_phase(&self) -> &IntentPhase {
507 &self.last_phase
508 }
509
510 pub fn checkpoint(
512 &mut self,
513 n: u32,
514 progress: Option<IntentProgress>,
515 ) -> Result<(), IntentLogError> {
516 let phase = IntentPhase::Checkpoint(n);
517 self.log.write_record(
518 self.id,
519 self.op,
520 &phase,
521 &self.actor,
522 &self.args_json,
523 progress.as_ref(),
524 None,
525 )?;
526 self.last_phase = phase;
527 Ok(())
528 }
529
530 pub fn complete(mut self, summary: Option<IntentSummary>) -> Result<(), IntentLogError> {
532 let result = self.log.write_record(
533 self.id,
534 self.op,
535 &IntentPhase::Completed,
536 &self.actor,
537 &self.args_json,
538 None,
539 summary.as_ref(),
540 );
541 if result.is_ok() {
542 self.done = true;
543 }
544 result
545 }
546}
547
548impl Drop for IntentHandle<'_> {
549 fn drop(&mut self) {
550 if !self.done {
551 let _ = self.log.write_record(
552 self.id,
553 self.op,
554 &IntentPhase::Aborted,
555 &self.actor,
556 &self.args_json,
557 None,
558 None,
559 );
560 }
561 }
562}
563
564#[cfg(test)]
569mod tests {
570 use super::*;
571 use crate::json::Value as JsonValue;
572
573 fn tmp_path(label: &str) -> PathBuf {
574 let mut p = std::env::temp_dir();
575 p.push(format!(
576 "reddb-intent-{}-{}-{}.log",
577 label,
578 std::process::id(),
579 crate::utils::now_unix_nanos()
580 ));
581 p
582 }
583
584 fn last_line_json(path: &Path) -> JsonValue {
585 let body = std::fs::read_to_string(path).unwrap();
586 let line = body.lines().last().expect("at least one line");
587 crate::json::from_str(line).expect("valid JSON")
588 }
589
590 fn all_lines_json(path: &Path) -> Vec<JsonValue> {
591 let body = std::fs::read_to_string(path).unwrap();
592 body.lines()
593 .filter(|l| !l.trim().is_empty())
594 .map(|l| crate::json::from_str(l).expect("valid JSON"))
595 .collect()
596 }
597
598 #[test]
602 fn begin_writes_running_record() {
603 let path = tmp_path("begin");
604 let log = AdminIntentLog::open(&path).unwrap();
605 let handle = log
606 .begin(IntentOp::ReplicaBootstrap, "ops-bot", IntentArgs::new())
607 .unwrap();
608 drop(handle); let body = std::fs::read_to_string(&path).unwrap();
611 let first_line = body.lines().next().unwrap();
612 let v: JsonValue = crate::json::from_str(first_line).unwrap();
613 assert_eq!(v.get("phase").and_then(|x| x.as_str()), Some("running"));
614 assert_eq!(
615 v.get("op").and_then(|x| x.as_str()),
616 Some("replica_bootstrap")
617 );
618 assert_eq!(v.get("actor").and_then(|x| x.as_str()), Some("ops-bot"));
619 }
620
621 #[test]
625 fn complete_writes_completed_phase() {
626 let path = tmp_path("complete");
627 let log = AdminIntentLog::open(&path).unwrap();
628 let handle = log
629 .begin(IntentOp::ReplicaBootstrap, "admin", IntentArgs::new())
630 .unwrap();
631 handle.complete(None).unwrap();
632
633 let lines = all_lines_json(&path);
634 assert_eq!(lines.len(), 2, "begin + complete = 2 lines");
635 assert_eq!(
636 lines[1].get("phase").and_then(|x| x.as_str()),
637 Some("completed")
638 );
639 }
640
641 #[test]
645 fn drop_without_complete_writes_aborted() {
646 let path = tmp_path("drop-abort");
647 let log = AdminIntentLog::open(&path).unwrap();
648 {
649 let _handle = log
650 .begin(IntentOp::ReplicaBootstrap, "admin", IntentArgs::new())
651 .unwrap();
652 }
654
655 let last = last_line_json(&path);
656 assert_eq!(last.get("phase").and_then(|x| x.as_str()), Some("aborted"));
657 }
658
659 #[test]
663 fn checkpoint_writes_intermediate_records() {
664 let path = tmp_path("checkpoint");
665 let log = AdminIntentLog::open(&path).unwrap();
666 let mut handle = log
667 .begin(IntentOp::ReplicaBootstrap, "admin", IntentArgs::new())
668 .unwrap();
669 handle.checkpoint(1, None).unwrap();
670 handle.checkpoint(2, None).unwrap();
671 handle.complete(None).unwrap();
672
673 let lines = all_lines_json(&path);
674 assert_eq!(lines.len(), 4); assert_eq!(
676 lines[1].get("phase").and_then(|x| x.as_str()),
677 Some("checkpoint_1")
678 );
679 assert_eq!(
680 lines[2].get("phase").and_then(|x| x.as_str()),
681 Some("checkpoint_2")
682 );
683 }
684
685 #[test]
689 fn scan_and_report_finds_unfinished_intents() {
690 let path = tmp_path("scan");
691 let log = AdminIntentLog::open(&path).unwrap();
692
693 let h1 = log
696 .begin(IntentOp::ReplicaBootstrap, "a", IntentArgs::new())
697 .unwrap();
698 let h2 = log
699 .begin(IntentOp::ReplicaBootstrap, "b", IntentArgs::new())
700 .unwrap();
701 std::mem::forget(h1);
702 std::mem::forget(h2);
703
704 let h3 = log
706 .begin(IntentOp::ReplicaBootstrap, "c", IntentArgs::new())
707 .unwrap();
708 h3.complete(None).unwrap();
709
710 let log2 = AdminIntentLog::open(&path).unwrap();
711 let unfinished = log2.list_unfinished();
712 assert_eq!(unfinished.len(), 2, "expected exactly 2 dangling intents");
713 }
714
715 #[test]
719 fn record_too_large_returns_error_no_write() {
720 let path = tmp_path("toolarge");
721 let log = AdminIntentLog::open(&path).unwrap();
722
723 let big_value = "x".repeat(4096);
725 let args = IntentArgs::new().insert("data", JsonValue::String(big_value));
726 let err = log.begin(IntentOp::ReplicaBootstrap, "admin", args);
727 assert!(
728 matches!(err, Err(IntentLogError::TooLarge { .. })),
729 "expected TooLarge, got {:?}",
730 err.err().map(|e| e.to_string())
731 );
732
733 let content = std::fs::read_to_string(&path).unwrap_or_default();
735 assert!(
736 content.lines().all(|l| l.trim().is_empty()),
737 "no lines should have been written"
738 );
739 }
740
741 #[test]
745 fn corrupted_line_skipped_in_scan() {
746 let path = tmp_path("corrupt");
747 let log = AdminIntentLog::open(&path).unwrap();
748 let h = log
749 .begin(IntentOp::ReplicaBootstrap, "admin", IntentArgs::new())
750 .unwrap();
751 drop(h); let mut content = std::fs::read_to_string(&path).unwrap();
755 content.push_str("not-valid-json\n");
756 std::fs::write(&path, &content).unwrap();
757
758 let log2 = AdminIntentLog::open(&path).unwrap();
760 let unfinished = log2.list_unfinished(); assert_eq!(unfinished.len(), 0);
763 }
764
765 #[test]
769 fn sensitive_keys_are_redacted() {
770 let path = tmp_path("redact");
771 let log = AdminIntentLog::open(&path).unwrap();
772 let args = IntentArgs::new()
773 .insert("password", JsonValue::String("hunter2".to_string()))
774 .insert("host", JsonValue::String("db.internal".to_string()));
775 let h = log
776 .begin(IntentOp::ReplicaBootstrap, "admin", args)
777 .unwrap();
778 h.complete(None).unwrap();
779
780 let body = std::fs::read_to_string(&path).unwrap();
781 let first_line = body.lines().next().unwrap();
782 let v: JsonValue = crate::json::from_str(first_line).unwrap();
783 let args_obj = v.get("args").unwrap();
784 let pwd = args_obj.get("password").and_then(|x| x.as_str());
785 assert_eq!(pwd, Some("***REDACTED***"), "password should be redacted");
786 let host = args_obj.get("host").and_then(|x| x.as_str());
787 assert_eq!(host, Some("db.internal"), "host should not be redacted");
788 }
789
790 #[test]
796 fn multi_process_posix_atomicity() {
797 const LOG_PATH_ENV: &str = "INTENT_LOG_CHILD_PATH";
798 const CHILD_OPS: u32 = 20;
799
800 if let Ok(path) = std::env::var(LOG_PATH_ENV) {
802 let log = AdminIntentLog::open(&path).unwrap();
803 for _ in 0..CHILD_OPS {
804 let h = log
805 .begin(IntentOp::ReplicaBootstrap, "child", IntentArgs::new())
806 .unwrap();
807 h.complete(None).unwrap();
808 }
809 return;
810 }
811
812 let path = format!(
814 "/tmp/reddb-intent-mp-{}-{}.log",
815 std::process::id(),
816 crate::utils::now_unix_nanos()
817 );
818
819 let log = AdminIntentLog::open(&path).unwrap();
821
822 let exe = std::env::current_exe().unwrap();
823 let spawn_child = || {
824 std::process::Command::new(&exe)
825 .arg("multi_process_posix_atomicity")
826 .env(LOG_PATH_ENV, &path)
827 .stdout(std::process::Stdio::null())
828 .stderr(std::process::Stdio::null())
829 .spawn()
830 .expect("spawn child process")
831 };
832
833 let mut child1 = spawn_child();
834 let mut child2 = spawn_child();
835
836 for _ in 0..CHILD_OPS {
838 let h = log
839 .begin(IntentOp::ReplicaBootstrap, "parent", IntentArgs::new())
840 .unwrap();
841 h.complete(None).unwrap();
842 }
843
844 let s1 = child1.wait().expect("child1 wait");
845 let s2 = child2.wait().expect("child2 wait");
846 assert!(s1.success(), "child1 exited with failure");
847 assert!(s2.success(), "child2 exited with failure");
848
849 let content = std::fs::read_to_string(&path).unwrap();
851 let mut line_count = 0_usize;
852 for (i, line) in content.lines().enumerate() {
853 let line = line.trim();
854 if line.is_empty() {
855 continue;
856 }
857 crate::json::from_str::<JsonValue>(line)
858 .unwrap_or_else(|e| panic!("line {i} is not valid JSON: {e}\n{line:?}"));
859 line_count += 1;
860 }
861 assert!(line_count >= 120, "expected ≥120 lines, got {line_count}");
863
864 let _ = std::fs::remove_file(&path);
865 }
866}