1use crate::error::Result;
2use chkpt_core::index::{FileEntry, FileIndex};
3use chkpt_core::scanner::scan_workspace;
4use chkpt_core::store::blob::{bytes_to_hex, hash_content_bytes, read_or_mmap};
5use chkpt_core::store::catalog::{BlobLocation, CatalogSnapshot, ManifestEntry, MetadataCatalog};
6use chkpt_core::store::pack::PackWriter;
7use chkpt_core::store::snapshot::SnapshotStats;
8use chrono::Utc;
9use std::io::Write;
10use std::path::Path;
11use uuid::Uuid;
12
13pub struct SaveResult {
14 pub snapshot_id: String,
15 pub files_scanned: usize,
16 pub files_hashed: usize,
17 pub bytes_compressed: u64,
18}
19
20pub struct SavePipeline;
21
22impl SavePipeline {
23 pub fn run(workspace: &Path, store_dir: &Path, message: Option<&str>) -> Result<SaveResult> {
24 let packs_dir = store_dir.join("packs");
26 std::fs::create_dir_all(&packs_dir)?;
27 std::fs::create_dir_all(store_dir.join("trees"))?;
28
29 let scanned = scan_workspace(workspace, None)?;
31 let files_scanned = scanned.len();
32
33 let index_path = store_dir.join("index.bin");
35 let mut file_index = FileIndex::open(&index_path)?;
36
37 let catalog_path = store_dir.join("catalog.sqlite");
38
39 let mut changed_files = Vec::new();
41 for sf in &scanned {
42 if let Ok(Some(entry)) = file_index.get(&sf.relative_path) {
43 if entry.size == sf.size
45 && entry.mtime_secs == sf.mtime_secs
46 && entry.mtime_nanos == sf.mtime_nanos
47 {
48 continue;
49 }
50 }
51 changed_files.push(sf);
52 }
53
54 let files_hashed = changed_files.len();
56
57 let scanned_paths_set: std::collections::HashSet<&str> =
59 scanned.iter().map(|f| f.relative_path.as_str()).collect();
60 let all_indexed = file_index.all_paths()?;
61 let removed_paths: Vec<String> = all_indexed
62 .iter()
63 .filter(|p| !scanned_paths_set.contains(p.as_str()))
64 .cloned()
65 .collect();
66
67 if files_hashed == 0 && removed_paths.is_empty() {
68 let catalog = MetadataCatalog::open(&catalog_path)?;
70 let latest = catalog.latest_snapshot()?;
71 return Ok(SaveResult {
72 snapshot_id: latest.map(|s| s.id).unwrap_or_default(),
73 files_scanned,
74 files_hashed: 0,
75 bytes_compressed: 0,
76 });
77 }
78
79 let mut pack_writer = PackWriter::new(&packs_dir)?;
81 let mut blob_locations: Vec<([u8; 16], BlobLocation)> = Vec::new();
82 let mut new_entries: Vec<FileEntry> = Vec::new();
83 let mut bytes_compressed: u64 = 0;
84
85 for sf in &changed_files {
86 let content = read_or_mmap(&sf.absolute_path)?;
87 let hash_bytes = hash_content_bytes(content.as_ref());
88 let hash_hex = bytes_to_hex(&hash_bytes);
89
90 let mut encoder = lz4_flex::frame::FrameEncoder::new(Vec::new());
92 encoder
93 .write_all(content.as_ref())
94 .map_err(crate::error::SyncorError::Io)?;
95 let compressed = encoder
96 .finish()
97 .map_err(|e| crate::error::SyncorError::Other(e.to_string()))?;
98 let compressed_len = compressed.len() as u64;
99 bytes_compressed += compressed_len;
100
101 pack_writer.add_pre_compressed(hash_hex.clone(), compressed)?;
102
103 blob_locations.push((
104 hash_bytes,
105 BlobLocation {
106 pack_hash: None, size: sf.size,
108 },
109 ));
110
111 new_entries.push(FileEntry {
112 path: sf.relative_path.clone(),
113 blob_hash: hash_bytes,
114 size: sf.size,
115 mtime_secs: sf.mtime_secs,
116 mtime_nanos: sf.mtime_nanos,
117 inode: sf.inode,
118 mode: sf.mode,
119 });
120 }
121
122 let pack_hash = if !pack_writer.is_empty() {
124 Some(pack_writer.finish()?)
125 } else {
126 drop(pack_writer);
128 None
129 };
130
131 if let Some(ref ph) = pack_hash {
133 for (_, loc) in &mut blob_locations {
134 loc.pack_hash = Some(ph.clone());
135 }
136 }
137
138 let catalog = MetadataCatalog::open(&catalog_path)?;
139
140 if !blob_locations.is_empty() {
141 catalog.bulk_upsert_blob_locations(&blob_locations)?;
142 }
143
144 let mut manifest: Vec<ManifestEntry> = Vec::new();
146 let mut total_bytes: u64 = 0;
147
148 let changed_paths: std::collections::HashSet<&str> = changed_files
150 .iter()
151 .map(|sf| sf.relative_path.as_str())
152 .collect();
153
154 for sf in &scanned {
156 if !changed_paths.contains(sf.relative_path.as_str()) {
157 if let Ok(Some(entry)) = file_index.get(&sf.relative_path) {
158 manifest.push(ManifestEntry {
159 path: entry.path.clone(),
160 blob_hash: entry.blob_hash,
161 size: entry.size,
162 mode: entry.mode,
163 });
164 total_bytes += entry.size;
165 }
166 }
167 }
168
169 for entry in &new_entries {
171 manifest.push(ManifestEntry {
172 path: entry.path.clone(),
173 blob_hash: entry.blob_hash,
174 size: entry.size,
175 mode: entry.mode,
176 });
177 total_bytes += entry.size;
178 }
179
180 let snapshot_id = Uuid::now_v7().to_string();
182 let parent_snapshot = catalog.latest_snapshot()?;
183
184 let snapshot = CatalogSnapshot {
185 id: snapshot_id.clone(),
186 created_at: Utc::now(),
187 message: message.map(|s| s.to_string()),
188 parent_snapshot_id: parent_snapshot.map(|s| s.id),
189 manifest_snapshot_id: None,
190 root_tree_hash: None,
191 stats: SnapshotStats {
192 total_files: files_scanned as u64,
193 total_bytes,
194 new_objects: files_hashed as u64,
195 },
196 };
197
198 catalog.insert_snapshot(&snapshot, &manifest)?;
199
200 let entries_to_upsert: Vec<FileEntry> = new_entries;
203
204 file_index.apply_changes(&removed_paths, &entries_to_upsert)?;
205
206 Ok(SaveResult {
207 snapshot_id,
208 files_scanned,
209 files_hashed,
210 bytes_compressed,
211 })
212 }
213}