Skip to main content

handoff/
state.rs

1//! Handoff state journal — persisted across supervisor restarts.
2//!
3//! Writes are atomic (`write tmp + rename`). The supervisor records a new
4//! [`Phase`] after each acknowledged protocol step, so a crashed-and-restarted
5//! supervisor can read the journal and resume the protocol from exactly where
6//! it left off. Supports correctness invariant #7.
7//!
8//! The journal lives at `/var/lib/beyond/handoff/<svc>/state.bin` by
9//! convention; the location is supplied by the consumer.
10
11use std::fs::{File, OpenOptions};
12use std::io::Write;
13use std::path::Path;
14
15use serde::{Deserialize, Serialize};
16
17use crate::error::Result;
18use crate::protocol::HandoffId;
19
20/// Coarse phase recorded in the on-disk journal between protocol steps. Used
21/// by the supervisor for crash-recovery diagnostics — not the in-memory
22/// state machine. Each variant maps 1:1 to a transition documented in
23/// `ARCHITECTURE.md`.
24#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
25pub enum Phase {
26    /// `Hello`/`HelloAck` exchanged with both peers; no protocol message
27    /// sent or received yet.
28    Negotiating,
29    /// `Drained` received from O. Next supervisor action is `SealRequest`.
30    Draining,
31    /// `SealComplete` received from O. O has released the flock. Next
32    /// supervisor action is `Begin` to N.
33    Sealing,
34    /// `Begin` sent to N. Waiting for `Ready`.
35    AwaitingReady,
36    /// `Commit` sent to O. Cleanup pending (journal clear, child disarm).
37    Committed,
38    /// `ResumeAfterAbort` sent to O after a post-seal abort.
39    ResumingAfterAbort,
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
43pub struct StateJournal {
44    pub handoff_id: HandoffId,
45    pub phase: Phase,
46    pub incumbent_pid: u32,
47    pub successor_pid: Option<u32>,
48    /// Unix millis when the handoff was initiated.
49    pub started_at_unix_ms: u64,
50}
51
52impl StateJournal {
53    /// Atomically write the journal. Creates the parent directory if missing.
54    ///
55    /// Writes the payload to `<path>.tmp`, fsyncs it, renames, then fsyncs
56    /// the parent directory. The file fsync makes the contents durable;
57    /// the directory fsync makes the rename's link-update durable. Without
58    /// the directory fsync the rename can be lost on power failure even
59    /// though the file contents survived — leaving a journaled phase that
60    /// silently rolls back to the prior entry on supervisor restart. Pairs
61    /// with the same pattern in `lock.rs::write_pid_atomic`.
62    pub fn write_atomic(&self, path: &Path) -> Result<()> {
63        if let Some(parent) = path.parent() {
64            std::fs::create_dir_all(parent)?;
65        }
66        let tmp = path.with_extension("bin.tmp");
67        let bytes = postcard::to_allocvec(self)?;
68        {
69            let mut f = OpenOptions::new()
70                .write(true)
71                .create(true)
72                .truncate(true)
73                .open(&tmp)?;
74            f.write_all(&bytes)?;
75            f.sync_all()?;
76        }
77        std::fs::rename(&tmp, path)?;
78        if let Some(parent) = path.parent() {
79            fsync_dir(parent)?;
80        }
81        Ok(())
82    }
83
84    /// Read a previously-persisted journal. `Ok(None)` if no journal exists.
85    pub fn read(path: &Path) -> Result<Option<Self>> {
86        match std::fs::read(path) {
87            Ok(bytes) => Ok(Some(postcard::from_bytes(&bytes)?)),
88            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
89            Err(e) => Err(e.into()),
90        }
91    }
92
93    /// Remove the journal. Idempotent.
94    pub fn delete(path: &Path) -> Result<()> {
95        match std::fs::remove_file(path) {
96            Ok(()) => Ok(()),
97            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
98            Err(e) => Err(e.into()),
99        }
100    }
101}
102
103/// fsync a directory inode so a rename or unlink performed on a file
104/// within it becomes crash-durable. `File::open` on a directory is
105/// read-only on POSIX; we just need a usable FD to pass to `fsync`.
106/// Empty paths fall back to the current directory.
107fn fsync_dir(dir: &Path) -> std::io::Result<()> {
108    let target = if dir.as_os_str().is_empty() {
109        Path::new(".")
110    } else {
111        dir
112    };
113    File::open(target)?.sync_all()
114}
115
116#[cfg(test)]
117mod tests {
118    use super::*;
119
120    #[test]
121    fn roundtrip_persists_phase_and_pids() {
122        let dir = tempfile::tempdir().unwrap();
123        let path = dir.path().join("state.bin");
124        let original = StateJournal {
125            handoff_id: HandoffId::new(),
126            phase: Phase::Sealing,
127            incumbent_pid: 1234,
128            successor_pid: Some(5678),
129            started_at_unix_ms: 1_700_000_000_000,
130        };
131        original.write_atomic(&path).unwrap();
132        let loaded = StateJournal::read(&path).unwrap().unwrap();
133        assert_eq!(loaded.phase, Phase::Sealing);
134        assert_eq!(loaded.incumbent_pid, 1234);
135        assert_eq!(loaded.successor_pid, Some(5678));
136        assert_eq!(loaded.handoff_id, original.handoff_id);
137    }
138
139    #[test]
140    fn read_missing_returns_none() {
141        let dir = tempfile::tempdir().unwrap();
142        let path = dir.path().join("state.bin");
143        assert!(StateJournal::read(&path).unwrap().is_none());
144    }
145
146    #[test]
147    fn delete_missing_is_idempotent() {
148        let dir = tempfile::tempdir().unwrap();
149        let path = dir.path().join("state.bin");
150        StateJournal::delete(&path).unwrap();
151        StateJournal::delete(&path).unwrap();
152    }
153}