Skip to main content

syncor_core/sync/
engine.rs

1use crate::config::SyncorPaths;
2use crate::error::{Result, SyncorError};
3use crate::link::LinkInfo;
4use crate::sync::catalog_merge::{checkpoint_wal, merge_catalogs};
5use crate::sync::conflict::{detect_conflicts, FileAction, ManifestMap};
6use crate::sync::save::SavePipeline;
7use crate::sync::state::{ConflictRecord, StateDb, SyncState};
8use crate::transport::{PullResult, PushResult, SyncTransport};
9use chkpt_core::store::blob::bytes_to_hex;
10use chkpt_core::store::catalog::MetadataCatalog;
11use std::path::{Path, PathBuf};
12
13/// Validate that a path from a manifest doesn't escape the target directory.
14fn validate_path(base: &Path, relative: &str) -> Result<PathBuf> {
15    if relative.starts_with('/') || relative.starts_with('\\') || relative.contains("..") {
16        return Err(SyncorError::Other(format!(
17            "unsafe path in manifest: {}",
18            relative,
19        )));
20    }
21    let dest = base.join(relative);
22    Ok(dest)
23}
24
25// ---------------------------------------------------------------------------
26// LinkLock — per-link advisory filesystem lock using fs4
27// ---------------------------------------------------------------------------
28
29use fs4::fs_std::FileExt;
30use std::fs::File;
31
32pub struct LinkLock {
33    _file: File,
34}
35
36impl LinkLock {
37    pub fn acquire(paths: &SyncorPaths, link: &LinkInfo) -> Result<Self> {
38        let lock_path = paths.link_lock_file(&link.id);
39        if let Some(parent) = lock_path.parent() {
40            std::fs::create_dir_all(parent)?;
41        }
42        let file = File::options()
43            .create(true)
44            .write(true)
45            .truncate(false)
46            .open(&lock_path)?;
47        file.try_lock_exclusive()
48            .map_err(|_| SyncorError::LockHeld)?;
49        Ok(Self { _file: file })
50    }
51}
52
53// ---------------------------------------------------------------------------
54// SyncEngine
55// ---------------------------------------------------------------------------
56
57pub struct SyncEngine {
58    paths: SyncorPaths,
59    transport: Box<dyn SyncTransport + Send + Sync>,
60}
61
62#[derive(Debug)]
63pub struct PushSyncResult {
64    pub snapshot_id: Option<String>,
65    pub pushed: bool,
66}
67
68#[derive(Debug)]
69pub struct PullSyncResult {
70    pub restored: bool,
71    pub files_restored: usize,
72}
73
74impl SyncEngine {
75    pub fn new(paths: SyncorPaths, transport: Box<dyn SyncTransport + Send + Sync>) -> Self {
76        Self { paths, transport }
77    }
78
79    /// Scan workspace and update the FileIndex so the next push skips unchanged files.
80    fn update_file_index(&self, link: &LinkInfo, store_dir: &std::path::Path) -> Result<()> {
81        use chkpt_core::index::{FileEntry, FileIndex};
82        use chkpt_core::scanner::scan_workspace;
83        use chkpt_core::store::blob::hash_path_bytes;
84
85        let index_path = store_dir.join("index.bin");
86        let mut index = FileIndex::open(&index_path)?;
87
88        let scanned = scan_workspace(&link.local_dir, None)?;
89        let mut entries = Vec::new();
90        for file in &scanned {
91            let hash = hash_path_bytes(&file.absolute_path, file.is_symlink)?;
92            entries.push(FileEntry {
93                path: file.relative_path.clone(),
94                blob_hash: hash,
95                size: file.size,
96                mtime_secs: file.mtime_secs,
97                mtime_nanos: file.mtime_nanos,
98                inode: file.inode,
99                mode: file.mode,
100            });
101        }
102
103        let scanned_paths: std::collections::HashSet<&str> =
104            scanned.iter().map(|f| f.relative_path.as_str()).collect();
105        let all_indexed = index.all_paths()?;
106        let removed: Vec<String> = all_indexed
107            .into_iter()
108            .filter(|p| !scanned_paths.contains(p.as_str()))
109            .collect();
110
111        index.apply_changes(&removed, &entries)?;
112        Ok(())
113    }
114
115    fn store_dir(&self, link: &LinkInfo) -> PathBuf {
116        self.paths
117            .link_repo_dir(&link.id)
118            .join("stores")
119            .join(&link.name)
120    }
121
122    fn state_db(&self) -> Result<StateDb> {
123        let path = self.paths.link_state_db();
124        if let Some(parent) = path.parent() {
125            std::fs::create_dir_all(parent)?;
126        }
127        StateDb::open(path)
128    }
129
130    pub fn init_link(&self, link: &LinkInfo) -> Result<()> {
131        self.transport.init_remote(link)?;
132        let link_dir = self.paths.link_dir();
133        std::fs::create_dir_all(&link_dir)?;
134        Ok(())
135    }
136
137    /// Ensure syncor.toml in the repo dir lists this link.
138    fn ensure_syncor_toml(&self, link: &LinkInfo) -> Result<()> {
139        let repo_dir = self.paths.link_repo_dir(&link.id);
140        let toml_path = repo_dir.join("syncor.toml");
141
142        #[derive(serde::Serialize, serde::Deserialize, Default)]
143        struct SyncorManifest {
144            #[serde(default)]
145            links: Vec<SyncorManifestLink>,
146        }
147        #[derive(serde::Serialize, serde::Deserialize)]
148        struct SyncorManifestLink {
149            name: String,
150            #[serde(default)]
151            created_at: Option<String>,
152        }
153
154        let mut manifest = if toml_path.exists() {
155            let content = std::fs::read_to_string(&toml_path)?;
156            toml::from_str(&content).unwrap_or_default()
157        } else {
158            SyncorManifest::default()
159        };
160
161        if !manifest.links.iter().any(|l| l.name == link.name) {
162            manifest.links.push(SyncorManifestLink {
163                name: link.name.clone(),
164                created_at: Some(chrono::Utc::now().to_rfc3339()),
165            });
166            let content = toml::to_string_pretty(&manifest)
167                .map_err(|e| SyncorError::Config(e.to_string()))?;
168            std::fs::write(&toml_path, content)?;
169        }
170
171        Ok(())
172    }
173
174    /// Restore the latest snapshot to the local directory.
175    /// Used for initial connect when the repo already has data.
176    pub fn restore_latest(&self, link: &LinkInfo) -> Result<PullSyncResult> {
177        let store_dir = self.store_dir(link);
178        let catalog_path = store_dir.join("catalog.sqlite");
179
180        if !catalog_path.exists() {
181            return Ok(PullSyncResult {
182                restored: false,
183                files_restored: 0,
184            });
185        }
186
187        let catalog = MetadataCatalog::open(&catalog_path)?;
188        let latest = match catalog.latest_snapshot()? {
189            Some(s) => s,
190            None => {
191                return Ok(PullSyncResult {
192                    restored: false,
193                    files_restored: 0,
194                })
195            }
196        };
197
198        use crate::sync::restore::RestorePipeline;
199        let result = RestorePipeline::run(&latest.id, &store_dir, &link.local_dir)?;
200
201        self.update_file_index(link, &store_dir)?;
202
203        let db = self.state_db()?;
204        let state = SyncState {
205            link_id: link.id.as_str().to_string(),
206            last_local_snapshot: Some(latest.id.clone()),
207            last_remote_revision: None,
208            last_synced_snapshot_id: Some(latest.id),
209            last_sync_at: Some(chrono::Utc::now().to_rfc3339()),
210        };
211        db.upsert_sync_state(&state)?;
212
213        Ok(PullSyncResult {
214            restored: true,
215            files_restored: result.files_restored,
216        })
217    }
218
219    pub fn push(&self, link: &LinkInfo) -> Result<PushSyncResult> {
220        let _lock = LinkLock::acquire(&self.paths, link)?;
221
222        let store_dir = self.store_dir(link);
223
224        // Save current workspace state into the store
225        let save_result = SavePipeline::run(&link.local_dir, &store_dir, None)?;
226
227        // Ensure syncor.toml lists this link
228        self.ensure_syncor_toml(link)?;
229
230        // Checkpoint WAL before pushing so git sees a single-file DB
231        let catalog_path = store_dir.join("catalog.sqlite");
232        if catalog_path.exists() {
233            checkpoint_wal(&catalog_path)?;
234        }
235
236        // Push store to remote via transport
237        let push_result = self.transport.push(link, &store_dir)?;
238
239        let db = self.state_db()?;
240        match push_result {
241            PushResult::Success { revision } => {
242                let state = SyncState {
243                    link_id: link.id.as_str().to_string(),
244                    last_local_snapshot: Some(save_result.snapshot_id.clone()),
245                    last_remote_revision: Some(revision),
246                    last_synced_snapshot_id: Some(save_result.snapshot_id.clone()),
247                    last_sync_at: Some(chrono::Utc::now().to_rfc3339()),
248                };
249                db.upsert_sync_state(&state)?;
250                db.append_log(link.id.as_str(), "push", "success", None)?;
251
252                Ok(PushSyncResult {
253                    snapshot_id: Some(save_result.snapshot_id),
254                    pushed: true,
255                })
256            }
257            PushResult::Conflict { details } => {
258                db.append_log(link.id.as_str(), "push", "conflict", Some(&details.message))?;
259                Err(SyncorError::Conflict(details.message))
260            }
261        }
262    }
263
264    pub fn pull(&self, link: &LinkInfo) -> Result<PullSyncResult> {
265        let _lock = LinkLock::acquire(&self.paths, link)?;
266
267        let store_dir = self.store_dir(link);
268        let catalog_path = store_dir.join("catalog.sqlite");
269
270        // Pull remote changes via transport
271        let pull_result = self.transport.pull(link, &store_dir)?;
272
273        match pull_result {
274            PullResult::UpToDate => Ok(PullSyncResult {
275                restored: false,
276                files_restored: 0,
277            }),
278            PullResult::Conflict { details } => Err(SyncorError::Conflict(details.message)),
279            PullResult::Success { revision } => {
280                // The catalog in the repo dir (store_dir) is the remote catalog.
281                // We keep a local copy under the link dir for merging.
282                let local_catalog_path = self
283                    .paths
284                    .link_dir()
285                    .join(link.id.as_str())
286                    .join("catalog.sqlite");
287                let remote_catalog_path = catalog_path.clone();
288
289                if !local_catalog_path.exists() {
290                    if let Some(parent) = local_catalog_path.parent() {
291                        std::fs::create_dir_all(parent)?;
292                    }
293                    std::fs::copy(&remote_catalog_path, &local_catalog_path)?;
294                } else {
295                    merge_catalogs(&local_catalog_path, &remote_catalog_path)?;
296                }
297
298                let catalog = MetadataCatalog::open(&local_catalog_path)?;
299                let latest_remote = catalog
300                    .latest_snapshot()?
301                    .ok_or_else(|| SyncorError::Other("no snapshots in remote catalog".into()))?;
302
303                let db = self.state_db()?;
304                let sync_state = db.get_sync_state(link.id.as_str())?;
305
306                // Build remote manifest map
307                let remote_map: ManifestMap = catalog
308                    .snapshot_manifest(&latest_remote.id)?
309                    .into_iter()
310                    .map(|e| (e.path, e.blob_hash))
311                    .collect();
312
313                // Build base manifest map from last synced snapshot
314                let base_map: ManifestMap = match sync_state
315                    .as_ref()
316                    .and_then(|s| s.last_synced_snapshot_id.as_deref())
317                {
318                    Some(base_id) => catalog
319                        .snapshot_manifest(base_id)
320                        .unwrap_or_default()
321                        .into_iter()
322                        .map(|e| (e.path, e.blob_hash))
323                        .collect(),
324                    None => std::collections::HashMap::new(),
325                };
326
327                // Scan local workspace to build local manifest map
328                let local_map: ManifestMap = {
329                    use chkpt_core::scanner::scan_workspace;
330                    use chkpt_core::store::blob::hash_path_bytes;
331                    let scanned = scan_workspace(&link.local_dir, None)?;
332                    let mut map = std::collections::HashMap::new();
333                    for file in &scanned {
334                        let hash = hash_path_bytes(&file.absolute_path, file.is_symlink)?;
335                        map.insert(file.relative_path.clone(), hash);
336                    }
337                    map
338                };
339
340                // Three-point conflict detection
341                let actions = detect_conflicts(&base_map, &local_map, &remote_map);
342
343                // Check for conflicts
344                let conflicts: Vec<_> = actions
345                    .iter()
346                    .filter_map(|a| {
347                        if let FileAction::Conflict(c) = a {
348                            Some(c.clone())
349                        } else {
350                            None
351                        }
352                    })
353                    .collect();
354
355                if !conflicts.is_empty() {
356                    for c in &conflicts {
357                        db.insert_conflict(&ConflictRecord {
358                            link_id: link.id.as_str().to_string(),
359                            file_path: c.path.clone(),
360                            local_hash: c.local_hash.map(|h| bytes_to_hex(&h)),
361                            remote_hash: c.remote_hash.map(|h| bytes_to_hex(&h)),
362                            base_hash: c.base_hash.map(|h| bytes_to_hex(&h)),
363                        })?;
364                    }
365                    db.append_log(
366                        link.id.as_str(),
367                        "pull",
368                        "conflict",
369                        Some(&format!("{} conflicts", conflicts.len())),
370                    )?;
371                    return Err(SyncorError::Conflict(format!(
372                        "{} file(s) in conflict. Run 'syncor resolve' to fix.",
373                        conflicts.len()
374                    )));
375                }
376
377                let remote_modes: std::collections::HashMap<String, u32> = catalog
378                    .snapshot_manifest(&latest_remote.id)?
379                    .into_iter()
380                    .map(|e| (e.path, e.mode))
381                    .collect();
382
383                // Apply non-conflicting actions
384                let pack_set =
385                    chkpt_core::store::pack::PackSet::open_all(&store_dir.join("packs"))?;
386                let mut files_restored = 0;
387                for action in &actions {
388                    match action {
389                        FileAction::ApplyRemote { path, remote_hash } => {
390                            let hash_hex = bytes_to_hex(remote_hash);
391                            let content = pack_set.read(&hash_hex)?;
392                            let file_path = validate_path(&link.local_dir, path)?;
393                            if let Some(parent) = file_path.parent() {
394                                std::fs::create_dir_all(parent)?;
395                            }
396                            std::fs::write(&file_path, content)?;
397                            #[cfg(unix)]
398                            if let Some(&mode) = remote_modes.get(path) {
399                                use std::os::unix::fs::PermissionsExt;
400                                let perms = std::fs::Permissions::from_mode(mode);
401                                std::fs::set_permissions(&file_path, perms)?;
402                            }
403                            files_restored += 1;
404                        }
405                        FileAction::DeleteLocal { path } => {
406                            let file_path = validate_path(&link.local_dir, path)?;
407                            let _ = std::fs::remove_file(&file_path);
408                        }
409                        FileAction::Conflict(_) => {} // already handled above
410                    }
411                }
412
413                self.update_file_index(link, &store_dir)?;
414
415                // Copy merged catalog back to repo dir
416                checkpoint_wal(&local_catalog_path)?;
417                std::fs::copy(&local_catalog_path, &remote_catalog_path)?;
418
419                let state = SyncState {
420                    link_id: link.id.as_str().to_string(),
421                    last_local_snapshot: Some(latest_remote.id.clone()),
422                    last_remote_revision: Some(revision),
423                    last_synced_snapshot_id: Some(latest_remote.id),
424                    last_sync_at: Some(chrono::Utc::now().to_rfc3339()),
425                };
426                db.upsert_sync_state(&state)?;
427                db.append_log(link.id.as_str(), "pull", "success", None)?;
428
429                Ok(PullSyncResult {
430                    restored: true,
431                    files_restored,
432                })
433            }
434        }
435    }
436}