1use crate::commit::{Commit, CommitsTable};
8use crate::object_store::GitObjectStore;
9use crate::refs::RefsTable;
10use arrow::array::RecordBatch;
11use arrow::datatypes::Schema;
12use parquet::arrow::ArrowWriter;
13use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
14use parquet::basic::Compression;
15use parquet::file::properties::WriterProperties;
16use std::collections::HashSet;
17use std::fs;
18use std::path::Path;
19use std::sync::Arc;
20
21#[derive(Debug, thiserror::Error)]
23pub enum SaveError {
24 #[error("IO error: {0}")]
25 Io(#[from] std::io::Error),
26
27 #[error("Parquet error: {0}")]
28 Parquet(#[from] parquet::errors::ParquetError),
29
30 #[error("Arrow error: {0}")]
31 Arrow(#[from] arrow::error::ArrowError),
32
33 #[error("Save point not found: {0}")]
34 NotFound(String),
35
36 #[error("Write-ahead log incomplete — previous save may be corrupt")]
37 IncompleteWal,
38}
39
40pub type Result<T> = std::result::Result<T, SaveError>;
41
42pub fn save(obj_store: &GitObjectStore, save_dir: &Path) -> Result<()> {
51 save_full(obj_store, None, None, save_dir)
52}
53
54pub fn save_full(
56 obj_store: &GitObjectStore,
57 commits_table: Option<&CommitsTable>,
58 refs_table: Option<&RefsTable>,
59 save_dir: &Path,
60) -> Result<()> {
61 fs::create_dir_all(save_dir)?;
62
63 let wal_path = save_dir.join("_wal.json");
65 let namespaces_with_data: Vec<String> = obj_store
66 .store
67 .namespaces()
68 .iter()
69 .filter(|ns| !obj_store.store.get_namespace_batches(ns).is_empty())
70 .cloned()
71 .collect();
72
73 fs::write(&wal_path, serde_json_minimal(&namespaces_with_data))?;
74
75 for ns in obj_store.store.namespaces() {
77 let batches = obj_store.store.get_namespace_batches(ns);
78 let target = save_dir.join(format!("{}.parquet", ns));
79
80 if batches.is_empty() {
81 let _ = fs::remove_file(&target);
83 continue;
84 }
85
86 let tmp_path = save_dir.join(format!("{}.parquet.tmp", ns));
87 let schema = obj_store.store.schema().clone();
88 let file = fs::File::create(&tmp_path)?;
89 let mut writer = ArrowWriter::try_new(file, schema, None)?;
90
91 for batch in batches {
92 writer.write(batch)?;
93 }
94 writer.close()?;
95
96 fs::rename(&tmp_path, &target)?;
98 }
99
100 if let Some(ct) = commits_table {
102 let commits_json = serialize_commits(ct);
103 let tmp = save_dir.join("_commits.json.tmp");
104 fs::write(&tmp, &commits_json)?;
105 fs::rename(&tmp, save_dir.join("_commits.json"))?;
106 }
107
108 if let Some(rt) = refs_table {
110 let refs_json = serialize_refs(rt);
111 let tmp = save_dir.join("_refs.json.tmp");
112 fs::write(&tmp, &refs_json)?;
113 fs::rename(&tmp, save_dir.join("_refs.json"))?;
114 }
115
116 let _ = fs::remove_file(&wal_path);
118
119 Ok(())
120}
121
122pub fn restore(obj_store: &mut GitObjectStore, save_dir: &Path) -> Result<()> {
124 let (_, _) = restore_full(obj_store, save_dir)?;
125 Ok(())
126}
127
128pub fn restore_full(
132 obj_store: &mut GitObjectStore,
133 save_dir: &Path,
134) -> Result<(Option<CommitsTable>, Option<RefsTable>)> {
135 if !save_dir.exists() {
136 return Err(SaveError::NotFound(save_dir.display().to_string()));
137 }
138
139 let wal_path = save_dir.join("_wal.json");
141 if wal_path.exists() {
142 let _ = fs::remove_file(&wal_path);
150 }
151
152 obj_store.store.clear();
153
154 for ns in obj_store.store.namespaces().to_vec() {
155 let path = save_dir.join(format!("{}.parquet", ns));
156 if !path.exists() {
157 continue;
158 }
159
160 let file = fs::File::open(&path)?;
161 let reader = ParquetRecordBatchReaderBuilder::try_new(file)?.build()?;
162
163 let mut batches = Vec::new();
164 for batch_result in reader {
165 batches.push(batch_result?);
166 }
167
168 obj_store.store.set_namespace_batches(&ns, batches);
169 }
170
171 let commits = {
173 let path = save_dir.join("_commits.json");
174 if path.exists() {
175 let data = fs::read_to_string(&path)?;
176 Some(deserialize_commits(&data))
177 } else {
178 None
179 }
180 };
181
182 let refs = {
184 let path = save_dir.join("_refs.json");
185 if path.exists() {
186 let data = fs::read_to_string(&path)?;
187 Some(deserialize_refs(&data))
188 } else {
189 None
190 }
191 };
192
193 Ok((commits, refs))
194}
195
196#[derive(Debug, Clone)]
198pub struct SaveMetrics {
199 pub namespaces_saved: Vec<String>,
201 pub bytes_written: u64,
203 pub duration_ms: u128,
205 pub compressed: bool,
207}
208
209#[derive(Debug, Clone, Default)]
211pub struct SaveOptions {
212 pub compress: bool,
214 pub dirty_namespaces: Option<HashSet<String>>,
217}
218
219pub fn save_with_options(
223 obj_store: &GitObjectStore,
224 commits_table: Option<&CommitsTable>,
225 refs_table: Option<&RefsTable>,
226 save_dir: &Path,
227 options: &SaveOptions,
228) -> Result<SaveMetrics> {
229 let start = std::time::Instant::now();
230 fs::create_dir_all(save_dir)?;
231
232 let namespaces_to_save: Vec<String> = obj_store
234 .store
235 .namespaces()
236 .iter()
237 .filter(|ns| {
238 if obj_store
240 .store
241 .get_namespace_batches(ns.as_str())
242 .is_empty()
243 {
244 return false;
245 }
246 if let Some(dirty) = &options.dirty_namespaces {
248 return dirty.contains(ns.as_str());
249 }
250 true
251 })
252 .cloned()
253 .collect();
254
255 let wal_path = save_dir.join("_wal.json");
257 fs::write(&wal_path, serde_json_minimal(&namespaces_to_save))?;
258
259 let props = if options.compress {
261 WriterProperties::builder()
262 .set_compression(Compression::ZSTD(Default::default()))
263 .build()
264 } else {
265 WriterProperties::builder().build()
266 };
267
268 let mut total_bytes = 0u64;
269 let mut saved_ns_names = Vec::new();
270
271 for ns in &namespaces_to_save {
273 let batches = obj_store.store.get_namespace_batches(ns);
274 let target = save_dir.join(format!("{}.parquet", ns));
275
276 if batches.is_empty() {
277 let _ = fs::remove_file(&target);
278 continue;
279 }
280
281 let tmp_path = save_dir.join(format!("{}.parquet.tmp", ns));
282 let schema = obj_store.store.schema().clone();
283 let file = fs::File::create(&tmp_path)?;
284 let mut writer = ArrowWriter::try_new(file, schema, Some(props.clone()))?;
285
286 for batch in batches {
287 writer.write(batch)?;
288 }
289 writer.close()?;
290
291 let file_size = fs::metadata(&tmp_path)?.len();
292 total_bytes += file_size;
293 saved_ns_names.push(ns.clone());
294
295 fs::rename(&tmp_path, &target)?;
296 }
297
298 if let Some(ct) = commits_table {
300 let commits_json = serialize_commits(ct);
301 let tmp = save_dir.join("_commits.json.tmp");
302 fs::write(&tmp, &commits_json)?;
303 fs::rename(&tmp, save_dir.join("_commits.json"))?;
304 }
305
306 if let Some(rt) = refs_table {
308 let refs_json = serialize_refs(rt);
309 let tmp = save_dir.join("_refs.json.tmp");
310 fs::write(&tmp, &refs_json)?;
311 fs::rename(&tmp, save_dir.join("_refs.json"))?;
312 }
313
314 let _ = fs::remove_file(&wal_path);
316
317 Ok(SaveMetrics {
318 namespaces_saved: saved_ns_names,
319 bytes_written: total_bytes,
320 duration_ms: start.elapsed().as_millis(),
321 compressed: options.compress,
322 })
323}
324
325pub fn save_named_batches(
335 entries: &[(&str, &[RecordBatch], &Schema)],
336 save_dir: &Path,
337) -> Result<SaveMetrics> {
338 let start = std::time::Instant::now();
339 fs::create_dir_all(save_dir)?;
340
341 let wal_path = save_dir.join("_wal.json");
343 let names: Vec<String> = entries
344 .iter()
345 .map(|(name, _, _)| name.to_string())
346 .collect();
347 fs::write(&wal_path, serde_json_minimal(&names))?;
348
349 let mut total_bytes = 0u64;
350 let mut saved_names = Vec::new();
351
352 for (name, batches, schema) in entries {
353 let target = save_dir.join(format!("{name}.parquet"));
354
355 if batches.is_empty() {
356 let _ = fs::remove_file(&target);
358 continue;
359 }
360
361 let tmp_path = save_dir.join(format!("{name}.parquet.tmp"));
362 let schema_ref = Arc::new((*schema).clone());
363 let file = fs::File::create(&tmp_path)?;
364 let mut writer = ArrowWriter::try_new(file, schema_ref, None)?;
365
366 for batch in *batches {
367 writer.write(batch)?;
368 }
369 writer.close()?;
370
371 let file_size = fs::metadata(&tmp_path)?.len();
372 total_bytes += file_size;
373 saved_names.push(name.to_string());
374
375 fs::rename(&tmp_path, &target)?;
377 }
378
379 let _ = fs::remove_file(&wal_path);
381
382 Ok(SaveMetrics {
383 namespaces_saved: saved_names,
384 bytes_written: total_bytes,
385 duration_ms: start.elapsed().as_millis(),
386 compressed: false,
387 })
388}
389
390pub fn restore_named_batches(
395 save_dir: &Path,
396 names: &[&str],
397) -> Result<Vec<(String, Vec<RecordBatch>)>> {
398 if !save_dir.exists() {
399 return Err(SaveError::NotFound(save_dir.display().to_string()));
400 }
401
402 let wal_path = save_dir.join("_wal.json");
404 if wal_path.exists() {
405 let _ = fs::remove_file(&wal_path);
409 }
410
411 let mut results = Vec::new();
412
413 for name in names {
414 let path = save_dir.join(format!("{name}.parquet"));
415 if !path.exists() {
416 continue;
417 }
418
419 let file = fs::File::open(&path)?;
420 let reader = ParquetRecordBatchReaderBuilder::try_new(file)?.build()?;
421
422 let mut batches = Vec::new();
423 for batch_result in reader {
424 batches.push(batch_result?);
425 }
426
427 results.push((name.to_string(), batches));
428 }
429
430 Ok(results)
431}
432
433pub fn persist_commits(table: &CommitsTable, dir: &Path) -> Result<()> {
437 fs::create_dir_all(dir)?;
438 let json = serialize_commits(table);
439 let tmp = dir.join("_commits.json.tmp");
440 fs::write(&tmp, &json)?;
441 fs::rename(&tmp, dir.join("_commits.json"))?;
442 Ok(())
443}
444
445pub fn restore_commits(dir: &Path) -> Result<Option<CommitsTable>> {
449 let path = dir.join("_commits.json");
450 if !path.exists() {
451 return Ok(None);
452 }
453 let data = fs::read_to_string(&path)?;
454 Ok(Some(deserialize_commits(&data)))
455}
456
457fn serde_json_minimal(items: &[String]) -> String {
461 let inner: Vec<String> = items.iter().map(|s| format!("\"{}\"", s)).collect();
462 format!("[{}]", inner.join(","))
463}
464
465fn json_escape(s: &str) -> String {
467 s.replace('\\', "\\\\")
468 .replace('"', "\\\"")
469 .replace('\n', "\\n")
470 .replace('\r', "\\r")
471 .replace('\t', "\\t")
472}
473
474pub(crate) fn serialize_commits(table: &CommitsTable) -> String {
476 let mut lines = Vec::new();
477 for c in table.all() {
478 let parents: Vec<String> = c
479 .parent_ids
480 .iter()
481 .map(|p| format!("\"{}\"", json_escape(p)))
482 .collect();
483 lines.push(format!(
484 "{{\"id\":\"{}\",\"parents\":[{}],\"ts\":{},\"msg\":\"{}\",\"author\":\"{}\"}}",
485 json_escape(&c.commit_id),
486 parents.join(","),
487 c.timestamp_ms,
488 json_escape(&c.message),
489 json_escape(&c.author),
490 ));
491 }
492 format!("[{}]", lines.join(",\n"))
493}
494
495pub(crate) fn deserialize_commits(json: &str) -> CommitsTable {
497 let mut table = CommitsTable::new();
498 for obj in extract_json_objects(json) {
500 let id = extract_json_string(&obj, "id").unwrap_or_default();
501 let msg = extract_json_string(&obj, "msg").unwrap_or_default();
502 let author = extract_json_string(&obj, "author").unwrap_or_default();
503 let ts = extract_json_number(&obj, "ts").unwrap_or(0);
504 let parents = extract_json_string_array(&obj, "parents");
505
506 table.append(Commit {
507 commit_id: id,
508 parent_ids: parents,
509 timestamp_ms: ts,
510 message: msg,
511 author,
512 });
513 }
514 table
515}
516
517pub(crate) fn serialize_refs(table: &RefsTable) -> String {
519 let mut lines = Vec::new();
520 for r in table.branches() {
521 lines.push(format!(
522 "{{\"name\":\"{}\",\"commit\":\"{}\",\"type\":\"{}\",\"head\":{},\"created\":{}}}",
523 json_escape(&r.ref_name),
524 json_escape(&r.commit_id),
525 r.ref_type.as_str(),
526 r.is_head,
527 r.created_at_ms,
528 ));
529 }
530 format!("[{}]", lines.join(",\n"))
531}
532
533pub(crate) fn deserialize_refs(json: &str) -> RefsTable {
535 let mut table = RefsTable::new();
536 for obj in extract_json_objects(json) {
537 let name = extract_json_string(&obj, "name").unwrap_or_default();
538 let commit = extract_json_string(&obj, "commit").unwrap_or_default();
539 let is_head = obj.contains("\"head\":true");
540
541 if table.head().is_none() && is_head {
543 table.init_main(&commit);
544 if name != "main" {
546 let _ = table.update_ref("main", &commit);
548 }
549 } else {
550 let _ = table.create_branch(&name, &commit);
551 if is_head {
552 let _ = table.switch_head(&name);
553 }
554 }
555 }
556 table
557}
558
559fn extract_json_objects(json: &str) -> Vec<String> {
563 let mut objects = Vec::new();
564 let mut depth = 0;
565 let mut start = None;
566 for (i, ch) in json.char_indices() {
567 match ch {
568 '{' => {
569 if depth == 0 {
570 start = Some(i);
571 }
572 depth += 1;
573 }
574 '}' => {
575 depth -= 1;
576 if depth == 0 {
577 if let Some(s) = start {
578 objects.push(json[s..=i].to_string());
579 }
580 start = None;
581 }
582 }
583 _ => {}
584 }
585 }
586 objects
587}
588
589fn extract_json_string(obj: &str, key: &str) -> Option<String> {
591 let pattern = format!("\"{}\":\"", key);
592 let start = obj.find(&pattern)? + pattern.len();
593 let rest = &obj[start..];
594 let mut end = 0;
596 let mut escaped = false;
597 for ch in rest.chars() {
598 if escaped {
599 escaped = false;
600 } else if ch == '\\' {
601 escaped = true;
602 } else if ch == '"' {
603 break;
604 }
605 end += ch.len_utf8();
606 }
607 Some(
608 rest[..end]
609 .replace("\\\"", "\"")
610 .replace("\\\\", "\\")
611 .replace("\\n", "\n"),
612 )
613}
614
615fn extract_json_number(obj: &str, key: &str) -> Option<i64> {
617 let pattern = format!("\"{}\":", key);
618 let start = obj.find(&pattern)? + pattern.len();
619 let rest = obj[start..].trim_start();
620 let end = rest
621 .find(|c: char| !c.is_ascii_digit() && c != '-')
622 .unwrap_or(rest.len());
623 rest[..end].parse().ok()
624}
625
626fn extract_json_string_array(obj: &str, key: &str) -> Vec<String> {
628 let pattern = format!("\"{}\":[", key);
629 let Some(start) = obj.find(&pattern) else {
630 return Vec::new();
631 };
632 let start = start + pattern.len();
633 let rest = &obj[start..];
634 let end = rest.find(']').unwrap_or(rest.len());
635 let inner = &rest[..end];
636
637 let mut result = Vec::new();
638 for part in inner.split(',') {
639 let trimmed = part.trim().trim_matches('"');
640 if !trimmed.is_empty() {
641 result.push(trimmed.to_string());
642 }
643 }
644 result
645}
646
647#[cfg(test)]
648mod tests {
649 use super::*;
650 use arrow_graph_core::Triple;
651
652 fn sample_triple(subj: &str) -> Triple {
653 Triple {
654 subject: subj.to_string(),
655 predicate: "rdf:type".to_string(),
656 object: "Thing".to_string(),
657 graph: None,
658 confidence: Some(0.9),
659 source_document: None,
660 source_chunk_id: None,
661 extracted_by: None,
662 caused_by: None,
663 derived_from: None,
664 consolidated_at: None,
665 }
666 }
667
668 #[test]
669 fn test_save_restore_roundtrip() {
670 let tmp = tempfile::tempdir().unwrap();
671 let save_dir = tmp.path().join("savepoint");
672 let mut obj = GitObjectStore::with_snapshot_dir(tmp.path().join("snapshots"));
673
674 for i in 0..50 {
676 obj.store
677 .add_triple(&sample_triple(&format!("world-{i}")), "world", Some(1u8))
678 .unwrap();
679 }
680 for i in 0..30 {
681 obj.store
682 .add_triple(&sample_triple(&format!("work-{i}")), "work", Some(5u8))
683 .unwrap();
684 }
685
686 assert_eq!(obj.store.len(), 80);
687
688 save(&obj, &save_dir).unwrap();
690
691 assert!(save_dir.join("world.parquet").exists());
693 assert!(save_dir.join("work.parquet").exists());
694 assert!(!save_dir.join("research.parquet").exists()); assert!(!save_dir.join("_wal.json").exists()); obj.store.clear();
699 assert_eq!(obj.store.len(), 0);
700
701 restore(&mut obj, &save_dir).unwrap();
702 assert_eq!(obj.store.len(), 80);
703 }
704
705 #[test]
706 fn test_restore_nonexistent_fails() {
707 let tmp = tempfile::tempdir().unwrap();
708 let mut obj = GitObjectStore::with_snapshot_dir(tmp.path());
709 let result = restore(&mut obj, &tmp.path().join("nonexistent"));
710 assert!(result.is_err());
711 }
712
713 #[test]
714 fn test_save_atomic_no_partial_files() {
715 let tmp = tempfile::tempdir().unwrap();
716 let save_dir = tmp.path().join("savepoint");
717 let obj = GitObjectStore::with_snapshot_dir(tmp.path().join("snapshots"));
718
719 save(&obj, &save_dir).unwrap();
721 assert!(!save_dir.join("world.parquet").exists());
722 }
723
724 #[test]
725 fn test_simulated_crash_recovery() {
726 let tmp = tempfile::tempdir().unwrap();
727 let save_dir = tmp.path().join("savepoint");
728 let mut obj = GitObjectStore::with_snapshot_dir(tmp.path().join("snapshots"));
729
730 obj.store
732 .add_triple(&sample_triple("s1"), "world", Some(1u8))
733 .unwrap();
734 save(&obj, &save_dir).unwrap();
735
736 fs::write(save_dir.join("_wal.json"), "[\"world\"]").unwrap();
738
739 obj.store.clear();
741 restore(&mut obj, &save_dir).unwrap();
742 assert_eq!(obj.store.len(), 1);
743 }
744
745 #[test]
746 fn test_concurrent_reads_during_save() {
747 let tmp = tempfile::tempdir().unwrap();
748 let save_dir = tmp.path().join("savepoint");
749 let mut obj = GitObjectStore::with_snapshot_dir(tmp.path().join("snapshots"));
750
751 for i in 0..100 {
753 obj.store
754 .add_triple(&sample_triple(&format!("s{i}")), "world", Some(1u8))
755 .unwrap();
756 }
757 save(&obj, &save_dir).unwrap();
758
759 assert_eq!(obj.store.len(), 100);
761
762 save(&obj, &save_dir).unwrap();
764
765 obj.store.clear();
767 restore(&mut obj, &save_dir).unwrap();
768 assert_eq!(obj.store.len(), 100);
769 }
770
771 #[test]
772 fn test_save_full_persists_commits_and_refs() {
773 let tmp = tempfile::tempdir().unwrap();
774 let save_dir = tmp.path().join("savepoint");
775 let mut obj = GitObjectStore::with_snapshot_dir(tmp.path().join("snapshots"));
776
777 obj.store
779 .add_triple(&sample_triple("s1"), "world", Some(1u8))
780 .unwrap();
781
782 let mut commits = crate::commit::CommitsTable::new();
784 let c1 = crate::commit::create_commit(&obj, &mut commits, vec![], "init", "test").unwrap();
785
786 let mut refs = crate::refs::RefsTable::new();
787 refs.init_main(&c1.commit_id);
788 refs.create_branch("feature", &c1.commit_id).unwrap();
789
790 save_full(&obj, Some(&commits), Some(&refs), &save_dir).unwrap();
792
793 assert!(save_dir.join("_commits.json").exists());
795 assert!(save_dir.join("_refs.json").exists());
796
797 let mut obj2 = GitObjectStore::with_snapshot_dir(tmp.path().join("snapshots2"));
799 let (restored_commits, restored_refs) = restore_full(&mut obj2, &save_dir).unwrap();
800
801 let rc = restored_commits.unwrap();
803 assert_eq!(rc.len(), 1);
804 assert_eq!(rc.get(&c1.commit_id).unwrap().message, "init");
805
806 let rr = restored_refs.unwrap();
808 assert_eq!(rr.branches().len(), 2);
809 assert!(rr.head().is_some());
810
811 assert_eq!(obj2.store.len(), 1);
813 }
814
815 #[test]
816 fn test_save_with_zstd_compression() {
817 let tmp = tempfile::tempdir().unwrap();
818 let save_dir = tmp.path().join("compressed");
819 let uncompressed_dir = tmp.path().join("uncompressed");
820 let mut obj = GitObjectStore::with_snapshot_dir(tmp.path().join("snap"));
821
822 for i in 0..1000 {
824 obj.store
825 .add_triple(&sample_triple(&format!("entity-{}", i)), "world", Some(1u8))
826 .unwrap();
827 }
828
829 let metrics_plain = save_with_options(
831 &obj,
832 None,
833 None,
834 &uncompressed_dir,
835 &SaveOptions {
836 compress: false,
837 dirty_namespaces: None,
838 },
839 )
840 .unwrap();
841
842 let metrics_zstd = save_with_options(
844 &obj,
845 None,
846 None,
847 &save_dir,
848 &SaveOptions {
849 compress: true,
850 dirty_namespaces: None,
851 },
852 )
853 .unwrap();
854
855 assert!(metrics_zstd.compressed);
856 assert!(!metrics_plain.compressed);
857
858 assert!(
860 metrics_zstd.bytes_written < metrics_plain.bytes_written,
861 "Compressed ({}) should be smaller than uncompressed ({})",
862 metrics_zstd.bytes_written,
863 metrics_plain.bytes_written,
864 );
865
866 obj.store.clear();
868 restore(&mut obj, &save_dir).unwrap();
869 assert_eq!(obj.store.len(), 1000);
870 }
871
872 #[test]
873 fn test_incremental_save_only_dirty_namespaces() {
874 let tmp = tempfile::tempdir().unwrap();
875 let save_dir = tmp.path().join("incremental");
876 let mut obj = GitObjectStore::with_snapshot_dir(tmp.path().join("snap"));
877
878 let ns_count = obj.store.namespaces().len();
879
880 for ns in obj.store.namespaces().to_vec() {
882 for i in 0..100 {
883 obj.store
884 .add_triple(&sample_triple(&format!("{}:{}", ns, i)), &ns, Some(1u8))
885 .unwrap();
886 }
887 }
888
889 save(&obj, &save_dir).unwrap();
891
892 let mut dirty = HashSet::new();
894 dirty.insert("world".to_string());
895
896 let metrics = save_with_options(
897 &obj,
898 None,
899 None,
900 &save_dir,
901 &SaveOptions {
902 compress: false,
903 dirty_namespaces: Some(dirty),
904 },
905 )
906 .unwrap();
907
908 assert_eq!(metrics.namespaces_saved.len(), 1);
910 assert_eq!(metrics.namespaces_saved[0], "world");
911
912 obj.store.clear();
914 restore(&mut obj, &save_dir).unwrap();
915 assert_eq!(obj.store.len(), ns_count * 100);
916 }
917
918 #[test]
919 fn test_save_metrics_populated() {
920 let tmp = tempfile::tempdir().unwrap();
921 let save_dir = tmp.path().join("metrics");
922 let mut obj = GitObjectStore::with_snapshot_dir(tmp.path().join("snap"));
923
924 for i in 0..200 {
925 obj.store
926 .add_triple(&sample_triple(&format!("entity-{}", i)), "world", Some(1u8))
927 .unwrap();
928 }
929 for i in 0..100 {
930 obj.store
931 .add_triple(&sample_triple(&format!("work-{}", i)), "work", Some(3u8))
932 .unwrap();
933 }
934
935 let metrics =
936 save_with_options(&obj, None, None, &save_dir, &SaveOptions::default()).unwrap();
937
938 assert_eq!(metrics.namespaces_saved.len(), 2);
939 assert!(metrics.namespaces_saved.contains(&"world".to_string()));
940 assert!(metrics.namespaces_saved.contains(&"work".to_string()));
941 assert!(metrics.bytes_written > 0);
942 assert!(!metrics.compressed);
943 }
944
945 fn kanban_schema() -> Schema {
948 use arrow::datatypes::{DataType, Field};
949 Schema::new(vec![
950 Field::new("id", DataType::Utf8, false),
951 Field::new("title", DataType::Utf8, false),
952 Field::new("status", DataType::Utf8, false),
953 ])
954 }
955
956 fn kanban_batch(ids: &[&str], titles: &[&str], statuses: &[&str]) -> RecordBatch {
957 use arrow::array::StringArray;
958 RecordBatch::try_new(
959 Arc::new(kanban_schema()),
960 vec![
961 Arc::new(StringArray::from(ids.to_vec())),
962 Arc::new(StringArray::from(titles.to_vec())),
963 Arc::new(StringArray::from(statuses.to_vec())),
964 ],
965 )
966 .unwrap()
967 }
968
969 #[test]
970 fn test_save_named_batches_roundtrip() {
971 let tmp = tempfile::tempdir().unwrap();
972 let save_dir = tmp.path().join("kanban");
973
974 let batch = kanban_batch(
975 &["EXP-1", "EXP-2"],
976 &["First", "Second"],
977 &["backlog", "in_progress"],
978 );
979 let schema = kanban_schema();
980
981 let batches = vec![batch.clone()];
982 let metrics = save_named_batches(&[("items", &batches, &schema)], &save_dir).unwrap();
983
984 assert_eq!(metrics.namespaces_saved, vec!["items"]);
985 assert!(metrics.bytes_written > 0);
986 assert!(!metrics.compressed);
987 assert!(!save_dir.join("_wal.json").exists());
988
989 let results = restore_named_batches(&save_dir, &["items"]).unwrap();
991 assert_eq!(results.len(), 1);
992 assert_eq!(results[0].0, "items");
993 assert_eq!(results[0].1[0].num_rows(), 2);
994 }
995
996 #[test]
997 fn test_save_named_batches_multiple_datasets() {
998 let tmp = tempfile::tempdir().unwrap();
999 let save_dir = tmp.path().join("multi");
1000
1001 let items = kanban_batch(&["EXP-1"], &["Expedition"], &["backlog"]);
1002 let runs = kanban_batch(&["RUN-1"], &["Status Change"], &["done"]);
1003 let schema = kanban_schema();
1004
1005 save_named_batches(
1006 &[("items", &[items], &schema), ("runs", &[runs], &schema)],
1007 &save_dir,
1008 )
1009 .unwrap();
1010
1011 assert!(save_dir.join("items.parquet").exists());
1012 assert!(save_dir.join("runs.parquet").exists());
1013
1014 let results = restore_named_batches(&save_dir, &["items", "runs"]).unwrap();
1015 assert_eq!(results.len(), 2);
1016 }
1017
1018 #[test]
1019 fn test_save_named_batches_empty_skipped() {
1020 let tmp = tempfile::tempdir().unwrap();
1021 let save_dir = tmp.path().join("empty");
1022
1023 let schema = kanban_schema();
1024 let metrics =
1025 save_named_batches(&[("items", &[] as &[RecordBatch], &schema)], &save_dir).unwrap();
1026
1027 assert!(metrics.namespaces_saved.is_empty());
1029 assert!(!save_dir.join("items.parquet").exists());
1030 }
1031
1032 #[test]
1033 fn test_restore_named_batches_missing_files_skipped() {
1034 let tmp = tempfile::tempdir().unwrap();
1035 let save_dir = tmp.path().join("partial");
1036 fs::create_dir_all(&save_dir).unwrap();
1037
1038 let batch = kanban_batch(&["EXP-1"], &["Test"], &["backlog"]);
1040 save_named_batches(&[("items", &[batch], &kanban_schema())], &save_dir).unwrap();
1041
1042 let results = restore_named_batches(&save_dir, &["items", "runs"]).unwrap();
1043 assert_eq!(results.len(), 1); assert_eq!(results[0].0, "items");
1045 }
1046
1047 #[test]
1048 fn test_restore_named_batches_nonexistent_dir() {
1049 let tmp = tempfile::tempdir().unwrap();
1050 let result = restore_named_batches(&tmp.path().join("nonexistent"), &["items"]);
1051 assert!(result.is_err());
1052 }
1053
1054 #[test]
1055 fn test_save_named_batches_wal_cleanup() {
1056 let tmp = tempfile::tempdir().unwrap();
1057 let save_dir = tmp.path().join("wal_test");
1058
1059 let batch = kanban_batch(&["EXP-1"], &["Test"], &["backlog"]);
1060 save_named_batches(&[("items", &[batch], &kanban_schema())], &save_dir).unwrap();
1061
1062 assert!(!save_dir.join("_wal.json").exists());
1064 }
1065
1066 #[test]
1067 fn test_save_named_batches_crash_recovery() {
1068 let tmp = tempfile::tempdir().unwrap();
1069 let save_dir = tmp.path().join("crash");
1070
1071 let batch = kanban_batch(&["EXP-1"], &["Test"], &["backlog"]);
1073 save_named_batches(&[("items", &[batch], &kanban_schema())], &save_dir).unwrap();
1074
1075 fs::write(save_dir.join("_wal.json"), "[\"items\"]").unwrap();
1077
1078 let results = restore_named_batches(&save_dir, &["items"]).unwrap();
1080 assert_eq!(results.len(), 1);
1081 assert_eq!(results[0].1[0].num_rows(), 1);
1082 assert!(!save_dir.join("_wal.json").exists());
1083 }
1084
1085 #[test]
1086 fn test_persist_commits_roundtrip() {
1087 let tmp = tempfile::tempdir().unwrap();
1088 let dir = tmp.path();
1089
1090 let mut table = CommitsTable::new();
1091 table.append(Commit {
1092 commit_id: "c1".to_string(),
1093 parent_ids: vec![],
1094 timestamp_ms: 1000,
1095 message: "first save".to_string(),
1096 author: "test".to_string(),
1097 });
1098 table.append(Commit {
1099 commit_id: "c2".to_string(),
1100 parent_ids: vec!["c1".to_string()],
1101 timestamp_ms: 2000,
1102 message: "second save".to_string(),
1103 author: "test".to_string(),
1104 });
1105
1106 persist_commits(&table, dir).unwrap();
1107 assert!(dir.join("_commits.json").exists());
1108
1109 let restored = restore_commits(dir).unwrap().unwrap();
1110 assert_eq!(restored.len(), 2);
1111 assert_eq!(restored.get("c1").unwrap().message, "first save");
1112 assert_eq!(restored.get("c2").unwrap().parent_ids, vec!["c1"]);
1113 }
1114
1115 #[test]
1116 fn test_restore_commits_empty_dir() {
1117 let tmp = tempfile::tempdir().unwrap();
1118 let result = restore_commits(tmp.path()).unwrap();
1119 assert!(result.is_none());
1120 }
1121}