Skip to main content

syncor_core/sync/
save.rs

1use crate::error::Result;
2use chkpt_core::index::{FileEntry, FileIndex};
3use chkpt_core::scanner::scan_workspace;
4use chkpt_core::store::blob::{bytes_to_hex, hash_content_bytes};
5use chkpt_core::store::catalog::{BlobLocation, CatalogSnapshot, ManifestEntry, MetadataCatalog};
6use chkpt_core::store::pack::PackWriter;
7use chkpt_core::store::snapshot::SnapshotStats;
8use chrono::Utc;
9use std::io::Write;
10use std::path::Path;
11use uuid::Uuid;
12
13pub struct SaveResult {
14    pub snapshot_id: String,
15    pub files_scanned: usize,
16    pub files_hashed: usize,
17    pub bytes_compressed: u64,
18}
19
20pub struct SavePipeline;
21
22impl SavePipeline {
23    pub fn run(workspace: &Path, store_dir: &Path, message: Option<&str>) -> Result<SaveResult> {
24        // 1. Ensure store directories
25        let packs_dir = store_dir.join("packs");
26        std::fs::create_dir_all(&packs_dir)?;
27        std::fs::create_dir_all(store_dir.join("trees"))?;
28
29        // 2. Scan workspace
30        let scanned = scan_workspace(workspace, None)?;
31        let files_scanned = scanned.len();
32
33        // 3. Load FileIndex
34        let index_path = store_dir.join("index.bin");
35        let mut file_index = FileIndex::open(&index_path)?;
36
37        let catalog_path = store_dir.join("catalog.sqlite");
38
39        // 4. Compare scanned files against index to find changed files
40        let mut changed_files = Vec::new();
41        for sf in &scanned {
42            if let Ok(Some(entry)) = file_index.get(&sf.relative_path) {
43                // Check if size or mtime changed
44                if entry.size == sf.size
45                    && entry.mtime_secs == sf.mtime_secs
46                    && entry.mtime_nanos == sf.mtime_nanos
47                {
48                    continue;
49                }
50            }
51            changed_files.push(sf);
52        }
53
54        // 5. Check if anything changed before doing expensive work
55        let files_hashed = changed_files.len();
56
57        // Check if any files were removed since last index
58        let scanned_paths_set: std::collections::HashSet<&str> =
59            scanned.iter().map(|f| f.relative_path.as_str()).collect();
60        let all_indexed = file_index.all_paths()?;
61        let removed_paths: Vec<String> = all_indexed
62            .iter()
63            .filter(|p| !scanned_paths_set.contains(p.as_str()))
64            .cloned()
65            .collect();
66
67        if files_hashed == 0 && removed_paths.is_empty() {
68            // Nothing changed — return the latest snapshot ID without creating a new one
69            let catalog = MetadataCatalog::open(&catalog_path)?;
70            let latest = catalog.latest_snapshot()?;
71            return Ok(SaveResult {
72                snapshot_id: latest.map(|s| s.id).unwrap_or_default(),
73                files_scanned,
74                files_hashed: 0,
75                bytes_compressed: 0,
76            });
77        }
78
79        // Hash and compress changed files, add to pack
80        let mut pack_writer = PackWriter::new(&packs_dir)?;
81        let mut blob_locations: Vec<([u8; 16], BlobLocation)> = Vec::new();
82        let mut new_entries: Vec<FileEntry> = Vec::new();
83        let mut bytes_compressed: u64 = 0;
84
85        for sf in &changed_files {
86            // Use read_path_bytes for symlink-aware reading (symlinks to dirs
87            // would cause EISDIR with read_or_mmap).
88            let content = chkpt_core::store::blob::read_path_bytes(
89                &sf.absolute_path,
90                sf.is_symlink,
91            )?;
92            let hash_bytes = hash_content_bytes(&content);
93            let hash_hex = bytes_to_hex(&hash_bytes);
94
95            // LZ4 compress
96            let mut encoder = lz4_flex::frame::FrameEncoder::new(Vec::new());
97            encoder
98                .write_all(content.as_ref())
99                .map_err(crate::error::SyncorError::Io)?;
100            let compressed = encoder
101                .finish()
102                .map_err(|e| crate::error::SyncorError::Other(e.to_string()))?;
103            let compressed_len = compressed.len() as u64;
104            bytes_compressed += compressed_len;
105
106            pack_writer.add_pre_compressed(hash_hex.clone(), compressed)?;
107
108            blob_locations.push((
109                hash_bytes,
110                BlobLocation {
111                    pack_hash: None, // Will be set after finish
112                    size: sf.size,
113                },
114            ));
115
116            new_entries.push(FileEntry {
117                path: sf.relative_path.clone(),
118                blob_hash: hash_bytes,
119                size: sf.size,
120                mtime_secs: sf.mtime_secs,
121                mtime_nanos: sf.mtime_nanos,
122                inode: sf.inode,
123                mode: sf.mode,
124            });
125        }
126
127        // 6. Finish pack if not empty
128        let pack_hash = if !pack_writer.is_empty() {
129            Some(pack_writer.finish()?)
130        } else {
131            // Drop the empty pack writer without finishing
132            drop(pack_writer);
133            None
134        };
135
136        // 7. Update blob locations with pack hash and upsert to catalog
137        if let Some(ref ph) = pack_hash {
138            for (_, loc) in &mut blob_locations {
139                loc.pack_hash = Some(ph.clone());
140            }
141        }
142
143        let catalog = MetadataCatalog::open(&catalog_path)?;
144
145        if !blob_locations.is_empty() {
146            catalog.bulk_upsert_blob_locations(&blob_locations)?;
147        }
148
149        // 8. Build manifest: unchanged from index + newly hashed
150        let mut manifest: Vec<ManifestEntry> = Vec::new();
151        let mut total_bytes: u64 = 0;
152
153        // Build a set of changed paths for quick lookup
154        let changed_paths: std::collections::HashSet<&str> = changed_files
155            .iter()
156            .map(|sf| sf.relative_path.as_str())
157            .collect();
158
159        // Add unchanged files from index
160        for sf in &scanned {
161            if !changed_paths.contains(sf.relative_path.as_str()) {
162                if let Ok(Some(entry)) = file_index.get(&sf.relative_path) {
163                    manifest.push(ManifestEntry {
164                        path: entry.path.clone(),
165                        blob_hash: entry.blob_hash,
166                        size: entry.size,
167                        mode: entry.mode,
168                    });
169                    total_bytes += entry.size;
170                }
171            }
172        }
173
174        // Add newly hashed files
175        for entry in &new_entries {
176            manifest.push(ManifestEntry {
177                path: entry.path.clone(),
178                blob_hash: entry.blob_hash,
179                size: entry.size,
180                mode: entry.mode,
181            });
182            total_bytes += entry.size;
183        }
184
185        // 9. Create CatalogSnapshot with UUIDv7, insert_snapshot
186        let snapshot_id = Uuid::now_v7().to_string();
187        let parent_snapshot = catalog.latest_snapshot()?;
188
189        let snapshot = CatalogSnapshot {
190            id: snapshot_id.clone(),
191            created_at: Utc::now(),
192            message: message.map(|s| s.to_string()),
193            parent_snapshot_id: parent_snapshot.map(|s| s.id),
194            manifest_snapshot_id: None,
195            root_tree_hash: None,
196            stats: SnapshotStats {
197                total_files: files_scanned as u64,
198                total_bytes,
199                new_objects: files_hashed as u64,
200            },
201        };
202
203        catalog.insert_snapshot(&snapshot, &manifest)?;
204
205        // 10. Update FileIndex with apply_changes
206        // Build all entries to upsert (unchanged keep their existing entry, changed get new)
207        let entries_to_upsert: Vec<FileEntry> = new_entries;
208
209        file_index.apply_changes(&removed_paths, &entries_to_upsert)?;
210
211        Ok(SaveResult {
212            snapshot_id,
213            files_scanned,
214            files_hashed,
215            bytes_compressed,
216        })
217    }
218}