Skip to main content

arrow_kanban/
persistence.rs

1//! Kanban persistence engine — WAL + atomic Parquet save, dogfooding arrow-graph-git.
2//!
3//! This module wraps `nusy-kanban::persist` with production-grade features:
4//! - **Dirty tracking**: only save when state has changed
5//! - **WAL (Write-Ahead Log)**: crash-safe atomic writes (via `arrow-graph-git::save_named_batches`)
6//! - **Graph-native commit history**: queryable audit trail via `arrow-graph-git::CommitsTable`
7//! - **Health metrics**: track save count, last save time, bytes written
8//! - **Graceful shutdown**: save before exit
9//!
10//! This is the being persistence pattern dogfood — the same strategy that
11//! V14 beings will use for their ArrowGraphStore.
12
13use crate::crud::KanbanStore;
14use crate::persist;
15use crate::relations::RelationsStore;
16use arrow_graph_git::commit::{Commit, CommitsTable};
17use arrow_graph_git::save::{persist_commits, restore_commits};
18use std::path::{Path, PathBuf};
19use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
20
21/// Errors from the persistence engine.
22#[derive(Debug, thiserror::Error)]
23pub enum PersistenceError {
24    #[error("Persist error: {0}")]
25    Persist(#[from] persist::PersistError),
26
27    #[error("IO error: {0}")]
28    Io(#[from] std::io::Error),
29
30    #[error("Save error: {0}")]
31    Save(#[from] arrow_graph_git::save::SaveError),
32}
33
34pub type Result<T> = std::result::Result<T, PersistenceError>;
35
36/// Configuration for the persistence engine.
37#[derive(Debug, Clone)]
38pub struct PersistenceConfig {
39    /// Root directory for kanban data.
40    pub root: PathBuf,
41    /// Minimum interval between periodic saves (default: 30s).
42    pub save_interval: Duration,
43    /// Interval between graph-native commit snapshots (default: 5 minutes).
44    pub commit_interval: Duration,
45    /// Whether to save after every mutation (default: true).
46    pub save_on_mutation: bool,
47}
48
49impl Default for PersistenceConfig {
50    fn default() -> Self {
51        PersistenceConfig {
52            root: PathBuf::from("."),
53            save_interval: Duration::from_secs(30),
54            commit_interval: Duration::from_secs(300),
55            save_on_mutation: true,
56        }
57    }
58}
59
60/// Health metrics for the persistence engine.
61#[derive(Debug, Clone)]
62pub struct HealthMetrics {
63    /// Total number of saves performed since startup.
64    pub save_count: u64,
65    /// Timestamp of last successful save (millis since epoch).
66    pub last_save_at: Option<u64>,
67    /// Duration of last save operation.
68    pub last_save_duration: Option<Duration>,
69    /// Whether there are unsaved changes.
70    pub dirty: bool,
71    /// Number of mutations since last save.
72    pub mutations_since_save: u64,
73    /// Total number of graph-native commits.
74    pub git_commit_count: u64,
75    /// Timestamp when engine started.
76    pub started_at: u64,
77    /// Number of items in the store.
78    pub item_count: usize,
79    /// Number of relations.
80    pub relation_count: usize,
81}
82
83impl HealthMetrics {
84    /// Uptime in seconds.
85    pub fn uptime_secs(&self) -> u64 {
86        let now = SystemTime::now()
87            .duration_since(UNIX_EPOCH)
88            .unwrap_or_default()
89            .as_millis() as u64;
90        (now.saturating_sub(self.started_at)) / 1000
91    }
92}
93
94/// The persistence engine — wraps KanbanStore + RelationsStore with
95/// dirty tracking, atomic saves (via arrow-graph-git), and graph-native commits.
96pub struct PersistenceEngine {
97    config: PersistenceConfig,
98    dirty: bool,
99    mutations_since_save: u64,
100    save_count: u64,
101    last_save_at: Option<u64>,
102    last_save_duration: Option<Duration>,
103    last_periodic_save: Instant,
104    git_commit_count: u64,
105    last_git_backup: Instant,
106    mutations_since_git_commit: u64,
107    started_at: u64,
108    commits_table: CommitsTable,
109    last_commit_id: Option<String>,
110}
111
112impl PersistenceEngine {
113    /// Create a new persistence engine with the given config.
114    pub fn new(config: PersistenceConfig) -> Self {
115        let now_ms = SystemTime::now()
116            .duration_since(UNIX_EPOCH)
117            .unwrap_or_default()
118            .as_millis() as u64;
119
120        PersistenceEngine {
121            config,
122            dirty: false,
123            mutations_since_save: 0,
124            save_count: 0,
125            last_save_at: None,
126            last_save_duration: None,
127            last_periodic_save: Instant::now(),
128            git_commit_count: 0,
129            last_git_backup: Instant::now(),
130            mutations_since_git_commit: 0,
131            started_at: now_ms,
132            commits_table: CommitsTable::new(),
133            last_commit_id: None,
134        }
135    }
136
137    /// Mark state as dirty (a mutation occurred).
138    pub fn mark_dirty(&mut self) {
139        self.dirty = true;
140        self.mutations_since_save += 1;
141        self.mutations_since_git_commit += 1;
142    }
143
144    /// Whether the engine has unsaved changes.
145    pub fn is_dirty(&self) -> bool {
146        self.dirty
147    }
148
149    /// Check if a periodic save is due.
150    pub fn periodic_save_due(&self) -> bool {
151        self.dirty && self.last_periodic_save.elapsed() >= self.config.save_interval
152    }
153
154    /// Check if a graph-native commit is due.
155    pub fn git_backup_due(&self) -> bool {
156        self.mutations_since_git_commit > 0
157            && self.last_git_backup.elapsed() >= self.config.commit_interval
158    }
159
160    /// Save the kanban state to Parquet atomically.
161    ///
162    /// Delegates to `persist::save_all()` which uses `arrow-graph-git::save_named_batches()`
163    /// for crash-safe atomic writes with WAL protection.
164    pub fn save(&mut self, store: &KanbanStore, relations: &RelationsStore) -> Result<SaveMetrics> {
165        if !self.dirty {
166            return Ok(SaveMetrics {
167                skipped: true,
168                ..Default::default()
169            });
170        }
171
172        let start = Instant::now();
173
174        // Atomic Parquet save via arrow-graph-git
175        persist::save_all(&self.config.root, store, relations)?;
176
177        let duration = start.elapsed();
178        let now = self.now_ms();
179
180        self.dirty = false;
181        self.mutations_since_save = 0;
182        self.save_count += 1;
183        self.last_save_at = Some(now);
184        self.last_save_duration = Some(duration);
185        self.last_periodic_save = Instant::now();
186
187        Ok(SaveMetrics {
188            skipped: false,
189            duration,
190            items_saved: store.active_item_count(),
191            relations_saved: relations.active_count(),
192            timestamp_ms: now,
193        })
194    }
195
196    /// Create a graph-native commit for the audit trail.
197    ///
198    /// Replaces shell `git add` + `git commit` with a CommitsTable entry
199    /// persisted as JSON. History is queryable via `commits()`.
200    pub fn git_backup(&mut self, item_count: usize) -> Result<GitBackupMetrics> {
201        if self.mutations_since_git_commit == 0 {
202            return Ok(GitBackupMetrics {
203                skipped: true,
204                ..Default::default()
205            });
206        }
207
208        let msg = format!(
209            "kanban: auto-save ({} items, {} changes since last commit)",
210            item_count, self.mutations_since_git_commit
211        );
212
213        // Create graph-native commit (no shell git)
214        let commit = Commit {
215            commit_id: uuid::Uuid::new_v4().to_string(),
216            parent_ids: self.last_commit_id.clone().into_iter().collect(),
217            timestamp_ms: chrono::Utc::now().timestamp_millis(),
218            message: msg.clone(),
219            author: "nusy-kanban".to_string(),
220        };
221
222        self.last_commit_id = Some(commit.commit_id.clone());
223        self.commits_table.append(commit);
224
225        // Persist commit history as JSON
226        let data_dir = persist::data_dir(&self.config.root)?;
227        persist_commits(&self.commits_table, &data_dir)?;
228
229        self.git_commit_count += 1;
230        self.mutations_since_git_commit = 0;
231        self.last_git_backup = Instant::now();
232
233        Ok(GitBackupMetrics {
234            skipped: false,
235            message: msg,
236            commit_count: self.git_commit_count,
237        })
238    }
239
240    /// Load commit history from disk (call after startup).
241    pub fn load_commits(&mut self) -> Result<()> {
242        let data_dir = persist::data_dir(&self.config.root)?;
243        if let Some(table) = restore_commits(&data_dir)? {
244            if let Some(last) = table.all().last() {
245                self.last_commit_id = Some(last.commit_id.clone());
246            }
247            self.git_commit_count = table.len() as u64;
248            self.commits_table = table;
249        }
250        Ok(())
251    }
252
253    /// Get the commit history (graph-native audit trail).
254    pub fn commits(&self) -> &CommitsTable {
255        &self.commits_table
256    }
257
258    /// Check for and recover from an incomplete save (WAL present on startup).
259    ///
260    /// If a WAL file exists, the previous save was interrupted. The Parquet
261    /// files may be in an inconsistent state. Since we use atomic file
262    /// replacement (write .tmp then rename), the old files are still valid.
263    /// Remove the WAL and proceed — the old state is correct.
264    pub fn check_wal_recovery(root: &Path) -> Result<bool> {
265        let wal_path = persist::data_dir(root)?.join("_wal.json");
266        if wal_path.exists() {
267            // WAL exists = previous save was interrupted
268            // Old Parquet files are still valid (atomic rename guarantees this)
269            let _ = std::fs::remove_file(&wal_path);
270            Ok(true) // Recovery was needed
271        } else {
272            Ok(false) // Clean state
273        }
274    }
275
276    /// Graceful shutdown — save state before exit.
277    pub fn shutdown(&mut self, store: &KanbanStore, relations: &RelationsStore) -> Result<()> {
278        if self.dirty {
279            self.save(store, relations)?;
280        }
281        Ok(())
282    }
283
284    /// Get current health metrics.
285    pub fn health(&self, store: &KanbanStore, relations: &RelationsStore) -> HealthMetrics {
286        HealthMetrics {
287            save_count: self.save_count,
288            last_save_at: self.last_save_at,
289            last_save_duration: self.last_save_duration,
290            dirty: self.dirty,
291            mutations_since_save: self.mutations_since_save,
292            git_commit_count: self.git_commit_count,
293            started_at: self.started_at,
294            item_count: store.active_item_count(),
295            relation_count: relations.active_count(),
296        }
297    }
298
299    fn now_ms(&self) -> u64 {
300        SystemTime::now()
301            .duration_since(UNIX_EPOCH)
302            .unwrap_or_default()
303            .as_millis() as u64
304    }
305}
306
307/// Metrics from a save operation.
308#[derive(Debug, Default)]
309pub struct SaveMetrics {
310    /// Whether the save was skipped (not dirty).
311    pub skipped: bool,
312    /// Duration of the save operation.
313    pub duration: Duration,
314    /// Number of items saved.
315    pub items_saved: usize,
316    /// Number of relations saved.
317    pub relations_saved: usize,
318    /// Timestamp of the save (millis since epoch).
319    pub timestamp_ms: u64,
320}
321
322/// Metrics from a graph-native commit operation.
323#[derive(Debug, Default)]
324pub struct GitBackupMetrics {
325    /// Whether the backup was skipped.
326    pub skipped: bool,
327    /// Commit message used.
328    pub message: String,
329    /// Total graph-native commits since startup.
330    pub commit_count: u64,
331}
332
333#[cfg(test)]
334mod tests {
335    use super::*;
336    use crate::crud::CreateItemInput;
337    use crate::item_type::ItemType;
338
339    fn test_config(root: &Path) -> PersistenceConfig {
340        PersistenceConfig {
341            root: root.to_path_buf(),
342            save_interval: Duration::from_millis(100),
343            commit_interval: Duration::from_secs(300),
344            save_on_mutation: true,
345        }
346    }
347
348    fn create_test_store() -> (KanbanStore, RelationsStore) {
349        let mut store = KanbanStore::new();
350        store
351            .create_item(&CreateItemInput {
352                title: "Test Item".into(),
353                item_type: ItemType::Expedition,
354                priority: Some("high".into()),
355                assignee: None,
356                tags: vec!["v14".into()],
357                related: vec![],
358                depends_on: vec![],
359                body: None,
360            })
361            .expect("create item");
362        (store, RelationsStore::new())
363    }
364
365    #[test]
366    fn test_new_engine_is_clean() {
367        let dir = tempfile::tempdir().expect("tempdir");
368        let engine = PersistenceEngine::new(test_config(dir.path()));
369        assert!(!engine.is_dirty());
370        assert!(!engine.periodic_save_due());
371        assert!(!engine.git_backup_due());
372    }
373
374    #[test]
375    fn test_mark_dirty() {
376        let dir = tempfile::tempdir().expect("tempdir");
377        let mut engine = PersistenceEngine::new(test_config(dir.path()));
378        engine.mark_dirty();
379        assert!(engine.is_dirty());
380        assert_eq!(engine.mutations_since_save, 1);
381    }
382
383    #[test]
384    fn test_save_clears_dirty() {
385        let dir = tempfile::tempdir().expect("tempdir");
386        let mut engine = PersistenceEngine::new(test_config(dir.path()));
387        let (store, rels) = create_test_store();
388
389        engine.mark_dirty();
390        assert!(engine.is_dirty());
391
392        let metrics = engine.save(&store, &rels).expect("save");
393        assert!(!metrics.skipped);
394        assert!(!engine.is_dirty());
395        assert_eq!(engine.save_count, 1);
396        assert!(engine.last_save_at.is_some());
397    }
398
399    #[test]
400    fn test_save_skips_when_clean() {
401        let dir = tempfile::tempdir().expect("tempdir");
402        let mut engine = PersistenceEngine::new(test_config(dir.path()));
403        let (store, rels) = create_test_store();
404
405        // Save without marking dirty — should skip
406        let metrics = engine.save(&store, &rels).expect("save");
407        assert!(metrics.skipped);
408        assert_eq!(engine.save_count, 0);
409    }
410
411    #[test]
412    fn test_save_creates_parquet_files() {
413        let dir = tempfile::tempdir().expect("tempdir");
414        let mut engine = PersistenceEngine::new(test_config(dir.path()));
415        let (store, rels) = create_test_store();
416
417        engine.mark_dirty();
418        engine.save(&store, &rels).expect("save");
419
420        // Parquet files should exist
421        let data_dir = dir.path().join(".nusy-kanban");
422        assert!(data_dir.join("items.parquet").exists());
423        // WAL should not exist after successful save
424        assert!(!data_dir.join("_wal.json").exists());
425    }
426
427    #[test]
428    fn test_wal_recovery_clean() {
429        let dir = tempfile::tempdir().expect("tempdir");
430        // Create data dir so check_wal_recovery can find it
431        std::fs::create_dir_all(dir.path().join(".nusy-kanban")).expect("mkdir");
432        let recovered = PersistenceEngine::check_wal_recovery(dir.path()).expect("check");
433        assert!(!recovered);
434    }
435
436    #[test]
437    fn test_wal_recovery_with_wal() {
438        let dir = tempfile::tempdir().expect("tempdir");
439        let data_dir = dir.path().join(".nusy-kanban");
440        std::fs::create_dir_all(&data_dir).expect("mkdir");
441        std::fs::write(data_dir.join("_wal.json"), r#"["items","runs"]"#).expect("write");
442
443        let recovered = PersistenceEngine::check_wal_recovery(dir.path()).expect("check");
444        assert!(recovered);
445
446        // WAL should be cleaned up
447        assert!(!data_dir.join("_wal.json").exists());
448    }
449
450    #[test]
451    fn test_save_round_trip() {
452        let dir = tempfile::tempdir().expect("tempdir");
453        let mut engine = PersistenceEngine::new(test_config(dir.path()));
454        let (store, rels) = create_test_store();
455
456        engine.mark_dirty();
457        engine.save(&store, &rels).expect("save");
458
459        // Load and verify
460        let loaded = persist::load_store(dir.path()).expect("load");
461        assert_eq!(loaded.active_item_count(), 1);
462    }
463
464    #[test]
465    fn test_periodic_save_due() {
466        let dir = tempfile::tempdir().expect("tempdir");
467        let mut engine = PersistenceEngine::new(PersistenceConfig {
468            root: dir.path().to_path_buf(),
469            save_interval: Duration::from_millis(1), // Very short for testing
470            ..Default::default()
471        });
472
473        engine.mark_dirty();
474        // Wait for interval to elapse
475        std::thread::sleep(Duration::from_millis(5));
476        assert!(engine.periodic_save_due());
477    }
478
479    #[test]
480    fn test_health_metrics() {
481        let dir = tempfile::tempdir().expect("tempdir");
482        let mut engine = PersistenceEngine::new(test_config(dir.path()));
483        let (store, rels) = create_test_store();
484
485        engine.mark_dirty();
486        engine.mark_dirty();
487        engine.save(&store, &rels).expect("save");
488        engine.mark_dirty();
489
490        let health = engine.health(&store, &rels);
491        assert_eq!(health.save_count, 1);
492        assert!(health.last_save_at.is_some());
493        assert!(health.dirty);
494        assert_eq!(health.mutations_since_save, 1);
495        assert_eq!(health.item_count, 1);
496        assert!(health.uptime_secs() < 5); // Test runs fast
497    }
498
499    #[test]
500    fn test_shutdown_saves_dirty() {
501        let dir = tempfile::tempdir().expect("tempdir");
502        let mut engine = PersistenceEngine::new(test_config(dir.path()));
503        let (store, rels) = create_test_store();
504
505        engine.mark_dirty();
506        engine.shutdown(&store, &rels).expect("shutdown");
507
508        assert!(!engine.is_dirty());
509        assert_eq!(engine.save_count, 1);
510
511        // Verify data persisted
512        let loaded = persist::load_store(dir.path()).expect("load");
513        assert_eq!(loaded.active_item_count(), 1);
514    }
515
516    #[test]
517    fn test_shutdown_skips_when_clean() {
518        let dir = tempfile::tempdir().expect("tempdir");
519        let mut engine = PersistenceEngine::new(test_config(dir.path()));
520        let (store, rels) = create_test_store();
521
522        engine.shutdown(&store, &rels).expect("shutdown");
523        assert_eq!(engine.save_count, 0);
524    }
525
526    #[test]
527    fn test_multiple_save_cycles() {
528        let dir = tempfile::tempdir().expect("tempdir");
529        let mut engine = PersistenceEngine::new(test_config(dir.path()));
530        let (mut store, rels) = create_test_store();
531
532        // Cycle 1
533        engine.mark_dirty();
534        engine.save(&store, &rels).expect("save 1");
535        assert_eq!(engine.save_count, 1);
536
537        // Add more items
538        store
539            .create_item(&CreateItemInput {
540                title: "Second Item".into(),
541                item_type: ItemType::Chore,
542                priority: None,
543                assignee: None,
544                tags: vec![],
545                related: vec![],
546                depends_on: vec![],
547                body: None,
548            })
549            .expect("create");
550
551        // Cycle 2
552        engine.mark_dirty();
553        engine.save(&store, &rels).expect("save 2");
554        assert_eq!(engine.save_count, 2);
555
556        // Verify latest state
557        let loaded = persist::load_store(dir.path()).expect("load");
558        assert_eq!(loaded.active_item_count(), 2);
559    }
560
561    #[test]
562    fn test_crash_recovery_simulation() {
563        let dir = tempfile::tempdir().expect("tempdir");
564        let mut engine = PersistenceEngine::new(test_config(dir.path()));
565        let (store, rels) = create_test_store();
566
567        // Save successfully first
568        engine.mark_dirty();
569        engine.save(&store, &rels).expect("save");
570
571        // Simulate crash: create WAL file (as if save was interrupted)
572        let data_dir = dir.path().join(".nusy-kanban");
573        std::fs::write(
574            data_dir.join("_wal.json"),
575            r#"["items","runs","relations"]"#,
576        )
577        .expect("write wal");
578
579        // On "restart": check for WAL recovery
580        let recovered = PersistenceEngine::check_wal_recovery(dir.path()).expect("recovery");
581        assert!(recovered);
582
583        // Data should still be intact from the last successful save
584        let loaded = persist::load_store(dir.path()).expect("load after crash");
585        assert_eq!(loaded.active_item_count(), 1);
586    }
587
588    #[test]
589    fn test_git_backup_creates_commit() {
590        let dir = tempfile::tempdir().expect("tempdir");
591        let mut engine = PersistenceEngine::new(test_config(dir.path()));
592
593        engine.mark_dirty();
594        let metrics = engine.git_backup(5).expect("git backup");
595
596        assert!(!metrics.skipped);
597        assert_eq!(metrics.commit_count, 1);
598        assert!(metrics.message.contains("5 items"));
599
600        // CommitsTable should have one entry
601        assert_eq!(engine.commits().len(), 1);
602        assert!(engine.last_commit_id.is_some());
603    }
604
605    #[test]
606    fn test_git_backup_skips_when_clean() {
607        let dir = tempfile::tempdir().expect("tempdir");
608        let mut engine = PersistenceEngine::new(test_config(dir.path()));
609
610        let metrics = engine.git_backup(0).expect("git backup");
611        assert!(metrics.skipped);
612        assert_eq!(engine.commits().len(), 0);
613    }
614
615    #[test]
616    fn test_git_backup_chain() {
617        let dir = tempfile::tempdir().expect("tempdir");
618        let mut engine = PersistenceEngine::new(test_config(dir.path()));
619
620        // First commit
621        engine.mark_dirty();
622        engine.git_backup(3).expect("backup 1");
623
624        // Second commit (should have parent)
625        engine.mark_dirty();
626        engine.git_backup(5).expect("backup 2");
627
628        assert_eq!(engine.commits().len(), 2);
629        let commits = engine.commits().all();
630        assert!(commits[0].parent_ids.is_empty()); // First has no parent
631        assert_eq!(commits[1].parent_ids.len(), 1); // Second has parent
632        assert_eq!(commits[1].parent_ids[0], commits[0].commit_id);
633    }
634
635    #[test]
636    fn test_load_commits_on_restart() {
637        let dir = tempfile::tempdir().expect("tempdir");
638
639        // First session: create commits
640        {
641            let mut engine = PersistenceEngine::new(test_config(dir.path()));
642            engine.mark_dirty();
643            engine.git_backup(3).expect("backup");
644            assert_eq!(engine.commits().len(), 1);
645        }
646
647        // Second session: load commits
648        {
649            let mut engine = PersistenceEngine::new(test_config(dir.path()));
650            engine.load_commits().expect("load commits");
651            assert_eq!(engine.commits().len(), 1);
652            assert!(engine.last_commit_id.is_some());
653        }
654    }
655
656    #[test]
657    fn test_git_backup_persists_to_disk() {
658        let dir = tempfile::tempdir().expect("tempdir");
659        let mut engine = PersistenceEngine::new(test_config(dir.path()));
660
661        engine.mark_dirty();
662        engine.git_backup(1).expect("backup");
663
664        // _commits.json should exist in data dir
665        let commits_path = dir.path().join(".nusy-kanban/_commits.json");
666        assert!(commits_path.exists());
667    }
668}