1use crate::crud::KanbanStore;
14use crate::persist;
15use crate::relations::RelationsStore;
16use arrow_graph_git::commit::{Commit, CommitsTable};
17use arrow_graph_git::save::{persist_commits, restore_commits};
18use std::path::{Path, PathBuf};
19use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
20
21#[derive(Debug, thiserror::Error)]
23pub enum PersistenceError {
24 #[error("Persist error: {0}")]
25 Persist(#[from] persist::PersistError),
26
27 #[error("IO error: {0}")]
28 Io(#[from] std::io::Error),
29
30 #[error("Save error: {0}")]
31 Save(#[from] arrow_graph_git::save::SaveError),
32}
33
34pub type Result<T> = std::result::Result<T, PersistenceError>;
35
36#[derive(Debug, Clone)]
38pub struct PersistenceConfig {
39 pub root: PathBuf,
41 pub save_interval: Duration,
43 pub commit_interval: Duration,
45 pub save_on_mutation: bool,
47}
48
49impl Default for PersistenceConfig {
50 fn default() -> Self {
51 PersistenceConfig {
52 root: PathBuf::from("."),
53 save_interval: Duration::from_secs(30),
54 commit_interval: Duration::from_secs(300),
55 save_on_mutation: true,
56 }
57 }
58}
59
60#[derive(Debug, Clone)]
62pub struct HealthMetrics {
63 pub save_count: u64,
65 pub last_save_at: Option<u64>,
67 pub last_save_duration: Option<Duration>,
69 pub dirty: bool,
71 pub mutations_since_save: u64,
73 pub git_commit_count: u64,
75 pub started_at: u64,
77 pub item_count: usize,
79 pub relation_count: usize,
81}
82
83impl HealthMetrics {
84 pub fn uptime_secs(&self) -> u64 {
86 let now = SystemTime::now()
87 .duration_since(UNIX_EPOCH)
88 .unwrap_or_default()
89 .as_millis() as u64;
90 (now.saturating_sub(self.started_at)) / 1000
91 }
92}
93
94pub struct PersistenceEngine {
97 config: PersistenceConfig,
98 dirty: bool,
99 mutations_since_save: u64,
100 save_count: u64,
101 last_save_at: Option<u64>,
102 last_save_duration: Option<Duration>,
103 last_periodic_save: Instant,
104 git_commit_count: u64,
105 last_git_backup: Instant,
106 mutations_since_git_commit: u64,
107 started_at: u64,
108 commits_table: CommitsTable,
109 last_commit_id: Option<String>,
110}
111
112impl PersistenceEngine {
113 pub fn new(config: PersistenceConfig) -> Self {
115 let now_ms = SystemTime::now()
116 .duration_since(UNIX_EPOCH)
117 .unwrap_or_default()
118 .as_millis() as u64;
119
120 PersistenceEngine {
121 config,
122 dirty: false,
123 mutations_since_save: 0,
124 save_count: 0,
125 last_save_at: None,
126 last_save_duration: None,
127 last_periodic_save: Instant::now(),
128 git_commit_count: 0,
129 last_git_backup: Instant::now(),
130 mutations_since_git_commit: 0,
131 started_at: now_ms,
132 commits_table: CommitsTable::new(),
133 last_commit_id: None,
134 }
135 }
136
137 pub fn mark_dirty(&mut self) {
139 self.dirty = true;
140 self.mutations_since_save += 1;
141 self.mutations_since_git_commit += 1;
142 }
143
144 pub fn is_dirty(&self) -> bool {
146 self.dirty
147 }
148
149 pub fn periodic_save_due(&self) -> bool {
151 self.dirty && self.last_periodic_save.elapsed() >= self.config.save_interval
152 }
153
154 pub fn git_backup_due(&self) -> bool {
156 self.mutations_since_git_commit > 0
157 && self.last_git_backup.elapsed() >= self.config.commit_interval
158 }
159
160 pub fn save(&mut self, store: &KanbanStore, relations: &RelationsStore) -> Result<SaveMetrics> {
165 if !self.dirty {
166 return Ok(SaveMetrics {
167 skipped: true,
168 ..Default::default()
169 });
170 }
171
172 let start = Instant::now();
173
174 persist::save_all(&self.config.root, store, relations)?;
176
177 let duration = start.elapsed();
178 let now = self.now_ms();
179
180 self.dirty = false;
181 self.mutations_since_save = 0;
182 self.save_count += 1;
183 self.last_save_at = Some(now);
184 self.last_save_duration = Some(duration);
185 self.last_periodic_save = Instant::now();
186
187 Ok(SaveMetrics {
188 skipped: false,
189 duration,
190 items_saved: store.active_item_count(),
191 relations_saved: relations.active_count(),
192 timestamp_ms: now,
193 })
194 }
195
196 pub fn git_backup(&mut self, item_count: usize) -> Result<GitBackupMetrics> {
201 if self.mutations_since_git_commit == 0 {
202 return Ok(GitBackupMetrics {
203 skipped: true,
204 ..Default::default()
205 });
206 }
207
208 let msg = format!(
209 "kanban: auto-save ({} items, {} changes since last commit)",
210 item_count, self.mutations_since_git_commit
211 );
212
213 let commit = Commit {
215 commit_id: uuid::Uuid::new_v4().to_string(),
216 parent_ids: self.last_commit_id.clone().into_iter().collect(),
217 timestamp_ms: chrono::Utc::now().timestamp_millis(),
218 message: msg.clone(),
219 author: "nusy-kanban".to_string(),
220 };
221
222 self.last_commit_id = Some(commit.commit_id.clone());
223 self.commits_table.append(commit);
224
225 let data_dir = persist::data_dir(&self.config.root)?;
227 persist_commits(&self.commits_table, &data_dir)?;
228
229 self.git_commit_count += 1;
230 self.mutations_since_git_commit = 0;
231 self.last_git_backup = Instant::now();
232
233 Ok(GitBackupMetrics {
234 skipped: false,
235 message: msg,
236 commit_count: self.git_commit_count,
237 })
238 }
239
240 pub fn load_commits(&mut self) -> Result<()> {
242 let data_dir = persist::data_dir(&self.config.root)?;
243 if let Some(table) = restore_commits(&data_dir)? {
244 if let Some(last) = table.all().last() {
245 self.last_commit_id = Some(last.commit_id.clone());
246 }
247 self.git_commit_count = table.len() as u64;
248 self.commits_table = table;
249 }
250 Ok(())
251 }
252
253 pub fn commits(&self) -> &CommitsTable {
255 &self.commits_table
256 }
257
258 pub fn check_wal_recovery(root: &Path) -> Result<bool> {
265 let wal_path = persist::data_dir(root)?.join("_wal.json");
266 if wal_path.exists() {
267 let _ = std::fs::remove_file(&wal_path);
270 Ok(true) } else {
272 Ok(false) }
274 }
275
276 pub fn shutdown(&mut self, store: &KanbanStore, relations: &RelationsStore) -> Result<()> {
278 if self.dirty {
279 self.save(store, relations)?;
280 }
281 Ok(())
282 }
283
284 pub fn health(&self, store: &KanbanStore, relations: &RelationsStore) -> HealthMetrics {
286 HealthMetrics {
287 save_count: self.save_count,
288 last_save_at: self.last_save_at,
289 last_save_duration: self.last_save_duration,
290 dirty: self.dirty,
291 mutations_since_save: self.mutations_since_save,
292 git_commit_count: self.git_commit_count,
293 started_at: self.started_at,
294 item_count: store.active_item_count(),
295 relation_count: relations.active_count(),
296 }
297 }
298
299 fn now_ms(&self) -> u64 {
300 SystemTime::now()
301 .duration_since(UNIX_EPOCH)
302 .unwrap_or_default()
303 .as_millis() as u64
304 }
305}
306
307#[derive(Debug, Default)]
309pub struct SaveMetrics {
310 pub skipped: bool,
312 pub duration: Duration,
314 pub items_saved: usize,
316 pub relations_saved: usize,
318 pub timestamp_ms: u64,
320}
321
322#[derive(Debug, Default)]
324pub struct GitBackupMetrics {
325 pub skipped: bool,
327 pub message: String,
329 pub commit_count: u64,
331}
332
333#[cfg(test)]
334mod tests {
335 use super::*;
336 use crate::crud::CreateItemInput;
337 use crate::item_type::ItemType;
338
339 fn test_config(root: &Path) -> PersistenceConfig {
340 PersistenceConfig {
341 root: root.to_path_buf(),
342 save_interval: Duration::from_millis(100),
343 commit_interval: Duration::from_secs(300),
344 save_on_mutation: true,
345 }
346 }
347
348 fn create_test_store() -> (KanbanStore, RelationsStore) {
349 let mut store = KanbanStore::new();
350 store
351 .create_item(&CreateItemInput {
352 title: "Test Item".into(),
353 item_type: ItemType::Expedition,
354 priority: Some("high".into()),
355 assignee: None,
356 tags: vec!["v14".into()],
357 related: vec![],
358 depends_on: vec![],
359 body: None,
360 })
361 .expect("create item");
362 (store, RelationsStore::new())
363 }
364
365 #[test]
366 fn test_new_engine_is_clean() {
367 let dir = tempfile::tempdir().expect("tempdir");
368 let engine = PersistenceEngine::new(test_config(dir.path()));
369 assert!(!engine.is_dirty());
370 assert!(!engine.periodic_save_due());
371 assert!(!engine.git_backup_due());
372 }
373
374 #[test]
375 fn test_mark_dirty() {
376 let dir = tempfile::tempdir().expect("tempdir");
377 let mut engine = PersistenceEngine::new(test_config(dir.path()));
378 engine.mark_dirty();
379 assert!(engine.is_dirty());
380 assert_eq!(engine.mutations_since_save, 1);
381 }
382
383 #[test]
384 fn test_save_clears_dirty() {
385 let dir = tempfile::tempdir().expect("tempdir");
386 let mut engine = PersistenceEngine::new(test_config(dir.path()));
387 let (store, rels) = create_test_store();
388
389 engine.mark_dirty();
390 assert!(engine.is_dirty());
391
392 let metrics = engine.save(&store, &rels).expect("save");
393 assert!(!metrics.skipped);
394 assert!(!engine.is_dirty());
395 assert_eq!(engine.save_count, 1);
396 assert!(engine.last_save_at.is_some());
397 }
398
399 #[test]
400 fn test_save_skips_when_clean() {
401 let dir = tempfile::tempdir().expect("tempdir");
402 let mut engine = PersistenceEngine::new(test_config(dir.path()));
403 let (store, rels) = create_test_store();
404
405 let metrics = engine.save(&store, &rels).expect("save");
407 assert!(metrics.skipped);
408 assert_eq!(engine.save_count, 0);
409 }
410
411 #[test]
412 fn test_save_creates_parquet_files() {
413 let dir = tempfile::tempdir().expect("tempdir");
414 let mut engine = PersistenceEngine::new(test_config(dir.path()));
415 let (store, rels) = create_test_store();
416
417 engine.mark_dirty();
418 engine.save(&store, &rels).expect("save");
419
420 let data_dir = dir.path().join(".nusy-kanban");
422 assert!(data_dir.join("items.parquet").exists());
423 assert!(!data_dir.join("_wal.json").exists());
425 }
426
427 #[test]
428 fn test_wal_recovery_clean() {
429 let dir = tempfile::tempdir().expect("tempdir");
430 std::fs::create_dir_all(dir.path().join(".nusy-kanban")).expect("mkdir");
432 let recovered = PersistenceEngine::check_wal_recovery(dir.path()).expect("check");
433 assert!(!recovered);
434 }
435
436 #[test]
437 fn test_wal_recovery_with_wal() {
438 let dir = tempfile::tempdir().expect("tempdir");
439 let data_dir = dir.path().join(".nusy-kanban");
440 std::fs::create_dir_all(&data_dir).expect("mkdir");
441 std::fs::write(data_dir.join("_wal.json"), r#"["items","runs"]"#).expect("write");
442
443 let recovered = PersistenceEngine::check_wal_recovery(dir.path()).expect("check");
444 assert!(recovered);
445
446 assert!(!data_dir.join("_wal.json").exists());
448 }
449
450 #[test]
451 fn test_save_round_trip() {
452 let dir = tempfile::tempdir().expect("tempdir");
453 let mut engine = PersistenceEngine::new(test_config(dir.path()));
454 let (store, rels) = create_test_store();
455
456 engine.mark_dirty();
457 engine.save(&store, &rels).expect("save");
458
459 let loaded = persist::load_store(dir.path()).expect("load");
461 assert_eq!(loaded.active_item_count(), 1);
462 }
463
464 #[test]
465 fn test_periodic_save_due() {
466 let dir = tempfile::tempdir().expect("tempdir");
467 let mut engine = PersistenceEngine::new(PersistenceConfig {
468 root: dir.path().to_path_buf(),
469 save_interval: Duration::from_millis(1), ..Default::default()
471 });
472
473 engine.mark_dirty();
474 std::thread::sleep(Duration::from_millis(5));
476 assert!(engine.periodic_save_due());
477 }
478
479 #[test]
480 fn test_health_metrics() {
481 let dir = tempfile::tempdir().expect("tempdir");
482 let mut engine = PersistenceEngine::new(test_config(dir.path()));
483 let (store, rels) = create_test_store();
484
485 engine.mark_dirty();
486 engine.mark_dirty();
487 engine.save(&store, &rels).expect("save");
488 engine.mark_dirty();
489
490 let health = engine.health(&store, &rels);
491 assert_eq!(health.save_count, 1);
492 assert!(health.last_save_at.is_some());
493 assert!(health.dirty);
494 assert_eq!(health.mutations_since_save, 1);
495 assert_eq!(health.item_count, 1);
496 assert!(health.uptime_secs() < 5); }
498
499 #[test]
500 fn test_shutdown_saves_dirty() {
501 let dir = tempfile::tempdir().expect("tempdir");
502 let mut engine = PersistenceEngine::new(test_config(dir.path()));
503 let (store, rels) = create_test_store();
504
505 engine.mark_dirty();
506 engine.shutdown(&store, &rels).expect("shutdown");
507
508 assert!(!engine.is_dirty());
509 assert_eq!(engine.save_count, 1);
510
511 let loaded = persist::load_store(dir.path()).expect("load");
513 assert_eq!(loaded.active_item_count(), 1);
514 }
515
516 #[test]
517 fn test_shutdown_skips_when_clean() {
518 let dir = tempfile::tempdir().expect("tempdir");
519 let mut engine = PersistenceEngine::new(test_config(dir.path()));
520 let (store, rels) = create_test_store();
521
522 engine.shutdown(&store, &rels).expect("shutdown");
523 assert_eq!(engine.save_count, 0);
524 }
525
526 #[test]
527 fn test_multiple_save_cycles() {
528 let dir = tempfile::tempdir().expect("tempdir");
529 let mut engine = PersistenceEngine::new(test_config(dir.path()));
530 let (mut store, rels) = create_test_store();
531
532 engine.mark_dirty();
534 engine.save(&store, &rels).expect("save 1");
535 assert_eq!(engine.save_count, 1);
536
537 store
539 .create_item(&CreateItemInput {
540 title: "Second Item".into(),
541 item_type: ItemType::Chore,
542 priority: None,
543 assignee: None,
544 tags: vec![],
545 related: vec![],
546 depends_on: vec![],
547 body: None,
548 })
549 .expect("create");
550
551 engine.mark_dirty();
553 engine.save(&store, &rels).expect("save 2");
554 assert_eq!(engine.save_count, 2);
555
556 let loaded = persist::load_store(dir.path()).expect("load");
558 assert_eq!(loaded.active_item_count(), 2);
559 }
560
561 #[test]
562 fn test_crash_recovery_simulation() {
563 let dir = tempfile::tempdir().expect("tempdir");
564 let mut engine = PersistenceEngine::new(test_config(dir.path()));
565 let (store, rels) = create_test_store();
566
567 engine.mark_dirty();
569 engine.save(&store, &rels).expect("save");
570
571 let data_dir = dir.path().join(".nusy-kanban");
573 std::fs::write(
574 data_dir.join("_wal.json"),
575 r#"["items","runs","relations"]"#,
576 )
577 .expect("write wal");
578
579 let recovered = PersistenceEngine::check_wal_recovery(dir.path()).expect("recovery");
581 assert!(recovered);
582
583 let loaded = persist::load_store(dir.path()).expect("load after crash");
585 assert_eq!(loaded.active_item_count(), 1);
586 }
587
588 #[test]
589 fn test_git_backup_creates_commit() {
590 let dir = tempfile::tempdir().expect("tempdir");
591 let mut engine = PersistenceEngine::new(test_config(dir.path()));
592
593 engine.mark_dirty();
594 let metrics = engine.git_backup(5).expect("git backup");
595
596 assert!(!metrics.skipped);
597 assert_eq!(metrics.commit_count, 1);
598 assert!(metrics.message.contains("5 items"));
599
600 assert_eq!(engine.commits().len(), 1);
602 assert!(engine.last_commit_id.is_some());
603 }
604
605 #[test]
606 fn test_git_backup_skips_when_clean() {
607 let dir = tempfile::tempdir().expect("tempdir");
608 let mut engine = PersistenceEngine::new(test_config(dir.path()));
609
610 let metrics = engine.git_backup(0).expect("git backup");
611 assert!(metrics.skipped);
612 assert_eq!(engine.commits().len(), 0);
613 }
614
615 #[test]
616 fn test_git_backup_chain() {
617 let dir = tempfile::tempdir().expect("tempdir");
618 let mut engine = PersistenceEngine::new(test_config(dir.path()));
619
620 engine.mark_dirty();
622 engine.git_backup(3).expect("backup 1");
623
624 engine.mark_dirty();
626 engine.git_backup(5).expect("backup 2");
627
628 assert_eq!(engine.commits().len(), 2);
629 let commits = engine.commits().all();
630 assert!(commits[0].parent_ids.is_empty()); assert_eq!(commits[1].parent_ids.len(), 1); assert_eq!(commits[1].parent_ids[0], commits[0].commit_id);
633 }
634
635 #[test]
636 fn test_load_commits_on_restart() {
637 let dir = tempfile::tempdir().expect("tempdir");
638
639 {
641 let mut engine = PersistenceEngine::new(test_config(dir.path()));
642 engine.mark_dirty();
643 engine.git_backup(3).expect("backup");
644 assert_eq!(engine.commits().len(), 1);
645 }
646
647 {
649 let mut engine = PersistenceEngine::new(test_config(dir.path()));
650 engine.load_commits().expect("load commits");
651 assert_eq!(engine.commits().len(), 1);
652 assert!(engine.last_commit_id.is_some());
653 }
654 }
655
656 #[test]
657 fn test_git_backup_persists_to_disk() {
658 let dir = tempfile::tempdir().expect("tempdir");
659 let mut engine = PersistenceEngine::new(test_config(dir.path()));
660
661 engine.mark_dirty();
662 engine.git_backup(1).expect("backup");
663
664 let commits_path = dir.path().join(".nusy-kanban/_commits.json");
666 assert!(commits_path.exists());
667 }
668}