Skip to main content

nusy_arrow_git/
save.rs

1//! Save — persist current state without creating a commit (crash recovery).
2//!
3//! Distinct from Commit: Save is mechanical (for crash recovery),
4//! Commit is semantic (for versioning). Save uses atomic file replacement
5//! to prevent corruption from partial writes.
6
7use crate::commit::{Commit, CommitsTable};
8use crate::object_store::GitObjectStore;
9use crate::refs::RefsTable;
10use arrow::array::RecordBatch;
11use arrow::datatypes::Schema;
12use nusy_arrow_core::Namespace;
13use parquet::arrow::ArrowWriter;
14use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
15use parquet::basic::Compression;
16use parquet::file::properties::WriterProperties;
17use std::collections::HashSet;
18use std::fs;
19use std::path::Path;
20use std::sync::Arc;
21
22/// Errors from save/restore operations.
23#[derive(Debug, thiserror::Error)]
24pub enum SaveError {
25    #[error("IO error: {0}")]
26    Io(#[from] std::io::Error),
27
28    #[error("Parquet error: {0}")]
29    Parquet(#[from] parquet::errors::ParquetError),
30
31    #[error("Arrow error: {0}")]
32    Arrow(#[from] arrow::error::ArrowError),
33
34    #[error("Save point not found: {0}")]
35    NotFound(String),
36
37    #[error("Write-ahead log incomplete — previous save may be corrupt")]
38    IncompleteWal,
39}
40
41pub type Result<T> = std::result::Result<T, SaveError>;
42
43/// Save current store state to a directory using atomic file replacement.
44///
45/// Persists:
46/// - Graph data: namespace Parquet files
47/// - Commit history: `_commits.json`
48/// - Branch refs: `_refs.json`
49///
50/// A write-ahead log (`_wal.json`) tracks the operation for crash recovery.
51pub fn save(obj_store: &GitObjectStore, save_dir: &Path) -> Result<()> {
52    save_full(obj_store, None, None, save_dir)
53}
54
55/// Save with commit history and refs.
56pub fn save_full(
57    obj_store: &GitObjectStore,
58    commits_table: Option<&CommitsTable>,
59    refs_table: Option<&RefsTable>,
60    save_dir: &Path,
61) -> Result<()> {
62    fs::create_dir_all(save_dir)?;
63
64    // Write WAL marker — lists namespaces being saved
65    let wal_path = save_dir.join("_wal.json");
66    let namespaces_with_data: Vec<String> = Namespace::ALL
67        .iter()
68        .filter(|ns| !obj_store.store.get_namespace_batches(**ns).is_empty())
69        .map(|ns| ns.as_str().to_string())
70        .collect();
71
72    fs::write(&wal_path, serde_json_minimal(&namespaces_with_data))?;
73
74    // Write each namespace atomically
75    for ns in Namespace::ALL {
76        let batches = obj_store.store.get_namespace_batches(ns);
77        let target = save_dir.join(format!("{}.parquet", ns.as_str()));
78
79        if batches.is_empty() {
80            // Remove stale file if namespace is now empty
81            let _ = fs::remove_file(&target);
82            continue;
83        }
84
85        let tmp_path = save_dir.join(format!("{}.parquet.tmp", ns.as_str()));
86        let schema = obj_store.store.schema().clone();
87        let file = fs::File::create(&tmp_path)?;
88        let mut writer = ArrowWriter::try_new(file, schema, None)?;
89
90        for batch in batches {
91            writer.write(batch)?;
92        }
93        writer.close()?;
94
95        // Atomic rename (POSIX guarantees)
96        fs::rename(&tmp_path, &target)?;
97    }
98
99    // Persist CommitsTable as JSON
100    if let Some(ct) = commits_table {
101        let commits_json = serialize_commits(ct);
102        let tmp = save_dir.join("_commits.json.tmp");
103        fs::write(&tmp, &commits_json)?;
104        fs::rename(&tmp, save_dir.join("_commits.json"))?;
105    }
106
107    // Persist RefsTable as JSON
108    if let Some(rt) = refs_table {
109        let refs_json = serialize_refs(rt);
110        let tmp = save_dir.join("_refs.json.tmp");
111        fs::write(&tmp, &refs_json)?;
112        fs::rename(&tmp, save_dir.join("_refs.json"))?;
113    }
114
115    // Remove WAL — save complete
116    let _ = fs::remove_file(&wal_path);
117
118    Ok(())
119}
120
121/// Restore store state from a save point (graph data only).
122pub fn restore(obj_store: &mut GitObjectStore, save_dir: &Path) -> Result<()> {
123    let (_, _) = restore_full(obj_store, save_dir)?;
124    Ok(())
125}
126
127/// Restore store state including commit history and refs.
128///
129/// Returns (CommitsTable, RefsTable) if they were persisted.
130pub fn restore_full(
131    obj_store: &mut GitObjectStore,
132    save_dir: &Path,
133) -> Result<(Option<CommitsTable>, Option<RefsTable>)> {
134    if !save_dir.exists() {
135        return Err(SaveError::NotFound(save_dir.display().to_string()));
136    }
137
138    // Check for incomplete WAL (crash during previous save)
139    let wal_path = save_dir.join("_wal.json");
140    if wal_path.exists() {
141        // WAL exists = previous save was interrupted.
142        // The .parquet files may be a mix of old and new.
143        // Conservative: return error so caller can decide.
144        // In practice, the old .parquet files are still valid
145        // (atomic rename means either old or new, not partial).
146        // So we can proceed — the WAL just means some namespaces
147        // may have old data. Remove WAL and continue.
148        let _ = fs::remove_file(&wal_path);
149    }
150
151    obj_store.store.clear();
152
153    for ns in Namespace::ALL {
154        let path = save_dir.join(format!("{}.parquet", ns.as_str()));
155        if !path.exists() {
156            continue;
157        }
158
159        let file = fs::File::open(&path)?;
160        let reader = ParquetRecordBatchReaderBuilder::try_new(file)?.build()?;
161
162        let mut batches = Vec::new();
163        for batch_result in reader {
164            batches.push(batch_result?);
165        }
166
167        obj_store.store.set_namespace_batches(ns, batches);
168    }
169
170    // Restore CommitsTable
171    let commits = {
172        let path = save_dir.join("_commits.json");
173        if path.exists() {
174            let data = fs::read_to_string(&path)?;
175            Some(deserialize_commits(&data))
176        } else {
177            None
178        }
179    };
180
181    // Restore RefsTable
182    let refs = {
183        let path = save_dir.join("_refs.json");
184        if path.exists() {
185            let data = fs::read_to_string(&path)?;
186            Some(deserialize_refs(&data))
187        } else {
188            None
189        }
190    };
191
192    Ok((commits, refs))
193}
194
195/// Metrics collected during a save operation.
196#[derive(Debug, Clone)]
197pub struct SaveMetrics {
198    /// Which namespaces were written to disk.
199    pub namespaces_saved: Vec<String>,
200    /// Total bytes written across all Parquet files.
201    pub bytes_written: u64,
202    /// Duration of the save in milliseconds.
203    pub duration_ms: u128,
204    /// Whether zstd compression was used.
205    pub compressed: bool,
206}
207
208/// Options for customizing save behavior.
209#[derive(Debug, Clone, Default)]
210pub struct SaveOptions {
211    /// Use zstd compression for Parquet files.
212    pub compress: bool,
213    /// Only save namespaces in this set (incremental save).
214    /// If `None`, save all namespaces with data.
215    pub dirty_namespaces: Option<HashSet<Namespace>>,
216}
217
218/// Save with options, returning metrics about the operation.
219///
220/// Supports incremental saves (only dirty namespaces) and zstd compression.
221pub fn save_with_options(
222    obj_store: &GitObjectStore,
223    commits_table: Option<&CommitsTable>,
224    refs_table: Option<&RefsTable>,
225    save_dir: &Path,
226    options: &SaveOptions,
227) -> Result<SaveMetrics> {
228    let start = std::time::Instant::now();
229    fs::create_dir_all(save_dir)?;
230
231    // Determine which namespaces to save
232    let namespaces_to_save: Vec<Namespace> = Namespace::ALL
233        .iter()
234        .filter(|ns| {
235            // Skip empty namespaces
236            if obj_store.store.get_namespace_batches(**ns).is_empty() {
237                return false;
238            }
239            // If dirty set is specified, only save dirty namespaces
240            if let Some(dirty) = &options.dirty_namespaces {
241                return dirty.contains(ns);
242            }
243            true
244        })
245        .copied()
246        .collect();
247
248    // Write WAL marker
249    let wal_path = save_dir.join("_wal.json");
250    let ns_names: Vec<String> = namespaces_to_save
251        .iter()
252        .map(|ns| ns.as_str().to_string())
253        .collect();
254    fs::write(&wal_path, serde_json_minimal(&ns_names))?;
255
256    // Build writer properties
257    let props = if options.compress {
258        WriterProperties::builder()
259            .set_compression(Compression::ZSTD(Default::default()))
260            .build()
261    } else {
262        WriterProperties::builder().build()
263    };
264
265    let mut total_bytes = 0u64;
266    let mut saved_ns_names = Vec::new();
267
268    // Write each namespace atomically
269    for ns in &namespaces_to_save {
270        let batches = obj_store.store.get_namespace_batches(*ns);
271        let target = save_dir.join(format!("{}.parquet", ns.as_str()));
272
273        if batches.is_empty() {
274            let _ = fs::remove_file(&target);
275            continue;
276        }
277
278        let tmp_path = save_dir.join(format!("{}.parquet.tmp", ns.as_str()));
279        let schema = obj_store.store.schema().clone();
280        let file = fs::File::create(&tmp_path)?;
281        let mut writer = ArrowWriter::try_new(file, schema, Some(props.clone()))?;
282
283        for batch in batches {
284            writer.write(batch)?;
285        }
286        writer.close()?;
287
288        let file_size = fs::metadata(&tmp_path)?.len();
289        total_bytes += file_size;
290        saved_ns_names.push(ns.as_str().to_string());
291
292        fs::rename(&tmp_path, &target)?;
293    }
294
295    // Persist CommitsTable
296    if let Some(ct) = commits_table {
297        let commits_json = serialize_commits(ct);
298        let tmp = save_dir.join("_commits.json.tmp");
299        fs::write(&tmp, &commits_json)?;
300        fs::rename(&tmp, save_dir.join("_commits.json"))?;
301    }
302
303    // Persist RefsTable
304    if let Some(rt) = refs_table {
305        let refs_json = serialize_refs(rt);
306        let tmp = save_dir.join("_refs.json.tmp");
307        fs::write(&tmp, &refs_json)?;
308        fs::rename(&tmp, save_dir.join("_refs.json"))?;
309    }
310
311    // Remove WAL — save complete
312    let _ = fs::remove_file(&wal_path);
313
314    Ok(SaveMetrics {
315        namespaces_saved: saved_ns_names,
316        bytes_written: total_bytes,
317        duration_ms: start.elapsed().as_millis(),
318        compressed: options.compress,
319    })
320}
321
322// --- Generic save/restore for arbitrary RecordBatch data ---
323
324/// Save arbitrary named RecordBatch collections with WAL + atomic write.
325///
326/// Each entry is `(name, batches, schema)`. Creates Parquet files
327/// like `{save_dir}/{name}.parquet` with atomic tmp+rename.
328///
329/// This is the generic version of [`save()`] — it works with any Arrow data,
330/// not just `GitObjectStore` namespaces. Used by nusy-kanban to persist
331/// kanban items, runs, and relations through the same crash-safe pattern.
332pub fn save_named_batches(
333    entries: &[(&str, &[RecordBatch], &Schema)],
334    save_dir: &Path,
335) -> Result<SaveMetrics> {
336    let start = std::time::Instant::now();
337    fs::create_dir_all(save_dir)?;
338
339    // Write WAL marker — lists names being saved
340    let wal_path = save_dir.join("_wal.json");
341    let names: Vec<String> = entries
342        .iter()
343        .map(|(name, _, _)| name.to_string())
344        .collect();
345    fs::write(&wal_path, serde_json_minimal(&names))?;
346
347    let mut total_bytes = 0u64;
348    let mut saved_names = Vec::new();
349
350    for (name, batches, schema) in entries {
351        let target = save_dir.join(format!("{name}.parquet"));
352
353        if batches.is_empty() {
354            // Remove stale file if data is now empty
355            let _ = fs::remove_file(&target);
356            continue;
357        }
358
359        let tmp_path = save_dir.join(format!("{name}.parquet.tmp"));
360        let schema_ref = Arc::new((*schema).clone());
361        let file = fs::File::create(&tmp_path)?;
362        let mut writer = ArrowWriter::try_new(file, schema_ref, None)?;
363
364        for batch in *batches {
365            writer.write(batch)?;
366        }
367        writer.close()?;
368
369        let file_size = fs::metadata(&tmp_path)?.len();
370        total_bytes += file_size;
371        saved_names.push(name.to_string());
372
373        // Atomic replacement — old file intact if crash occurs before this line
374        fs::rename(&tmp_path, &target)?;
375    }
376
377    // Remove WAL — save complete
378    let _ = fs::remove_file(&wal_path);
379
380    Ok(SaveMetrics {
381        namespaces_saved: saved_names,
382        bytes_written: total_bytes,
383        duration_ms: start.elapsed().as_millis(),
384        compressed: false,
385    })
386}
387
388/// Restore named RecordBatch collections from a save directory.
389///
390/// Returns a vector of `(name, batches)` for each name found on disk.
391/// Missing files are silently skipped (returns empty vec for first-run case).
392pub fn restore_named_batches(
393    save_dir: &Path,
394    names: &[&str],
395) -> Result<Vec<(String, Vec<RecordBatch>)>> {
396    if !save_dir.exists() {
397        return Err(SaveError::NotFound(save_dir.display().to_string()));
398    }
399
400    // Check for incomplete WAL (crash during previous save)
401    let wal_path = save_dir.join("_wal.json");
402    if wal_path.exists() {
403        // WAL exists = previous save was interrupted.
404        // Atomic rename means files are either old or new, not partial.
405        // Safe to proceed — remove WAL and continue.
406        let _ = fs::remove_file(&wal_path);
407    }
408
409    let mut results = Vec::new();
410
411    for name in names {
412        let path = save_dir.join(format!("{name}.parquet"));
413        if !path.exists() {
414            continue;
415        }
416
417        let file = fs::File::open(&path)?;
418        let reader = ParquetRecordBatchReaderBuilder::try_new(file)?.build()?;
419
420        let mut batches = Vec::new();
421        for batch_result in reader {
422            batches.push(batch_result?);
423        }
424
425        results.push((name.to_string(), batches));
426    }
427
428    Ok(results)
429}
430
431/// Persist a CommitsTable as JSON to a directory.
432///
433/// Uses atomic tmp+rename to prevent corruption.
434pub fn persist_commits(table: &CommitsTable, dir: &Path) -> Result<()> {
435    fs::create_dir_all(dir)?;
436    let json = serialize_commits(table);
437    let tmp = dir.join("_commits.json.tmp");
438    fs::write(&tmp, &json)?;
439    fs::rename(&tmp, dir.join("_commits.json"))?;
440    Ok(())
441}
442
443/// Restore a CommitsTable from JSON in a directory.
444///
445/// Returns `None` if no commits file exists (first run).
446pub fn restore_commits(dir: &Path) -> Result<Option<CommitsTable>> {
447    let path = dir.join("_commits.json");
448    if !path.exists() {
449        return Ok(None);
450    }
451    let data = fs::read_to_string(&path)?;
452    Ok(Some(deserialize_commits(&data)))
453}
454
455// --- Minimal JSON serialization (no serde dependency) ---
456
457/// Minimal JSON serialization for the WAL (avoid serde dependency).
458fn serde_json_minimal(items: &[String]) -> String {
459    let inner: Vec<String> = items.iter().map(|s| format!("\"{}\"", s)).collect();
460    format!("[{}]", inner.join(","))
461}
462
463/// Escape a string for JSON output.
464fn json_escape(s: &str) -> String {
465    s.replace('\\', "\\\\")
466        .replace('"', "\\\"")
467        .replace('\n', "\\n")
468        .replace('\r', "\\r")
469        .replace('\t', "\\t")
470}
471
472/// Serialize CommitsTable to JSON (one commit per line, array format).
473pub(crate) fn serialize_commits(table: &CommitsTable) -> String {
474    let mut lines = Vec::new();
475    for c in table.all() {
476        let parents: Vec<String> = c
477            .parent_ids
478            .iter()
479            .map(|p| format!("\"{}\"", json_escape(p)))
480            .collect();
481        lines.push(format!(
482            "{{\"id\":\"{}\",\"parents\":[{}],\"ts\":{},\"msg\":\"{}\",\"author\":\"{}\"}}",
483            json_escape(&c.commit_id),
484            parents.join(","),
485            c.timestamp_ms,
486            json_escape(&c.message),
487            json_escape(&c.author),
488        ));
489    }
490    format!("[{}]", lines.join(",\n"))
491}
492
493/// Deserialize CommitsTable from JSON.
494pub(crate) fn deserialize_commits(json: &str) -> CommitsTable {
495    let mut table = CommitsTable::new();
496    // Simple parser: extract objects between { }
497    for obj in extract_json_objects(json) {
498        let id = extract_json_string(&obj, "id").unwrap_or_default();
499        let msg = extract_json_string(&obj, "msg").unwrap_or_default();
500        let author = extract_json_string(&obj, "author").unwrap_or_default();
501        let ts = extract_json_number(&obj, "ts").unwrap_or(0);
502        let parents = extract_json_string_array(&obj, "parents");
503
504        table.append(Commit {
505            commit_id: id,
506            parent_ids: parents,
507            timestamp_ms: ts,
508            message: msg,
509            author,
510        });
511    }
512    table
513}
514
515/// Serialize RefsTable to JSON.
516pub(crate) fn serialize_refs(table: &RefsTable) -> String {
517    let mut lines = Vec::new();
518    for r in table.branches() {
519        lines.push(format!(
520            "{{\"name\":\"{}\",\"commit\":\"{}\",\"type\":\"{}\",\"head\":{},\"created\":{}}}",
521            json_escape(&r.ref_name),
522            json_escape(&r.commit_id),
523            r.ref_type.as_str(),
524            r.is_head,
525            r.created_at_ms,
526        ));
527    }
528    format!("[{}]", lines.join(",\n"))
529}
530
531/// Deserialize RefsTable from JSON.
532pub(crate) fn deserialize_refs(json: &str) -> RefsTable {
533    let mut table = RefsTable::new();
534    for obj in extract_json_objects(json) {
535        let name = extract_json_string(&obj, "name").unwrap_or_default();
536        let commit = extract_json_string(&obj, "commit").unwrap_or_default();
537        let is_head = obj.contains("\"head\":true");
538
539        // Use init_main for first head branch, create_branch for others
540        if table.head().is_none() && is_head {
541            table.init_main(&commit);
542            // Fix the name if it's not "main"
543            if name != "main" {
544                // Re-create: clear and rebuild
545                let _ = table.update_ref("main", &commit);
546            }
547        } else {
548            let _ = table.create_branch(&name, &commit);
549            if is_head {
550                let _ = table.switch_head(&name);
551            }
552        }
553    }
554    table
555}
556
557// --- Minimal JSON parsing helpers (no serde dependency) ---
558
559/// Extract JSON objects (top-level array of {...}).
560fn extract_json_objects(json: &str) -> Vec<String> {
561    let mut objects = Vec::new();
562    let mut depth = 0;
563    let mut start = None;
564    for (i, ch) in json.char_indices() {
565        match ch {
566            '{' => {
567                if depth == 0 {
568                    start = Some(i);
569                }
570                depth += 1;
571            }
572            '}' => {
573                depth -= 1;
574                if depth == 0 {
575                    if let Some(s) = start {
576                        objects.push(json[s..=i].to_string());
577                    }
578                    start = None;
579                }
580            }
581            _ => {}
582        }
583    }
584    objects
585}
586
587/// Extract a string value for a key from a JSON object string.
588fn extract_json_string(obj: &str, key: &str) -> Option<String> {
589    let pattern = format!("\"{}\":\"", key);
590    let start = obj.find(&pattern)? + pattern.len();
591    let rest = &obj[start..];
592    // Find unescaped closing quote
593    let mut end = 0;
594    let mut escaped = false;
595    for ch in rest.chars() {
596        if escaped {
597            escaped = false;
598        } else if ch == '\\' {
599            escaped = true;
600        } else if ch == '"' {
601            break;
602        }
603        end += ch.len_utf8();
604    }
605    Some(
606        rest[..end]
607            .replace("\\\"", "\"")
608            .replace("\\\\", "\\")
609            .replace("\\n", "\n"),
610    )
611}
612
613/// Extract a number value for a key from a JSON object string.
614fn extract_json_number(obj: &str, key: &str) -> Option<i64> {
615    let pattern = format!("\"{}\":", key);
616    let start = obj.find(&pattern)? + pattern.len();
617    let rest = obj[start..].trim_start();
618    let end = rest
619        .find(|c: char| !c.is_ascii_digit() && c != '-')
620        .unwrap_or(rest.len());
621    rest[..end].parse().ok()
622}
623
624/// Extract a string array value for a key from a JSON object string.
625fn extract_json_string_array(obj: &str, key: &str) -> Vec<String> {
626    let pattern = format!("\"{}\":[", key);
627    let Some(start) = obj.find(&pattern) else {
628        return Vec::new();
629    };
630    let start = start + pattern.len();
631    let rest = &obj[start..];
632    let end = rest.find(']').unwrap_or(rest.len());
633    let inner = &rest[..end];
634
635    let mut result = Vec::new();
636    for part in inner.split(',') {
637        let trimmed = part.trim().trim_matches('"');
638        if !trimmed.is_empty() {
639            result.push(trimmed.to_string());
640        }
641    }
642    result
643}
644
645#[cfg(test)]
646mod tests {
647    use super::*;
648    use nusy_arrow_core::{Namespace, Triple, YLayer};
649
650    fn sample_triple(subj: &str) -> Triple {
651        Triple {
652            subject: subj.to_string(),
653            predicate: "rdf:type".to_string(),
654            object: "Thing".to_string(),
655            graph: None,
656            confidence: Some(0.9),
657            source_document: None,
658            source_chunk_id: None,
659            extracted_by: None,
660            caused_by: None,
661            derived_from: None,
662            consolidated_at: None,
663            certifiability_class: None,
664        }
665    }
666
667    #[test]
668    fn test_save_restore_roundtrip() {
669        let tmp = tempfile::tempdir().unwrap();
670        let save_dir = tmp.path().join("savepoint");
671        let mut obj = GitObjectStore::with_snapshot_dir(tmp.path().join("snapshots"));
672
673        // Add triples to multiple namespaces
674        for i in 0..50 {
675            obj.store
676                .add_triple(
677                    &sample_triple(&format!("world-{i}")),
678                    Namespace::World,
679                    YLayer::Semantic,
680                )
681                .unwrap();
682        }
683        for i in 0..30 {
684            obj.store
685                .add_triple(
686                    &sample_triple(&format!("work-{i}")),
687                    Namespace::Work,
688                    YLayer::Procedural,
689                )
690                .unwrap();
691        }
692
693        assert_eq!(obj.store.len(), 80);
694
695        // Save
696        save(&obj, &save_dir).unwrap();
697
698        // Verify files exist
699        assert!(save_dir.join("world.parquet").exists());
700        assert!(save_dir.join("work.parquet").exists());
701        assert!(!save_dir.join("research.parquet").exists()); // No data
702        assert!(!save_dir.join("_wal.json").exists()); // WAL cleaned up
703
704        // Clear and restore
705        obj.store.clear();
706        assert_eq!(obj.store.len(), 0);
707
708        restore(&mut obj, &save_dir).unwrap();
709        assert_eq!(obj.store.len(), 80);
710    }
711
712    #[test]
713    fn test_restore_nonexistent_fails() {
714        let tmp = tempfile::tempdir().unwrap();
715        let mut obj = GitObjectStore::with_snapshot_dir(tmp.path());
716        let result = restore(&mut obj, &tmp.path().join("nonexistent"));
717        assert!(result.is_err());
718    }
719
720    #[test]
721    fn test_save_atomic_no_partial_files() {
722        let tmp = tempfile::tempdir().unwrap();
723        let save_dir = tmp.path().join("savepoint");
724        let obj = GitObjectStore::with_snapshot_dir(tmp.path().join("snapshots"));
725
726        // Save empty store — should succeed, no .parquet files
727        save(&obj, &save_dir).unwrap();
728        assert!(!save_dir.join("world.parquet").exists());
729    }
730
731    #[test]
732    fn test_simulated_crash_recovery() {
733        let tmp = tempfile::tempdir().unwrap();
734        let save_dir = tmp.path().join("savepoint");
735        let mut obj = GitObjectStore::with_snapshot_dir(tmp.path().join("snapshots"));
736
737        // First save with data
738        obj.store
739            .add_triple(&sample_triple("s1"), Namespace::World, YLayer::Semantic)
740            .unwrap();
741        save(&obj, &save_dir).unwrap();
742
743        // Simulate crash: write a WAL file as if a save was interrupted
744        fs::write(save_dir.join("_wal.json"), "[\"world\"]").unwrap();
745
746        // Restore should still work (WAL is cleaned up, old .parquet is valid)
747        obj.store.clear();
748        restore(&mut obj, &save_dir).unwrap();
749        assert_eq!(obj.store.len(), 1);
750    }
751
752    #[test]
753    fn test_concurrent_reads_during_save() {
754        let tmp = tempfile::tempdir().unwrap();
755        let save_dir = tmp.path().join("savepoint");
756        let mut obj = GitObjectStore::with_snapshot_dir(tmp.path().join("snapshots"));
757
758        // Add data and save
759        for i in 0..100 {
760            obj.store
761                .add_triple(
762                    &sample_triple(&format!("s{i}")),
763                    Namespace::World,
764                    YLayer::Semantic,
765                )
766                .unwrap();
767        }
768        save(&obj, &save_dir).unwrap();
769
770        // Read while "saving" (verify store is still usable)
771        assert_eq!(obj.store.len(), 100);
772
773        // Save again (overwrite) — should not corrupt
774        save(&obj, &save_dir).unwrap();
775
776        // Restore and verify
777        obj.store.clear();
778        restore(&mut obj, &save_dir).unwrap();
779        assert_eq!(obj.store.len(), 100);
780    }
781
782    #[test]
783    fn test_save_full_persists_commits_and_refs() {
784        let tmp = tempfile::tempdir().unwrap();
785        let save_dir = tmp.path().join("savepoint");
786        let mut obj = GitObjectStore::with_snapshot_dir(tmp.path().join("snapshots"));
787
788        // Add data
789        obj.store
790            .add_triple(&sample_triple("s1"), Namespace::World, YLayer::Semantic)
791            .unwrap();
792
793        // Create commits and refs
794        let mut commits = crate::commit::CommitsTable::new();
795        let c1 = crate::commit::create_commit(&obj, &mut commits, vec![], "init", "DGX").unwrap();
796
797        let mut refs = crate::refs::RefsTable::new();
798        refs.init_main(&c1.commit_id);
799        refs.create_branch("feature", &c1.commit_id).unwrap();
800
801        // Save everything
802        save_full(&obj, Some(&commits), Some(&refs), &save_dir).unwrap();
803
804        // Verify files exist
805        assert!(save_dir.join("_commits.json").exists());
806        assert!(save_dir.join("_refs.json").exists());
807
808        // Restore into new store
809        let mut obj2 = GitObjectStore::with_snapshot_dir(tmp.path().join("snapshots2"));
810        let (restored_commits, restored_refs) = restore_full(&mut obj2, &save_dir).unwrap();
811
812        // Verify commits restored
813        let rc = restored_commits.unwrap();
814        assert_eq!(rc.len(), 1);
815        assert_eq!(rc.get(&c1.commit_id).unwrap().message, "init");
816
817        // Verify refs restored
818        let rr = restored_refs.unwrap();
819        assert_eq!(rr.branches().len(), 2);
820        assert!(rr.head().is_some());
821
822        // Verify graph data restored
823        assert_eq!(obj2.store.len(), 1);
824    }
825
826    #[test]
827    fn test_save_with_zstd_compression() {
828        let tmp = tempfile::tempdir().unwrap();
829        let save_dir = tmp.path().join("compressed");
830        let uncompressed_dir = tmp.path().join("uncompressed");
831        let mut obj = GitObjectStore::with_snapshot_dir(tmp.path().join("snap"));
832
833        // Add 1K triples
834        for i in 0..1000 {
835            obj.store
836                .add_triple(
837                    &sample_triple(&format!("entity-{}", i)),
838                    Namespace::World,
839                    YLayer::Semantic,
840                )
841                .unwrap();
842        }
843
844        // Save without compression
845        let metrics_plain = save_with_options(
846            &obj,
847            None,
848            None,
849            &uncompressed_dir,
850            &SaveOptions {
851                compress: false,
852                dirty_namespaces: None,
853            },
854        )
855        .unwrap();
856
857        // Save with compression
858        let metrics_zstd = save_with_options(
859            &obj,
860            None,
861            None,
862            &save_dir,
863            &SaveOptions {
864                compress: true,
865                dirty_namespaces: None,
866            },
867        )
868        .unwrap();
869
870        assert!(metrics_zstd.compressed);
871        assert!(!metrics_plain.compressed);
872
873        // Compressed should be smaller
874        assert!(
875            metrics_zstd.bytes_written < metrics_plain.bytes_written,
876            "Compressed ({}) should be smaller than uncompressed ({})",
877            metrics_zstd.bytes_written,
878            metrics_plain.bytes_written,
879        );
880
881        // Verify compressed file still restores correctly
882        obj.store.clear();
883        restore(&mut obj, &save_dir).unwrap();
884        assert_eq!(obj.store.len(), 1000);
885    }
886
887    #[test]
888    fn test_incremental_save_only_dirty_namespaces() {
889        let tmp = tempfile::tempdir().unwrap();
890        let save_dir = tmp.path().join("incremental");
891        let mut obj = GitObjectStore::with_snapshot_dir(tmp.path().join("snap"));
892
893        // Add data to all namespaces
894        for ns in Namespace::ALL {
895            for i in 0..100 {
896                obj.store
897                    .add_triple(
898                        &sample_triple(&format!("{}:{}", ns.as_str(), i)),
899                        ns,
900                        YLayer::Semantic,
901                    )
902                    .unwrap();
903            }
904        }
905
906        // Full save first
907        save(&obj, &save_dir).unwrap();
908
909        // Now do incremental save with only World dirty
910        let mut dirty = HashSet::new();
911        dirty.insert(Namespace::World);
912
913        let metrics = save_with_options(
914            &obj,
915            None,
916            None,
917            &save_dir,
918            &SaveOptions {
919                compress: false,
920                dirty_namespaces: Some(dirty),
921            },
922        )
923        .unwrap();
924
925        // Only 1 namespace should be saved
926        assert_eq!(metrics.namespaces_saved.len(), 1);
927        assert_eq!(metrics.namespaces_saved[0], "world");
928
929        // All data should still restore (other files untouched on disk)
930        obj.store.clear();
931        restore(&mut obj, &save_dir).unwrap();
932        assert_eq!(obj.store.len(), Namespace::ALL.len() * 100);
933    }
934
935    #[test]
936    fn test_save_metrics_populated() {
937        let tmp = tempfile::tempdir().unwrap();
938        let save_dir = tmp.path().join("metrics");
939        let mut obj = GitObjectStore::with_snapshot_dir(tmp.path().join("snap"));
940
941        for i in 0..200 {
942            obj.store
943                .add_triple(
944                    &sample_triple(&format!("entity-{}", i)),
945                    Namespace::World,
946                    YLayer::Semantic,
947                )
948                .unwrap();
949        }
950        for i in 0..100 {
951            obj.store
952                .add_triple(
953                    &sample_triple(&format!("work-{}", i)),
954                    Namespace::Work,
955                    YLayer::Experience,
956                )
957                .unwrap();
958        }
959
960        let metrics =
961            save_with_options(&obj, None, None, &save_dir, &SaveOptions::default()).unwrap();
962
963        assert_eq!(metrics.namespaces_saved.len(), 2);
964        assert!(metrics.namespaces_saved.contains(&"world".to_string()));
965        assert!(metrics.namespaces_saved.contains(&"work".to_string()));
966        assert!(metrics.bytes_written > 0);
967        assert!(!metrics.compressed);
968    }
969
970    // --- Tests for generic save/restore ---
971
972    fn kanban_schema() -> Schema {
973        use arrow::datatypes::{DataType, Field};
974        Schema::new(vec![
975            Field::new("id", DataType::Utf8, false),
976            Field::new("title", DataType::Utf8, false),
977            Field::new("status", DataType::Utf8, false),
978        ])
979    }
980
981    fn kanban_batch(ids: &[&str], titles: &[&str], statuses: &[&str]) -> RecordBatch {
982        use arrow::array::StringArray;
983        RecordBatch::try_new(
984            Arc::new(kanban_schema()),
985            vec![
986                Arc::new(StringArray::from(ids.to_vec())),
987                Arc::new(StringArray::from(titles.to_vec())),
988                Arc::new(StringArray::from(statuses.to_vec())),
989            ],
990        )
991        .unwrap()
992    }
993
994    #[test]
995    fn test_save_named_batches_roundtrip() {
996        let tmp = tempfile::tempdir().unwrap();
997        let save_dir = tmp.path().join("kanban");
998
999        let batch = kanban_batch(
1000            &["EXP-1", "EXP-2"],
1001            &["First", "Second"],
1002            &["backlog", "in_progress"],
1003        );
1004        let schema = kanban_schema();
1005
1006        let metrics =
1007            save_named_batches(&[("items", &[batch.clone()], &schema)], &save_dir).unwrap();
1008
1009        assert_eq!(metrics.namespaces_saved, vec!["items"]);
1010        assert!(metrics.bytes_written > 0);
1011        assert!(!metrics.compressed);
1012        assert!(!save_dir.join("_wal.json").exists());
1013
1014        // Restore and verify
1015        let results = restore_named_batches(&save_dir, &["items"]).unwrap();
1016        assert_eq!(results.len(), 1);
1017        assert_eq!(results[0].0, "items");
1018        assert_eq!(results[0].1[0].num_rows(), 2);
1019    }
1020
1021    #[test]
1022    fn test_save_named_batches_multiple_datasets() {
1023        let tmp = tempfile::tempdir().unwrap();
1024        let save_dir = tmp.path().join("multi");
1025
1026        let items = kanban_batch(&["EXP-1"], &["Expedition"], &["backlog"]);
1027        let runs = kanban_batch(&["RUN-1"], &["Status Change"], &["done"]);
1028        let schema = kanban_schema();
1029
1030        save_named_batches(
1031            &[("items", &[items], &schema), ("runs", &[runs], &schema)],
1032            &save_dir,
1033        )
1034        .unwrap();
1035
1036        assert!(save_dir.join("items.parquet").exists());
1037        assert!(save_dir.join("runs.parquet").exists());
1038
1039        let results = restore_named_batches(&save_dir, &["items", "runs"]).unwrap();
1040        assert_eq!(results.len(), 2);
1041    }
1042
1043    #[test]
1044    fn test_save_named_batches_empty_skipped() {
1045        let tmp = tempfile::tempdir().unwrap();
1046        let save_dir = tmp.path().join("empty");
1047
1048        let schema = kanban_schema();
1049        let metrics =
1050            save_named_batches(&[("items", &[] as &[RecordBatch], &schema)], &save_dir).unwrap();
1051
1052        // Empty batches should not create a file
1053        assert!(metrics.namespaces_saved.is_empty());
1054        assert!(!save_dir.join("items.parquet").exists());
1055    }
1056
1057    #[test]
1058    fn test_restore_named_batches_missing_files_skipped() {
1059        let tmp = tempfile::tempdir().unwrap();
1060        let save_dir = tmp.path().join("partial");
1061        fs::create_dir_all(&save_dir).unwrap();
1062
1063        // Only save "items", then try to restore "items" and "runs"
1064        let batch = kanban_batch(&["EXP-1"], &["Test"], &["backlog"]);
1065        save_named_batches(&[("items", &[batch], &kanban_schema())], &save_dir).unwrap();
1066
1067        let results = restore_named_batches(&save_dir, &["items", "runs"]).unwrap();
1068        assert_eq!(results.len(), 1); // Only "items" found
1069        assert_eq!(results[0].0, "items");
1070    }
1071
1072    #[test]
1073    fn test_restore_named_batches_nonexistent_dir() {
1074        let tmp = tempfile::tempdir().unwrap();
1075        let result = restore_named_batches(&tmp.path().join("nonexistent"), &["items"]);
1076        assert!(result.is_err());
1077    }
1078
1079    #[test]
1080    fn test_save_named_batches_wal_cleanup() {
1081        let tmp = tempfile::tempdir().unwrap();
1082        let save_dir = tmp.path().join("wal_test");
1083
1084        let batch = kanban_batch(&["EXP-1"], &["Test"], &["backlog"]);
1085        save_named_batches(&[("items", &[batch], &kanban_schema())], &save_dir).unwrap();
1086
1087        // WAL should not exist after successful save
1088        assert!(!save_dir.join("_wal.json").exists());
1089    }
1090
1091    #[test]
1092    fn test_save_named_batches_crash_recovery() {
1093        let tmp = tempfile::tempdir().unwrap();
1094        let save_dir = tmp.path().join("crash");
1095
1096        // Save data
1097        let batch = kanban_batch(&["EXP-1"], &["Test"], &["backlog"]);
1098        save_named_batches(&[("items", &[batch], &kanban_schema())], &save_dir).unwrap();
1099
1100        // Simulate crash: write WAL as if save was interrupted
1101        fs::write(save_dir.join("_wal.json"), "[\"items\"]").unwrap();
1102
1103        // Restore should still work — WAL is cleaned up, old .parquet is valid
1104        let results = restore_named_batches(&save_dir, &["items"]).unwrap();
1105        assert_eq!(results.len(), 1);
1106        assert_eq!(results[0].1[0].num_rows(), 1);
1107        assert!(!save_dir.join("_wal.json").exists());
1108    }
1109
1110    #[test]
1111    fn test_persist_commits_roundtrip() {
1112        let tmp = tempfile::tempdir().unwrap();
1113        let dir = tmp.path();
1114
1115        let mut table = CommitsTable::new();
1116        table.append(Commit {
1117            commit_id: "c1".to_string(),
1118            parent_ids: vec![],
1119            timestamp_ms: 1000,
1120            message: "first save".to_string(),
1121            author: "nusy-kanban".to_string(),
1122        });
1123        table.append(Commit {
1124            commit_id: "c2".to_string(),
1125            parent_ids: vec!["c1".to_string()],
1126            timestamp_ms: 2000,
1127            message: "second save".to_string(),
1128            author: "nusy-kanban".to_string(),
1129        });
1130
1131        persist_commits(&table, dir).unwrap();
1132        assert!(dir.join("_commits.json").exists());
1133
1134        let restored = restore_commits(dir).unwrap().unwrap();
1135        assert_eq!(restored.len(), 2);
1136        assert_eq!(restored.get("c1").unwrap().message, "first save");
1137        assert_eq!(restored.get("c2").unwrap().parent_ids, vec!["c1"]);
1138    }
1139
1140    #[test]
1141    fn test_restore_commits_empty_dir() {
1142        let tmp = tempfile::tempdir().unwrap();
1143        let result = restore_commits(tmp.path()).unwrap();
1144        assert!(result.is_none());
1145    }
1146}