beads_rs/daemon/
wal.rs

1//! Write-Ahead Log for mutation durability.
2//!
3//! Provides crash-safe persistence without flooding git history with commits.
4//! WAL entries are state snapshots (not operation replay) written atomically
5//! via rename. Cleared after successful remote sync.
6
7use std::fs::{self, File};
8use std::io::Write;
9use std::path::{Path, PathBuf};
10
11use serde::{Deserialize, Serialize};
12use sha2::{Digest, Sha256};
13use thiserror::Error;
14
15use super::remote::RemoteUrl;
16use crate::core::{Bead, BeadId, CanonicalState, DepEdge, DepKey, Tombstone, TombstoneKey};
17
18/// WAL format version.
19const WAL_VERSION: u32 = 1;
20
21/// A WAL entry containing a full state snapshot.
22#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct WalEntry {
24    /// Format version for future compatibility.
25    pub version: u32,
26    /// Wall clock time when written (for debugging).
27    pub written_at_ms: u64,
28    /// Full state snapshot.
29    #[serde(with = "wal_state")]
30    pub state: CanonicalState,
31    /// Root slug for bead IDs.
32    pub root_slug: Option<String>,
33    /// Monotonic sequence number.
34    pub sequence: u64,
35}
36
37mod wal_state {
38    use super::*;
39    use serde::de::Error as DeError;
40    use serde::{Deserializer, Serializer};
41    use std::collections::BTreeMap;
42
43    #[derive(Serialize, Deserialize)]
44    struct WalStateVec {
45        live: Vec<Bead>,
46        tombstones: Vec<Tombstone>,
47        deps: Vec<DepEdge>,
48    }
49
50    #[derive(Deserialize)]
51    struct WalStateMap {
52        live: BTreeMap<BeadId, Bead>,
53        tombstones: BTreeMap<TombstoneKey, Tombstone>,
54        deps: BTreeMap<DepKey, DepEdge>,
55    }
56
57    #[derive(Deserialize)]
58    #[serde(untagged)]
59    enum WalStateRepr {
60        Vecs(WalStateVec),
61        Maps(WalStateMap),
62    }
63
64    pub fn serialize<S>(state: &CanonicalState, serializer: S) -> Result<S::Ok, S::Error>
65    where
66        S: Serializer,
67    {
68        let snapshot = WalStateVec {
69            live: state.iter_live().map(|(_, bead)| bead.clone()).collect(),
70            tombstones: state
71                .iter_tombstones()
72                .map(|(_, tomb)| tomb.clone())
73                .collect(),
74            deps: state.iter_deps().map(|(_, dep)| dep.clone()).collect(),
75        };
76        snapshot.serialize(serializer)
77    }
78
79    pub fn deserialize<'de, D>(deserializer: D) -> Result<CanonicalState, D::Error>
80    where
81        D: Deserializer<'de>,
82    {
83        let (live, tombstones, deps) = match WalStateRepr::deserialize(deserializer)? {
84            WalStateRepr::Vecs(snapshot) => (snapshot.live, snapshot.tombstones, snapshot.deps),
85            WalStateRepr::Maps(snapshot) => (
86                snapshot.live.into_values().collect(),
87                snapshot.tombstones.into_values().collect(),
88                snapshot.deps.into_values().collect(),
89            ),
90        };
91        let mut state = CanonicalState::new();
92        for bead in live {
93            state.insert(bead).map_err(DeError::custom)?;
94        }
95        for tombstone in tombstones {
96            state.insert_tombstone(tombstone);
97        }
98        for dep in deps {
99            state.insert_dep(dep);
100        }
101        Ok(state)
102    }
103}
104
105impl WalEntry {
106    /// Create a new WAL entry.
107    pub fn new(
108        state: CanonicalState,
109        root_slug: Option<String>,
110        sequence: u64,
111        wall_ms: u64,
112    ) -> Self {
113        WalEntry {
114            version: WAL_VERSION,
115            written_at_ms: wall_ms,
116            state,
117            root_slug,
118            sequence,
119        }
120    }
121}
122
123/// WAL errors.
124#[derive(Debug, Error)]
125pub enum WalError {
126    #[error("IO error: {0}")]
127    Io(#[from] std::io::Error),
128
129    #[error("JSON error: {0}")]
130    Json(#[from] serde_json::Error),
131
132    #[error("WAL version mismatch: expected {expected}, got {got}")]
133    VersionMismatch { expected: u32, got: u32 },
134}
135
136/// Write-Ahead Log manager.
137///
138/// Stores per-remote WAL files in a subdirectory of a persistent base dir.
139pub struct Wal {
140    dir: PathBuf,
141}
142
143impl Wal {
144    /// Create a new WAL manager.
145    ///
146    /// Creates the WAL directory if it doesn't exist.
147    pub fn new(base_dir: &Path) -> Result<Self, WalError> {
148        let dir = base_dir.join("wal");
149        fs::create_dir_all(&dir)?;
150
151        #[cfg(unix)]
152        {
153            use std::os::unix::fs::PermissionsExt;
154            let _ = fs::set_permissions(&dir, fs::Permissions::from_mode(0o700));
155        }
156
157        Ok(Wal { dir })
158    }
159
160    /// Best-effort migration from a legacy runtime WAL directory.
161    ///
162    /// The legacy path is `<runtime_dir>/wal`. Any WAL files found there are
163    /// copied into the persistent WAL dir. If both exist, the newer entry wins.
164    pub fn migrate_from_runtime_dir(&self, runtime_dir: &Path) {
165        let legacy_dir = runtime_dir.join("wal");
166        if legacy_dir == self.dir || !legacy_dir.exists() {
167            return;
168        }
169
170        let entries = match fs::read_dir(&legacy_dir) {
171            Ok(entries) => entries,
172            Err(e) => {
173                tracing::warn!("wal migration: failed to read {:?}: {}", legacy_dir, e);
174                return;
175            }
176        };
177
178        for entry in entries.flatten() {
179            let path = entry.path();
180            if path.extension().is_none_or(|e| e != "wal") {
181                continue;
182            }
183            let file_name = match path.file_name() {
184                Some(name) => name.to_os_string(),
185                None => continue,
186            };
187            let dest = self.dir.join(&file_name);
188            if dest == path {
189                continue;
190            }
191
192            if !dest.exists() {
193                if let Err(e) = copy_then_remove(&path, &dest) {
194                    tracing::warn!("wal migration: failed to move {:?}: {}", path, e);
195                }
196                continue;
197            }
198
199            let src_entry = read_entry_at(&path);
200            let dest_entry = read_entry_at(&dest);
201
202            match (src_entry, dest_entry) {
203                (Ok(src), Ok(dest_entry)) => {
204                    if is_newer(&src, &dest_entry) {
205                        if let Err(e) = copy_then_remove(&path, &dest) {
206                            tracing::warn!("wal migration: failed to update {:?}: {}", dest, e);
207                        }
208                    } else if let Err(e) = fs::remove_file(&path) {
209                        tracing::warn!("wal migration: failed to remove {:?}: {}", path, e);
210                    }
211                }
212                (Ok(_), Err(_)) => {
213                    if let Err(e) = copy_then_remove(&path, &dest) {
214                        tracing::warn!("wal migration: failed to update {:?}: {}", dest, e);
215                    }
216                }
217                (Err(e), Ok(_)) => {
218                    tracing::warn!(
219                        "wal migration: keeping legacy WAL {:?} (unreadable): {}",
220                        path,
221                        e
222                    );
223                }
224                (Err(e1), Err(e2)) => {
225                    tracing::warn!(
226                        "wal migration: keeping legacy WAL {:?} (unreadable): {}, {}",
227                        path,
228                        e1,
229                        e2
230                    );
231                }
232            }
233        }
234    }
235
236    /// Get the WAL file path for a remote.
237    fn wal_path(&self, remote: &RemoteUrl) -> PathBuf {
238        // Hash the remote URL to get a stable filename
239        let mut hasher = Sha256::new();
240        hasher.update(remote.0.as_bytes());
241        let hash = hasher.finalize();
242        let hash_hex = hex::encode(&hash[..8]); // First 16 hex chars
243        self.dir.join(format!("{}.wal", hash_hex))
244    }
245
246    /// Get the temporary file path for atomic writes.
247    fn tmp_path(&self, remote: &RemoteUrl) -> PathBuf {
248        let wal_path = self.wal_path(remote);
249        wal_path.with_extension("wal.tmp")
250    }
251
252    /// Write state to WAL atomically.
253    ///
254    /// Uses write-to-temp + fsync + rename for crash safety.
255    pub fn write(&self, remote: &RemoteUrl, entry: &WalEntry) -> Result<(), WalError> {
256        let tmp_path = self.tmp_path(remote);
257        let wal_path = self.wal_path(remote);
258
259        // Serialize to JSON
260        let data = serde_json::to_vec(entry)?;
261
262        // Write to temp file
263        let mut file = File::create(&tmp_path)?;
264        file.write_all(&data)?;
265        file.sync_all()?; // fsync for durability
266
267        // Atomic rename
268        fs::rename(&tmp_path, &wal_path)?;
269
270        // fsync the directory to ensure rename is durable
271        #[cfg(unix)]
272        {
273            if let Ok(dir) = File::open(&self.dir) {
274                let _ = dir.sync_all();
275            }
276        }
277
278        Ok(())
279    }
280
281    /// Read state from WAL if it exists.
282    ///
283    /// Returns None if no WAL file exists.
284    /// Returns error if file exists but is corrupted.
285    pub fn read(&self, remote: &RemoteUrl) -> Result<Option<WalEntry>, WalError> {
286        let wal_path = self.wal_path(remote);
287
288        if !wal_path.exists() {
289            return Ok(None);
290        }
291
292        let data = fs::read(&wal_path)?;
293        let entry: WalEntry = serde_json::from_slice(&data)?;
294
295        // Version check
296        if entry.version != WAL_VERSION {
297            return Err(WalError::VersionMismatch {
298                expected: WAL_VERSION,
299                got: entry.version,
300            });
301        }
302
303        Ok(Some(entry))
304    }
305
306    /// Delete WAL for a remote.
307    ///
308    /// Called after successful remote sync.
309    pub fn delete(&self, remote: &RemoteUrl) -> Result<(), WalError> {
310        let wal_path = self.wal_path(remote);
311        let tmp_path = self.tmp_path(remote);
312
313        // Remove both WAL and any stale temp file
314        let _ = fs::remove_file(&wal_path);
315        let _ = fs::remove_file(&tmp_path);
316
317        Ok(())
318    }
319
320    /// Check if a WAL exists for a remote.
321    pub fn exists(&self, remote: &RemoteUrl) -> bool {
322        self.wal_path(remote).exists()
323    }
324
325    /// Clean up any stale temp files (from crashes during write).
326    ///
327    /// Called on startup.
328    pub fn cleanup_stale(&self) -> Result<(), WalError> {
329        if let Ok(entries) = fs::read_dir(&self.dir) {
330            for entry in entries.flatten() {
331                let path = entry.path();
332                if path.extension().is_some_and(|e| e == "tmp") {
333                    let _ = fs::remove_file(&path);
334                }
335            }
336        }
337        Ok(())
338    }
339}
340
341/// Default base directory for WAL storage.
342///
343/// Uses `BD_WAL_DIR` if set, otherwise `BD_DATA_DIR` or
344/// `$XDG_DATA_HOME/beads-rs` (`~/.local/share/beads-rs`).
345pub fn default_wal_base_dir() -> PathBuf {
346    if let Ok(dir) = std::env::var("BD_WAL_DIR")
347        && !dir.trim().is_empty()
348    {
349        return PathBuf::from(dir);
350    }
351    crate::paths::data_dir()
352}
353
354fn read_entry_at(path: &Path) -> Result<WalEntry, WalError> {
355    let data = fs::read(path)?;
356    let entry: WalEntry = serde_json::from_slice(&data)?;
357    if entry.version != WAL_VERSION {
358        return Err(WalError::VersionMismatch {
359            expected: WAL_VERSION,
360            got: entry.version,
361        });
362    }
363    Ok(entry)
364}
365
366fn is_newer(a: &WalEntry, b: &WalEntry) -> bool {
367    a.sequence > b.sequence || (a.sequence == b.sequence && a.written_at_ms > b.written_at_ms)
368}
369
370fn copy_then_remove(src: &Path, dest: &Path) -> Result<(), WalError> {
371    fs::copy(src, dest)?;
372    fs::remove_file(src)?;
373    Ok(())
374}
375
376#[cfg(test)]
377mod tests {
378    use super::*;
379    use crate::core::{
380        ActorId, BeadCore, BeadFields, BeadId, BeadType, Claim, DepKey, DepKind, Lww, Priority,
381        Stamp, Workflow, WriteStamp,
382    };
383    use serde::Serialize;
384    use std::collections::BTreeMap;
385    use tempfile::TempDir;
386
387    fn test_remote() -> RemoteUrl {
388        RemoteUrl("git@github.com:test/repo.git".into())
389    }
390
391    #[test]
392    fn write_read_roundtrip() {
393        let tmp = TempDir::new().unwrap();
394        let wal = Wal::new(tmp.path()).unwrap();
395        let remote = test_remote();
396
397        let entry = WalEntry::new(
398            CanonicalState::new(),
399            Some("test-slug".into()),
400            42,
401            1234567890,
402        );
403
404        wal.write(&remote, &entry).unwrap();
405
406        let loaded = wal.read(&remote).unwrap().unwrap();
407        assert_eq!(loaded.version, WAL_VERSION);
408        assert_eq!(loaded.root_slug, Some("test-slug".into()));
409        assert_eq!(loaded.sequence, 42);
410        assert_eq!(loaded.written_at_ms, 1234567890);
411    }
412
413    fn make_bead(id: &str, stamp: &Stamp) -> Bead {
414        let core = BeadCore::new(BeadId::parse(id).unwrap(), stamp.clone(), None);
415        let fields = BeadFields {
416            title: Lww::new("test".to_string(), stamp.clone()),
417            description: Lww::new(String::new(), stamp.clone()),
418            design: Lww::new(None, stamp.clone()),
419            acceptance_criteria: Lww::new(None, stamp.clone()),
420            priority: Lww::new(Priority::new(2).unwrap(), stamp.clone()),
421            bead_type: Lww::new(BeadType::Task, stamp.clone()),
422            labels: Lww::new(Default::default(), stamp.clone()),
423            external_ref: Lww::new(None, stamp.clone()),
424            source_repo: Lww::new(None, stamp.clone()),
425            estimated_minutes: Lww::new(None, stamp.clone()),
426            workflow: Lww::new(Workflow::Open, stamp.clone()),
427            claim: Lww::new(Claim::default(), stamp.clone()),
428        };
429        Bead::new(core, fields)
430    }
431
432    #[test]
433    fn write_read_roundtrip_with_tombstones_and_deps() {
434        let tmp = TempDir::new().unwrap();
435        let wal = Wal::new(tmp.path()).unwrap();
436        let remote = test_remote();
437
438        let actor = ActorId::new("tester").unwrap();
439        let stamp = Stamp::new(WriteStamp::new(1234, 0), actor);
440
441        let mut state = CanonicalState::new();
442        state.insert(make_bead("bd-abc", &stamp)).unwrap();
443        state.insert_tombstone(Tombstone::new(
444            BeadId::parse("bd-del").unwrap(),
445            stamp.clone(),
446            None,
447        ));
448        let dep_key = DepKey::new(
449            BeadId::parse("bd-abc").unwrap(),
450            BeadId::parse("bd-def").unwrap(),
451            DepKind::Blocks,
452        )
453        .unwrap();
454        state.insert_dep(DepEdge::new(dep_key, stamp.clone()));
455
456        let entry = WalEntry::new(state, None, 7, 42);
457        wal.write(&remote, &entry).unwrap();
458
459        let loaded = wal.read(&remote).unwrap().unwrap();
460        assert_eq!(loaded.state.live_count(), 1);
461        assert_eq!(loaded.state.tombstone_count(), 1);
462        assert_eq!(loaded.state.dep_count(), 1);
463    }
464
465    #[test]
466    fn read_legacy_map_state_format() {
467        #[derive(Serialize)]
468        struct LegacyWalState {
469            live: BTreeMap<BeadId, Bead>,
470            tombstones: BTreeMap<TombstoneKey, Tombstone>,
471            deps: BTreeMap<DepKey, DepEdge>,
472        }
473
474        #[derive(Serialize)]
475        struct LegacyWalEntry {
476            version: u32,
477            written_at_ms: u64,
478            state: LegacyWalState,
479            root_slug: Option<String>,
480            sequence: u64,
481        }
482
483        let actor = ActorId::new("tester").unwrap();
484        let stamp = Stamp::new(WriteStamp::new(1, 0), actor);
485        let bead = make_bead("bd-abc", &stamp);
486
487        let mut live = BTreeMap::new();
488        live.insert(bead.core.id.clone(), bead);
489
490        let legacy = LegacyWalEntry {
491            version: WAL_VERSION,
492            written_at_ms: 1,
493            state: LegacyWalState {
494                live,
495                tombstones: BTreeMap::new(),
496                deps: BTreeMap::new(),
497            },
498            root_slug: None,
499            sequence: 1,
500        };
501
502        let data = serde_json::to_vec(&legacy).unwrap();
503        let loaded: WalEntry = serde_json::from_slice(&data).unwrap();
504        assert_eq!(loaded.state.live_count(), 1);
505        assert_eq!(loaded.state.tombstone_count(), 0);
506        assert_eq!(loaded.state.dep_count(), 0);
507    }
508
509    #[test]
510    fn read_nonexistent() {
511        let tmp = TempDir::new().unwrap();
512        let wal = Wal::new(tmp.path()).unwrap();
513        let remote = test_remote();
514
515        assert!(wal.read(&remote).unwrap().is_none());
516    }
517
518    #[test]
519    fn delete_removes_file() {
520        let tmp = TempDir::new().unwrap();
521        let wal = Wal::new(tmp.path()).unwrap();
522        let remote = test_remote();
523
524        let entry = WalEntry::new(CanonicalState::new(), None, 1, 0);
525        wal.write(&remote, &entry).unwrap();
526        assert!(wal.exists(&remote));
527
528        wal.delete(&remote).unwrap();
529        assert!(!wal.exists(&remote));
530    }
531
532    #[test]
533    fn cleanup_stale_removes_tmp() {
534        let tmp = TempDir::new().unwrap();
535        let wal = Wal::new(tmp.path()).unwrap();
536
537        // Create a stale .tmp file
538        let stale = wal.dir.join("stale.wal.tmp");
539        fs::write(&stale, b"garbage").unwrap();
540        assert!(stale.exists());
541
542        wal.cleanup_stale().unwrap();
543        assert!(!stale.exists());
544    }
545
546    #[test]
547    fn different_remotes_different_files() {
548        let tmp = TempDir::new().unwrap();
549        let wal = Wal::new(tmp.path()).unwrap();
550
551        let remote1 = RemoteUrl("git@github.com:user/repo1.git".into());
552        let remote2 = RemoteUrl("git@github.com:user/repo2.git".into());
553
554        let entry1 = WalEntry::new(CanonicalState::new(), Some("slug1".into()), 1, 0);
555        let entry2 = WalEntry::new(CanonicalState::new(), Some("slug2".into()), 2, 0);
556
557        wal.write(&remote1, &entry1).unwrap();
558        wal.write(&remote2, &entry2).unwrap();
559
560        let loaded1 = wal.read(&remote1).unwrap().unwrap();
561        let loaded2 = wal.read(&remote2).unwrap().unwrap();
562
563        assert_eq!(loaded1.root_slug, Some("slug1".into()));
564        assert_eq!(loaded2.root_slug, Some("slug2".into()));
565    }
566
567    #[test]
568    fn migrate_from_runtime_dir_moves_wal() {
569        let legacy_base = TempDir::new().unwrap();
570        let new_base = TempDir::new().unwrap();
571        let legacy = Wal::new(legacy_base.path()).unwrap();
572        let current = Wal::new(new_base.path()).unwrap();
573        let remote = test_remote();
574
575        let entry = WalEntry::new(CanonicalState::new(), None, 1, 123);
576        legacy.write(&remote, &entry).unwrap();
577        assert!(legacy.exists(&remote));
578
579        current.migrate_from_runtime_dir(legacy_base.path());
580        assert!(current.exists(&remote));
581        assert!(!legacy.exists(&remote));
582    }
583
584    #[test]
585    fn migrate_prefers_newer_sequence() {
586        let legacy_base = TempDir::new().unwrap();
587        let new_base = TempDir::new().unwrap();
588        let legacy = Wal::new(legacy_base.path()).unwrap();
589        let current = Wal::new(new_base.path()).unwrap();
590        let remote = test_remote();
591
592        let older = WalEntry::new(CanonicalState::new(), None, 1, 100);
593        let newer = WalEntry::new(CanonicalState::new(), None, 2, 200);
594
595        current.write(&remote, &older).unwrap();
596        legacy.write(&remote, &newer).unwrap();
597
598        current.migrate_from_runtime_dir(legacy_base.path());
599        let loaded = current.read(&remote).unwrap().unwrap();
600        assert_eq!(loaded.sequence, 2);
601    }
602}