1use crate::commit::{Commit, CommitsTable};
8use crate::object_store::GitObjectStore;
9use crate::refs::RefsTable;
10use arrow::array::RecordBatch;
11use arrow::datatypes::Schema;
12use nusy_arrow_core::Namespace;
13use parquet::arrow::ArrowWriter;
14use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
15use parquet::basic::Compression;
16use parquet::file::properties::WriterProperties;
17use std::collections::HashSet;
18use std::fs;
19use std::path::Path;
20use std::sync::Arc;
21
22#[derive(Debug, thiserror::Error)]
24pub enum SaveError {
25 #[error("IO error: {0}")]
26 Io(#[from] std::io::Error),
27
28 #[error("Parquet error: {0}")]
29 Parquet(#[from] parquet::errors::ParquetError),
30
31 #[error("Arrow error: {0}")]
32 Arrow(#[from] arrow::error::ArrowError),
33
34 #[error("Save point not found: {0}")]
35 NotFound(String),
36
37 #[error("Write-ahead log incomplete — previous save may be corrupt")]
38 IncompleteWal,
39}
40
41pub type Result<T> = std::result::Result<T, SaveError>;
42
43pub fn save(obj_store: &GitObjectStore, save_dir: &Path) -> Result<()> {
52 save_full(obj_store, None, None, save_dir)
53}
54
55pub fn save_full(
57 obj_store: &GitObjectStore,
58 commits_table: Option<&CommitsTable>,
59 refs_table: Option<&RefsTable>,
60 save_dir: &Path,
61) -> Result<()> {
62 fs::create_dir_all(save_dir)?;
63
64 let wal_path = save_dir.join("_wal.json");
66 let namespaces_with_data: Vec<String> = Namespace::ALL
67 .iter()
68 .filter(|ns| !obj_store.store.get_namespace_batches(**ns).is_empty())
69 .map(|ns| ns.as_str().to_string())
70 .collect();
71
72 fs::write(&wal_path, serde_json_minimal(&namespaces_with_data))?;
73
74 for ns in Namespace::ALL {
76 let batches = obj_store.store.get_namespace_batches(ns);
77 let target = save_dir.join(format!("{}.parquet", ns.as_str()));
78
79 if batches.is_empty() {
80 let _ = fs::remove_file(&target);
82 continue;
83 }
84
85 let tmp_path = save_dir.join(format!("{}.parquet.tmp", ns.as_str()));
86 let schema = obj_store.store.schema().clone();
87 let file = fs::File::create(&tmp_path)?;
88 let mut writer = ArrowWriter::try_new(file, schema, None)?;
89
90 for batch in batches {
91 writer.write(batch)?;
92 }
93 writer.close()?;
94
95 fs::rename(&tmp_path, &target)?;
97 }
98
99 if let Some(ct) = commits_table {
101 let commits_json = serialize_commits(ct);
102 let tmp = save_dir.join("_commits.json.tmp");
103 fs::write(&tmp, &commits_json)?;
104 fs::rename(&tmp, save_dir.join("_commits.json"))?;
105 }
106
107 if let Some(rt) = refs_table {
109 let refs_json = serialize_refs(rt);
110 let tmp = save_dir.join("_refs.json.tmp");
111 fs::write(&tmp, &refs_json)?;
112 fs::rename(&tmp, save_dir.join("_refs.json"))?;
113 }
114
115 let _ = fs::remove_file(&wal_path);
117
118 Ok(())
119}
120
121pub fn restore(obj_store: &mut GitObjectStore, save_dir: &Path) -> Result<()> {
123 let (_, _) = restore_full(obj_store, save_dir)?;
124 Ok(())
125}
126
127pub fn restore_full(
131 obj_store: &mut GitObjectStore,
132 save_dir: &Path,
133) -> Result<(Option<CommitsTable>, Option<RefsTable>)> {
134 if !save_dir.exists() {
135 return Err(SaveError::NotFound(save_dir.display().to_string()));
136 }
137
138 let wal_path = save_dir.join("_wal.json");
140 if wal_path.exists() {
141 let _ = fs::remove_file(&wal_path);
149 }
150
151 obj_store.store.clear();
152
153 for ns in Namespace::ALL {
154 let path = save_dir.join(format!("{}.parquet", ns.as_str()));
155 if !path.exists() {
156 continue;
157 }
158
159 let file = fs::File::open(&path)?;
160 let reader = ParquetRecordBatchReaderBuilder::try_new(file)?.build()?;
161
162 let mut batches = Vec::new();
163 for batch_result in reader {
164 batches.push(batch_result?);
165 }
166
167 obj_store.store.set_namespace_batches(ns, batches);
168 }
169
170 let commits = {
172 let path = save_dir.join("_commits.json");
173 if path.exists() {
174 let data = fs::read_to_string(&path)?;
175 Some(deserialize_commits(&data))
176 } else {
177 None
178 }
179 };
180
181 let refs = {
183 let path = save_dir.join("_refs.json");
184 if path.exists() {
185 let data = fs::read_to_string(&path)?;
186 Some(deserialize_refs(&data))
187 } else {
188 None
189 }
190 };
191
192 Ok((commits, refs))
193}
194
195#[derive(Debug, Clone)]
197pub struct SaveMetrics {
198 pub namespaces_saved: Vec<String>,
200 pub bytes_written: u64,
202 pub duration_ms: u128,
204 pub compressed: bool,
206}
207
208#[derive(Debug, Clone, Default)]
210pub struct SaveOptions {
211 pub compress: bool,
213 pub dirty_namespaces: Option<HashSet<Namespace>>,
216}
217
218pub fn save_with_options(
222 obj_store: &GitObjectStore,
223 commits_table: Option<&CommitsTable>,
224 refs_table: Option<&RefsTable>,
225 save_dir: &Path,
226 options: &SaveOptions,
227) -> Result<SaveMetrics> {
228 let start = std::time::Instant::now();
229 fs::create_dir_all(save_dir)?;
230
231 let namespaces_to_save: Vec<Namespace> = Namespace::ALL
233 .iter()
234 .filter(|ns| {
235 if obj_store.store.get_namespace_batches(**ns).is_empty() {
237 return false;
238 }
239 if let Some(dirty) = &options.dirty_namespaces {
241 return dirty.contains(ns);
242 }
243 true
244 })
245 .copied()
246 .collect();
247
248 let wal_path = save_dir.join("_wal.json");
250 let ns_names: Vec<String> = namespaces_to_save
251 .iter()
252 .map(|ns| ns.as_str().to_string())
253 .collect();
254 fs::write(&wal_path, serde_json_minimal(&ns_names))?;
255
256 let props = if options.compress {
258 WriterProperties::builder()
259 .set_compression(Compression::ZSTD(Default::default()))
260 .build()
261 } else {
262 WriterProperties::builder().build()
263 };
264
265 let mut total_bytes = 0u64;
266 let mut saved_ns_names = Vec::new();
267
268 for ns in &namespaces_to_save {
270 let batches = obj_store.store.get_namespace_batches(*ns);
271 let target = save_dir.join(format!("{}.parquet", ns.as_str()));
272
273 if batches.is_empty() {
274 let _ = fs::remove_file(&target);
275 continue;
276 }
277
278 let tmp_path = save_dir.join(format!("{}.parquet.tmp", ns.as_str()));
279 let schema = obj_store.store.schema().clone();
280 let file = fs::File::create(&tmp_path)?;
281 let mut writer = ArrowWriter::try_new(file, schema, Some(props.clone()))?;
282
283 for batch in batches {
284 writer.write(batch)?;
285 }
286 writer.close()?;
287
288 let file_size = fs::metadata(&tmp_path)?.len();
289 total_bytes += file_size;
290 saved_ns_names.push(ns.as_str().to_string());
291
292 fs::rename(&tmp_path, &target)?;
293 }
294
295 if let Some(ct) = commits_table {
297 let commits_json = serialize_commits(ct);
298 let tmp = save_dir.join("_commits.json.tmp");
299 fs::write(&tmp, &commits_json)?;
300 fs::rename(&tmp, save_dir.join("_commits.json"))?;
301 }
302
303 if let Some(rt) = refs_table {
305 let refs_json = serialize_refs(rt);
306 let tmp = save_dir.join("_refs.json.tmp");
307 fs::write(&tmp, &refs_json)?;
308 fs::rename(&tmp, save_dir.join("_refs.json"))?;
309 }
310
311 let _ = fs::remove_file(&wal_path);
313
314 Ok(SaveMetrics {
315 namespaces_saved: saved_ns_names,
316 bytes_written: total_bytes,
317 duration_ms: start.elapsed().as_millis(),
318 compressed: options.compress,
319 })
320}
321
322pub fn save_named_batches(
333 entries: &[(&str, &[RecordBatch], &Schema)],
334 save_dir: &Path,
335) -> Result<SaveMetrics> {
336 let start = std::time::Instant::now();
337 fs::create_dir_all(save_dir)?;
338
339 let wal_path = save_dir.join("_wal.json");
341 let names: Vec<String> = entries
342 .iter()
343 .map(|(name, _, _)| name.to_string())
344 .collect();
345 fs::write(&wal_path, serde_json_minimal(&names))?;
346
347 let mut total_bytes = 0u64;
348 let mut saved_names = Vec::new();
349
350 for (name, batches, schema) in entries {
351 let target = save_dir.join(format!("{name}.parquet"));
352
353 if batches.is_empty() {
354 let _ = fs::remove_file(&target);
356 continue;
357 }
358
359 let tmp_path = save_dir.join(format!("{name}.parquet.tmp"));
360 let schema_ref = Arc::new((*schema).clone());
361 let file = fs::File::create(&tmp_path)?;
362 let mut writer = ArrowWriter::try_new(file, schema_ref, None)?;
363
364 for batch in *batches {
365 writer.write(batch)?;
366 }
367 writer.close()?;
368
369 let file_size = fs::metadata(&tmp_path)?.len();
370 total_bytes += file_size;
371 saved_names.push(name.to_string());
372
373 fs::rename(&tmp_path, &target)?;
375 }
376
377 let _ = fs::remove_file(&wal_path);
379
380 Ok(SaveMetrics {
381 namespaces_saved: saved_names,
382 bytes_written: total_bytes,
383 duration_ms: start.elapsed().as_millis(),
384 compressed: false,
385 })
386}
387
388pub fn restore_named_batches(
393 save_dir: &Path,
394 names: &[&str],
395) -> Result<Vec<(String, Vec<RecordBatch>)>> {
396 if !save_dir.exists() {
397 return Err(SaveError::NotFound(save_dir.display().to_string()));
398 }
399
400 let wal_path = save_dir.join("_wal.json");
402 if wal_path.exists() {
403 let _ = fs::remove_file(&wal_path);
407 }
408
409 let mut results = Vec::new();
410
411 for name in names {
412 let path = save_dir.join(format!("{name}.parquet"));
413 if !path.exists() {
414 continue;
415 }
416
417 let file = fs::File::open(&path)?;
418 let reader = ParquetRecordBatchReaderBuilder::try_new(file)?.build()?;
419
420 let mut batches = Vec::new();
421 for batch_result in reader {
422 batches.push(batch_result?);
423 }
424
425 results.push((name.to_string(), batches));
426 }
427
428 Ok(results)
429}
430
431pub fn persist_commits(table: &CommitsTable, dir: &Path) -> Result<()> {
435 fs::create_dir_all(dir)?;
436 let json = serialize_commits(table);
437 let tmp = dir.join("_commits.json.tmp");
438 fs::write(&tmp, &json)?;
439 fs::rename(&tmp, dir.join("_commits.json"))?;
440 Ok(())
441}
442
443pub fn restore_commits(dir: &Path) -> Result<Option<CommitsTable>> {
447 let path = dir.join("_commits.json");
448 if !path.exists() {
449 return Ok(None);
450 }
451 let data = fs::read_to_string(&path)?;
452 Ok(Some(deserialize_commits(&data)))
453}
454
455fn serde_json_minimal(items: &[String]) -> String {
459 let inner: Vec<String> = items.iter().map(|s| format!("\"{}\"", s)).collect();
460 format!("[{}]", inner.join(","))
461}
462
463fn json_escape(s: &str) -> String {
465 s.replace('\\', "\\\\")
466 .replace('"', "\\\"")
467 .replace('\n', "\\n")
468 .replace('\r', "\\r")
469 .replace('\t', "\\t")
470}
471
472pub(crate) fn serialize_commits(table: &CommitsTable) -> String {
474 let mut lines = Vec::new();
475 for c in table.all() {
476 let parents: Vec<String> = c
477 .parent_ids
478 .iter()
479 .map(|p| format!("\"{}\"", json_escape(p)))
480 .collect();
481 lines.push(format!(
482 "{{\"id\":\"{}\",\"parents\":[{}],\"ts\":{},\"msg\":\"{}\",\"author\":\"{}\"}}",
483 json_escape(&c.commit_id),
484 parents.join(","),
485 c.timestamp_ms,
486 json_escape(&c.message),
487 json_escape(&c.author),
488 ));
489 }
490 format!("[{}]", lines.join(",\n"))
491}
492
493pub(crate) fn deserialize_commits(json: &str) -> CommitsTable {
495 let mut table = CommitsTable::new();
496 for obj in extract_json_objects(json) {
498 let id = extract_json_string(&obj, "id").unwrap_or_default();
499 let msg = extract_json_string(&obj, "msg").unwrap_or_default();
500 let author = extract_json_string(&obj, "author").unwrap_or_default();
501 let ts = extract_json_number(&obj, "ts").unwrap_or(0);
502 let parents = extract_json_string_array(&obj, "parents");
503
504 table.append(Commit {
505 commit_id: id,
506 parent_ids: parents,
507 timestamp_ms: ts,
508 message: msg,
509 author,
510 });
511 }
512 table
513}
514
515pub(crate) fn serialize_refs(table: &RefsTable) -> String {
517 let mut lines = Vec::new();
518 for r in table.branches() {
519 lines.push(format!(
520 "{{\"name\":\"{}\",\"commit\":\"{}\",\"type\":\"{}\",\"head\":{},\"created\":{}}}",
521 json_escape(&r.ref_name),
522 json_escape(&r.commit_id),
523 r.ref_type.as_str(),
524 r.is_head,
525 r.created_at_ms,
526 ));
527 }
528 format!("[{}]", lines.join(",\n"))
529}
530
531pub(crate) fn deserialize_refs(json: &str) -> RefsTable {
533 let mut table = RefsTable::new();
534 for obj in extract_json_objects(json) {
535 let name = extract_json_string(&obj, "name").unwrap_or_default();
536 let commit = extract_json_string(&obj, "commit").unwrap_or_default();
537 let is_head = obj.contains("\"head\":true");
538
539 if table.head().is_none() && is_head {
541 table.init_main(&commit);
542 if name != "main" {
544 let _ = table.update_ref("main", &commit);
546 }
547 } else {
548 let _ = table.create_branch(&name, &commit);
549 if is_head {
550 let _ = table.switch_head(&name);
551 }
552 }
553 }
554 table
555}
556
557fn extract_json_objects(json: &str) -> Vec<String> {
561 let mut objects = Vec::new();
562 let mut depth = 0;
563 let mut start = None;
564 for (i, ch) in json.char_indices() {
565 match ch {
566 '{' => {
567 if depth == 0 {
568 start = Some(i);
569 }
570 depth += 1;
571 }
572 '}' => {
573 depth -= 1;
574 if depth == 0 {
575 if let Some(s) = start {
576 objects.push(json[s..=i].to_string());
577 }
578 start = None;
579 }
580 }
581 _ => {}
582 }
583 }
584 objects
585}
586
587fn extract_json_string(obj: &str, key: &str) -> Option<String> {
589 let pattern = format!("\"{}\":\"", key);
590 let start = obj.find(&pattern)? + pattern.len();
591 let rest = &obj[start..];
592 let mut end = 0;
594 let mut escaped = false;
595 for ch in rest.chars() {
596 if escaped {
597 escaped = false;
598 } else if ch == '\\' {
599 escaped = true;
600 } else if ch == '"' {
601 break;
602 }
603 end += ch.len_utf8();
604 }
605 Some(
606 rest[..end]
607 .replace("\\\"", "\"")
608 .replace("\\\\", "\\")
609 .replace("\\n", "\n"),
610 )
611}
612
613fn extract_json_number(obj: &str, key: &str) -> Option<i64> {
615 let pattern = format!("\"{}\":", key);
616 let start = obj.find(&pattern)? + pattern.len();
617 let rest = obj[start..].trim_start();
618 let end = rest
619 .find(|c: char| !c.is_ascii_digit() && c != '-')
620 .unwrap_or(rest.len());
621 rest[..end].parse().ok()
622}
623
624fn extract_json_string_array(obj: &str, key: &str) -> Vec<String> {
626 let pattern = format!("\"{}\":[", key);
627 let Some(start) = obj.find(&pattern) else {
628 return Vec::new();
629 };
630 let start = start + pattern.len();
631 let rest = &obj[start..];
632 let end = rest.find(']').unwrap_or(rest.len());
633 let inner = &rest[..end];
634
635 let mut result = Vec::new();
636 for part in inner.split(',') {
637 let trimmed = part.trim().trim_matches('"');
638 if !trimmed.is_empty() {
639 result.push(trimmed.to_string());
640 }
641 }
642 result
643}
644
645#[cfg(test)]
646mod tests {
647 use super::*;
648 use nusy_arrow_core::{Namespace, Triple, YLayer};
649
650 fn sample_triple(subj: &str) -> Triple {
651 Triple {
652 subject: subj.to_string(),
653 predicate: "rdf:type".to_string(),
654 object: "Thing".to_string(),
655 graph: None,
656 confidence: Some(0.9),
657 source_document: None,
658 source_chunk_id: None,
659 extracted_by: None,
660 caused_by: None,
661 derived_from: None,
662 consolidated_at: None,
663 certifiability_class: None,
664 }
665 }
666
667 #[test]
668 fn test_save_restore_roundtrip() {
669 let tmp = tempfile::tempdir().unwrap();
670 let save_dir = tmp.path().join("savepoint");
671 let mut obj = GitObjectStore::with_snapshot_dir(tmp.path().join("snapshots"));
672
673 for i in 0..50 {
675 obj.store
676 .add_triple(
677 &sample_triple(&format!("world-{i}")),
678 Namespace::World,
679 YLayer::Semantic,
680 )
681 .unwrap();
682 }
683 for i in 0..30 {
684 obj.store
685 .add_triple(
686 &sample_triple(&format!("work-{i}")),
687 Namespace::Work,
688 YLayer::Procedural,
689 )
690 .unwrap();
691 }
692
693 assert_eq!(obj.store.len(), 80);
694
695 save(&obj, &save_dir).unwrap();
697
698 assert!(save_dir.join("world.parquet").exists());
700 assert!(save_dir.join("work.parquet").exists());
701 assert!(!save_dir.join("research.parquet").exists()); assert!(!save_dir.join("_wal.json").exists()); obj.store.clear();
706 assert_eq!(obj.store.len(), 0);
707
708 restore(&mut obj, &save_dir).unwrap();
709 assert_eq!(obj.store.len(), 80);
710 }
711
712 #[test]
713 fn test_restore_nonexistent_fails() {
714 let tmp = tempfile::tempdir().unwrap();
715 let mut obj = GitObjectStore::with_snapshot_dir(tmp.path());
716 let result = restore(&mut obj, &tmp.path().join("nonexistent"));
717 assert!(result.is_err());
718 }
719
720 #[test]
721 fn test_save_atomic_no_partial_files() {
722 let tmp = tempfile::tempdir().unwrap();
723 let save_dir = tmp.path().join("savepoint");
724 let obj = GitObjectStore::with_snapshot_dir(tmp.path().join("snapshots"));
725
726 save(&obj, &save_dir).unwrap();
728 assert!(!save_dir.join("world.parquet").exists());
729 }
730
731 #[test]
732 fn test_simulated_crash_recovery() {
733 let tmp = tempfile::tempdir().unwrap();
734 let save_dir = tmp.path().join("savepoint");
735 let mut obj = GitObjectStore::with_snapshot_dir(tmp.path().join("snapshots"));
736
737 obj.store
739 .add_triple(&sample_triple("s1"), Namespace::World, YLayer::Semantic)
740 .unwrap();
741 save(&obj, &save_dir).unwrap();
742
743 fs::write(save_dir.join("_wal.json"), "[\"world\"]").unwrap();
745
746 obj.store.clear();
748 restore(&mut obj, &save_dir).unwrap();
749 assert_eq!(obj.store.len(), 1);
750 }
751
752 #[test]
753 fn test_concurrent_reads_during_save() {
754 let tmp = tempfile::tempdir().unwrap();
755 let save_dir = tmp.path().join("savepoint");
756 let mut obj = GitObjectStore::with_snapshot_dir(tmp.path().join("snapshots"));
757
758 for i in 0..100 {
760 obj.store
761 .add_triple(
762 &sample_triple(&format!("s{i}")),
763 Namespace::World,
764 YLayer::Semantic,
765 )
766 .unwrap();
767 }
768 save(&obj, &save_dir).unwrap();
769
770 assert_eq!(obj.store.len(), 100);
772
773 save(&obj, &save_dir).unwrap();
775
776 obj.store.clear();
778 restore(&mut obj, &save_dir).unwrap();
779 assert_eq!(obj.store.len(), 100);
780 }
781
782 #[test]
783 fn test_save_full_persists_commits_and_refs() {
784 let tmp = tempfile::tempdir().unwrap();
785 let save_dir = tmp.path().join("savepoint");
786 let mut obj = GitObjectStore::with_snapshot_dir(tmp.path().join("snapshots"));
787
788 obj.store
790 .add_triple(&sample_triple("s1"), Namespace::World, YLayer::Semantic)
791 .unwrap();
792
793 let mut commits = crate::commit::CommitsTable::new();
795 let c1 = crate::commit::create_commit(&obj, &mut commits, vec![], "init", "DGX").unwrap();
796
797 let mut refs = crate::refs::RefsTable::new();
798 refs.init_main(&c1.commit_id);
799 refs.create_branch("feature", &c1.commit_id).unwrap();
800
801 save_full(&obj, Some(&commits), Some(&refs), &save_dir).unwrap();
803
804 assert!(save_dir.join("_commits.json").exists());
806 assert!(save_dir.join("_refs.json").exists());
807
808 let mut obj2 = GitObjectStore::with_snapshot_dir(tmp.path().join("snapshots2"));
810 let (restored_commits, restored_refs) = restore_full(&mut obj2, &save_dir).unwrap();
811
812 let rc = restored_commits.unwrap();
814 assert_eq!(rc.len(), 1);
815 assert_eq!(rc.get(&c1.commit_id).unwrap().message, "init");
816
817 let rr = restored_refs.unwrap();
819 assert_eq!(rr.branches().len(), 2);
820 assert!(rr.head().is_some());
821
822 assert_eq!(obj2.store.len(), 1);
824 }
825
826 #[test]
827 fn test_save_with_zstd_compression() {
828 let tmp = tempfile::tempdir().unwrap();
829 let save_dir = tmp.path().join("compressed");
830 let uncompressed_dir = tmp.path().join("uncompressed");
831 let mut obj = GitObjectStore::with_snapshot_dir(tmp.path().join("snap"));
832
833 for i in 0..1000 {
835 obj.store
836 .add_triple(
837 &sample_triple(&format!("entity-{}", i)),
838 Namespace::World,
839 YLayer::Semantic,
840 )
841 .unwrap();
842 }
843
844 let metrics_plain = save_with_options(
846 &obj,
847 None,
848 None,
849 &uncompressed_dir,
850 &SaveOptions {
851 compress: false,
852 dirty_namespaces: None,
853 },
854 )
855 .unwrap();
856
857 let metrics_zstd = save_with_options(
859 &obj,
860 None,
861 None,
862 &save_dir,
863 &SaveOptions {
864 compress: true,
865 dirty_namespaces: None,
866 },
867 )
868 .unwrap();
869
870 assert!(metrics_zstd.compressed);
871 assert!(!metrics_plain.compressed);
872
873 assert!(
875 metrics_zstd.bytes_written < metrics_plain.bytes_written,
876 "Compressed ({}) should be smaller than uncompressed ({})",
877 metrics_zstd.bytes_written,
878 metrics_plain.bytes_written,
879 );
880
881 obj.store.clear();
883 restore(&mut obj, &save_dir).unwrap();
884 assert_eq!(obj.store.len(), 1000);
885 }
886
887 #[test]
888 fn test_incremental_save_only_dirty_namespaces() {
889 let tmp = tempfile::tempdir().unwrap();
890 let save_dir = tmp.path().join("incremental");
891 let mut obj = GitObjectStore::with_snapshot_dir(tmp.path().join("snap"));
892
893 for ns in Namespace::ALL {
895 for i in 0..100 {
896 obj.store
897 .add_triple(
898 &sample_triple(&format!("{}:{}", ns.as_str(), i)),
899 ns,
900 YLayer::Semantic,
901 )
902 .unwrap();
903 }
904 }
905
906 save(&obj, &save_dir).unwrap();
908
909 let mut dirty = HashSet::new();
911 dirty.insert(Namespace::World);
912
913 let metrics = save_with_options(
914 &obj,
915 None,
916 None,
917 &save_dir,
918 &SaveOptions {
919 compress: false,
920 dirty_namespaces: Some(dirty),
921 },
922 )
923 .unwrap();
924
925 assert_eq!(metrics.namespaces_saved.len(), 1);
927 assert_eq!(metrics.namespaces_saved[0], "world");
928
929 obj.store.clear();
931 restore(&mut obj, &save_dir).unwrap();
932 assert_eq!(obj.store.len(), Namespace::ALL.len() * 100);
933 }
934
935 #[test]
936 fn test_save_metrics_populated() {
937 let tmp = tempfile::tempdir().unwrap();
938 let save_dir = tmp.path().join("metrics");
939 let mut obj = GitObjectStore::with_snapshot_dir(tmp.path().join("snap"));
940
941 for i in 0..200 {
942 obj.store
943 .add_triple(
944 &sample_triple(&format!("entity-{}", i)),
945 Namespace::World,
946 YLayer::Semantic,
947 )
948 .unwrap();
949 }
950 for i in 0..100 {
951 obj.store
952 .add_triple(
953 &sample_triple(&format!("work-{}", i)),
954 Namespace::Work,
955 YLayer::Experience,
956 )
957 .unwrap();
958 }
959
960 let metrics =
961 save_with_options(&obj, None, None, &save_dir, &SaveOptions::default()).unwrap();
962
963 assert_eq!(metrics.namespaces_saved.len(), 2);
964 assert!(metrics.namespaces_saved.contains(&"world".to_string()));
965 assert!(metrics.namespaces_saved.contains(&"work".to_string()));
966 assert!(metrics.bytes_written > 0);
967 assert!(!metrics.compressed);
968 }
969
970 fn kanban_schema() -> Schema {
973 use arrow::datatypes::{DataType, Field};
974 Schema::new(vec![
975 Field::new("id", DataType::Utf8, false),
976 Field::new("title", DataType::Utf8, false),
977 Field::new("status", DataType::Utf8, false),
978 ])
979 }
980
981 fn kanban_batch(ids: &[&str], titles: &[&str], statuses: &[&str]) -> RecordBatch {
982 use arrow::array::StringArray;
983 RecordBatch::try_new(
984 Arc::new(kanban_schema()),
985 vec![
986 Arc::new(StringArray::from(ids.to_vec())),
987 Arc::new(StringArray::from(titles.to_vec())),
988 Arc::new(StringArray::from(statuses.to_vec())),
989 ],
990 )
991 .unwrap()
992 }
993
994 #[test]
995 fn test_save_named_batches_roundtrip() {
996 let tmp = tempfile::tempdir().unwrap();
997 let save_dir = tmp.path().join("kanban");
998
999 let batch = kanban_batch(
1000 &["EXP-1", "EXP-2"],
1001 &["First", "Second"],
1002 &["backlog", "in_progress"],
1003 );
1004 let schema = kanban_schema();
1005
1006 let metrics =
1007 save_named_batches(&[("items", &[batch.clone()], &schema)], &save_dir).unwrap();
1008
1009 assert_eq!(metrics.namespaces_saved, vec!["items"]);
1010 assert!(metrics.bytes_written > 0);
1011 assert!(!metrics.compressed);
1012 assert!(!save_dir.join("_wal.json").exists());
1013
1014 let results = restore_named_batches(&save_dir, &["items"]).unwrap();
1016 assert_eq!(results.len(), 1);
1017 assert_eq!(results[0].0, "items");
1018 assert_eq!(results[0].1[0].num_rows(), 2);
1019 }
1020
1021 #[test]
1022 fn test_save_named_batches_multiple_datasets() {
1023 let tmp = tempfile::tempdir().unwrap();
1024 let save_dir = tmp.path().join("multi");
1025
1026 let items = kanban_batch(&["EXP-1"], &["Expedition"], &["backlog"]);
1027 let runs = kanban_batch(&["RUN-1"], &["Status Change"], &["done"]);
1028 let schema = kanban_schema();
1029
1030 save_named_batches(
1031 &[("items", &[items], &schema), ("runs", &[runs], &schema)],
1032 &save_dir,
1033 )
1034 .unwrap();
1035
1036 assert!(save_dir.join("items.parquet").exists());
1037 assert!(save_dir.join("runs.parquet").exists());
1038
1039 let results = restore_named_batches(&save_dir, &["items", "runs"]).unwrap();
1040 assert_eq!(results.len(), 2);
1041 }
1042
1043 #[test]
1044 fn test_save_named_batches_empty_skipped() {
1045 let tmp = tempfile::tempdir().unwrap();
1046 let save_dir = tmp.path().join("empty");
1047
1048 let schema = kanban_schema();
1049 let metrics =
1050 save_named_batches(&[("items", &[] as &[RecordBatch], &schema)], &save_dir).unwrap();
1051
1052 assert!(metrics.namespaces_saved.is_empty());
1054 assert!(!save_dir.join("items.parquet").exists());
1055 }
1056
1057 #[test]
1058 fn test_restore_named_batches_missing_files_skipped() {
1059 let tmp = tempfile::tempdir().unwrap();
1060 let save_dir = tmp.path().join("partial");
1061 fs::create_dir_all(&save_dir).unwrap();
1062
1063 let batch = kanban_batch(&["EXP-1"], &["Test"], &["backlog"]);
1065 save_named_batches(&[("items", &[batch], &kanban_schema())], &save_dir).unwrap();
1066
1067 let results = restore_named_batches(&save_dir, &["items", "runs"]).unwrap();
1068 assert_eq!(results.len(), 1); assert_eq!(results[0].0, "items");
1070 }
1071
1072 #[test]
1073 fn test_restore_named_batches_nonexistent_dir() {
1074 let tmp = tempfile::tempdir().unwrap();
1075 let result = restore_named_batches(&tmp.path().join("nonexistent"), &["items"]);
1076 assert!(result.is_err());
1077 }
1078
1079 #[test]
1080 fn test_save_named_batches_wal_cleanup() {
1081 let tmp = tempfile::tempdir().unwrap();
1082 let save_dir = tmp.path().join("wal_test");
1083
1084 let batch = kanban_batch(&["EXP-1"], &["Test"], &["backlog"]);
1085 save_named_batches(&[("items", &[batch], &kanban_schema())], &save_dir).unwrap();
1086
1087 assert!(!save_dir.join("_wal.json").exists());
1089 }
1090
1091 #[test]
1092 fn test_save_named_batches_crash_recovery() {
1093 let tmp = tempfile::tempdir().unwrap();
1094 let save_dir = tmp.path().join("crash");
1095
1096 let batch = kanban_batch(&["EXP-1"], &["Test"], &["backlog"]);
1098 save_named_batches(&[("items", &[batch], &kanban_schema())], &save_dir).unwrap();
1099
1100 fs::write(save_dir.join("_wal.json"), "[\"items\"]").unwrap();
1102
1103 let results = restore_named_batches(&save_dir, &["items"]).unwrap();
1105 assert_eq!(results.len(), 1);
1106 assert_eq!(results[0].1[0].num_rows(), 1);
1107 assert!(!save_dir.join("_wal.json").exists());
1108 }
1109
1110 #[test]
1111 fn test_persist_commits_roundtrip() {
1112 let tmp = tempfile::tempdir().unwrap();
1113 let dir = tmp.path();
1114
1115 let mut table = CommitsTable::new();
1116 table.append(Commit {
1117 commit_id: "c1".to_string(),
1118 parent_ids: vec![],
1119 timestamp_ms: 1000,
1120 message: "first save".to_string(),
1121 author: "nusy-kanban".to_string(),
1122 });
1123 table.append(Commit {
1124 commit_id: "c2".to_string(),
1125 parent_ids: vec!["c1".to_string()],
1126 timestamp_ms: 2000,
1127 message: "second save".to_string(),
1128 author: "nusy-kanban".to_string(),
1129 });
1130
1131 persist_commits(&table, dir).unwrap();
1132 assert!(dir.join("_commits.json").exists());
1133
1134 let restored = restore_commits(dir).unwrap().unwrap();
1135 assert_eq!(restored.len(), 2);
1136 assert_eq!(restored.get("c1").unwrap().message, "first save");
1137 assert_eq!(restored.get("c2").unwrap().parent_ids, vec!["c1"]);
1138 }
1139
1140 #[test]
1141 fn test_restore_commits_empty_dir() {
1142 let tmp = tempfile::tempdir().unwrap();
1143 let result = restore_commits(tmp.path()).unwrap();
1144 assert!(result.is_none());
1145 }
1146}