1use crate::config::SyncPolicy;
2use crate::config::{DiffMode, SyncTask};
3use crate::encryption::EncryptionManager;
4use crate::error::{ProviderError, SyncError};
5use crate::providers::{FileInfo, StorageProvider};
6use crate::report::{FileOperation, SyncReport};
7use crate::sync::diff::{ChecksumType, DiffAction, DiffResult, FileDiff};
8use dashmap::DashMap;
9use rusqlite::{Connection, params};
10use std::collections::HashMap;
11use std::sync::{Arc, Mutex};
12use std::time::SystemTime;
13use tokio::time::{Duration, sleep};
14use tracing::{debug, error, info, instrument, warn};
15
16pub struct SyncEngine {
17 providers: HashMap<String, Box<dyn StorageProvider>>,
18 encryption_manager: EncryptionManager,
19 diff_cache: DashMap<String, FileDiff>,
20 resume_store: Arc<Mutex<Connection>>,
21 scan_cache: DashMap<String, (Vec<FileInfo>, SystemTime)>,
23}
24
25impl SyncEngine {
26 pub async fn new() -> Result<Self, SyncError> {
27 let db_path = dirs::data_dir()
28 .ok_or(SyncError::Unknown(String::from(
29 "Failed to obtain data_dir",
30 )))?
31 .join("disksync");
32
33 std::fs::create_dir_all(&db_path)?;
34
35 let db_path = db_path.join("resume.db");
36 let conn = Connection::open(&db_path)?;
37
38 conn.execute(
40 "CREATE TABLE IF NOT EXISTS resume_data (
41 task_id TEXT NOT NULL,
42 file_path TEXT NOT NULL,
43 last_modified INTEGER NOT NULL,
44 file_size INTEGER NOT NULL,
45 checksum TEXT,
46 status TEXT NOT NULL,
47 PRIMARY KEY (task_id, file_path)
48 )",
49 [],
50 )?;
51
52 conn.execute(
54 "CREATE TABLE IF NOT EXISTS sync_reports (
55 report_id TEXT PRIMARY KEY,
56 task_id TEXT NOT NULL,
57 start_time INTEGER NOT NULL,
58 status TEXT NOT NULL,
59 duration_seconds INTEGER NOT NULL,
60 details_json TEXT NOT NULL
61 )",
62 [],
63 )?;
64
65 conn.execute(
67 "CREATE INDEX IF NOT EXISTS idx_reports_task_id ON sync_reports(task_id)",
68 [],
69 )?;
70
71 Ok(Self {
72 providers: HashMap::new(),
73 encryption_manager: EncryptionManager::new(),
74 diff_cache: DashMap::new(),
75 resume_store: Arc::new(Mutex::new(conn)),
76 scan_cache: DashMap::new(),
77 })
78 }
79
80 pub fn register_provider(&mut self, account_id: String, provider: Box<dyn StorageProvider>) {
82 self.providers.insert(account_id, provider);
83 }
84
85 pub fn get_provider(&self, account_id: &str) -> Option<&Box<dyn StorageProvider>> {
86 self.providers.get(account_id)
87 }
88
89 pub async fn walk_directory(
90 &self,
91 provider: &dyn StorageProvider,
92 root: &str,
93 ) -> Result<std::collections::HashMap<String, crate::sync::diff::FileMetadata>, SyncError> {
94 let mut map = std::collections::HashMap::new();
95 let entries = provider.list(root).await?;
96 for e in entries {
97 let path = std::path::PathBuf::from(e.path.clone());
98 let meta = crate::sync::diff::FileMetadata::from_path(&path)?;
99 map.insert(e.path, meta);
100 }
101 Ok(map)
102 }
103
104 fn create_temp_file(&self) -> Result<std::path::PathBuf, SyncError> {
105 let name = format!("sync_{}.tmp", uuid::Uuid::new_v4());
106 let path = std::env::temp_dir().join(name);
107 std::fs::File::create(&path)?;
108 Ok(path)
109 }
110
111 fn cleanup_temp_file(&self, path: &std::path::Path) -> Result<(), SyncError> {
112 std::fs::remove_file(path)?;
113 Ok(())
114 }
115
116 pub async fn sync(&mut self, task: &SyncTask) -> Result<SyncReport, SyncError> {
117 self.execute_sync(task, None::<fn(SyncProgress)>).await
118 }
119
120 pub async fn sync_with_progress(
121 &mut self,
122 task: &SyncTask,
123 progress_callback: impl Fn(SyncProgress) + Send + Sync + 'static,
124 ) -> Result<SyncReport, SyncError> {
125 self.execute_sync(task, Some(progress_callback)).await
126 }
127
128 async fn execute_sync<F>(
129 &self,
130 task: &SyncTask,
131 progress_callback: Option<F>,
132 ) -> Result<SyncReport, SyncError>
133 where
134 F: Fn(SyncProgress) + Send + Sync + 'static,
135 {
136 info!(task_id = %task.id, "Starting sync task: {}", task.name);
137 let mut report = SyncReport::new(&task.id);
138
139 let diff = {
141 let source_provider =
142 self.get_provider(&task.source_account)
143 .ok_or(SyncError::Provider(ProviderError::NotFound(
144 task.source_account.clone(),
145 )))?;
146 let target_provider =
147 self.get_provider(&task.target_account)
148 .ok_or(SyncError::Provider(ProviderError::NotFound(
149 task.target_account.clone(),
150 )))?;
151 self.calculate_diff(
152 source_provider.as_ref(),
153 target_provider.as_ref(),
154 &task.source_path,
155 &task.target_path,
156 &task.diff_mode,
157 task.sync_policy.as_ref(),
158 &format!("{}::{}", task.source_account, task.source_path),
159 &format!("{}::{}", task.target_account, task.target_path),
160 )
161 .await?
162 };
163
164 info!(task_id = %task.id, total_files = diff.files.len(), "Diff calculation completed");
165
166 let total_transfer_size = diff.total_transfer_size;
167 let mut transferred_size = 0u64;
168 let start_time = std::time::Instant::now();
169
170 for file_diff in diff.files {
172 match file_diff.action {
173 DiffAction::Upload | DiffAction::Update => {
174 debug!(file = %file_diff.path, "Syncing file (Upload/Update)");
175 if let Some(src_info) = &file_diff.source_info {
177 if src_info.is_dir {
178 debug!(path = %file_diff.path, "Creating directory (from Upload action)");
179 let target_provider = self.get_provider(&task.target_account).ok_or(
180 SyncError::Provider(ProviderError::NotFound(
181 task.target_account.clone(),
182 )),
183 )?;
184 let target_full_path = {
185 let base_path = std::path::Path::new(&task.target_path);
186 let rel_path = std::path::Path::new(&file_diff.path);
187 base_path
188 .join(rel_path)
189 .to_string_lossy()
190 .replace('\\', "/")
191 };
192 match target_provider.mkdir(&target_full_path).await {
193 Ok(_) => {
194 info!(path = %file_diff.path, "Created directory");
195 report.add_success(&file_diff.path, 0);
196 continue; }
198 Err(e) => {
199 warn!(path = %file_diff.path, error = %e, "Failed to create directory (might exist)");
203 continue;
208 }
209 }
210 }
211 }
212
213 let source_provider =
215 self.get_provider(&task.source_account)
216 .ok_or(SyncError::Provider(ProviderError::NotFound(
217 task.source_account.clone(),
218 )))?;
219 let target_provider =
220 self.get_provider(&task.target_account)
221 .ok_or(SyncError::Provider(ProviderError::NotFound(
222 task.target_account.clone(),
223 )))?;
224
225 let file_size = file_diff.transfer_size();
226
227 if let Some(ref cb) = progress_callback {
229 cb(SyncProgress {
230 current_file: file_diff.path.clone(),
231 current_file_size: file_size,
232 transferred: transferred_size,
233 total: total_transfer_size,
234 percentage: if total_transfer_size > 0 {
235 (transferred_size as f64 / total_transfer_size as f64) * 100.0
236 } else {
237 0.0
238 },
239 speed: 0.0,
240 });
241 }
242
243 match self
244 .sync_file(
245 source_provider.as_ref(),
246 target_provider.as_ref(),
247 &file_diff,
248 task,
249 &mut report,
250 )
251 .await
252 {
253 Ok(_) => {
254 debug!(file = %file_diff.path, "Sync successful");
255 transferred_size += file_size;
256
257 if let Some(ref cb) = progress_callback {
259 let elapsed = start_time.elapsed().as_secs_f64();
260 let speed = if elapsed > 0.0 {
261 transferred_size as f64 / elapsed
262 } else {
263 0.0
264 };
265
266 cb(SyncProgress {
267 current_file: file_diff.path.clone(),
268 current_file_size: file_size,
269 transferred: transferred_size,
270 total: total_transfer_size,
271 percentage: if total_transfer_size > 0 {
272 (transferred_size as f64 / total_transfer_size as f64)
273 * 100.0
274 } else {
275 100.0
276 },
277 speed,
278 });
279 }
280 }
281 Err(e) => {
282 error!(file = %file_diff.path, error = %e, "Sync failed");
283 report.add_failure(
284 &file_diff.path,
285 FileOperation::from_diff_action(file_diff.action),
286 e.to_string(),
287 );
288 }
289 }
290 }
291 DiffAction::Download => {
292 }
294 DiffAction::Delete => {
295 debug!(file = %file_diff.path, "Deleting target file");
296 let target_provider =
298 self.get_provider(&task.target_account)
299 .ok_or(SyncError::Provider(ProviderError::NotFound(
300 task.target_account.clone(),
301 )))?;
302
303 let target_full_path = {
304 let base_path = std::path::Path::new(&task.target_path);
305 let rel_path = std::path::Path::new(&file_diff.path);
306 base_path
307 .join(rel_path)
308 .to_string_lossy()
309 .replace('\\', "/")
310 };
311
312 match target_provider.delete(&target_full_path).await {
313 Ok(_) => {
314 info!(file = %file_diff.path, "Deleted target file");
315 report.add_success(&file_diff.path, file_diff.size_diff);
316 }
317 Err(e) => {
318 error!(file = %file_diff.path, error = %e, "Failed to delete file");
319 report.add_failure(
320 &file_diff.path,
321 FileOperation::Delete,
322 e.to_string(),
323 );
324 }
325 }
326 }
327 DiffAction::CreateDir => {
328 debug!(path = %file_diff.path, "Creating directory");
329 let target_provider =
330 self.get_provider(&task.target_account)
331 .ok_or(SyncError::Provider(ProviderError::NotFound(
332 task.target_account.clone(),
333 )))?;
334 let target_full_path = {
335 let base_path = std::path::Path::new(&task.target_path);
336 let rel_path = std::path::Path::new(&file_diff.path);
337 base_path
338 .join(rel_path)
339 .to_string_lossy()
340 .replace('\\', "/")
341 };
342 match target_provider.mkdir(&target_full_path).await {
343 Ok(_) => {
344 info!(path = %file_diff.path, "Created directory");
345 report.add_success(&file_diff.path, 0);
346 }
347 Err(e) => {
348 error!(path = %file_diff.path, error = %e, "Failed to create directory");
349 report.add_failure(
350 &file_diff.path,
351 FileOperation::CreateDir,
352 e.to_string(),
353 );
354 }
355 }
356 }
357 DiffAction::Conflict => {
358 warn!(file = %file_diff.path, "Conflict detected");
359 report.add_conflict(&file_diff.path);
360 }
361 _ => {}
362 }
363 }
364
365 let duration = start_time.elapsed().as_secs_f64();
366 report.statistics.finalize(duration);
367 report.duration_seconds = duration as i64;
368
369 if let Err(e) = self.save_report(&report) {
371 error!(error = %e, "Failed to save sync report to database");
372 }
373
374 info!(task_id = %task.id, stats = ?report.statistics, "Sync task completed");
375 Ok(report)
376 }
377}
378
379pub struct VerificationProgress {
381 pub current_path: String,
382 pub current_file: usize,
383 pub total_files: usize,
384}
385
386pub struct SyncProgress {
387 pub current_file: String,
388 pub current_file_size: u64,
389 pub transferred: u64,
390 pub total: u64,
391 pub percentage: f64,
392 pub speed: f64,
393}
394
395pub struct VerificationResult {
396 pub total_files: usize,
397 pub checked_files: usize,
398 pub passed: usize,
399 pub failed: usize,
400 pub skipped: usize,
401 pub errors: Vec<String>,
402}
403
404impl VerificationResult {
405 pub fn new() -> Self {
406 VerificationResult {
407 total_files: 0,
408 checked_files: 0,
409 passed: 0,
410 failed: 0,
411 skipped: 0,
412 errors: vec![],
413 }
414 }
415}
416
417pub struct RepairResult {
418 pub repaired_files: usize,
419 pub repaired_bytes: u64,
420}
421
422impl Default for RepairResult {
423 fn default() -> Self {
424 RepairResult {
425 repaired_files: 0,
426 repaired_bytes: 0,
427 }
428 }
429}
430
431impl SyncEngine {
432 pub async fn verify_integrity(
433 &self,
434 task: &SyncTask,
435 _verify_all: bool,
436 progress_callback: impl Fn(VerificationProgress),
437 ) -> Result<VerificationResult, SyncError> {
438 let source_provider =
439 self.get_provider(&task.source_account)
440 .ok_or(SyncError::Provider(ProviderError::NotFound(
441 task.source_account.clone(),
442 )))?;
443 let target_provider =
444 self.get_provider(&task.target_account)
445 .ok_or(SyncError::Provider(ProviderError::NotFound(
446 task.target_account.clone(),
447 )))?;
448
449 let mut result = VerificationResult::new();
450 let source_files = self
451 .walk_directory(source_provider.as_ref(), &task.source_path)
452 .await?;
453 let target_files = self
454 .walk_directory(target_provider.as_ref(), &task.target_path)
455 .await?;
456
457 for (path, source_info) in &source_files {
458 progress_callback(VerificationProgress {
459 current_path: path.clone(),
460 current_file: result.checked_files + 1,
461 total_files: source_files.len(),
462 });
463 if let Some(target_info) = target_files.get(path) {
464 if let (Some(source_hash), Some(target_hash)) =
465 (&source_info.file_hash, &target_info.file_hash)
466 {
467 if source_hash == target_hash {
468 result.passed += 1;
469 } else {
470 result.failed += 1;
471 result.errors.push(format!("文件哈希不匹配: {}", path));
472 }
473 } else if source_info.size == target_info.size {
474 result.passed += 1;
475 } else {
476 result.failed += 1;
477 result.errors.push(format!("文件大小不匹配: {}", path));
478 }
479 } else {
480 result.skipped += 1;
481 }
482 result.checked_files += 1;
483 }
484 result.total_files = source_files.len();
485 Ok(result)
486 }
487
488 pub async fn repair_integrity(
489 &self,
490 _task: &SyncTask,
491 _verification_result: &VerificationResult,
492 ) -> Result<RepairResult, SyncError> {
493 Ok(RepairResult::default())
494 }
495
496 pub async fn calculate_diff_for_dry_run(
497 &self,
498 task: &SyncTask,
499 ) -> Result<DiffResult, SyncError> {
500 let source_provider =
501 self.get_provider(&task.source_account)
502 .ok_or(SyncError::Provider(ProviderError::NotFound(
503 task.source_account.clone(),
504 )))?;
505 let target_provider =
506 self.get_provider(&task.target_account)
507 .ok_or(SyncError::Provider(ProviderError::NotFound(
508 task.target_account.clone(),
509 )))?;
510
511 self.calculate_diff(
512 source_provider.as_ref(),
513 target_provider.as_ref(),
514 &task.source_path,
515 &task.target_path,
516 &task.diff_mode,
517 task.sync_policy.as_ref(),
518 &format!("{}::{}", task.source_account, task.source_path),
519 &format!("{}::{}", task.target_account, task.target_path),
520 )
521 .await
522 }
523
524 pub fn save_report(&self, report: &SyncReport) -> Result<(), SyncError> {
526 let conn = self.resume_store.lock().unwrap();
527 let report_id = uuid::Uuid::new_v4().to_string();
528 let json = serde_json::to_string(report).map_err(|e| SyncError::Unknown(e.to_string()))?;
529
530 conn.execute(
531 "INSERT INTO sync_reports (report_id, task_id, start_time, status, duration_seconds, details_json)
532 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
533 params![
534 report_id,
535 report.task_id,
536 report.start_time.timestamp(),
537 format!("{:?}", report.status),
538 report.duration_seconds,
539 json
540 ],
541 )?;
542 Ok(())
543 }
544
545 pub fn list_reports(
547 &self,
548 task_id: &str,
549 limit: usize,
550 offset: usize,
551 ) -> Result<Vec<(String, i64, String, i64)>, SyncError> {
552 let conn = self.resume_store.lock().unwrap();
553 let mut stmt = conn.prepare(
555 "SELECT report_id, start_time, status, duration_seconds
556 FROM sync_reports
557 WHERE task_id = ?1
558 ORDER BY start_time DESC
559 LIMIT ?2 OFFSET ?3",
560 )?;
561
562 let limit_i64 = limit as i64;
564 let offset_i64 = offset as i64;
565
566 let report_iter = stmt.query_map(params![task_id, limit_i64, offset_i64], |row| {
567 Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?))
568 })?;
569
570 let mut reports = Vec::new();
571 for report in report_iter {
572 reports.push(report?);
573 }
574 Ok(reports)
575 }
576
577 pub fn get_report(&self, report_id: &str) -> Result<SyncReport, SyncError> {
579 let conn = self.resume_store.lock().unwrap();
580 let mut stmt =
581 conn.prepare("SELECT details_json FROM sync_reports WHERE report_id = ?1")?;
582
583 let mut rows = stmt.query(params![report_id])?;
584 if let Some(row) = rows.next()? {
585 let json: String = row.get(0)?;
586 let report: SyncReport =
587 serde_json::from_str(&json).map_err(|e| SyncError::Unknown(e.to_string()))?;
588 Ok(report)
589 } else {
590 Err(SyncError::Unknown("Report not found".to_string()))
591 }
592 }
593}
594
595impl SyncEngine {
596 async fn list_with_retry(
597 &self,
598 provider: &dyn StorageProvider,
599 path: &str,
600 ) -> Result<Vec<FileInfo>, SyncError> {
601 let max_retries = 3;
602 let mut last_error = SyncError::Unknown("Initial".to_string());
603
604 for i in 0..=max_retries {
605 match provider.list(path).await {
606 Ok(list) => return Ok(list),
607 Err(e) => {
608 last_error = e;
609 if i < max_retries {
610 sleep(Duration::from_millis(100 * (1 << i))).await;
611 }
612 }
613 }
614 }
615 Err(last_error)
616 }
617
618 async fn recursive_list(
619 &self,
620 provider: &dyn StorageProvider,
621 root: &str,
622 ) -> Result<Vec<FileInfo>, SyncError> {
623 let mut result = Vec::new();
624 let mut stack = vec![root.to_string()];
625
626 while let Some(dir) = stack.pop() {
627 let entries = self.list_with_retry(provider, &dir).await?;
629 for entry in entries {
630 if entry.is_dir {
631 stack.push(entry.path.clone());
637 }
638 result.push(entry);
639 }
640 }
641 Ok(result)
642 }
643
644 async fn calculate_diff(
646 &self,
647 source: &dyn StorageProvider,
648 target: &dyn StorageProvider,
649 source_path: &str,
650 target_path: &str,
651 diff_mode: &DiffMode,
652 policy: Option<&SyncPolicy>,
653 source_key: &str,
654 target_key: &str,
655 ) -> Result<DiffResult, SyncError> {
656 debug!(source = source_path, target = target_path, mode = ?diff_mode, "Calculating diff");
657 let (delete_orphans, overwrite_existing, cooldown_secs) = if let Some(p) = policy {
659 (p.delete_orphans, p.overwrite_existing, p.scan_cooldown_secs)
660 } else {
661 (true, true, 0u64)
663 };
664
665 let use_cache = matches!(diff_mode, DiffMode::Smart) && cooldown_secs > 0;
667 let now = SystemTime::now();
668
669 let src_list = if use_cache {
671 if let Some((cached, ts)) = self.scan_cache.get(source_key).map(|v| v.clone()) {
672 if now.duration_since(ts).unwrap_or_default().as_secs() < cooldown_secs {
673 debug!(key = source_key, "Using cached source list");
674 cached
675 } else {
676 debug!(key = source_key, "Refreshing source list (cache expired)");
677 let fresh = self.recursive_list(source, source_path).await?;
678 self.scan_cache
679 .insert(source_key.to_string(), (fresh.clone(), now));
680 fresh
681 }
682 } else {
683 debug!(key = source_key, "Fetching source list (no cache)");
684 let fresh = self.recursive_list(source, source_path).await?;
685 self.scan_cache
686 .insert(source_key.to_string(), (fresh.clone(), now));
687 fresh
688 }
689 } else {
690 debug!(key = source_key, "Fetching source list (cache disabled)");
691 let fresh = self.recursive_list(source, source_path).await?;
692 self.scan_cache
693 .insert(source_key.to_string(), (fresh.clone(), now));
694 fresh
695 };
696
697 let dst_list = if use_cache {
699 if let Some((cached, ts)) = self.scan_cache.get(target_key).map(|v| v.clone()) {
700 if now.duration_since(ts).unwrap_or_default().as_secs() < cooldown_secs {
701 debug!(key = target_key, "Using cached target list");
702 cached
703 } else {
704 debug!(key = target_key, "Refreshing target list (cache expired)");
705 let fresh = self.recursive_list(target, target_path).await?;
706 self.scan_cache
707 .insert(target_key.to_string(), (fresh.clone(), now));
708 fresh
709 }
710 } else {
711 debug!(key = target_key, "Fetching target list (no cache)");
712 let fresh = self.recursive_list(target, target_path).await?;
713 self.scan_cache
714 .insert(target_key.to_string(), (fresh.clone(), now));
715 fresh
716 }
717 } else {
718 debug!(key = target_key, "Fetching target list (cache disabled)");
719 let fresh = self.recursive_list(target, target_path).await?;
720 self.scan_cache
721 .insert(target_key.to_string(), (fresh.clone(), now));
722 fresh
723 };
724
725 let to_metadata = |info: &FileInfo| -> crate::sync::diff::FileMetadata {
727 let mut meta =
728 crate::sync::diff::FileMetadata::new(std::path::PathBuf::from(&info.path));
729 meta.size = info.size;
730 meta.modified = info.modified;
731 meta.is_dir = info.is_dir;
732 meta.file_hash = info.hash.clone();
733 meta
734 };
735
736 let normalize_path = |full_path: &str, root: &str| -> String {
738 let root = root.trim_end_matches('/');
739 if full_path.starts_with(root) {
740 let rel = &full_path[root.len()..];
741 rel.trim_start_matches('/').to_string()
742 } else {
743 full_path.to_string()
744 }
745 };
746
747 let mut src_map = std::collections::HashMap::new();
749 for f in src_list.iter() {
750 let rel_path = normalize_path(&f.path, source_path);
751 src_map.insert(rel_path, to_metadata(f));
752 }
753
754 let mut dst_map = std::collections::HashMap::new();
755 for f in dst_list.iter() {
756 let rel_path = normalize_path(&f.path, target_path);
757 dst_map.insert(rel_path, to_metadata(f));
758 }
759
760 let mut all_paths: std::collections::HashSet<String> = std::collections::HashSet::new();
762 for p in src_map.keys() {
763 all_paths.insert(p.clone());
764 }
765 for p in dst_map.keys() {
766 all_paths.insert(p.clone());
767 }
768
769 let mut diff = DiffResult::new();
770
771 for path in all_paths {
772 let src_meta = src_map.get(&path);
773 let dst_meta = dst_map.get(&path);
774
775 match (src_meta, dst_meta) {
776 (Some(s), Some(t)) => {
777 let size_match = s.size == t.size;
781 let time_match = (s.modified - t.modified).abs() <= 2;
783
784 if size_match && time_match {
785 diff.add_file(FileDiff::unchanged(path.clone(), s.clone(), t.clone()));
787 } else {
788 if overwrite_existing {
790 diff.add_file(FileDiff::update(path.clone(), s.clone(), t.clone()));
791 } else {
792 let mut d = FileDiff::unchanged(path.clone(), s.clone(), t.clone());
795 d.tags.push("skipped_overwrite".to_string());
796 diff.add_file(d);
797 }
798 }
799 }
800 (Some(s), None) => {
801 diff.add_file(FileDiff::upload(path.clone(), s.clone(), None));
803 }
804 (None, Some(t)) => {
805 if delete_orphans {
807 diff.add_file(FileDiff::delete(path.clone(), t.clone()));
808 } else {
809 let mut d = FileDiff::unchanged(
810 path.clone(),
811 crate::sync::diff::FileMetadata::new(std::path::PathBuf::from(&path)),
812 t.clone(),
813 );
814 d = FileDiff::new(
819 path.clone(),
820 DiffAction::Unchanged,
821 None,
822 Some(t.clone()),
823 );
824 d.tags.push("target_only".to_string());
825 diff.add_file(d);
826 }
827 }
828 (None, None) => unreachable!(),
829 }
830 }
831
832 Ok(diff)
833 }
834
835 async fn sync_file(
836 &self,
837 source: &dyn StorageProvider,
838 target: &dyn StorageProvider,
839 file_diff: &FileDiff,
840 task: &SyncTask,
841 report: &mut SyncReport,
842 ) -> Result<(), SyncError> {
843 let join_path = |base: &str, rel: &str| -> String {
845 let base_path = std::path::Path::new(base);
846 let rel_path = std::path::Path::new(rel);
847 base_path
848 .join(rel_path)
849 .to_string_lossy()
850 .replace('\\', "/")
851 };
852
853 let source_full_path = join_path(&task.source_path, &file_diff.path);
854 let target_full_path = join_path(&task.target_path, &file_diff.path);
855
856 {
859 let conn = self.resume_store.lock().unwrap();
860 let mut stmt = conn.prepare(
861 "SELECT status FROM resume_data WHERE task_id = ?1 AND file_path = ?2 LIMIT 1",
862 )?;
863 let mut rows = stmt.query(params![task.id, file_diff.path.clone()])?;
864 if let Some(row) = rows.next()? {
865 let status: String = row.get(0)?;
866 if status == "in_progress" {
867 let resume_data = String::new();
868 self.resume_transfer(source, target, file_diff, task, &resume_data, report)
869 .await;
870 return Ok(());
871 }
872 }
873 }
874
875 let temp_path = self.create_temp_file()?;
877
878 let download_result = source.download(&source_full_path, &temp_path).await?;
880
881 let (encrypted_data, metadata) = if let Some(enc_config) = &task.encryption {
883 self.encryption_manager
884 .encrypt_file(&temp_path, enc_config)
885 .await?
886 } else {
887 (None, None)
888 };
889
890 let upload_result = if let Some(encrypted) = encrypted_data {
892 target.upload(&encrypted, &target_full_path).await?
894 } else {
895 target.upload(&temp_path, &target_full_path).await?
897 };
898
899 report.add_success(&file_diff.path, file_diff.size_diff);
901
902 self.cleanup_temp_file(&temp_path)?;
904
905 Ok(())
906 }
907
908 async fn resume_transfer(
909 &self,
910 source_storage_provider: &dyn StorageProvider,
911 target_storage_provider: &dyn StorageProvider,
912 file_diff: &FileDiff,
913 task: &SyncTask,
914 data: &String,
915 reporter: &mut SyncReport,
916 ) {
917 todo!()
918 }
919}