Skip to main content

syncor_core/sync/
save.rs

1use crate::error::Result;
2use chkpt_core::index::{FileEntry, FileIndex};
3use chkpt_core::scanner::scan_workspace;
4use chkpt_core::store::blob::{bytes_to_hex, hash_content_bytes};
5use chkpt_core::store::catalog::{BlobLocation, CatalogSnapshot, ManifestEntry, MetadataCatalog};
6use chkpt_core::store::pack::{PackFinishOptions, PackWriter};
7use chkpt_core::store::snapshot::SnapshotStats;
8use chrono::Utc;
9use std::io::Write;
10use std::path::Path;
11use uuid::Uuid;
12
13pub struct SaveResult {
14    pub snapshot_id: String,
15    pub files_scanned: usize,
16    pub files_hashed: usize,
17    pub bytes_compressed: u64,
18}
19
20pub struct SavePipeline;
21
22impl SavePipeline {
23    pub fn run(workspace: &Path, store_dir: &Path, _message: Option<&str>) -> Result<SaveResult> {
24        // 1. Ensure store directories exist
25        let packs_dir = store_dir.join("packs");
26        let trees_dir = store_dir.join("trees");
27        let catalog_path = store_dir.join("catalog.sqlite");
28        let index_path = store_dir.join("index.bin");
29        std::fs::create_dir_all(&packs_dir)?;
30        std::fs::create_dir_all(&trees_dir)?;
31
32        // 2. Scan workspace
33        let scanned = scan_workspace(workspace, None)?;
34        let files_scanned = scanned.len();
35
36        // 3. Load file index for incremental detection
37        let mut index = FileIndex::open(&index_path)?;
38
39        // 4. Find changed files (compare against index by size + mtime)
40        let mut changed_files = Vec::new();
41        for sf in &scanned {
42            if let Ok(Some(entry)) = index.get(&sf.relative_path) {
43                if entry.size == sf.size
44                    && entry.mtime_secs == sf.mtime_secs
45                    && entry.mtime_nanos == sf.mtime_nanos
46                {
47                    continue;
48                }
49            }
50            changed_files.push(sf);
51        }
52        let files_hashed = changed_files.len();
53
54        // Check if any files were removed since last index
55        let scanned_paths: std::collections::HashSet<&str> =
56            scanned.iter().map(|f| f.relative_path.as_str()).collect();
57        let all_indexed = index.all_paths()?;
58        let removed_paths: Vec<String> = all_indexed
59            .into_iter()
60            .filter(|p| !scanned_paths.contains(p.as_str()))
61            .collect();
62
63        // If nothing changed and nothing removed, skip snapshot creation
64        if files_hashed == 0 && removed_paths.is_empty() {
65            let catalog = MetadataCatalog::open(&catalog_path)?;
66            let latest = catalog.latest_snapshot()?;
67            return Ok(SaveResult {
68                snapshot_id: latest.map(|s| s.id).unwrap_or_default(),
69                files_scanned,
70                files_hashed: 0,
71                bytes_compressed: 0,
72            });
73        }
74
75        // 5. Hash, compress, and pack changed files
76        let mut pack_writer = PackWriter::new(&packs_dir)?;
77        let mut blob_locations: Vec<([u8; 16], BlobLocation)> = Vec::new();
78        let mut new_entries: Vec<FileEntry> = Vec::new();
79        let mut bytes_compressed: u64 = 0;
80
81        for sf in &changed_files {
82            // Use read_path_bytes for symlink-aware reading (symlinks to dirs
83            // would cause EISDIR with read_or_mmap).
84            let content =
85                chkpt_core::store::blob::read_path_bytes(&sf.absolute_path, sf.is_symlink)?;
86            let hash_bytes = hash_content_bytes(&content);
87            let hash_hex = bytes_to_hex(&hash_bytes);
88
89            // LZ4 compress
90            let mut encoder = lz4_flex::frame::FrameEncoder::new(Vec::new());
91            encoder
92                .write_all(content.as_ref())
93                .map_err(crate::error::SyncorError::Io)?;
94            let compressed = encoder
95                .finish()
96                .map_err(|e| crate::error::SyncorError::Other(e.to_string()))?;
97            let compressed_len = compressed.len() as u64;
98            bytes_compressed += compressed_len;
99
100            pack_writer.add_pre_compressed(hash_hex.clone(), compressed)?;
101
102            blob_locations.push((
103                hash_bytes,
104                BlobLocation {
105                    pack_hash: None, // Will be set after finish
106                    size: sf.size,
107                },
108            ));
109
110            new_entries.push(FileEntry {
111                path: sf.relative_path.clone(),
112                blob_hash: hash_bytes,
113                size: sf.size,
114                mtime_secs: sf.mtime_secs,
115                mtime_nanos: sf.mtime_nanos,
116                inode: sf.inode,
117                mode: sf.mode,
118            });
119        }
120
121        // 6. Finish pack — chunk at 50 MB to stay under GitHub's 100 MB file limit
122        let pack_hash = if !pack_writer.is_empty() {
123            Some(pack_writer.finish_with_options(PackFinishOptions {
124                chunk_bytes: Some(50_000_000),
125            })?)
126        } else {
127            drop(pack_writer);
128            None
129        };
130
131        // 7. Set pack_hash on blob locations
132        if let Some(ref ph) = pack_hash {
133            for (_, loc) in blob_locations.iter_mut() {
134                loc.pack_hash = Some(ph.clone());
135            }
136        }
137
138        // 8. Build manifest from index (unchanged) + new entries
139        let mut manifest: Vec<ManifestEntry> = Vec::new();
140        for sf in &scanned {
141            if let Ok(Some(entry)) = index.get(&sf.relative_path) {
142                if !new_entries.iter().any(|e| e.path == sf.relative_path) {
143                    manifest.push(ManifestEntry {
144                        path: entry.path.clone(),
145                        blob_hash: entry.blob_hash,
146                        size: entry.size,
147                        mode: entry.mode,
148                    });
149                }
150            }
151        }
152        for entry in &new_entries {
153            manifest.push(ManifestEntry {
154                path: entry.path.clone(),
155                blob_hash: entry.blob_hash,
156                size: entry.size,
157                mode: entry.mode,
158            });
159        }
160        manifest.sort_by(|a, b| a.path.cmp(&b.path));
161
162        // 9. Open catalog and insert snapshot
163        let catalog = MetadataCatalog::open(&catalog_path)?;
164        let parent = catalog.latest_snapshot()?;
165
166        if !blob_locations.is_empty() {
167            catalog.bulk_upsert_blob_locations(&blob_locations)?;
168        }
169
170        let snapshot_id = Uuid::now_v7().to_string();
171        let snapshot = CatalogSnapshot {
172            id: snapshot_id.clone(),
173            created_at: Utc::now(),
174            message: None,
175            parent_snapshot_id: parent.as_ref().map(|p| p.id.clone()),
176            manifest_snapshot_id: None,
177            root_tree_hash: None,
178            stats: SnapshotStats {
179                total_files: manifest.len() as u64,
180                total_bytes: manifest.iter().map(|e| e.size).sum(),
181                new_objects: files_hashed as u64,
182            },
183        };
184        catalog.insert_snapshot(&snapshot, &manifest)?;
185
186        // 10. Update file index
187        index.apply_changes(&removed_paths, &new_entries)?;
188
189        Ok(SaveResult {
190            snapshot_id,
191            files_scanned,
192            files_hashed,
193            bytes_compressed,
194        })
195    }
196}