1use std::fs::{File, OpenOptions};
12use std::io::Write;
13use std::path::Path;
14
15use serde::{Deserialize, Serialize};
16
17use crate::error::Result;
18use crate::protocol::HandoffId;
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
25pub enum Phase {
26 Negotiating,
29 Draining,
31 Sealing,
34 AwaitingReady,
36 Committed,
38 ResumingAfterAbort,
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
43pub struct StateJournal {
44 pub handoff_id: HandoffId,
45 pub phase: Phase,
46 pub incumbent_pid: u32,
47 pub successor_pid: Option<u32>,
48 pub started_at_unix_ms: u64,
50}
51
52impl StateJournal {
53 pub fn write_atomic(&self, path: &Path) -> Result<()> {
63 if let Some(parent) = path.parent() {
64 std::fs::create_dir_all(parent)?;
65 }
66 let tmp = path.with_extension("bin.tmp");
67 let bytes = postcard::to_allocvec(self)?;
68 {
69 let mut f = OpenOptions::new()
70 .write(true)
71 .create(true)
72 .truncate(true)
73 .open(&tmp)?;
74 f.write_all(&bytes)?;
75 f.sync_all()?;
76 }
77 std::fs::rename(&tmp, path)?;
78 if let Some(parent) = path.parent() {
79 fsync_dir(parent)?;
80 }
81 Ok(())
82 }
83
84 pub fn read(path: &Path) -> Result<Option<Self>> {
86 match std::fs::read(path) {
87 Ok(bytes) => Ok(Some(postcard::from_bytes(&bytes)?)),
88 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
89 Err(e) => Err(e.into()),
90 }
91 }
92
93 pub fn delete(path: &Path) -> Result<()> {
95 match std::fs::remove_file(path) {
96 Ok(()) => Ok(()),
97 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
98 Err(e) => Err(e.into()),
99 }
100 }
101}
102
103fn fsync_dir(dir: &Path) -> std::io::Result<()> {
108 let target = if dir.as_os_str().is_empty() {
109 Path::new(".")
110 } else {
111 dir
112 };
113 File::open(target)?.sync_all()
114}
115
116#[cfg(test)]
117mod tests {
118 use super::*;
119
120 #[test]
121 fn roundtrip_persists_phase_and_pids() {
122 let dir = tempfile::tempdir().unwrap();
123 let path = dir.path().join("state.bin");
124 let original = StateJournal {
125 handoff_id: HandoffId::new(),
126 phase: Phase::Sealing,
127 incumbent_pid: 1234,
128 successor_pid: Some(5678),
129 started_at_unix_ms: 1_700_000_000_000,
130 };
131 original.write_atomic(&path).unwrap();
132 let loaded = StateJournal::read(&path).unwrap().unwrap();
133 assert_eq!(loaded.phase, Phase::Sealing);
134 assert_eq!(loaded.incumbent_pid, 1234);
135 assert_eq!(loaded.successor_pid, Some(5678));
136 assert_eq!(loaded.handoff_id, original.handoff_id);
137 }
138
139 #[test]
140 fn read_missing_returns_none() {
141 let dir = tempfile::tempdir().unwrap();
142 let path = dir.path().join("state.bin");
143 assert!(StateJournal::read(&path).unwrap().is_none());
144 }
145
146 #[test]
147 fn delete_missing_is_idempotent() {
148 let dir = tempfile::tempdir().unwrap();
149 let path = dir.path().join("state.bin");
150 StateJournal::delete(&path).unwrap();
151 StateJournal::delete(&path).unwrap();
152 }
153}