1use crate::error::Result;
2use chkpt_core::index::{FileEntry, FileIndex};
3use chkpt_core::scanner::scan_workspace;
4use chkpt_core::store::blob::{bytes_to_hex, hash_content_bytes};
5use chkpt_core::store::catalog::{BlobLocation, CatalogSnapshot, ManifestEntry, MetadataCatalog};
6use chkpt_core::store::pack::{PackFinishOptions, PackWriter};
7use chkpt_core::store::snapshot::SnapshotStats;
8use chrono::Utc;
9use std::io::Write;
10use std::path::Path;
11use uuid::Uuid;
12
13pub struct SaveResult {
14 pub snapshot_id: String,
15 pub files_scanned: usize,
16 pub files_hashed: usize,
17 pub bytes_compressed: u64,
18}
19
20pub struct SavePipeline;
21
22impl SavePipeline {
23 pub fn run(workspace: &Path, store_dir: &Path, _message: Option<&str>) -> Result<SaveResult> {
24 let packs_dir = store_dir.join("packs");
26 let trees_dir = store_dir.join("trees");
27 let catalog_path = store_dir.join("catalog.sqlite");
28 let index_path = store_dir.join("index.bin");
29 std::fs::create_dir_all(&packs_dir)?;
30 std::fs::create_dir_all(&trees_dir)?;
31
32 let scanned = scan_workspace(workspace, None)?;
34 let files_scanned = scanned.len();
35
36 let mut index = FileIndex::open(&index_path)?;
38
39 let mut changed_files = Vec::new();
41 for sf in &scanned {
42 if let Ok(Some(entry)) = index.get(&sf.relative_path) {
43 if entry.size == sf.size
44 && entry.mtime_secs == sf.mtime_secs
45 && entry.mtime_nanos == sf.mtime_nanos
46 {
47 continue;
48 }
49 }
50 changed_files.push(sf);
51 }
52 let files_hashed = changed_files.len();
53
54 let scanned_paths: std::collections::HashSet<&str> =
56 scanned.iter().map(|f| f.relative_path.as_str()).collect();
57 let all_indexed = index.all_paths()?;
58 let removed_paths: Vec<String> = all_indexed
59 .into_iter()
60 .filter(|p| !scanned_paths.contains(p.as_str()))
61 .collect();
62
63 if files_hashed == 0 && removed_paths.is_empty() {
65 let catalog = MetadataCatalog::open(&catalog_path)?;
66 let latest = catalog.latest_snapshot()?;
67 return Ok(SaveResult {
68 snapshot_id: latest.map(|s| s.id).unwrap_or_default(),
69 files_scanned,
70 files_hashed: 0,
71 bytes_compressed: 0,
72 });
73 }
74
75 let mut pack_writer = PackWriter::new(&packs_dir)?;
77 let mut blob_locations: Vec<([u8; 16], BlobLocation)> = Vec::new();
78 let mut new_entries: Vec<FileEntry> = Vec::new();
79 let mut bytes_compressed: u64 = 0;
80
81 for sf in &changed_files {
82 let content =
85 chkpt_core::store::blob::read_path_bytes(&sf.absolute_path, sf.is_symlink)?;
86 let hash_bytes = hash_content_bytes(&content);
87 let hash_hex = bytes_to_hex(&hash_bytes);
88
89 let mut encoder = lz4_flex::frame::FrameEncoder::new(Vec::new());
91 encoder
92 .write_all(content.as_ref())
93 .map_err(crate::error::SyncorError::Io)?;
94 let compressed = encoder
95 .finish()
96 .map_err(|e| crate::error::SyncorError::Other(e.to_string()))?;
97 let compressed_len = compressed.len() as u64;
98 bytes_compressed += compressed_len;
99
100 pack_writer.add_pre_compressed(hash_hex.clone(), compressed)?;
101
102 blob_locations.push((
103 hash_bytes,
104 BlobLocation {
105 pack_hash: None, size: sf.size,
107 },
108 ));
109
110 new_entries.push(FileEntry {
111 path: sf.relative_path.clone(),
112 blob_hash: hash_bytes,
113 size: sf.size,
114 mtime_secs: sf.mtime_secs,
115 mtime_nanos: sf.mtime_nanos,
116 inode: sf.inode,
117 mode: sf.mode,
118 });
119 }
120
121 let pack_hash = if !pack_writer.is_empty() {
123 Some(pack_writer.finish_with_options(PackFinishOptions {
124 chunk_bytes: Some(50_000_000),
125 })?)
126 } else {
127 drop(pack_writer);
128 None
129 };
130
131 if let Some(ref ph) = pack_hash {
133 for (_, loc) in blob_locations.iter_mut() {
134 loc.pack_hash = Some(ph.clone());
135 }
136 }
137
138 let mut manifest: Vec<ManifestEntry> = Vec::new();
140 for sf in &scanned {
141 if let Ok(Some(entry)) = index.get(&sf.relative_path) {
142 if !new_entries.iter().any(|e| e.path == sf.relative_path) {
143 manifest.push(ManifestEntry {
144 path: entry.path.clone(),
145 blob_hash: entry.blob_hash,
146 size: entry.size,
147 mode: entry.mode,
148 });
149 }
150 }
151 }
152 for entry in &new_entries {
153 manifest.push(ManifestEntry {
154 path: entry.path.clone(),
155 blob_hash: entry.blob_hash,
156 size: entry.size,
157 mode: entry.mode,
158 });
159 }
160 manifest.sort_by(|a, b| a.path.cmp(&b.path));
161
162 let catalog = MetadataCatalog::open(&catalog_path)?;
164 let parent = catalog.latest_snapshot()?;
165
166 if !blob_locations.is_empty() {
167 catalog.bulk_upsert_blob_locations(&blob_locations)?;
168 }
169
170 let snapshot_id = Uuid::now_v7().to_string();
171 let snapshot = CatalogSnapshot {
172 id: snapshot_id.clone(),
173 created_at: Utc::now(),
174 message: None,
175 parent_snapshot_id: parent.as_ref().map(|p| p.id.clone()),
176 manifest_snapshot_id: None,
177 root_tree_hash: None,
178 stats: SnapshotStats {
179 total_files: manifest.len() as u64,
180 total_bytes: manifest.iter().map(|e| e.size).sum(),
181 new_objects: files_hashed as u64,
182 },
183 };
184 catalog.insert_snapshot(&snapshot, &manifest)?;
185
186 index.apply_changes(&removed_paths, &new_entries)?;
188
189 Ok(SaveResult {
190 snapshot_id,
191 files_scanned,
192 files_hashed,
193 bytes_compressed,
194 })
195 }
196}