cloud_disk_sync/sync/
engine.rs

1use crate::config::SyncPolicy;
2use crate::config::{DiffMode, SyncTask};
3use crate::encryption::EncryptionManager;
4use crate::error::{ProviderError, SyncError};
5use crate::providers::{FileInfo, StorageProvider};
6use crate::report::{FileOperation, SyncReport};
7use crate::sync::diff::{ChecksumType, DiffAction, DiffResult, FileDiff};
8use dashmap::DashMap;
9use rusqlite::{Connection, params};
10use std::collections::HashMap;
11use std::sync::{Arc, Mutex};
12use std::time::SystemTime;
13use tokio::time::{Duration, sleep};
14use tracing::{debug, error, info, instrument, warn};
15
16pub struct SyncEngine {
17    providers: HashMap<String, Box<dyn StorageProvider>>,
18    encryption_manager: EncryptionManager,
19    diff_cache: DashMap<String, FileDiff>,
20    resume_store: Arc<Mutex<Connection>>,
21    /// 扫描缓存:key -> (列表快照, 上次扫描时间)
22    scan_cache: DashMap<String, (Vec<FileInfo>, SystemTime)>,
23}
24
25impl SyncEngine {
26    pub async fn new() -> Result<Self, SyncError> {
27        let db_path = dirs::data_dir()
28            .ok_or(SyncError::Unknown(String::from(
29                "Failed to obtain data_dir",
30            )))?
31            .join("disksync");
32
33        std::fs::create_dir_all(&db_path)?;
34
35        let db_path = db_path.join("resume.db");
36        let conn = Connection::open(&db_path)?;
37
38        // 创建简历表
39        conn.execute(
40            "CREATE TABLE IF NOT EXISTS resume_data (
41                task_id TEXT NOT NULL,
42                file_path TEXT NOT NULL,
43                last_modified INTEGER NOT NULL,
44                file_size INTEGER NOT NULL,
45                checksum TEXT,
46                status TEXT NOT NULL,
47                PRIMARY KEY (task_id, file_path)
48            )",
49            [],
50        )?;
51
52        // 创建报告表
53        conn.execute(
54            "CREATE TABLE IF NOT EXISTS sync_reports (
55                report_id TEXT PRIMARY KEY,
56                task_id TEXT NOT NULL,
57                start_time INTEGER NOT NULL,
58                status TEXT NOT NULL,
59                duration_seconds INTEGER NOT NULL,
60                details_json TEXT NOT NULL
61            )",
62            [],
63        )?;
64
65        // 创建索引以加速查询
66        conn.execute(
67            "CREATE INDEX IF NOT EXISTS idx_reports_task_id ON sync_reports(task_id)",
68            [],
69        )?;
70
71        Ok(Self {
72            providers: HashMap::new(),
73            encryption_manager: EncryptionManager::new(),
74            diff_cache: DashMap::new(),
75            resume_store: Arc::new(Mutex::new(conn)),
76            scan_cache: DashMap::new(),
77        })
78    }
79
80    /// 注册存储提供器到引擎
81    pub fn register_provider(&mut self, account_id: String, provider: Box<dyn StorageProvider>) {
82        self.providers.insert(account_id, provider);
83    }
84
85    pub fn get_provider(&self, account_id: &str) -> Option<&Box<dyn StorageProvider>> {
86        self.providers.get(account_id)
87    }
88
89    pub async fn walk_directory(
90        &self,
91        provider: &dyn StorageProvider,
92        root: &str,
93    ) -> Result<std::collections::HashMap<String, crate::sync::diff::FileMetadata>, SyncError> {
94        let mut map = std::collections::HashMap::new();
95        let entries = provider.list(root).await?;
96        for e in entries {
97            let path = std::path::PathBuf::from(e.path.clone());
98            let meta = crate::sync::diff::FileMetadata::from_path(&path)?;
99            map.insert(e.path, meta);
100        }
101        Ok(map)
102    }
103
104    fn create_temp_file(&self) -> Result<std::path::PathBuf, SyncError> {
105        let name = format!("sync_{}.tmp", uuid::Uuid::new_v4());
106        let path = std::env::temp_dir().join(name);
107        std::fs::File::create(&path)?;
108        Ok(path)
109    }
110
111    fn cleanup_temp_file(&self, path: &std::path::Path) -> Result<(), SyncError> {
112        std::fs::remove_file(path)?;
113        Ok(())
114    }
115
116    pub async fn sync(&mut self, task: &SyncTask) -> Result<SyncReport, SyncError> {
117        self.execute_sync(task, None::<fn(SyncProgress)>).await
118    }
119
120    pub async fn sync_with_progress(
121        &mut self,
122        task: &SyncTask,
123        progress_callback: impl Fn(SyncProgress) + Send + Sync + 'static,
124    ) -> Result<SyncReport, SyncError> {
125        self.execute_sync(task, Some(progress_callback)).await
126    }
127
128    async fn execute_sync<F>(
129        &self,
130        task: &SyncTask,
131        progress_callback: Option<F>,
132    ) -> Result<SyncReport, SyncError>
133    where
134        F: Fn(SyncProgress) + Send + Sync + 'static,
135    {
136        info!(task_id = %task.id, "Starting sync task: {}", task.name);
137        let mut report = SyncReport::new(&task.id);
138
139        // 计算文件差异(限定借用作用域)
140        let diff = {
141            let source_provider =
142                self.get_provider(&task.source_account)
143                    .ok_or(SyncError::Provider(ProviderError::NotFound(
144                        task.source_account.clone(),
145                    )))?;
146            let target_provider =
147                self.get_provider(&task.target_account)
148                    .ok_or(SyncError::Provider(ProviderError::NotFound(
149                        task.target_account.clone(),
150                    )))?;
151            self.calculate_diff(
152                source_provider.as_ref(),
153                target_provider.as_ref(),
154                &task.source_path,
155                &task.target_path,
156                &task.diff_mode,
157                task.sync_policy.as_ref(),
158                &format!("{}::{}", task.source_account, task.source_path),
159                &format!("{}::{}", task.target_account, task.target_path),
160            )
161            .await?
162        };
163
164        info!(task_id = %task.id, total_files = diff.files.len(), "Diff calculation completed");
165
166        let total_transfer_size = diff.total_transfer_size;
167        let mut transferred_size = 0u64;
168        let start_time = std::time::Instant::now();
169
170        // 执行同步
171        for file_diff in diff.files {
172            match file_diff.action {
173                DiffAction::Upload | DiffAction::Update => {
174                    debug!(file = %file_diff.path, "Syncing file (Upload/Update)");
175                    // 如果是目录,则创建目录
176                    if let Some(src_info) = &file_diff.source_info {
177                        if src_info.is_dir {
178                            debug!(path = %file_diff.path, "Creating directory (from Upload action)");
179                            let target_provider = self.get_provider(&task.target_account).ok_or(
180                                SyncError::Provider(ProviderError::NotFound(
181                                    task.target_account.clone(),
182                                )),
183                            )?;
184                            let target_full_path = {
185                                let base_path = std::path::Path::new(&task.target_path);
186                                let rel_path = std::path::Path::new(&file_diff.path);
187                                base_path
188                                    .join(rel_path)
189                                    .to_string_lossy()
190                                    .replace('\\', "/")
191                            };
192                            match target_provider.mkdir(&target_full_path).await {
193                                Ok(_) => {
194                                    info!(path = %file_diff.path, "Created directory");
195                                    report.add_success(&file_diff.path, 0);
196                                    continue; // 目录处理完毕
197                                }
198                                Err(e) => {
199                                    // 忽略目录已存在错误
200                                    // WebDavProvider::mkdir 返回什么错误?
201                                    // 假设是通用错误,暂时记录日志
202                                    warn!(path = %file_diff.path, error = %e, "Failed to create directory (might exist)");
203                                    // 不因为目录创建失败中断,尝试继续?或者算成功?
204                                    // 如果目录创建失败,后续文件上传可能会失败。
205                                    // 但如果是"已存在",则没问题。
206                                    // 暂时 continue
207                                    continue;
208                                }
209                            }
210                        }
211                    }
212
213                    // 重新获取 provider,避免与 &self 的可变借用冲突
214                    let source_provider =
215                        self.get_provider(&task.source_account)
216                            .ok_or(SyncError::Provider(ProviderError::NotFound(
217                                task.source_account.clone(),
218                            )))?;
219                    let target_provider =
220                        self.get_provider(&task.target_account)
221                            .ok_or(SyncError::Provider(ProviderError::NotFound(
222                                task.target_account.clone(),
223                            )))?;
224
225                    let file_size = file_diff.transfer_size();
226
227                    // 通知进度:开始
228                    if let Some(ref cb) = progress_callback {
229                        cb(SyncProgress {
230                            current_file: file_diff.path.clone(),
231                            current_file_size: file_size,
232                            transferred: transferred_size,
233                            total: total_transfer_size,
234                            percentage: if total_transfer_size > 0 {
235                                (transferred_size as f64 / total_transfer_size as f64) * 100.0
236                            } else {
237                                0.0
238                            },
239                            speed: 0.0,
240                        });
241                    }
242
243                    match self
244                        .sync_file(
245                            source_provider.as_ref(),
246                            target_provider.as_ref(),
247                            &file_diff,
248                            task,
249                            &mut report,
250                        )
251                        .await
252                    {
253                        Ok(_) => {
254                            debug!(file = %file_diff.path, "Sync successful");
255                            transferred_size += file_size;
256
257                            // 通知进度:完成
258                            if let Some(ref cb) = progress_callback {
259                                let elapsed = start_time.elapsed().as_secs_f64();
260                                let speed = if elapsed > 0.0 {
261                                    transferred_size as f64 / elapsed
262                                } else {
263                                    0.0
264                                };
265
266                                cb(SyncProgress {
267                                    current_file: file_diff.path.clone(),
268                                    current_file_size: file_size,
269                                    transferred: transferred_size,
270                                    total: total_transfer_size,
271                                    percentage: if total_transfer_size > 0 {
272                                        (transferred_size as f64 / total_transfer_size as f64)
273                                            * 100.0
274                                    } else {
275                                        100.0
276                                    },
277                                    speed,
278                                });
279                            }
280                        }
281                        Err(e) => {
282                            error!(file = %file_diff.path, error = %e, "Sync failed");
283                            report.add_failure(
284                                &file_diff.path,
285                                FileOperation::from_diff_action(file_diff.action),
286                                e.to_string(),
287                            );
288                        }
289                    }
290                }
291                DiffAction::Download => {
292                    // 反向同步
293                }
294                DiffAction::Delete => {
295                    debug!(file = %file_diff.path, "Deleting target file");
296                    // 删除目标文件
297                    let target_provider =
298                        self.get_provider(&task.target_account)
299                            .ok_or(SyncError::Provider(ProviderError::NotFound(
300                                task.target_account.clone(),
301                            )))?;
302
303                    let target_full_path = {
304                        let base_path = std::path::Path::new(&task.target_path);
305                        let rel_path = std::path::Path::new(&file_diff.path);
306                        base_path
307                            .join(rel_path)
308                            .to_string_lossy()
309                            .replace('\\', "/")
310                    };
311
312                    match target_provider.delete(&target_full_path).await {
313                        Ok(_) => {
314                            info!(file = %file_diff.path, "Deleted target file");
315                            report.add_success(&file_diff.path, file_diff.size_diff);
316                        }
317                        Err(e) => {
318                            error!(file = %file_diff.path, error = %e, "Failed to delete file");
319                            report.add_failure(
320                                &file_diff.path,
321                                FileOperation::Delete,
322                                e.to_string(),
323                            );
324                        }
325                    }
326                }
327                DiffAction::CreateDir => {
328                    debug!(path = %file_diff.path, "Creating directory");
329                    let target_provider =
330                        self.get_provider(&task.target_account)
331                            .ok_or(SyncError::Provider(ProviderError::NotFound(
332                                task.target_account.clone(),
333                            )))?;
334                    let target_full_path = {
335                        let base_path = std::path::Path::new(&task.target_path);
336                        let rel_path = std::path::Path::new(&file_diff.path);
337                        base_path
338                            .join(rel_path)
339                            .to_string_lossy()
340                            .replace('\\', "/")
341                    };
342                    match target_provider.mkdir(&target_full_path).await {
343                        Ok(_) => {
344                            info!(path = %file_diff.path, "Created directory");
345                            report.add_success(&file_diff.path, 0);
346                        }
347                        Err(e) => {
348                            error!(path = %file_diff.path, error = %e, "Failed to create directory");
349                            report.add_failure(
350                                &file_diff.path,
351                                FileOperation::CreateDir,
352                                e.to_string(),
353                            );
354                        }
355                    }
356                }
357                DiffAction::Conflict => {
358                    warn!(file = %file_diff.path, "Conflict detected");
359                    report.add_conflict(&file_diff.path);
360                }
361                _ => {}
362            }
363        }
364
365        let duration = start_time.elapsed().as_secs_f64();
366        report.statistics.finalize(duration);
367        report.duration_seconds = duration as i64;
368
369        // 保存报告到数据库
370        if let Err(e) = self.save_report(&report) {
371            error!(error = %e, "Failed to save sync report to database");
372        }
373
374        info!(task_id = %task.id, stats = ?report.statistics, "Sync task completed");
375        Ok(report)
376    }
377}
378
379// 进度结构体与结果类型
380pub struct VerificationProgress {
381    pub current_path: String,
382    pub current_file: usize,
383    pub total_files: usize,
384}
385
386pub struct SyncProgress {
387    pub current_file: String,
388    pub current_file_size: u64,
389    pub transferred: u64,
390    pub total: u64,
391    pub percentage: f64,
392    pub speed: f64,
393}
394
395pub struct VerificationResult {
396    pub total_files: usize,
397    pub checked_files: usize,
398    pub passed: usize,
399    pub failed: usize,
400    pub skipped: usize,
401    pub errors: Vec<String>,
402}
403
404impl VerificationResult {
405    pub fn new() -> Self {
406        VerificationResult {
407            total_files: 0,
408            checked_files: 0,
409            passed: 0,
410            failed: 0,
411            skipped: 0,
412            errors: vec![],
413        }
414    }
415}
416
417pub struct RepairResult {
418    pub repaired_files: usize,
419    pub repaired_bytes: u64,
420}
421
422impl Default for RepairResult {
423    fn default() -> Self {
424        RepairResult {
425            repaired_files: 0,
426            repaired_bytes: 0,
427        }
428    }
429}
430
431impl SyncEngine {
432    pub async fn verify_integrity(
433        &self,
434        task: &SyncTask,
435        _verify_all: bool,
436        progress_callback: impl Fn(VerificationProgress),
437    ) -> Result<VerificationResult, SyncError> {
438        let source_provider =
439            self.get_provider(&task.source_account)
440                .ok_or(SyncError::Provider(ProviderError::NotFound(
441                    task.source_account.clone(),
442                )))?;
443        let target_provider =
444            self.get_provider(&task.target_account)
445                .ok_or(SyncError::Provider(ProviderError::NotFound(
446                    task.target_account.clone(),
447                )))?;
448
449        let mut result = VerificationResult::new();
450        let source_files = self
451            .walk_directory(source_provider.as_ref(), &task.source_path)
452            .await?;
453        let target_files = self
454            .walk_directory(target_provider.as_ref(), &task.target_path)
455            .await?;
456
457        for (path, source_info) in &source_files {
458            progress_callback(VerificationProgress {
459                current_path: path.clone(),
460                current_file: result.checked_files + 1,
461                total_files: source_files.len(),
462            });
463            if let Some(target_info) = target_files.get(path) {
464                if let (Some(source_hash), Some(target_hash)) =
465                    (&source_info.file_hash, &target_info.file_hash)
466                {
467                    if source_hash == target_hash {
468                        result.passed += 1;
469                    } else {
470                        result.failed += 1;
471                        result.errors.push(format!("文件哈希不匹配: {}", path));
472                    }
473                } else if source_info.size == target_info.size {
474                    result.passed += 1;
475                } else {
476                    result.failed += 1;
477                    result.errors.push(format!("文件大小不匹配: {}", path));
478                }
479            } else {
480                result.skipped += 1;
481            }
482            result.checked_files += 1;
483        }
484        result.total_files = source_files.len();
485        Ok(result)
486    }
487
488    pub async fn repair_integrity(
489        &self,
490        _task: &SyncTask,
491        _verification_result: &VerificationResult,
492    ) -> Result<RepairResult, SyncError> {
493        Ok(RepairResult::default())
494    }
495
496    pub async fn calculate_diff_for_dry_run(
497        &self,
498        task: &SyncTask,
499    ) -> Result<DiffResult, SyncError> {
500        let source_provider =
501            self.get_provider(&task.source_account)
502                .ok_or(SyncError::Provider(ProviderError::NotFound(
503                    task.source_account.clone(),
504                )))?;
505        let target_provider =
506            self.get_provider(&task.target_account)
507                .ok_or(SyncError::Provider(ProviderError::NotFound(
508                    task.target_account.clone(),
509                )))?;
510
511        self.calculate_diff(
512            source_provider.as_ref(),
513            target_provider.as_ref(),
514            &task.source_path,
515            &task.target_path,
516            &task.diff_mode,
517            task.sync_policy.as_ref(),
518            &format!("{}::{}", task.source_account, task.source_path),
519            &format!("{}::{}", task.target_account, task.target_path),
520        )
521        .await
522    }
523
524    /// 保存同步报告到数据库
525    pub fn save_report(&self, report: &SyncReport) -> Result<(), SyncError> {
526        let conn = self.resume_store.lock().unwrap();
527        let report_id = uuid::Uuid::new_v4().to_string();
528        let json = serde_json::to_string(report).map_err(|e| SyncError::Unknown(e.to_string()))?;
529
530        conn.execute(
531            "INSERT INTO sync_reports (report_id, task_id, start_time, status, duration_seconds, details_json)
532             VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
533            params![
534                report_id,
535                report.task_id,
536                report.start_time.timestamp(),
537                format!("{:?}", report.status),
538                report.duration_seconds,
539                json
540            ],
541        )?;
542        Ok(())
543    }
544
545    /// 获取任务的报告列表
546    pub fn list_reports(
547        &self,
548        task_id: &str,
549        limit: usize,
550        offset: usize,
551    ) -> Result<Vec<(String, i64, String, i64)>, SyncError> {
552        let conn = self.resume_store.lock().unwrap();
553        // 构造 SQL 查询
554        let mut stmt = conn.prepare(
555            "SELECT report_id, start_time, status, duration_seconds 
556             FROM sync_reports 
557             WHERE task_id = ?1 
558             ORDER BY start_time DESC 
559             LIMIT ?2 OFFSET ?3",
560        )?;
561
562        // 将 usize 转换为 i64,以满足 ToSql trait (SQLite INTEGER is i64)
563        let limit_i64 = limit as i64;
564        let offset_i64 = offset as i64;
565
566        let report_iter = stmt.query_map(params![task_id, limit_i64, offset_i64], |row| {
567            Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?))
568        })?;
569
570        let mut reports = Vec::new();
571        for report in report_iter {
572            reports.push(report?);
573        }
574        Ok(reports)
575    }
576
577    /// 获取特定报告详情
578    pub fn get_report(&self, report_id: &str) -> Result<SyncReport, SyncError> {
579        let conn = self.resume_store.lock().unwrap();
580        let mut stmt =
581            conn.prepare("SELECT details_json FROM sync_reports WHERE report_id = ?1")?;
582
583        let mut rows = stmt.query(params![report_id])?;
584        if let Some(row) = rows.next()? {
585            let json: String = row.get(0)?;
586            let report: SyncReport =
587                serde_json::from_str(&json).map_err(|e| SyncError::Unknown(e.to_string()))?;
588            Ok(report)
589        } else {
590            Err(SyncError::Unknown("Report not found".to_string()))
591        }
592    }
593}
594
595impl SyncEngine {
596    async fn list_with_retry(
597        &self,
598        provider: &dyn StorageProvider,
599        path: &str,
600    ) -> Result<Vec<FileInfo>, SyncError> {
601        let max_retries = 3;
602        let mut last_error = SyncError::Unknown("Initial".to_string());
603
604        for i in 0..=max_retries {
605            match provider.list(path).await {
606                Ok(list) => return Ok(list),
607                Err(e) => {
608                    last_error = e;
609                    if i < max_retries {
610                        sleep(Duration::from_millis(100 * (1 << i))).await;
611                    }
612                }
613            }
614        }
615        Err(last_error)
616    }
617
618    async fn recursive_list(
619        &self,
620        provider: &dyn StorageProvider,
621        root: &str,
622    ) -> Result<Vec<FileInfo>, SyncError> {
623        let mut result = Vec::new();
624        let mut stack = vec![root.to_string()];
625
626        while let Some(dir) = stack.pop() {
627            // list_with_retry might fail for deep directories if we hit limits, but we have retry now.
628            let entries = self.list_with_retry(provider, &dir).await?;
629            for entry in entries {
630                if entry.is_dir {
631                    // Ensure we don't get into infinite loop if provider returns "." or ".."
632                    // WebDavProvider usually filters them or returns absolute paths.
633                    // Also avoid re-listing the dir itself if it's returned.
634                    // WebDavProvider::parse_propfind_response checks `path == base_path` and skips it.
635                    // So we are safe from self-inclusion.
636                    stack.push(entry.path.clone());
637                }
638                result.push(entry);
639            }
640        }
641        Ok(result)
642    }
643
644    /// 计算差异,支持扫描结果缓存与限频
645    async fn calculate_diff(
646        &self,
647        source: &dyn StorageProvider,
648        target: &dyn StorageProvider,
649        source_path: &str,
650        target_path: &str,
651        diff_mode: &DiffMode,
652        policy: Option<&SyncPolicy>,
653        source_key: &str,
654        target_key: &str,
655    ) -> Result<DiffResult, SyncError> {
656        debug!(source = source_path, target = target_path, mode = ?diff_mode, "Calculating diff");
657        // 读取策略
658        let (delete_orphans, overwrite_existing, cooldown_secs) = if let Some(p) = policy {
659            (p.delete_orphans, p.overwrite_existing, p.scan_cooldown_secs)
660        } else {
661            // 默认策略:删除孤立、允许覆盖、不开启限频
662            (true, true, 0u64)
663        };
664
665        // 根据 DiffMode 与策略决定是否使用缓存
666        let use_cache = matches!(diff_mode, DiffMode::Smart) && cooldown_secs > 0;
667        let now = SystemTime::now();
668
669        // 获取源列表(考虑缓存)
670        let src_list = if use_cache {
671            if let Some((cached, ts)) = self.scan_cache.get(source_key).map(|v| v.clone()) {
672                if now.duration_since(ts).unwrap_or_default().as_secs() < cooldown_secs {
673                    debug!(key = source_key, "Using cached source list");
674                    cached
675                } else {
676                    debug!(key = source_key, "Refreshing source list (cache expired)");
677                    let fresh = self.recursive_list(source, source_path).await?;
678                    self.scan_cache
679                        .insert(source_key.to_string(), (fresh.clone(), now));
680                    fresh
681                }
682            } else {
683                debug!(key = source_key, "Fetching source list (no cache)");
684                let fresh = self.recursive_list(source, source_path).await?;
685                self.scan_cache
686                    .insert(source_key.to_string(), (fresh.clone(), now));
687                fresh
688            }
689        } else {
690            debug!(key = source_key, "Fetching source list (cache disabled)");
691            let fresh = self.recursive_list(source, source_path).await?;
692            self.scan_cache
693                .insert(source_key.to_string(), (fresh.clone(), now));
694            fresh
695        };
696
697        // 获取目标列表(考虑缓存)
698        let dst_list = if use_cache {
699            if let Some((cached, ts)) = self.scan_cache.get(target_key).map(|v| v.clone()) {
700                if now.duration_since(ts).unwrap_or_default().as_secs() < cooldown_secs {
701                    debug!(key = target_key, "Using cached target list");
702                    cached
703                } else {
704                    debug!(key = target_key, "Refreshing target list (cache expired)");
705                    let fresh = self.recursive_list(target, target_path).await?;
706                    self.scan_cache
707                        .insert(target_key.to_string(), (fresh.clone(), now));
708                    fresh
709                }
710            } else {
711                debug!(key = target_key, "Fetching target list (no cache)");
712                let fresh = self.recursive_list(target, target_path).await?;
713                self.scan_cache
714                    .insert(target_key.to_string(), (fresh.clone(), now));
715                fresh
716            }
717        } else {
718            debug!(key = target_key, "Fetching target list (cache disabled)");
719            let fresh = self.recursive_list(target, target_path).await?;
720            self.scan_cache
721                .insert(target_key.to_string(), (fresh.clone(), now));
722            fresh
723        };
724
725        // 辅助函数:将 FileInfo 转换为 FileMetadata
726        let to_metadata = |info: &FileInfo| -> crate::sync::diff::FileMetadata {
727            let mut meta =
728                crate::sync::diff::FileMetadata::new(std::path::PathBuf::from(&info.path));
729            meta.size = info.size;
730            meta.modified = info.modified;
731            meta.is_dir = info.is_dir;
732            meta.file_hash = info.hash.clone();
733            meta
734        };
735
736        // 辅助函数:标准化路径为相对路径
737        let normalize_path = |full_path: &str, root: &str| -> String {
738            let root = root.trim_end_matches('/');
739            if full_path.starts_with(root) {
740                let rel = &full_path[root.len()..];
741                rel.trim_start_matches('/').to_string()
742            } else {
743                full_path.to_string()
744            }
745        };
746
747        // 构建 Map (Relative Path -> FileMetadata)
748        let mut src_map = std::collections::HashMap::new();
749        for f in src_list.iter() {
750            let rel_path = normalize_path(&f.path, source_path);
751            src_map.insert(rel_path, to_metadata(f));
752        }
753
754        let mut dst_map = std::collections::HashMap::new();
755        for f in dst_list.iter() {
756            let rel_path = normalize_path(&f.path, target_path);
757            dst_map.insert(rel_path, to_metadata(f));
758        }
759
760        // 收集所有相对路径
761        let mut all_paths: std::collections::HashSet<String> = std::collections::HashSet::new();
762        for p in src_map.keys() {
763            all_paths.insert(p.clone());
764        }
765        for p in dst_map.keys() {
766            all_paths.insert(p.clone());
767        }
768
769        let mut diff = DiffResult::new();
770
771        for path in all_paths {
772            let src_meta = src_map.get(&path);
773            let dst_meta = dst_map.get(&path);
774
775            match (src_meta, dst_meta) {
776                (Some(s), Some(t)) => {
777                    // 两边都有,比较元数据
778                    // 这里可以加入更复杂的比较逻辑(如哈希)
779                    // 暂时只比较大小和修改时间
780                    let size_match = s.size == t.size;
781                    // 修改时间容差 2秒
782                    let time_match = (s.modified - t.modified).abs() <= 2;
783
784                    if size_match && time_match {
785                        // 认为相同
786                        diff.add_file(FileDiff::unchanged(path.clone(), s.clone(), t.clone()));
787                    } else {
788                        // 不同,需要更新
789                        if overwrite_existing {
790                            diff.add_file(FileDiff::update(path.clone(), s.clone(), t.clone()));
791                        } else {
792                            // 不允许覆盖,虽然不同但也标记为 Unchanged (或 Conflict? 视策略而定)
793                            // 这里标记为 Unchanged 但可以加个 Tag 说明被忽略
794                            let mut d = FileDiff::unchanged(path.clone(), s.clone(), t.clone());
795                            d.tags.push("skipped_overwrite".to_string());
796                            diff.add_file(d);
797                        }
798                    }
799                }
800                (Some(s), None) => {
801                    // 只有源有 -> Upload
802                    diff.add_file(FileDiff::upload(path.clone(), s.clone(), None));
803                }
804                (None, Some(t)) => {
805                    // 只有目标有 -> Delete (如果 delete_orphans) 否则 Unchanged (TargetOnly)
806                    if delete_orphans {
807                        diff.add_file(FileDiff::delete(path.clone(), t.clone()));
808                    } else {
809                        let mut d = FileDiff::unchanged(
810                            path.clone(),
811                            crate::sync::diff::FileMetadata::new(std::path::PathBuf::from(&path)),
812                            t.clone(),
813                        );
814                        // 标记源信息为"空"的元数据是不太准确的,FileDiff::unchanged 需要 source_info
815                        // 实际上 FileDiff::new 的 source_info 是 Option。
816                        // 但是 FileDiff::unchanged 辅助方法要求 FileMetadata。
817                        // 我们直接用 FileDiff::new
818                        d = FileDiff::new(
819                            path.clone(),
820                            DiffAction::Unchanged,
821                            None,
822                            Some(t.clone()),
823                        );
824                        d.tags.push("target_only".to_string());
825                        diff.add_file(d);
826                    }
827                }
828                (None, None) => unreachable!(),
829            }
830        }
831
832        Ok(diff)
833    }
834
835    async fn sync_file(
836        &self,
837        source: &dyn StorageProvider,
838        target: &dyn StorageProvider,
839        file_diff: &FileDiff,
840        task: &SyncTask,
841        report: &mut SyncReport,
842    ) -> Result<(), SyncError> {
843        // 构造完整路径辅助函数
844        let join_path = |base: &str, rel: &str| -> String {
845            let base_path = std::path::Path::new(base);
846            let rel_path = std::path::Path::new(rel);
847            base_path
848                .join(rel_path)
849                .to_string_lossy()
850                .replace('\\', "/")
851        };
852
853        let source_full_path = join_path(&task.source_path, &file_diff.path);
854        let target_full_path = join_path(&task.target_path, &file_diff.path);
855
856        // 检查是否有断点续传记录
857        // 查询断点续传记录
858        {
859            let conn = self.resume_store.lock().unwrap();
860            let mut stmt = conn.prepare(
861                "SELECT status FROM resume_data WHERE task_id = ?1 AND file_path = ?2 LIMIT 1",
862            )?;
863            let mut rows = stmt.query(params![task.id, file_diff.path.clone()])?;
864            if let Some(row) = rows.next()? {
865                let status: String = row.get(0)?;
866                if status == "in_progress" {
867                    let resume_data = String::new();
868                    self.resume_transfer(source, target, file_diff, task, &resume_data, report)
869                        .await;
870                    return Ok(());
871                }
872            }
873        }
874
875        // 创建临时文件
876        let temp_path = self.create_temp_file()?;
877
878        // 下载文件
879        let download_result = source.download(&source_full_path, &temp_path).await?;
880
881        // 加密(如果需要)
882        let (encrypted_data, metadata) = if let Some(enc_config) = &task.encryption {
883            self.encryption_manager
884                .encrypt_file(&temp_path, enc_config)
885                .await?
886        } else {
887            (None, None)
888        };
889
890        // 上传文件
891        let upload_result = if let Some(encrypted) = encrypted_data {
892            // 上传加密文件
893            target.upload(&encrypted, &target_full_path).await?
894        } else {
895            // 上传原始文件
896            target.upload(&temp_path, &target_full_path).await?
897        };
898
899        // 记录成功
900        report.add_success(&file_diff.path, file_diff.size_diff);
901
902        // 清理临时文件
903        self.cleanup_temp_file(&temp_path)?;
904
905        Ok(())
906    }
907
908    async fn resume_transfer(
909        &self,
910        source_storage_provider: &dyn StorageProvider,
911        target_storage_provider: &dyn StorageProvider,
912        file_diff: &FileDiff,
913        task: &SyncTask,
914        data: &String,
915        reporter: &mut SyncReport,
916    ) {
917        todo!()
918    }
919}