Skip to main content

arrow_graph_git/
save.rs

1//! Save — persist current state without creating a commit (crash recovery).
2//!
3//! Distinct from Commit: Save is mechanical (for crash recovery),
4//! Commit is semantic (for versioning). Save uses atomic file replacement
5//! to prevent corruption from partial writes.
6
7use crate::commit::{Commit, CommitsTable};
8use crate::object_store::GitObjectStore;
9use crate::refs::RefsTable;
10use arrow::array::RecordBatch;
11use arrow::datatypes::Schema;
12use parquet::arrow::ArrowWriter;
13use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
14use parquet::basic::Compression;
15use parquet::file::properties::WriterProperties;
16use std::collections::HashSet;
17use std::fs;
18use std::path::Path;
19use std::sync::Arc;
20
21/// Errors from save/restore operations.
22#[derive(Debug, thiserror::Error)]
23pub enum SaveError {
24    #[error("IO error: {0}")]
25    Io(#[from] std::io::Error),
26
27    #[error("Parquet error: {0}")]
28    Parquet(#[from] parquet::errors::ParquetError),
29
30    #[error("Arrow error: {0}")]
31    Arrow(#[from] arrow::error::ArrowError),
32
33    #[error("Save point not found: {0}")]
34    NotFound(String),
35
36    #[error("Write-ahead log incomplete — previous save may be corrupt")]
37    IncompleteWal,
38}
39
40pub type Result<T> = std::result::Result<T, SaveError>;
41
42/// Save current store state to a directory using atomic file replacement.
43///
44/// Persists:
45/// - Graph data: namespace Parquet files
46/// - Commit history: `_commits.json`
47/// - Branch refs: `_refs.json`
48///
49/// A write-ahead log (`_wal.json`) tracks the operation for crash recovery.
50pub fn save(obj_store: &GitObjectStore, save_dir: &Path) -> Result<()> {
51    save_full(obj_store, None, None, save_dir)
52}
53
54/// Save with commit history and refs.
55pub fn save_full(
56    obj_store: &GitObjectStore,
57    commits_table: Option<&CommitsTable>,
58    refs_table: Option<&RefsTable>,
59    save_dir: &Path,
60) -> Result<()> {
61    fs::create_dir_all(save_dir)?;
62
63    // Write WAL marker — lists namespaces being saved
64    let wal_path = save_dir.join("_wal.json");
65    let namespaces_with_data: Vec<String> = obj_store
66        .store
67        .namespaces()
68        .iter()
69        .filter(|ns| !obj_store.store.get_namespace_batches(ns).is_empty())
70        .cloned()
71        .collect();
72
73    fs::write(&wal_path, serde_json_minimal(&namespaces_with_data))?;
74
75    // Write each namespace atomically
76    for ns in obj_store.store.namespaces() {
77        let batches = obj_store.store.get_namespace_batches(ns);
78        let target = save_dir.join(format!("{}.parquet", ns));
79
80        if batches.is_empty() {
81            // Remove stale file if namespace is now empty
82            let _ = fs::remove_file(&target);
83            continue;
84        }
85
86        let tmp_path = save_dir.join(format!("{}.parquet.tmp", ns));
87        let schema = obj_store.store.schema().clone();
88        let file = fs::File::create(&tmp_path)?;
89        let mut writer = ArrowWriter::try_new(file, schema, None)?;
90
91        for batch in batches {
92            writer.write(batch)?;
93        }
94        writer.close()?;
95
96        // Atomic rename (POSIX guarantees)
97        fs::rename(&tmp_path, &target)?;
98    }
99
100    // Persist CommitsTable as JSON
101    if let Some(ct) = commits_table {
102        let commits_json = serialize_commits(ct);
103        let tmp = save_dir.join("_commits.json.tmp");
104        fs::write(&tmp, &commits_json)?;
105        fs::rename(&tmp, save_dir.join("_commits.json"))?;
106    }
107
108    // Persist RefsTable as JSON
109    if let Some(rt) = refs_table {
110        let refs_json = serialize_refs(rt);
111        let tmp = save_dir.join("_refs.json.tmp");
112        fs::write(&tmp, &refs_json)?;
113        fs::rename(&tmp, save_dir.join("_refs.json"))?;
114    }
115
116    // Remove WAL — save complete
117    let _ = fs::remove_file(&wal_path);
118
119    Ok(())
120}
121
122/// Restore store state from a save point (graph data only).
123pub fn restore(obj_store: &mut GitObjectStore, save_dir: &Path) -> Result<()> {
124    let (_, _) = restore_full(obj_store, save_dir)?;
125    Ok(())
126}
127
128/// Restore store state including commit history and refs.
129///
130/// Returns (CommitsTable, RefsTable) if they were persisted.
131pub fn restore_full(
132    obj_store: &mut GitObjectStore,
133    save_dir: &Path,
134) -> Result<(Option<CommitsTable>, Option<RefsTable>)> {
135    if !save_dir.exists() {
136        return Err(SaveError::NotFound(save_dir.display().to_string()));
137    }
138
139    // Check for incomplete WAL (crash during previous save)
140    let wal_path = save_dir.join("_wal.json");
141    if wal_path.exists() {
142        // WAL exists = previous save was interrupted.
143        // The .parquet files may be a mix of old and new.
144        // Conservative: return error so caller can decide.
145        // In practice, the old .parquet files are still valid
146        // (atomic rename means either old or new, not partial).
147        // So we can proceed — the WAL just means some namespaces
148        // may have old data. Remove WAL and continue.
149        let _ = fs::remove_file(&wal_path);
150    }
151
152    obj_store.store.clear();
153
154    for ns in obj_store.store.namespaces().to_vec() {
155        let path = save_dir.join(format!("{}.parquet", ns));
156        if !path.exists() {
157            continue;
158        }
159
160        let file = fs::File::open(&path)?;
161        let reader = ParquetRecordBatchReaderBuilder::try_new(file)?.build()?;
162
163        let mut batches = Vec::new();
164        for batch_result in reader {
165            batches.push(batch_result?);
166        }
167
168        obj_store.store.set_namespace_batches(&ns, batches);
169    }
170
171    // Restore CommitsTable
172    let commits = {
173        let path = save_dir.join("_commits.json");
174        if path.exists() {
175            let data = fs::read_to_string(&path)?;
176            Some(deserialize_commits(&data))
177        } else {
178            None
179        }
180    };
181
182    // Restore RefsTable
183    let refs = {
184        let path = save_dir.join("_refs.json");
185        if path.exists() {
186            let data = fs::read_to_string(&path)?;
187            Some(deserialize_refs(&data))
188        } else {
189            None
190        }
191    };
192
193    Ok((commits, refs))
194}
195
196/// Metrics collected during a save operation.
197#[derive(Debug, Clone)]
198pub struct SaveMetrics {
199    /// Which namespaces were written to disk.
200    pub namespaces_saved: Vec<String>,
201    /// Total bytes written across all Parquet files.
202    pub bytes_written: u64,
203    /// Duration of the save in milliseconds.
204    pub duration_ms: u128,
205    /// Whether zstd compression was used.
206    pub compressed: bool,
207}
208
209/// Options for customizing save behavior.
210#[derive(Debug, Clone, Default)]
211pub struct SaveOptions {
212    /// Use zstd compression for Parquet files.
213    pub compress: bool,
214    /// Only save namespaces in this set (incremental save).
215    /// If `None`, save all namespaces with data.
216    pub dirty_namespaces: Option<HashSet<String>>,
217}
218
219/// Save with options, returning metrics about the operation.
220///
221/// Supports incremental saves (only dirty namespaces) and zstd compression.
222pub fn save_with_options(
223    obj_store: &GitObjectStore,
224    commits_table: Option<&CommitsTable>,
225    refs_table: Option<&RefsTable>,
226    save_dir: &Path,
227    options: &SaveOptions,
228) -> Result<SaveMetrics> {
229    let start = std::time::Instant::now();
230    fs::create_dir_all(save_dir)?;
231
232    // Determine which namespaces to save
233    let namespaces_to_save: Vec<String> = obj_store
234        .store
235        .namespaces()
236        .iter()
237        .filter(|ns| {
238            // Skip empty namespaces
239            if obj_store
240                .store
241                .get_namespace_batches(ns.as_str())
242                .is_empty()
243            {
244                return false;
245            }
246            // If dirty set is specified, only save dirty namespaces
247            if let Some(dirty) = &options.dirty_namespaces {
248                return dirty.contains(ns.as_str());
249            }
250            true
251        })
252        .cloned()
253        .collect();
254
255    // Write WAL marker
256    let wal_path = save_dir.join("_wal.json");
257    fs::write(&wal_path, serde_json_minimal(&namespaces_to_save))?;
258
259    // Build writer properties
260    let props = if options.compress {
261        WriterProperties::builder()
262            .set_compression(Compression::ZSTD(Default::default()))
263            .build()
264    } else {
265        WriterProperties::builder().build()
266    };
267
268    let mut total_bytes = 0u64;
269    let mut saved_ns_names = Vec::new();
270
271    // Write each namespace atomically
272    for ns in &namespaces_to_save {
273        let batches = obj_store.store.get_namespace_batches(ns);
274        let target = save_dir.join(format!("{}.parquet", ns));
275
276        if batches.is_empty() {
277            let _ = fs::remove_file(&target);
278            continue;
279        }
280
281        let tmp_path = save_dir.join(format!("{}.parquet.tmp", ns));
282        let schema = obj_store.store.schema().clone();
283        let file = fs::File::create(&tmp_path)?;
284        let mut writer = ArrowWriter::try_new(file, schema, Some(props.clone()))?;
285
286        for batch in batches {
287            writer.write(batch)?;
288        }
289        writer.close()?;
290
291        let file_size = fs::metadata(&tmp_path)?.len();
292        total_bytes += file_size;
293        saved_ns_names.push(ns.clone());
294
295        fs::rename(&tmp_path, &target)?;
296    }
297
298    // Persist CommitsTable
299    if let Some(ct) = commits_table {
300        let commits_json = serialize_commits(ct);
301        let tmp = save_dir.join("_commits.json.tmp");
302        fs::write(&tmp, &commits_json)?;
303        fs::rename(&tmp, save_dir.join("_commits.json"))?;
304    }
305
306    // Persist RefsTable
307    if let Some(rt) = refs_table {
308        let refs_json = serialize_refs(rt);
309        let tmp = save_dir.join("_refs.json.tmp");
310        fs::write(&tmp, &refs_json)?;
311        fs::rename(&tmp, save_dir.join("_refs.json"))?;
312    }
313
314    // Remove WAL — save complete
315    let _ = fs::remove_file(&wal_path);
316
317    Ok(SaveMetrics {
318        namespaces_saved: saved_ns_names,
319        bytes_written: total_bytes,
320        duration_ms: start.elapsed().as_millis(),
321        compressed: options.compress,
322    })
323}
324
325// --- Generic save/restore for arbitrary RecordBatch data ---
326
327/// Save arbitrary named RecordBatch collections with WAL + atomic write.
328///
329/// Each entry is `(name, batches, schema)`. Creates Parquet files
330/// like `{save_dir}/{name}.parquet` with atomic tmp+rename.
331///
332/// This is the generic version of [`save()`] — it works with any Arrow data,
333/// not just `GitObjectStore` namespaces.
334pub fn save_named_batches(
335    entries: &[(&str, &[RecordBatch], &Schema)],
336    save_dir: &Path,
337) -> Result<SaveMetrics> {
338    let start = std::time::Instant::now();
339    fs::create_dir_all(save_dir)?;
340
341    // Write WAL marker — lists names being saved
342    let wal_path = save_dir.join("_wal.json");
343    let names: Vec<String> = entries
344        .iter()
345        .map(|(name, _, _)| name.to_string())
346        .collect();
347    fs::write(&wal_path, serde_json_minimal(&names))?;
348
349    let mut total_bytes = 0u64;
350    let mut saved_names = Vec::new();
351
352    for (name, batches, schema) in entries {
353        let target = save_dir.join(format!("{name}.parquet"));
354
355        if batches.is_empty() {
356            // Remove stale file if data is now empty
357            let _ = fs::remove_file(&target);
358            continue;
359        }
360
361        let tmp_path = save_dir.join(format!("{name}.parquet.tmp"));
362        let schema_ref = Arc::new((*schema).clone());
363        let file = fs::File::create(&tmp_path)?;
364        let mut writer = ArrowWriter::try_new(file, schema_ref, None)?;
365
366        for batch in *batches {
367            writer.write(batch)?;
368        }
369        writer.close()?;
370
371        let file_size = fs::metadata(&tmp_path)?.len();
372        total_bytes += file_size;
373        saved_names.push(name.to_string());
374
375        // Atomic replacement — old file intact if crash occurs before this line
376        fs::rename(&tmp_path, &target)?;
377    }
378
379    // Remove WAL — save complete
380    let _ = fs::remove_file(&wal_path);
381
382    Ok(SaveMetrics {
383        namespaces_saved: saved_names,
384        bytes_written: total_bytes,
385        duration_ms: start.elapsed().as_millis(),
386        compressed: false,
387    })
388}
389
390/// Restore named RecordBatch collections from a save directory.
391///
392/// Returns a vector of `(name, batches)` for each name found on disk.
393/// Missing files are silently skipped (returns empty vec for first-run case).
394pub fn restore_named_batches(
395    save_dir: &Path,
396    names: &[&str],
397) -> Result<Vec<(String, Vec<RecordBatch>)>> {
398    if !save_dir.exists() {
399        return Err(SaveError::NotFound(save_dir.display().to_string()));
400    }
401
402    // Check for incomplete WAL (crash during previous save)
403    let wal_path = save_dir.join("_wal.json");
404    if wal_path.exists() {
405        // WAL exists = previous save was interrupted.
406        // Atomic rename means files are either old or new, not partial.
407        // Safe to proceed — remove WAL and continue.
408        let _ = fs::remove_file(&wal_path);
409    }
410
411    let mut results = Vec::new();
412
413    for name in names {
414        let path = save_dir.join(format!("{name}.parquet"));
415        if !path.exists() {
416            continue;
417        }
418
419        let file = fs::File::open(&path)?;
420        let reader = ParquetRecordBatchReaderBuilder::try_new(file)?.build()?;
421
422        let mut batches = Vec::new();
423        for batch_result in reader {
424            batches.push(batch_result?);
425        }
426
427        results.push((name.to_string(), batches));
428    }
429
430    Ok(results)
431}
432
433/// Persist a CommitsTable as JSON to a directory.
434///
435/// Uses atomic tmp+rename to prevent corruption.
436pub fn persist_commits(table: &CommitsTable, dir: &Path) -> Result<()> {
437    fs::create_dir_all(dir)?;
438    let json = serialize_commits(table);
439    let tmp = dir.join("_commits.json.tmp");
440    fs::write(&tmp, &json)?;
441    fs::rename(&tmp, dir.join("_commits.json"))?;
442    Ok(())
443}
444
445/// Restore a CommitsTable from JSON in a directory.
446///
447/// Returns `None` if no commits file exists (first run).
448pub fn restore_commits(dir: &Path) -> Result<Option<CommitsTable>> {
449    let path = dir.join("_commits.json");
450    if !path.exists() {
451        return Ok(None);
452    }
453    let data = fs::read_to_string(&path)?;
454    Ok(Some(deserialize_commits(&data)))
455}
456
457// --- Minimal JSON serialization (no serde dependency) ---
458
459/// Minimal JSON serialization for the WAL (avoid serde dependency).
460fn serde_json_minimal(items: &[String]) -> String {
461    let inner: Vec<String> = items.iter().map(|s| format!("\"{}\"", s)).collect();
462    format!("[{}]", inner.join(","))
463}
464
465/// Escape a string for JSON output.
466fn json_escape(s: &str) -> String {
467    s.replace('\\', "\\\\")
468        .replace('"', "\\\"")
469        .replace('\n', "\\n")
470        .replace('\r', "\\r")
471        .replace('\t', "\\t")
472}
473
474/// Serialize CommitsTable to JSON (one commit per line, array format).
475pub(crate) fn serialize_commits(table: &CommitsTable) -> String {
476    let mut lines = Vec::new();
477    for c in table.all() {
478        let parents: Vec<String> = c
479            .parent_ids
480            .iter()
481            .map(|p| format!("\"{}\"", json_escape(p)))
482            .collect();
483        lines.push(format!(
484            "{{\"id\":\"{}\",\"parents\":[{}],\"ts\":{},\"msg\":\"{}\",\"author\":\"{}\"}}",
485            json_escape(&c.commit_id),
486            parents.join(","),
487            c.timestamp_ms,
488            json_escape(&c.message),
489            json_escape(&c.author),
490        ));
491    }
492    format!("[{}]", lines.join(",\n"))
493}
494
495/// Deserialize CommitsTable from JSON.
496pub(crate) fn deserialize_commits(json: &str) -> CommitsTable {
497    let mut table = CommitsTable::new();
498    // Simple parser: extract objects between { }
499    for obj in extract_json_objects(json) {
500        let id = extract_json_string(&obj, "id").unwrap_or_default();
501        let msg = extract_json_string(&obj, "msg").unwrap_or_default();
502        let author = extract_json_string(&obj, "author").unwrap_or_default();
503        let ts = extract_json_number(&obj, "ts").unwrap_or(0);
504        let parents = extract_json_string_array(&obj, "parents");
505
506        table.append(Commit {
507            commit_id: id,
508            parent_ids: parents,
509            timestamp_ms: ts,
510            message: msg,
511            author,
512        });
513    }
514    table
515}
516
517/// Serialize RefsTable to JSON.
518pub(crate) fn serialize_refs(table: &RefsTable) -> String {
519    let mut lines = Vec::new();
520    for r in table.branches() {
521        lines.push(format!(
522            "{{\"name\":\"{}\",\"commit\":\"{}\",\"type\":\"{}\",\"head\":{},\"created\":{}}}",
523            json_escape(&r.ref_name),
524            json_escape(&r.commit_id),
525            r.ref_type.as_str(),
526            r.is_head,
527            r.created_at_ms,
528        ));
529    }
530    format!("[{}]", lines.join(",\n"))
531}
532
533/// Deserialize RefsTable from JSON.
534pub(crate) fn deserialize_refs(json: &str) -> RefsTable {
535    let mut table = RefsTable::new();
536    for obj in extract_json_objects(json) {
537        let name = extract_json_string(&obj, "name").unwrap_or_default();
538        let commit = extract_json_string(&obj, "commit").unwrap_or_default();
539        let is_head = obj.contains("\"head\":true");
540
541        // Use init_main for first head branch, create_branch for others
542        if table.head().is_none() && is_head {
543            table.init_main(&commit);
544            // Fix the name if it's not "main"
545            if name != "main" {
546                // Re-create: clear and rebuild
547                let _ = table.update_ref("main", &commit);
548            }
549        } else {
550            let _ = table.create_branch(&name, &commit);
551            if is_head {
552                let _ = table.switch_head(&name);
553            }
554        }
555    }
556    table
557}
558
559// --- Minimal JSON parsing helpers (no serde dependency) ---
560
561/// Extract JSON objects (top-level array of {...}).
562fn extract_json_objects(json: &str) -> Vec<String> {
563    let mut objects = Vec::new();
564    let mut depth = 0;
565    let mut start = None;
566    for (i, ch) in json.char_indices() {
567        match ch {
568            '{' => {
569                if depth == 0 {
570                    start = Some(i);
571                }
572                depth += 1;
573            }
574            '}' => {
575                depth -= 1;
576                if depth == 0 {
577                    if let Some(s) = start {
578                        objects.push(json[s..=i].to_string());
579                    }
580                    start = None;
581                }
582            }
583            _ => {}
584        }
585    }
586    objects
587}
588
589/// Extract a string value for a key from a JSON object string.
590fn extract_json_string(obj: &str, key: &str) -> Option<String> {
591    let pattern = format!("\"{}\":\"", key);
592    let start = obj.find(&pattern)? + pattern.len();
593    let rest = &obj[start..];
594    // Find unescaped closing quote
595    let mut end = 0;
596    let mut escaped = false;
597    for ch in rest.chars() {
598        if escaped {
599            escaped = false;
600        } else if ch == '\\' {
601            escaped = true;
602        } else if ch == '"' {
603            break;
604        }
605        end += ch.len_utf8();
606    }
607    Some(
608        rest[..end]
609            .replace("\\\"", "\"")
610            .replace("\\\\", "\\")
611            .replace("\\n", "\n"),
612    )
613}
614
615/// Extract a number value for a key from a JSON object string.
616fn extract_json_number(obj: &str, key: &str) -> Option<i64> {
617    let pattern = format!("\"{}\":", key);
618    let start = obj.find(&pattern)? + pattern.len();
619    let rest = obj[start..].trim_start();
620    let end = rest
621        .find(|c: char| !c.is_ascii_digit() && c != '-')
622        .unwrap_or(rest.len());
623    rest[..end].parse().ok()
624}
625
626/// Extract a string array value for a key from a JSON object string.
627fn extract_json_string_array(obj: &str, key: &str) -> Vec<String> {
628    let pattern = format!("\"{}\":[", key);
629    let Some(start) = obj.find(&pattern) else {
630        return Vec::new();
631    };
632    let start = start + pattern.len();
633    let rest = &obj[start..];
634    let end = rest.find(']').unwrap_or(rest.len());
635    let inner = &rest[..end];
636
637    let mut result = Vec::new();
638    for part in inner.split(',') {
639        let trimmed = part.trim().trim_matches('"');
640        if !trimmed.is_empty() {
641            result.push(trimmed.to_string());
642        }
643    }
644    result
645}
646
647#[cfg(test)]
648mod tests {
649    use super::*;
650    use arrow_graph_core::Triple;
651
652    fn sample_triple(subj: &str) -> Triple {
653        Triple {
654            subject: subj.to_string(),
655            predicate: "rdf:type".to_string(),
656            object: "Thing".to_string(),
657            graph: None,
658            confidence: Some(0.9),
659            source_document: None,
660            source_chunk_id: None,
661            extracted_by: None,
662            caused_by: None,
663            derived_from: None,
664            consolidated_at: None,
665        }
666    }
667
668    #[test]
669    fn test_save_restore_roundtrip() {
670        let tmp = tempfile::tempdir().unwrap();
671        let save_dir = tmp.path().join("savepoint");
672        let mut obj = GitObjectStore::with_snapshot_dir(tmp.path().join("snapshots"));
673
674        // Add triples to multiple namespaces
675        for i in 0..50 {
676            obj.store
677                .add_triple(&sample_triple(&format!("world-{i}")), "world", Some(1u8))
678                .unwrap();
679        }
680        for i in 0..30 {
681            obj.store
682                .add_triple(&sample_triple(&format!("work-{i}")), "work", Some(5u8))
683                .unwrap();
684        }
685
686        assert_eq!(obj.store.len(), 80);
687
688        // Save
689        save(&obj, &save_dir).unwrap();
690
691        // Verify files exist
692        assert!(save_dir.join("world.parquet").exists());
693        assert!(save_dir.join("work.parquet").exists());
694        assert!(!save_dir.join("research.parquet").exists()); // No data
695        assert!(!save_dir.join("_wal.json").exists()); // WAL cleaned up
696
697        // Clear and restore
698        obj.store.clear();
699        assert_eq!(obj.store.len(), 0);
700
701        restore(&mut obj, &save_dir).unwrap();
702        assert_eq!(obj.store.len(), 80);
703    }
704
705    #[test]
706    fn test_restore_nonexistent_fails() {
707        let tmp = tempfile::tempdir().unwrap();
708        let mut obj = GitObjectStore::with_snapshot_dir(tmp.path());
709        let result = restore(&mut obj, &tmp.path().join("nonexistent"));
710        assert!(result.is_err());
711    }
712
713    #[test]
714    fn test_save_atomic_no_partial_files() {
715        let tmp = tempfile::tempdir().unwrap();
716        let save_dir = tmp.path().join("savepoint");
717        let obj = GitObjectStore::with_snapshot_dir(tmp.path().join("snapshots"));
718
719        // Save empty store — should succeed, no .parquet files
720        save(&obj, &save_dir).unwrap();
721        assert!(!save_dir.join("world.parquet").exists());
722    }
723
724    #[test]
725    fn test_simulated_crash_recovery() {
726        let tmp = tempfile::tempdir().unwrap();
727        let save_dir = tmp.path().join("savepoint");
728        let mut obj = GitObjectStore::with_snapshot_dir(tmp.path().join("snapshots"));
729
730        // First save with data
731        obj.store
732            .add_triple(&sample_triple("s1"), "world", Some(1u8))
733            .unwrap();
734        save(&obj, &save_dir).unwrap();
735
736        // Simulate crash: write a WAL file as if a save was interrupted
737        fs::write(save_dir.join("_wal.json"), "[\"world\"]").unwrap();
738
739        // Restore should still work (WAL is cleaned up, old .parquet is valid)
740        obj.store.clear();
741        restore(&mut obj, &save_dir).unwrap();
742        assert_eq!(obj.store.len(), 1);
743    }
744
745    #[test]
746    fn test_concurrent_reads_during_save() {
747        let tmp = tempfile::tempdir().unwrap();
748        let save_dir = tmp.path().join("savepoint");
749        let mut obj = GitObjectStore::with_snapshot_dir(tmp.path().join("snapshots"));
750
751        // Add data and save
752        for i in 0..100 {
753            obj.store
754                .add_triple(&sample_triple(&format!("s{i}")), "world", Some(1u8))
755                .unwrap();
756        }
757        save(&obj, &save_dir).unwrap();
758
759        // Read while "saving" (verify store is still usable)
760        assert_eq!(obj.store.len(), 100);
761
762        // Save again (overwrite) — should not corrupt
763        save(&obj, &save_dir).unwrap();
764
765        // Restore and verify
766        obj.store.clear();
767        restore(&mut obj, &save_dir).unwrap();
768        assert_eq!(obj.store.len(), 100);
769    }
770
771    #[test]
772    fn test_save_full_persists_commits_and_refs() {
773        let tmp = tempfile::tempdir().unwrap();
774        let save_dir = tmp.path().join("savepoint");
775        let mut obj = GitObjectStore::with_snapshot_dir(tmp.path().join("snapshots"));
776
777        // Add data
778        obj.store
779            .add_triple(&sample_triple("s1"), "world", Some(1u8))
780            .unwrap();
781
782        // Create commits and refs
783        let mut commits = crate::commit::CommitsTable::new();
784        let c1 = crate::commit::create_commit(&obj, &mut commits, vec![], "init", "test").unwrap();
785
786        let mut refs = crate::refs::RefsTable::new();
787        refs.init_main(&c1.commit_id);
788        refs.create_branch("feature", &c1.commit_id).unwrap();
789
790        // Save everything
791        save_full(&obj, Some(&commits), Some(&refs), &save_dir).unwrap();
792
793        // Verify files exist
794        assert!(save_dir.join("_commits.json").exists());
795        assert!(save_dir.join("_refs.json").exists());
796
797        // Restore into new store
798        let mut obj2 = GitObjectStore::with_snapshot_dir(tmp.path().join("snapshots2"));
799        let (restored_commits, restored_refs) = restore_full(&mut obj2, &save_dir).unwrap();
800
801        // Verify commits restored
802        let rc = restored_commits.unwrap();
803        assert_eq!(rc.len(), 1);
804        assert_eq!(rc.get(&c1.commit_id).unwrap().message, "init");
805
806        // Verify refs restored
807        let rr = restored_refs.unwrap();
808        assert_eq!(rr.branches().len(), 2);
809        assert!(rr.head().is_some());
810
811        // Verify graph data restored
812        assert_eq!(obj2.store.len(), 1);
813    }
814
815    #[test]
816    fn test_save_with_zstd_compression() {
817        let tmp = tempfile::tempdir().unwrap();
818        let save_dir = tmp.path().join("compressed");
819        let uncompressed_dir = tmp.path().join("uncompressed");
820        let mut obj = GitObjectStore::with_snapshot_dir(tmp.path().join("snap"));
821
822        // Add 1K triples
823        for i in 0..1000 {
824            obj.store
825                .add_triple(&sample_triple(&format!("entity-{}", i)), "world", Some(1u8))
826                .unwrap();
827        }
828
829        // Save without compression
830        let metrics_plain = save_with_options(
831            &obj,
832            None,
833            None,
834            &uncompressed_dir,
835            &SaveOptions {
836                compress: false,
837                dirty_namespaces: None,
838            },
839        )
840        .unwrap();
841
842        // Save with compression
843        let metrics_zstd = save_with_options(
844            &obj,
845            None,
846            None,
847            &save_dir,
848            &SaveOptions {
849                compress: true,
850                dirty_namespaces: None,
851            },
852        )
853        .unwrap();
854
855        assert!(metrics_zstd.compressed);
856        assert!(!metrics_plain.compressed);
857
858        // Compressed should be smaller
859        assert!(
860            metrics_zstd.bytes_written < metrics_plain.bytes_written,
861            "Compressed ({}) should be smaller than uncompressed ({})",
862            metrics_zstd.bytes_written,
863            metrics_plain.bytes_written,
864        );
865
866        // Verify compressed file still restores correctly
867        obj.store.clear();
868        restore(&mut obj, &save_dir).unwrap();
869        assert_eq!(obj.store.len(), 1000);
870    }
871
872    #[test]
873    fn test_incremental_save_only_dirty_namespaces() {
874        let tmp = tempfile::tempdir().unwrap();
875        let save_dir = tmp.path().join("incremental");
876        let mut obj = GitObjectStore::with_snapshot_dir(tmp.path().join("snap"));
877
878        let ns_count = obj.store.namespaces().len();
879
880        // Add data to all namespaces
881        for ns in obj.store.namespaces().to_vec() {
882            for i in 0..100 {
883                obj.store
884                    .add_triple(&sample_triple(&format!("{}:{}", ns, i)), &ns, Some(1u8))
885                    .unwrap();
886            }
887        }
888
889        // Full save first
890        save(&obj, &save_dir).unwrap();
891
892        // Now do incremental save with only World dirty
893        let mut dirty = HashSet::new();
894        dirty.insert("world".to_string());
895
896        let metrics = save_with_options(
897            &obj,
898            None,
899            None,
900            &save_dir,
901            &SaveOptions {
902                compress: false,
903                dirty_namespaces: Some(dirty),
904            },
905        )
906        .unwrap();
907
908        // Only 1 namespace should be saved
909        assert_eq!(metrics.namespaces_saved.len(), 1);
910        assert_eq!(metrics.namespaces_saved[0], "world");
911
912        // All data should still restore (other files untouched on disk)
913        obj.store.clear();
914        restore(&mut obj, &save_dir).unwrap();
915        assert_eq!(obj.store.len(), ns_count * 100);
916    }
917
918    #[test]
919    fn test_save_metrics_populated() {
920        let tmp = tempfile::tempdir().unwrap();
921        let save_dir = tmp.path().join("metrics");
922        let mut obj = GitObjectStore::with_snapshot_dir(tmp.path().join("snap"));
923
924        for i in 0..200 {
925            obj.store
926                .add_triple(&sample_triple(&format!("entity-{}", i)), "world", Some(1u8))
927                .unwrap();
928        }
929        for i in 0..100 {
930            obj.store
931                .add_triple(&sample_triple(&format!("work-{}", i)), "work", Some(3u8))
932                .unwrap();
933        }
934
935        let metrics =
936            save_with_options(&obj, None, None, &save_dir, &SaveOptions::default()).unwrap();
937
938        assert_eq!(metrics.namespaces_saved.len(), 2);
939        assert!(metrics.namespaces_saved.contains(&"world".to_string()));
940        assert!(metrics.namespaces_saved.contains(&"work".to_string()));
941        assert!(metrics.bytes_written > 0);
942        assert!(!metrics.compressed);
943    }
944
945    // --- Tests for generic save/restore ---
946
947    fn kanban_schema() -> Schema {
948        use arrow::datatypes::{DataType, Field};
949        Schema::new(vec![
950            Field::new("id", DataType::Utf8, false),
951            Field::new("title", DataType::Utf8, false),
952            Field::new("status", DataType::Utf8, false),
953        ])
954    }
955
956    fn kanban_batch(ids: &[&str], titles: &[&str], statuses: &[&str]) -> RecordBatch {
957        use arrow::array::StringArray;
958        RecordBatch::try_new(
959            Arc::new(kanban_schema()),
960            vec![
961                Arc::new(StringArray::from(ids.to_vec())),
962                Arc::new(StringArray::from(titles.to_vec())),
963                Arc::new(StringArray::from(statuses.to_vec())),
964            ],
965        )
966        .unwrap()
967    }
968
969    #[test]
970    fn test_save_named_batches_roundtrip() {
971        let tmp = tempfile::tempdir().unwrap();
972        let save_dir = tmp.path().join("kanban");
973
974        let batch = kanban_batch(
975            &["EXP-1", "EXP-2"],
976            &["First", "Second"],
977            &["backlog", "in_progress"],
978        );
979        let schema = kanban_schema();
980
981        let batches = vec![batch.clone()];
982        let metrics = save_named_batches(&[("items", &batches, &schema)], &save_dir).unwrap();
983
984        assert_eq!(metrics.namespaces_saved, vec!["items"]);
985        assert!(metrics.bytes_written > 0);
986        assert!(!metrics.compressed);
987        assert!(!save_dir.join("_wal.json").exists());
988
989        // Restore and verify
990        let results = restore_named_batches(&save_dir, &["items"]).unwrap();
991        assert_eq!(results.len(), 1);
992        assert_eq!(results[0].0, "items");
993        assert_eq!(results[0].1[0].num_rows(), 2);
994    }
995
996    #[test]
997    fn test_save_named_batches_multiple_datasets() {
998        let tmp = tempfile::tempdir().unwrap();
999        let save_dir = tmp.path().join("multi");
1000
1001        let items = kanban_batch(&["EXP-1"], &["Expedition"], &["backlog"]);
1002        let runs = kanban_batch(&["RUN-1"], &["Status Change"], &["done"]);
1003        let schema = kanban_schema();
1004
1005        save_named_batches(
1006            &[("items", &[items], &schema), ("runs", &[runs], &schema)],
1007            &save_dir,
1008        )
1009        .unwrap();
1010
1011        assert!(save_dir.join("items.parquet").exists());
1012        assert!(save_dir.join("runs.parquet").exists());
1013
1014        let results = restore_named_batches(&save_dir, &["items", "runs"]).unwrap();
1015        assert_eq!(results.len(), 2);
1016    }
1017
1018    #[test]
1019    fn test_save_named_batches_empty_skipped() {
1020        let tmp = tempfile::tempdir().unwrap();
1021        let save_dir = tmp.path().join("empty");
1022
1023        let schema = kanban_schema();
1024        let metrics =
1025            save_named_batches(&[("items", &[] as &[RecordBatch], &schema)], &save_dir).unwrap();
1026
1027        // Empty batches should not create a file
1028        assert!(metrics.namespaces_saved.is_empty());
1029        assert!(!save_dir.join("items.parquet").exists());
1030    }
1031
1032    #[test]
1033    fn test_restore_named_batches_missing_files_skipped() {
1034        let tmp = tempfile::tempdir().unwrap();
1035        let save_dir = tmp.path().join("partial");
1036        fs::create_dir_all(&save_dir).unwrap();
1037
1038        // Only save "items", then try to restore "items" and "runs"
1039        let batch = kanban_batch(&["EXP-1"], &["Test"], &["backlog"]);
1040        save_named_batches(&[("items", &[batch], &kanban_schema())], &save_dir).unwrap();
1041
1042        let results = restore_named_batches(&save_dir, &["items", "runs"]).unwrap();
1043        assert_eq!(results.len(), 1); // Only "items" found
1044        assert_eq!(results[0].0, "items");
1045    }
1046
1047    #[test]
1048    fn test_restore_named_batches_nonexistent_dir() {
1049        let tmp = tempfile::tempdir().unwrap();
1050        let result = restore_named_batches(&tmp.path().join("nonexistent"), &["items"]);
1051        assert!(result.is_err());
1052    }
1053
1054    #[test]
1055    fn test_save_named_batches_wal_cleanup() {
1056        let tmp = tempfile::tempdir().unwrap();
1057        let save_dir = tmp.path().join("wal_test");
1058
1059        let batch = kanban_batch(&["EXP-1"], &["Test"], &["backlog"]);
1060        save_named_batches(&[("items", &[batch], &kanban_schema())], &save_dir).unwrap();
1061
1062        // WAL should not exist after successful save
1063        assert!(!save_dir.join("_wal.json").exists());
1064    }
1065
1066    #[test]
1067    fn test_save_named_batches_crash_recovery() {
1068        let tmp = tempfile::tempdir().unwrap();
1069        let save_dir = tmp.path().join("crash");
1070
1071        // Save data
1072        let batch = kanban_batch(&["EXP-1"], &["Test"], &["backlog"]);
1073        save_named_batches(&[("items", &[batch], &kanban_schema())], &save_dir).unwrap();
1074
1075        // Simulate crash: write WAL as if save was interrupted
1076        fs::write(save_dir.join("_wal.json"), "[\"items\"]").unwrap();
1077
1078        // Restore should still work — WAL is cleaned up, old .parquet is valid
1079        let results = restore_named_batches(&save_dir, &["items"]).unwrap();
1080        assert_eq!(results.len(), 1);
1081        assert_eq!(results[0].1[0].num_rows(), 1);
1082        assert!(!save_dir.join("_wal.json").exists());
1083    }
1084
1085    #[test]
1086    fn test_persist_commits_roundtrip() {
1087        let tmp = tempfile::tempdir().unwrap();
1088        let dir = tmp.path();
1089
1090        let mut table = CommitsTable::new();
1091        table.append(Commit {
1092            commit_id: "c1".to_string(),
1093            parent_ids: vec![],
1094            timestamp_ms: 1000,
1095            message: "first save".to_string(),
1096            author: "test".to_string(),
1097        });
1098        table.append(Commit {
1099            commit_id: "c2".to_string(),
1100            parent_ids: vec!["c1".to_string()],
1101            timestamp_ms: 2000,
1102            message: "second save".to_string(),
1103            author: "test".to_string(),
1104        });
1105
1106        persist_commits(&table, dir).unwrap();
1107        assert!(dir.join("_commits.json").exists());
1108
1109        let restored = restore_commits(dir).unwrap().unwrap();
1110        assert_eq!(restored.len(), 2);
1111        assert_eq!(restored.get("c1").unwrap().message, "first save");
1112        assert_eq!(restored.get("c2").unwrap().parent_ids, vec!["c1"]);
1113    }
1114
1115    #[test]
1116    fn test_restore_commits_empty_dir() {
1117        let tmp = tempfile::tempdir().unwrap();
1118        let result = restore_commits(tmp.path()).unwrap();
1119        assert!(result.is_none());
1120    }
1121}