1use crate::error::Result;
2use chkpt_core::index::{FileEntry, FileIndex};
3use chkpt_core::scanner::scan_workspace;
4use chkpt_core::store::blob::{bytes_to_hex, hash_content_bytes};
5use chkpt_core::store::catalog::{BlobLocation, CatalogSnapshot, ManifestEntry, MetadataCatalog};
6use chkpt_core::store::pack::PackWriter;
7use chkpt_core::store::snapshot::SnapshotStats;
8use chrono::Utc;
9use std::io::Write;
10use std::path::Path;
11use uuid::Uuid;
12
13pub struct SaveResult {
14 pub snapshot_id: String,
15 pub files_scanned: usize,
16 pub files_hashed: usize,
17 pub bytes_compressed: u64,
18}
19
20pub struct SavePipeline;
21
22impl SavePipeline {
23 pub fn run(workspace: &Path, store_dir: &Path, message: Option<&str>) -> Result<SaveResult> {
24 let packs_dir = store_dir.join("packs");
26 std::fs::create_dir_all(&packs_dir)?;
27 std::fs::create_dir_all(store_dir.join("trees"))?;
28
29 let scanned = scan_workspace(workspace, None)?;
31 let files_scanned = scanned.len();
32
33 let index_path = store_dir.join("index.bin");
35 let mut file_index = FileIndex::open(&index_path)?;
36
37 let catalog_path = store_dir.join("catalog.sqlite");
38
39 let mut changed_files = Vec::new();
41 for sf in &scanned {
42 if let Ok(Some(entry)) = file_index.get(&sf.relative_path) {
43 if entry.size == sf.size
45 && entry.mtime_secs == sf.mtime_secs
46 && entry.mtime_nanos == sf.mtime_nanos
47 {
48 continue;
49 }
50 }
51 changed_files.push(sf);
52 }
53
54 let files_hashed = changed_files.len();
56
57 let scanned_paths_set: std::collections::HashSet<&str> =
59 scanned.iter().map(|f| f.relative_path.as_str()).collect();
60 let all_indexed = file_index.all_paths()?;
61 let removed_paths: Vec<String> = all_indexed
62 .iter()
63 .filter(|p| !scanned_paths_set.contains(p.as_str()))
64 .cloned()
65 .collect();
66
67 if files_hashed == 0 && removed_paths.is_empty() {
68 let catalog = MetadataCatalog::open(&catalog_path)?;
70 let latest = catalog.latest_snapshot()?;
71 return Ok(SaveResult {
72 snapshot_id: latest.map(|s| s.id).unwrap_or_default(),
73 files_scanned,
74 files_hashed: 0,
75 bytes_compressed: 0,
76 });
77 }
78
79 let mut pack_writer = PackWriter::new(&packs_dir)?;
81 let mut blob_locations: Vec<([u8; 16], BlobLocation)> = Vec::new();
82 let mut new_entries: Vec<FileEntry> = Vec::new();
83 let mut bytes_compressed: u64 = 0;
84
85 for sf in &changed_files {
86 let content = chkpt_core::store::blob::read_path_bytes(
89 &sf.absolute_path,
90 sf.is_symlink,
91 )?;
92 let hash_bytes = hash_content_bytes(&content);
93 let hash_hex = bytes_to_hex(&hash_bytes);
94
95 let mut encoder = lz4_flex::frame::FrameEncoder::new(Vec::new());
97 encoder
98 .write_all(content.as_ref())
99 .map_err(crate::error::SyncorError::Io)?;
100 let compressed = encoder
101 .finish()
102 .map_err(|e| crate::error::SyncorError::Other(e.to_string()))?;
103 let compressed_len = compressed.len() as u64;
104 bytes_compressed += compressed_len;
105
106 pack_writer.add_pre_compressed(hash_hex.clone(), compressed)?;
107
108 blob_locations.push((
109 hash_bytes,
110 BlobLocation {
111 pack_hash: None, size: sf.size,
113 },
114 ));
115
116 new_entries.push(FileEntry {
117 path: sf.relative_path.clone(),
118 blob_hash: hash_bytes,
119 size: sf.size,
120 mtime_secs: sf.mtime_secs,
121 mtime_nanos: sf.mtime_nanos,
122 inode: sf.inode,
123 mode: sf.mode,
124 });
125 }
126
127 let pack_hash = if !pack_writer.is_empty() {
129 Some(pack_writer.finish()?)
130 } else {
131 drop(pack_writer);
133 None
134 };
135
136 if let Some(ref ph) = pack_hash {
138 for (_, loc) in &mut blob_locations {
139 loc.pack_hash = Some(ph.clone());
140 }
141 }
142
143 let catalog = MetadataCatalog::open(&catalog_path)?;
144
145 if !blob_locations.is_empty() {
146 catalog.bulk_upsert_blob_locations(&blob_locations)?;
147 }
148
149 let mut manifest: Vec<ManifestEntry> = Vec::new();
151 let mut total_bytes: u64 = 0;
152
153 let changed_paths: std::collections::HashSet<&str> = changed_files
155 .iter()
156 .map(|sf| sf.relative_path.as_str())
157 .collect();
158
159 for sf in &scanned {
161 if !changed_paths.contains(sf.relative_path.as_str()) {
162 if let Ok(Some(entry)) = file_index.get(&sf.relative_path) {
163 manifest.push(ManifestEntry {
164 path: entry.path.clone(),
165 blob_hash: entry.blob_hash,
166 size: entry.size,
167 mode: entry.mode,
168 });
169 total_bytes += entry.size;
170 }
171 }
172 }
173
174 for entry in &new_entries {
176 manifest.push(ManifestEntry {
177 path: entry.path.clone(),
178 blob_hash: entry.blob_hash,
179 size: entry.size,
180 mode: entry.mode,
181 });
182 total_bytes += entry.size;
183 }
184
185 let snapshot_id = Uuid::now_v7().to_string();
187 let parent_snapshot = catalog.latest_snapshot()?;
188
189 let snapshot = CatalogSnapshot {
190 id: snapshot_id.clone(),
191 created_at: Utc::now(),
192 message: message.map(|s| s.to_string()),
193 parent_snapshot_id: parent_snapshot.map(|s| s.id),
194 manifest_snapshot_id: None,
195 root_tree_hash: None,
196 stats: SnapshotStats {
197 total_files: files_scanned as u64,
198 total_bytes,
199 new_objects: files_hashed as u64,
200 },
201 };
202
203 catalog.insert_snapshot(&snapshot, &manifest)?;
204
205 let entries_to_upsert: Vec<FileEntry> = new_entries;
208
209 file_index.apply_changes(&removed_paths, &entries_to_upsert)?;
210
211 Ok(SaveResult {
212 snapshot_id,
213 files_scanned,
214 files_hashed,
215 bytes_compressed,
216 })
217 }
218}