1use crate::config::SyncorPaths;
2use crate::error::{Result, SyncorError};
3use crate::link::LinkInfo;
4use crate::sync::catalog_merge::{checkpoint_wal, merge_catalogs};
5use crate::sync::conflict::{detect_conflicts, FileAction, ManifestMap};
6use crate::sync::save::SavePipeline;
7use crate::sync::state::{ConflictRecord, StateDb, SyncState};
8use crate::transport::{PullResult, PushResult, SyncTransport};
9use chkpt_core::store::blob::bytes_to_hex;
10use chkpt_core::store::catalog::MetadataCatalog;
11use std::path::{Path, PathBuf};
12
13fn validate_path(base: &Path, relative: &str) -> Result<PathBuf> {
15 if relative.starts_with('/') || relative.starts_with('\\') || relative.contains("..") {
16 return Err(SyncorError::Other(format!(
17 "unsafe path in manifest: {}",
18 relative,
19 )));
20 }
21 let dest = base.join(relative);
22 Ok(dest)
23}
24
25use fs4::fs_std::FileExt;
30use std::fs::File;
31
32pub struct LinkLock {
33 _file: File,
34}
35
36impl LinkLock {
37 pub fn acquire(paths: &SyncorPaths, link: &LinkInfo) -> Result<Self> {
38 let lock_path = paths.link_lock_file(&link.id);
39 if let Some(parent) = lock_path.parent() {
40 std::fs::create_dir_all(parent)?;
41 }
42 let file = File::options()
43 .create(true)
44 .write(true)
45 .truncate(false)
46 .open(&lock_path)?;
47 file.try_lock_exclusive()
48 .map_err(|_| SyncorError::LockHeld)?;
49 Ok(Self { _file: file })
50 }
51}
52
53pub struct SyncEngine {
58 paths: SyncorPaths,
59 transport: Box<dyn SyncTransport + Send + Sync>,
60}
61
62#[derive(Debug)]
63pub struct PushSyncResult {
64 pub snapshot_id: Option<String>,
65 pub pushed: bool,
66}
67
68#[derive(Debug)]
69pub struct PullSyncResult {
70 pub restored: bool,
71 pub files_restored: usize,
72}
73
74impl SyncEngine {
75 pub fn new(paths: SyncorPaths, transport: Box<dyn SyncTransport + Send + Sync>) -> Self {
76 Self { paths, transport }
77 }
78
79 fn update_file_index(&self, link: &LinkInfo, store_dir: &std::path::Path) -> Result<()> {
81 use chkpt_core::index::{FileEntry, FileIndex};
82 use chkpt_core::scanner::scan_workspace;
83 use chkpt_core::store::blob::hash_path_bytes;
84
85 let index_path = store_dir.join("index.bin");
86 let mut index = FileIndex::open(&index_path)?;
87
88 let scanned = scan_workspace(&link.local_dir, None)?;
89 let mut entries = Vec::new();
90 for file in &scanned {
91 let hash = hash_path_bytes(&file.absolute_path, file.is_symlink)?;
92 entries.push(FileEntry {
93 path: file.relative_path.clone(),
94 blob_hash: hash,
95 size: file.size,
96 mtime_secs: file.mtime_secs,
97 mtime_nanos: file.mtime_nanos,
98 inode: file.inode,
99 mode: file.mode,
100 });
101 }
102
103 let scanned_paths: std::collections::HashSet<&str> =
104 scanned.iter().map(|f| f.relative_path.as_str()).collect();
105 let all_indexed = index.all_paths()?;
106 let removed: Vec<String> = all_indexed
107 .into_iter()
108 .filter(|p| !scanned_paths.contains(p.as_str()))
109 .collect();
110
111 index.apply_changes(&removed, &entries)?;
112 Ok(())
113 }
114
115 fn store_dir(&self, link: &LinkInfo) -> PathBuf {
116 self.paths
117 .link_repo_dir(&link.id)
118 .join("stores")
119 .join(&link.name)
120 }
121
122 fn state_db(&self) -> Result<StateDb> {
123 let path = self.paths.link_state_db();
124 if let Some(parent) = path.parent() {
125 std::fs::create_dir_all(parent)?;
126 }
127 StateDb::open(path)
128 }
129
130 pub fn init_link(&self, link: &LinkInfo) -> Result<()> {
131 self.transport.init_remote(link)?;
132 let link_dir = self.paths.link_dir();
133 std::fs::create_dir_all(&link_dir)?;
134 Ok(())
135 }
136
137 fn ensure_syncor_toml(&self, link: &LinkInfo) -> Result<()> {
139 let repo_dir = self.paths.link_repo_dir(&link.id);
140 let toml_path = repo_dir.join("syncor.toml");
141
142 #[derive(serde::Serialize, serde::Deserialize, Default)]
143 struct SyncorManifest {
144 #[serde(default)]
145 links: Vec<SyncorManifestLink>,
146 }
147 #[derive(serde::Serialize, serde::Deserialize)]
148 struct SyncorManifestLink {
149 name: String,
150 #[serde(default)]
151 created_at: Option<String>,
152 }
153
154 let mut manifest = if toml_path.exists() {
155 let content = std::fs::read_to_string(&toml_path)?;
156 toml::from_str(&content).unwrap_or_default()
157 } else {
158 SyncorManifest::default()
159 };
160
161 if !manifest.links.iter().any(|l| l.name == link.name) {
162 manifest.links.push(SyncorManifestLink {
163 name: link.name.clone(),
164 created_at: Some(chrono::Utc::now().to_rfc3339()),
165 });
166 let content = toml::to_string_pretty(&manifest)
167 .map_err(|e| SyncorError::Config(e.to_string()))?;
168 std::fs::write(&toml_path, content)?;
169 }
170
171 Ok(())
172 }
173
174 pub fn restore_latest(&self, link: &LinkInfo) -> Result<PullSyncResult> {
177 let store_dir = self.store_dir(link);
178 let catalog_path = store_dir.join("catalog.sqlite");
179
180 if !catalog_path.exists() {
181 return Ok(PullSyncResult {
182 restored: false,
183 files_restored: 0,
184 });
185 }
186
187 let catalog = MetadataCatalog::open(&catalog_path)?;
188 let latest = match catalog.latest_snapshot()? {
189 Some(s) => s,
190 None => {
191 return Ok(PullSyncResult {
192 restored: false,
193 files_restored: 0,
194 })
195 }
196 };
197
198 use crate::sync::restore::RestorePipeline;
199 let result = RestorePipeline::run(&latest.id, &store_dir, &link.local_dir)?;
200
201 self.update_file_index(link, &store_dir)?;
202
203 let db = self.state_db()?;
204 let state = SyncState {
205 link_id: link.id.as_str().to_string(),
206 last_local_snapshot: Some(latest.id.clone()),
207 last_remote_revision: None,
208 last_synced_snapshot_id: Some(latest.id),
209 last_sync_at: Some(chrono::Utc::now().to_rfc3339()),
210 };
211 db.upsert_sync_state(&state)?;
212
213 Ok(PullSyncResult {
214 restored: true,
215 files_restored: result.files_restored,
216 })
217 }
218
219 pub fn push(&self, link: &LinkInfo) -> Result<PushSyncResult> {
220 let _lock = LinkLock::acquire(&self.paths, link)?;
221
222 let store_dir = self.store_dir(link);
223
224 let save_result = SavePipeline::run(&link.local_dir, &store_dir, None)?;
226
227 self.ensure_syncor_toml(link)?;
229
230 let catalog_path = store_dir.join("catalog.sqlite");
232 if catalog_path.exists() {
233 checkpoint_wal(&catalog_path)?;
234 }
235
236 let push_result = self.transport.push(link, &store_dir)?;
238
239 let db = self.state_db()?;
240 match push_result {
241 PushResult::Success { revision } => {
242 let state = SyncState {
243 link_id: link.id.as_str().to_string(),
244 last_local_snapshot: Some(save_result.snapshot_id.clone()),
245 last_remote_revision: Some(revision),
246 last_synced_snapshot_id: Some(save_result.snapshot_id.clone()),
247 last_sync_at: Some(chrono::Utc::now().to_rfc3339()),
248 };
249 db.upsert_sync_state(&state)?;
250 db.append_log(link.id.as_str(), "push", "success", None)?;
251
252 Ok(PushSyncResult {
253 snapshot_id: Some(save_result.snapshot_id),
254 pushed: true,
255 })
256 }
257 PushResult::Conflict { details } => {
258 db.append_log(link.id.as_str(), "push", "conflict", Some(&details.message))?;
259 Err(SyncorError::Conflict(details.message))
260 }
261 }
262 }
263
264 pub fn pull(&self, link: &LinkInfo) -> Result<PullSyncResult> {
265 let _lock = LinkLock::acquire(&self.paths, link)?;
266
267 let store_dir = self.store_dir(link);
268 let catalog_path = store_dir.join("catalog.sqlite");
269
270 let pull_result = self.transport.pull(link, &store_dir)?;
272
273 match pull_result {
274 PullResult::UpToDate => Ok(PullSyncResult {
275 restored: false,
276 files_restored: 0,
277 }),
278 PullResult::Conflict { details } => Err(SyncorError::Conflict(details.message)),
279 PullResult::Success { revision } => {
280 let local_catalog_path = self
283 .paths
284 .link_dir()
285 .join(link.id.as_str())
286 .join("catalog.sqlite");
287 let remote_catalog_path = catalog_path.clone();
288
289 if !local_catalog_path.exists() {
290 if let Some(parent) = local_catalog_path.parent() {
291 std::fs::create_dir_all(parent)?;
292 }
293 std::fs::copy(&remote_catalog_path, &local_catalog_path)?;
294 } else {
295 merge_catalogs(&local_catalog_path, &remote_catalog_path)?;
296 }
297
298 let catalog = MetadataCatalog::open(&local_catalog_path)?;
299 let latest_remote = catalog
300 .latest_snapshot()?
301 .ok_or_else(|| SyncorError::Other("no snapshots in remote catalog".into()))?;
302
303 let db = self.state_db()?;
304 let sync_state = db.get_sync_state(link.id.as_str())?;
305
306 let remote_map: ManifestMap = catalog
308 .snapshot_manifest(&latest_remote.id)?
309 .into_iter()
310 .map(|e| (e.path, e.blob_hash))
311 .collect();
312
313 let base_map: ManifestMap = match sync_state
315 .as_ref()
316 .and_then(|s| s.last_synced_snapshot_id.as_deref())
317 {
318 Some(base_id) => catalog
319 .snapshot_manifest(base_id)
320 .unwrap_or_default()
321 .into_iter()
322 .map(|e| (e.path, e.blob_hash))
323 .collect(),
324 None => std::collections::HashMap::new(),
325 };
326
327 let local_map: ManifestMap = {
329 use chkpt_core::scanner::scan_workspace;
330 use chkpt_core::store::blob::hash_path_bytes;
331 let scanned = scan_workspace(&link.local_dir, None)?;
332 let mut map = std::collections::HashMap::new();
333 for file in &scanned {
334 let hash = hash_path_bytes(&file.absolute_path, file.is_symlink)?;
335 map.insert(file.relative_path.clone(), hash);
336 }
337 map
338 };
339
340 let actions = detect_conflicts(&base_map, &local_map, &remote_map);
342
343 let conflicts: Vec<_> = actions
345 .iter()
346 .filter_map(|a| {
347 if let FileAction::Conflict(c) = a {
348 Some(c.clone())
349 } else {
350 None
351 }
352 })
353 .collect();
354
355 if !conflicts.is_empty() {
356 for c in &conflicts {
357 db.insert_conflict(&ConflictRecord {
358 link_id: link.id.as_str().to_string(),
359 file_path: c.path.clone(),
360 local_hash: c.local_hash.map(|h| bytes_to_hex(&h)),
361 remote_hash: c.remote_hash.map(|h| bytes_to_hex(&h)),
362 base_hash: c.base_hash.map(|h| bytes_to_hex(&h)),
363 })?;
364 }
365 db.append_log(
366 link.id.as_str(),
367 "pull",
368 "conflict",
369 Some(&format!("{} conflicts", conflicts.len())),
370 )?;
371 return Err(SyncorError::Conflict(format!(
372 "{} file(s) in conflict. Run 'syncor resolve' to fix.",
373 conflicts.len()
374 )));
375 }
376
377 let remote_modes: std::collections::HashMap<String, u32> = catalog
378 .snapshot_manifest(&latest_remote.id)?
379 .into_iter()
380 .map(|e| (e.path, e.mode))
381 .collect();
382
383 let pack_set =
385 chkpt_core::store::pack::PackSet::open_all(&store_dir.join("packs"))?;
386 let mut files_restored = 0;
387 for action in &actions {
388 match action {
389 FileAction::ApplyRemote { path, remote_hash } => {
390 let hash_hex = bytes_to_hex(remote_hash);
391 let content = pack_set.read(&hash_hex)?;
392 let file_path = validate_path(&link.local_dir, path)?;
393 if let Some(parent) = file_path.parent() {
394 std::fs::create_dir_all(parent)?;
395 }
396 std::fs::write(&file_path, content)?;
397 #[cfg(unix)]
398 if let Some(&mode) = remote_modes.get(path) {
399 use std::os::unix::fs::PermissionsExt;
400 let perms = std::fs::Permissions::from_mode(mode);
401 std::fs::set_permissions(&file_path, perms)?;
402 }
403 files_restored += 1;
404 }
405 FileAction::DeleteLocal { path } => {
406 let file_path = validate_path(&link.local_dir, path)?;
407 let _ = std::fs::remove_file(&file_path);
408 }
409 FileAction::Conflict(_) => {} }
411 }
412
413 self.update_file_index(link, &store_dir)?;
414
415 checkpoint_wal(&local_catalog_path)?;
417 std::fs::copy(&local_catalog_path, &remote_catalog_path)?;
418
419 let state = SyncState {
420 link_id: link.id.as_str().to_string(),
421 last_local_snapshot: Some(latest_remote.id.clone()),
422 last_remote_revision: Some(revision),
423 last_synced_snapshot_id: Some(latest_remote.id),
424 last_sync_at: Some(chrono::Utc::now().to_rfc3339()),
425 };
426 db.upsert_sync_state(&state)?;
427 db.append_log(link.id.as_str(), "pull", "success", None)?;
428
429 Ok(PullSyncResult {
430 restored: true,
431 files_restored,
432 })
433 }
434 }
435 }
436}