1use std::fs::{self, File};
8use std::io::Write;
9use std::path::{Path, PathBuf};
10
11use serde::{Deserialize, Serialize};
12use sha2::{Digest, Sha256};
13use thiserror::Error;
14
15use super::remote::RemoteUrl;
16use crate::core::{Bead, BeadId, CanonicalState, DepEdge, DepKey, Tombstone, TombstoneKey};
17
18const WAL_VERSION: u32 = 1;
20
21#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct WalEntry {
24 pub version: u32,
26 pub written_at_ms: u64,
28 #[serde(with = "wal_state")]
30 pub state: CanonicalState,
31 pub root_slug: Option<String>,
33 pub sequence: u64,
35}
36
37mod wal_state {
38 use super::*;
39 use serde::de::Error as DeError;
40 use serde::{Deserializer, Serializer};
41 use std::collections::BTreeMap;
42
43 #[derive(Serialize, Deserialize)]
44 struct WalStateVec {
45 live: Vec<Bead>,
46 tombstones: Vec<Tombstone>,
47 deps: Vec<DepEdge>,
48 }
49
50 #[derive(Deserialize)]
51 struct WalStateMap {
52 live: BTreeMap<BeadId, Bead>,
53 tombstones: BTreeMap<TombstoneKey, Tombstone>,
54 deps: BTreeMap<DepKey, DepEdge>,
55 }
56
57 #[derive(Deserialize)]
58 #[serde(untagged)]
59 enum WalStateRepr {
60 Vecs(WalStateVec),
61 Maps(WalStateMap),
62 }
63
64 pub fn serialize<S>(state: &CanonicalState, serializer: S) -> Result<S::Ok, S::Error>
65 where
66 S: Serializer,
67 {
68 let snapshot = WalStateVec {
69 live: state.iter_live().map(|(_, bead)| bead.clone()).collect(),
70 tombstones: state
71 .iter_tombstones()
72 .map(|(_, tomb)| tomb.clone())
73 .collect(),
74 deps: state.iter_deps().map(|(_, dep)| dep.clone()).collect(),
75 };
76 snapshot.serialize(serializer)
77 }
78
79 pub fn deserialize<'de, D>(deserializer: D) -> Result<CanonicalState, D::Error>
80 where
81 D: Deserializer<'de>,
82 {
83 let (live, tombstones, deps) = match WalStateRepr::deserialize(deserializer)? {
84 WalStateRepr::Vecs(snapshot) => (snapshot.live, snapshot.tombstones, snapshot.deps),
85 WalStateRepr::Maps(snapshot) => (
86 snapshot.live.into_values().collect(),
87 snapshot.tombstones.into_values().collect(),
88 snapshot.deps.into_values().collect(),
89 ),
90 };
91 let mut state = CanonicalState::new();
92 for bead in live {
93 state.insert(bead).map_err(DeError::custom)?;
94 }
95 for tombstone in tombstones {
96 state.insert_tombstone(tombstone);
97 }
98 for dep in deps {
99 state.insert_dep(dep);
100 }
101 Ok(state)
102 }
103}
104
105impl WalEntry {
106 pub fn new(
108 state: CanonicalState,
109 root_slug: Option<String>,
110 sequence: u64,
111 wall_ms: u64,
112 ) -> Self {
113 WalEntry {
114 version: WAL_VERSION,
115 written_at_ms: wall_ms,
116 state,
117 root_slug,
118 sequence,
119 }
120 }
121}
122
123#[derive(Debug, Error)]
125pub enum WalError {
126 #[error("IO error: {0}")]
127 Io(#[from] std::io::Error),
128
129 #[error("JSON error: {0}")]
130 Json(#[from] serde_json::Error),
131
132 #[error("WAL version mismatch: expected {expected}, got {got}")]
133 VersionMismatch { expected: u32, got: u32 },
134}
135
136pub struct Wal {
140 dir: PathBuf,
141}
142
143impl Wal {
144 pub fn new(base_dir: &Path) -> Result<Self, WalError> {
148 let dir = base_dir.join("wal");
149 fs::create_dir_all(&dir)?;
150
151 #[cfg(unix)]
152 {
153 use std::os::unix::fs::PermissionsExt;
154 let _ = fs::set_permissions(&dir, fs::Permissions::from_mode(0o700));
155 }
156
157 Ok(Wal { dir })
158 }
159
160 pub fn migrate_from_runtime_dir(&self, runtime_dir: &Path) {
165 let legacy_dir = runtime_dir.join("wal");
166 if legacy_dir == self.dir || !legacy_dir.exists() {
167 return;
168 }
169
170 let entries = match fs::read_dir(&legacy_dir) {
171 Ok(entries) => entries,
172 Err(e) => {
173 tracing::warn!("wal migration: failed to read {:?}: {}", legacy_dir, e);
174 return;
175 }
176 };
177
178 for entry in entries.flatten() {
179 let path = entry.path();
180 if path.extension().is_none_or(|e| e != "wal") {
181 continue;
182 }
183 let file_name = match path.file_name() {
184 Some(name) => name.to_os_string(),
185 None => continue,
186 };
187 let dest = self.dir.join(&file_name);
188 if dest == path {
189 continue;
190 }
191
192 if !dest.exists() {
193 if let Err(e) = copy_then_remove(&path, &dest) {
194 tracing::warn!("wal migration: failed to move {:?}: {}", path, e);
195 }
196 continue;
197 }
198
199 let src_entry = read_entry_at(&path);
200 let dest_entry = read_entry_at(&dest);
201
202 match (src_entry, dest_entry) {
203 (Ok(src), Ok(dest_entry)) => {
204 if is_newer(&src, &dest_entry) {
205 if let Err(e) = copy_then_remove(&path, &dest) {
206 tracing::warn!("wal migration: failed to update {:?}: {}", dest, e);
207 }
208 } else if let Err(e) = fs::remove_file(&path) {
209 tracing::warn!("wal migration: failed to remove {:?}: {}", path, e);
210 }
211 }
212 (Ok(_), Err(_)) => {
213 if let Err(e) = copy_then_remove(&path, &dest) {
214 tracing::warn!("wal migration: failed to update {:?}: {}", dest, e);
215 }
216 }
217 (Err(e), Ok(_)) => {
218 tracing::warn!(
219 "wal migration: keeping legacy WAL {:?} (unreadable): {}",
220 path,
221 e
222 );
223 }
224 (Err(e1), Err(e2)) => {
225 tracing::warn!(
226 "wal migration: keeping legacy WAL {:?} (unreadable): {}, {}",
227 path,
228 e1,
229 e2
230 );
231 }
232 }
233 }
234 }
235
236 fn wal_path(&self, remote: &RemoteUrl) -> PathBuf {
238 let mut hasher = Sha256::new();
240 hasher.update(remote.0.as_bytes());
241 let hash = hasher.finalize();
242 let hash_hex = hex::encode(&hash[..8]); self.dir.join(format!("{}.wal", hash_hex))
244 }
245
246 fn tmp_path(&self, remote: &RemoteUrl) -> PathBuf {
248 let wal_path = self.wal_path(remote);
249 wal_path.with_extension("wal.tmp")
250 }
251
252 pub fn write(&self, remote: &RemoteUrl, entry: &WalEntry) -> Result<(), WalError> {
256 let tmp_path = self.tmp_path(remote);
257 let wal_path = self.wal_path(remote);
258
259 let data = serde_json::to_vec(entry)?;
261
262 let mut file = File::create(&tmp_path)?;
264 file.write_all(&data)?;
265 file.sync_all()?; fs::rename(&tmp_path, &wal_path)?;
269
270 #[cfg(unix)]
272 {
273 if let Ok(dir) = File::open(&self.dir) {
274 let _ = dir.sync_all();
275 }
276 }
277
278 Ok(())
279 }
280
281 pub fn read(&self, remote: &RemoteUrl) -> Result<Option<WalEntry>, WalError> {
286 let wal_path = self.wal_path(remote);
287
288 if !wal_path.exists() {
289 return Ok(None);
290 }
291
292 let data = fs::read(&wal_path)?;
293 let entry: WalEntry = serde_json::from_slice(&data)?;
294
295 if entry.version != WAL_VERSION {
297 return Err(WalError::VersionMismatch {
298 expected: WAL_VERSION,
299 got: entry.version,
300 });
301 }
302
303 Ok(Some(entry))
304 }
305
306 pub fn delete(&self, remote: &RemoteUrl) -> Result<(), WalError> {
310 let wal_path = self.wal_path(remote);
311 let tmp_path = self.tmp_path(remote);
312
313 let _ = fs::remove_file(&wal_path);
315 let _ = fs::remove_file(&tmp_path);
316
317 Ok(())
318 }
319
320 pub fn exists(&self, remote: &RemoteUrl) -> bool {
322 self.wal_path(remote).exists()
323 }
324
325 pub fn cleanup_stale(&self) -> Result<(), WalError> {
329 if let Ok(entries) = fs::read_dir(&self.dir) {
330 for entry in entries.flatten() {
331 let path = entry.path();
332 if path.extension().is_some_and(|e| e == "tmp") {
333 let _ = fs::remove_file(&path);
334 }
335 }
336 }
337 Ok(())
338 }
339}
340
341pub fn default_wal_base_dir() -> PathBuf {
346 if let Ok(dir) = std::env::var("BD_WAL_DIR")
347 && !dir.trim().is_empty()
348 {
349 return PathBuf::from(dir);
350 }
351 crate::paths::data_dir()
352}
353
354fn read_entry_at(path: &Path) -> Result<WalEntry, WalError> {
355 let data = fs::read(path)?;
356 let entry: WalEntry = serde_json::from_slice(&data)?;
357 if entry.version != WAL_VERSION {
358 return Err(WalError::VersionMismatch {
359 expected: WAL_VERSION,
360 got: entry.version,
361 });
362 }
363 Ok(entry)
364}
365
366fn is_newer(a: &WalEntry, b: &WalEntry) -> bool {
367 a.sequence > b.sequence || (a.sequence == b.sequence && a.written_at_ms > b.written_at_ms)
368}
369
370fn copy_then_remove(src: &Path, dest: &Path) -> Result<(), WalError> {
371 fs::copy(src, dest)?;
372 fs::remove_file(src)?;
373 Ok(())
374}
375
376#[cfg(test)]
377mod tests {
378 use super::*;
379 use crate::core::{
380 ActorId, BeadCore, BeadFields, BeadId, BeadType, Claim, DepKey, DepKind, Lww, Priority,
381 Stamp, Workflow, WriteStamp,
382 };
383 use serde::Serialize;
384 use std::collections::BTreeMap;
385 use tempfile::TempDir;
386
387 fn test_remote() -> RemoteUrl {
388 RemoteUrl("git@github.com:test/repo.git".into())
389 }
390
391 #[test]
392 fn write_read_roundtrip() {
393 let tmp = TempDir::new().unwrap();
394 let wal = Wal::new(tmp.path()).unwrap();
395 let remote = test_remote();
396
397 let entry = WalEntry::new(
398 CanonicalState::new(),
399 Some("test-slug".into()),
400 42,
401 1234567890,
402 );
403
404 wal.write(&remote, &entry).unwrap();
405
406 let loaded = wal.read(&remote).unwrap().unwrap();
407 assert_eq!(loaded.version, WAL_VERSION);
408 assert_eq!(loaded.root_slug, Some("test-slug".into()));
409 assert_eq!(loaded.sequence, 42);
410 assert_eq!(loaded.written_at_ms, 1234567890);
411 }
412
413 fn make_bead(id: &str, stamp: &Stamp) -> Bead {
414 let core = BeadCore::new(BeadId::parse(id).unwrap(), stamp.clone(), None);
415 let fields = BeadFields {
416 title: Lww::new("test".to_string(), stamp.clone()),
417 description: Lww::new(String::new(), stamp.clone()),
418 design: Lww::new(None, stamp.clone()),
419 acceptance_criteria: Lww::new(None, stamp.clone()),
420 priority: Lww::new(Priority::new(2).unwrap(), stamp.clone()),
421 bead_type: Lww::new(BeadType::Task, stamp.clone()),
422 labels: Lww::new(Default::default(), stamp.clone()),
423 external_ref: Lww::new(None, stamp.clone()),
424 source_repo: Lww::new(None, stamp.clone()),
425 estimated_minutes: Lww::new(None, stamp.clone()),
426 workflow: Lww::new(Workflow::Open, stamp.clone()),
427 claim: Lww::new(Claim::default(), stamp.clone()),
428 };
429 Bead::new(core, fields)
430 }
431
432 #[test]
433 fn write_read_roundtrip_with_tombstones_and_deps() {
434 let tmp = TempDir::new().unwrap();
435 let wal = Wal::new(tmp.path()).unwrap();
436 let remote = test_remote();
437
438 let actor = ActorId::new("tester").unwrap();
439 let stamp = Stamp::new(WriteStamp::new(1234, 0), actor);
440
441 let mut state = CanonicalState::new();
442 state.insert(make_bead("bd-abc", &stamp)).unwrap();
443 state.insert_tombstone(Tombstone::new(
444 BeadId::parse("bd-del").unwrap(),
445 stamp.clone(),
446 None,
447 ));
448 let dep_key = DepKey::new(
449 BeadId::parse("bd-abc").unwrap(),
450 BeadId::parse("bd-def").unwrap(),
451 DepKind::Blocks,
452 )
453 .unwrap();
454 state.insert_dep(DepEdge::new(dep_key, stamp.clone()));
455
456 let entry = WalEntry::new(state, None, 7, 42);
457 wal.write(&remote, &entry).unwrap();
458
459 let loaded = wal.read(&remote).unwrap().unwrap();
460 assert_eq!(loaded.state.live_count(), 1);
461 assert_eq!(loaded.state.tombstone_count(), 1);
462 assert_eq!(loaded.state.dep_count(), 1);
463 }
464
465 #[test]
466 fn read_legacy_map_state_format() {
467 #[derive(Serialize)]
468 struct LegacyWalState {
469 live: BTreeMap<BeadId, Bead>,
470 tombstones: BTreeMap<TombstoneKey, Tombstone>,
471 deps: BTreeMap<DepKey, DepEdge>,
472 }
473
474 #[derive(Serialize)]
475 struct LegacyWalEntry {
476 version: u32,
477 written_at_ms: u64,
478 state: LegacyWalState,
479 root_slug: Option<String>,
480 sequence: u64,
481 }
482
483 let actor = ActorId::new("tester").unwrap();
484 let stamp = Stamp::new(WriteStamp::new(1, 0), actor);
485 let bead = make_bead("bd-abc", &stamp);
486
487 let mut live = BTreeMap::new();
488 live.insert(bead.core.id.clone(), bead);
489
490 let legacy = LegacyWalEntry {
491 version: WAL_VERSION,
492 written_at_ms: 1,
493 state: LegacyWalState {
494 live,
495 tombstones: BTreeMap::new(),
496 deps: BTreeMap::new(),
497 },
498 root_slug: None,
499 sequence: 1,
500 };
501
502 let data = serde_json::to_vec(&legacy).unwrap();
503 let loaded: WalEntry = serde_json::from_slice(&data).unwrap();
504 assert_eq!(loaded.state.live_count(), 1);
505 assert_eq!(loaded.state.tombstone_count(), 0);
506 assert_eq!(loaded.state.dep_count(), 0);
507 }
508
509 #[test]
510 fn read_nonexistent() {
511 let tmp = TempDir::new().unwrap();
512 let wal = Wal::new(tmp.path()).unwrap();
513 let remote = test_remote();
514
515 assert!(wal.read(&remote).unwrap().is_none());
516 }
517
518 #[test]
519 fn delete_removes_file() {
520 let tmp = TempDir::new().unwrap();
521 let wal = Wal::new(tmp.path()).unwrap();
522 let remote = test_remote();
523
524 let entry = WalEntry::new(CanonicalState::new(), None, 1, 0);
525 wal.write(&remote, &entry).unwrap();
526 assert!(wal.exists(&remote));
527
528 wal.delete(&remote).unwrap();
529 assert!(!wal.exists(&remote));
530 }
531
532 #[test]
533 fn cleanup_stale_removes_tmp() {
534 let tmp = TempDir::new().unwrap();
535 let wal = Wal::new(tmp.path()).unwrap();
536
537 let stale = wal.dir.join("stale.wal.tmp");
539 fs::write(&stale, b"garbage").unwrap();
540 assert!(stale.exists());
541
542 wal.cleanup_stale().unwrap();
543 assert!(!stale.exists());
544 }
545
546 #[test]
547 fn different_remotes_different_files() {
548 let tmp = TempDir::new().unwrap();
549 let wal = Wal::new(tmp.path()).unwrap();
550
551 let remote1 = RemoteUrl("git@github.com:user/repo1.git".into());
552 let remote2 = RemoteUrl("git@github.com:user/repo2.git".into());
553
554 let entry1 = WalEntry::new(CanonicalState::new(), Some("slug1".into()), 1, 0);
555 let entry2 = WalEntry::new(CanonicalState::new(), Some("slug2".into()), 2, 0);
556
557 wal.write(&remote1, &entry1).unwrap();
558 wal.write(&remote2, &entry2).unwrap();
559
560 let loaded1 = wal.read(&remote1).unwrap().unwrap();
561 let loaded2 = wal.read(&remote2).unwrap().unwrap();
562
563 assert_eq!(loaded1.root_slug, Some("slug1".into()));
564 assert_eq!(loaded2.root_slug, Some("slug2".into()));
565 }
566
567 #[test]
568 fn migrate_from_runtime_dir_moves_wal() {
569 let legacy_base = TempDir::new().unwrap();
570 let new_base = TempDir::new().unwrap();
571 let legacy = Wal::new(legacy_base.path()).unwrap();
572 let current = Wal::new(new_base.path()).unwrap();
573 let remote = test_remote();
574
575 let entry = WalEntry::new(CanonicalState::new(), None, 1, 123);
576 legacy.write(&remote, &entry).unwrap();
577 assert!(legacy.exists(&remote));
578
579 current.migrate_from_runtime_dir(legacy_base.path());
580 assert!(current.exists(&remote));
581 assert!(!legacy.exists(&remote));
582 }
583
584 #[test]
585 fn migrate_prefers_newer_sequence() {
586 let legacy_base = TempDir::new().unwrap();
587 let new_base = TempDir::new().unwrap();
588 let legacy = Wal::new(legacy_base.path()).unwrap();
589 let current = Wal::new(new_base.path()).unwrap();
590 let remote = test_remote();
591
592 let older = WalEntry::new(CanonicalState::new(), None, 1, 100);
593 let newer = WalEntry::new(CanonicalState::new(), None, 2, 200);
594
595 current.write(&remote, &older).unwrap();
596 legacy.write(&remote, &newer).unwrap();
597
598 current.migrate_from_runtime_dir(legacy_base.path());
599 let loaded = current.read(&remote).unwrap().unwrap();
600 assert_eq!(loaded.sequence, 2);
601 }
602}