use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use bytes::Bytes;
use tokio::sync::RwLock;
use tracing::info;
use warp::Filter;
use warp::http::Method;
use cloud_disk_sync::config::{
AccountConfig, ConfigManager, DiffMode, ProviderType, RetryPolicy, SyncPolicy, SyncTask,
};
use cloud_disk_sync::providers::{StorageProvider, WebDavProvider};
use cloud_disk_sync::sync::diff::DiffAction;
use cloud_disk_sync::sync::engine::SyncEngine;
mod common;
use common::{FileStore, InMemoryFile, start_mock_server_with_seed};
#[tokio::test]
async fn test_webdav_sync_basic() {
common::init_logging();
let (addr1, _store1) = start_mock_server_with_seed(vec![
("/file_root/a.txt", "source a", false),
("/file_root/b.txt", "source b", false),
])
.await;
let (addr2, _store2) = start_mock_server_with_seed(vec![
("/file_root/b.txt", "target b (old)", false),
("/file_root/c.txt", "target c", false),
])
.await;
let webdav1 = AccountConfig {
id: "webdav1".to_string(),
provider: cloud_disk_sync::config::ProviderType::WebDAV,
name: "webdav1".to_string(),
credentials: {
let mut c = HashMap::new();
c.insert("url".to_string(), format!("http://{}", addr1));
c.insert("username".to_string(), "user1".to_string());
c.insert("password".to_string(), "pass1".to_string());
c
},
rate_limit: None,
retry_policy: RetryPolicy::default(),
};
let webdav2 = AccountConfig {
id: "webdav2".to_string(),
provider: cloud_disk_sync::config::ProviderType::WebDAV,
name: "webdav2".to_string(),
credentials: {
let mut c = HashMap::new();
c.insert("url".to_string(), format!("http://{}", addr2));
c.insert("username".to_string(), "user2".to_string());
c.insert("password".to_string(), "pass2".to_string());
c
},
rate_limit: None,
retry_policy: RetryPolicy::default(),
};
let src = WebDavProvider::new(&webdav1).await.unwrap();
let dst = WebDavProvider::new(&webdav2).await.unwrap();
let mut ready = false;
for _ in 0..10 {
if src.list("/file_root").await.is_ok() && dst.list("/file_root").await.is_ok() {
ready = true;
break;
}
tokio::time::sleep(Duration::from_millis(200)).await;
}
assert!(ready, "webdav mock 服务未就绪");
let mut engine = SyncEngine::new().await.unwrap();
engine.register_provider("webdav1".to_string(), Box::new(src));
engine.register_provider("webdav2".to_string(), Box::new(dst));
let task = SyncTask {
id: "t_webdav_basic".to_string(),
name: "basic webdav sync".to_string(),
source_account: "webdav1".to_string(),
source_path: "/file_root".to_string(),
target_account: "webdav2".to_string(),
target_path: "/file_root".to_string(),
schedule: None,
filters: vec![],
encryption: None,
diff_mode: DiffMode::Smart,
preserve_metadata: false,
verify_integrity: false,
sync_policy: Some(SyncPolicy {
delete_orphans: true,
overwrite_existing: true,
scan_cooldown_secs: 0,
}),
};
let _report = engine.sync(&task).await.unwrap();
let dst_provider = engine.get_provider("webdav2").unwrap();
assert!(
dst_provider.exists("/file_root/a.txt").await.unwrap(),
"a.txt 未同步到目标"
);
assert!(
!dst_provider.exists("/file_root/c.txt").await.unwrap(),
"c.txt 未删除"
);
let temp_dir = std::env::temp_dir();
let b_local = temp_dir.join("webdav_b_verify.txt");
dst_provider
.download("/file_root/b.txt", &b_local)
.await
.unwrap();
let content = tokio::fs::read(&b_local).await.unwrap();
assert_eq!(String::from_utf8_lossy(&content), "source b");
tokio::fs::remove_file(&b_local).await.ok();
}
#[tokio::test]
async fn test_webdav_sync_directory_operations() {
common::init_logging();
let (addr1, _store1) = start_mock_server_with_seed(vec![
("/new_dir/file.txt", "content", false),
("/new_dir", "", true),
])
.await;
let (addr2, _store2) = start_mock_server_with_seed(vec![
("/old_dir/old.txt", "old content", false),
("/old_dir", "", true),
])
.await;
let mut config_manager = ConfigManager::new().unwrap();
let source_account = AccountConfig {
id: "source_acc".to_string(),
name: "Source".to_string(),
provider: ProviderType::WebDAV,
credentials: {
let mut map = HashMap::new();
map.insert("url".to_string(), format!("http://{}", addr1));
map.insert("username".to_string(), "user".to_string());
map.insert("password".to_string(), "pass".to_string());
map
},
rate_limit: None,
retry_policy: RetryPolicy::default(),
};
config_manager.add_account(source_account).unwrap();
let target_account = AccountConfig {
id: "target_acc".to_string(),
name: "Target".to_string(),
provider: ProviderType::WebDAV,
credentials: {
let mut map = HashMap::new();
map.insert("url".to_string(), format!("http://{}", addr2));
map.insert("username".to_string(), "user".to_string());
map.insert("password".to_string(), "pass".to_string());
map
},
rate_limit: None,
retry_policy: RetryPolicy::default(),
};
config_manager.add_account(target_account).unwrap();
let task = SyncTask {
id: "test_dir_sync".to_string(),
name: "Directory Sync Test".to_string(),
source_account: "source_acc".to_string(),
target_account: "target_acc".to_string(),
source_path: "/".to_string(),
target_path: "/".to_string(),
sync_policy: Some(SyncPolicy {
delete_orphans: true, overwrite_existing: true,
scan_cooldown_secs: 0,
}),
schedule: None,
filters: vec![],
encryption: None,
diff_mode: DiffMode::Smart,
preserve_metadata: false,
verify_integrity: false,
};
config_manager.add_task(task.clone()).unwrap();
let source_provider = WebDavProvider::new(&config_manager.get_account("source_acc").unwrap())
.await
.unwrap();
let target_provider = WebDavProvider::new(&config_manager.get_account("target_acc").unwrap())
.await
.unwrap();
let mut engine = SyncEngine::new().await.unwrap();
engine.register_provider("source_acc".to_string(), Box::new(source_provider));
engine.register_provider("target_acc".to_string(), Box::new(target_provider));
let diff = engine.calculate_diff_for_dry_run(&task).await.unwrap();
let creates_dir = diff.files.iter().any(|f| {
f.path == "new_dir/"
&& f.action == DiffAction::Upload
&& f.source_info.as_ref().map_or(false, |i| i.is_dir)
});
let deletes_dir = diff
.files
.iter()
.any(|f| f.path == "old_dir/" && f.action == DiffAction::Delete);
assert!(
creates_dir,
"Should detect directory creation (as Upload). Diff: {:?}",
diff.files
);
assert!(
deletes_dir,
"Should detect directory deletion. Diff: {:?}",
diff.files
);
engine.sync(&task).await.unwrap();
let target_provider = WebDavProvider::new(&config_manager.get_account("target_acc").unwrap())
.await
.unwrap();
assert!(
target_provider.exists("/new_dir").await.unwrap(),
"new_dir should exist"
);
assert!(
!target_provider.exists("/old_dir").await.unwrap(),
"old_dir should be deleted"
);
}
#[tokio::test]
async fn test_webdav_sync_policy_no_delete_orphans() {
common::init_logging();
let (addr1, _s1) = start_mock_server_with_seed(vec![("/file_root/a.txt", "A", false)]).await;
let (addr2, _s2) =
start_mock_server_with_seed(vec![("/file_root/c.txt", "C OLD", false)]).await;
let src_cfg = AccountConfig {
id: "p_no_del_src".to_string(),
provider: cloud_disk_sync::config::ProviderType::WebDAV,
name: "p_no_del_src".to_string(),
credentials: {
let mut c = HashMap::new();
c.insert("url".to_string(), format!("http://{}", addr1));
c.insert("username".to_string(), "u".to_string());
c.insert("password".to_string(), "p".to_string());
c
},
rate_limit: None,
retry_policy: RetryPolicy::default(),
};
let dst_cfg = AccountConfig {
id: "p_no_del_dst".to_string(),
provider: cloud_disk_sync::config::ProviderType::WebDAV,
name: "p_no_del_dst".to_string(),
credentials: {
let mut c = HashMap::new();
c.insert("url".to_string(), format!("http://{}", addr2));
c.insert("username".to_string(), "u".to_string());
c.insert("password".to_string(), "p".to_string());
c
},
rate_limit: None,
retry_policy: RetryPolicy::default(),
};
let mut engine = SyncEngine::new().await.unwrap();
engine.register_provider(
"src_nd".to_string(),
Box::new(WebDavProvider::new(&src_cfg).await.unwrap()),
);
engine.register_provider(
"dst_nd".to_string(),
Box::new(WebDavProvider::new(&dst_cfg).await.unwrap()),
);
let task = SyncTask {
id: "t_no_del".to_string(),
name: "policy no delete".to_string(),
source_account: "src_nd".to_string(),
source_path: "/file_root".to_string(),
target_account: "dst_nd".to_string(),
target_path: "/file_root".to_string(),
schedule: None,
filters: vec![],
encryption: None,
diff_mode: DiffMode::Smart,
preserve_metadata: false,
verify_integrity: false,
sync_policy: Some(SyncPolicy {
delete_orphans: false,
overwrite_existing: true,
scan_cooldown_secs: 0,
}),
};
engine.sync(&task).await.unwrap();
let dst = engine.get_provider("dst_nd").unwrap();
assert!(
dst.exists("/file_root/a.txt").await.unwrap(),
"a.txt 未被同步"
);
assert!(
dst.exists("/file_root/c.txt").await.unwrap(),
"c.txt 不应被删除"
);
}
#[tokio::test]
async fn test_webdav_sync_policy_no_overwrite() {
common::init_logging();
let (addr1, _s1) =
start_mock_server_with_seed(vec![("/file_root/b.txt", "B NEW", false)]).await;
let (addr2, _s2) =
start_mock_server_with_seed(vec![("/file_root/b.txt", "B OLD", false)]).await;
let src_cfg = AccountConfig {
id: "p_no_ov_src".to_string(),
provider: cloud_disk_sync::config::ProviderType::WebDAV,
name: "p_no_ov_src".to_string(),
credentials: {
let mut c = HashMap::new();
c.insert("url".to_string(), format!("http://{}", addr1));
c.insert("username".to_string(), "u".to_string());
c.insert("password".to_string(), "p".to_string());
c
},
rate_limit: None,
retry_policy: RetryPolicy::default(),
};
let dst_cfg = AccountConfig {
id: "p_no_ov_dst".to_string(),
provider: cloud_disk_sync::config::ProviderType::WebDAV,
name: "p_no_ov_dst".to_string(),
credentials: {
let mut c = HashMap::new();
c.insert("url".to_string(), format!("http://{}", addr2));
c.insert("username".to_string(), "u".to_string());
c.insert("password".to_string(), "p".to_string());
c
},
rate_limit: None,
retry_policy: RetryPolicy::default(),
};
let mut engine = SyncEngine::new().await.unwrap();
engine.register_provider(
"src_no".to_string(),
Box::new(WebDavProvider::new(&src_cfg).await.unwrap()),
);
engine.register_provider(
"dst_no".to_string(),
Box::new(WebDavProvider::new(&dst_cfg).await.unwrap()),
);
let task = SyncTask {
id: "t_no_ov".to_string(),
name: "policy no overwrite".to_string(),
source_account: "src_no".to_string(),
source_path: "/file_root".to_string(),
target_account: "dst_no".to_string(),
target_path: "/file_root".to_string(),
schedule: None,
filters: vec![],
encryption: None,
diff_mode: DiffMode::Smart,
preserve_metadata: false,
verify_integrity: false,
sync_policy: Some(SyncPolicy {
delete_orphans: true,
overwrite_existing: false,
scan_cooldown_secs: 0,
}),
};
engine.sync(&task).await.unwrap();
let dst = engine.get_provider("dst_no").unwrap();
let temp_dir = std::env::temp_dir();
let verify = temp_dir.join("policy_no_overwrite_b.txt");
dst.download("/file_root/b.txt", &verify).await.unwrap();
let content = tokio::fs::read(&verify).await.unwrap();
assert_eq!(String::from_utf8_lossy(&content), "B OLD");
tokio::fs::remove_file(&verify).await.ok();
}
#[tokio::test]
async fn test_webdav_diff_scan_cooldown() {
common::init_logging();
let (addr1, store1) =
start_mock_server_with_seed(vec![("/file_root/a.txt", "A1", false)]).await;
let (addr2, _s2) = start_mock_server_with_seed(vec![("/file_root/a.txt", "A0", false)]).await;
let src_cfg = AccountConfig {
id: "p_scan_src".to_string(),
provider: cloud_disk_sync::config::ProviderType::WebDAV,
name: "p_scan_src".to_string(),
credentials: {
let mut c = HashMap::new();
c.insert("url".to_string(), format!("http://{}", addr1));
c.insert("username".to_string(), "u".to_string());
c.insert("password".to_string(), "p".to_string());
c
},
rate_limit: None,
retry_policy: RetryPolicy::default(),
};
let dst_cfg = AccountConfig {
id: "p_scan_dst".to_string(),
provider: cloud_disk_sync::config::ProviderType::WebDAV,
name: "p_scan_dst".to_string(),
credentials: {
let mut c = HashMap::new();
c.insert("url".to_string(), format!("http://{}", addr2));
c.insert("username".to_string(), "u".to_string());
c.insert("password".to_string(), "p".to_string());
c
},
rate_limit: None,
retry_policy: RetryPolicy::default(),
};
let mut engine = SyncEngine::new().await.unwrap();
engine.register_provider(
"src_sc".to_string(),
Box::new(WebDavProvider::new(&src_cfg).await.unwrap()),
);
engine.register_provider(
"dst_sc".to_string(),
Box::new(WebDavProvider::new(&dst_cfg).await.unwrap()),
);
let task1 = SyncTask {
id: "t_scan_1".to_string(),
name: "scan 1".to_string(),
source_account: "src_sc".to_string(),
source_path: "/file_root".to_string(),
target_account: "dst_sc".to_string(),
target_path: "/file_root".to_string(),
schedule: None,
filters: vec![],
encryption: None,
diff_mode: DiffMode::Smart,
preserve_metadata: false,
verify_integrity: false,
sync_policy: Some(SyncPolicy {
delete_orphans: true,
overwrite_existing: true,
scan_cooldown_secs: 100,
}),
};
engine.sync(&task1).await.unwrap();
{
let mut files = store1.write().await;
files.insert(
"/file_root/a2.txt".to_string(),
InMemoryFile {
content: b"A2".to_vec(),
is_dir: false,
},
);
}
let task2 = SyncTask {
id: "t_scan_2".to_string(),
name: "scan 2".to_string(),
..task1.clone()
};
engine.sync(&task2).await.unwrap();
let dst = engine.get_provider("dst_sc").unwrap();
assert!(
!dst.exists("/file_root/a2.txt").await.unwrap(),
"冷却期内不应同步 a2.txt"
);
let mut engine2 = SyncEngine::new().await.unwrap();
engine2.register_provider(
"src_sc".to_string(),
Box::new(WebDavProvider::new(&src_cfg).await.unwrap()),
);
engine2.register_provider(
"dst_sc".to_string(),
Box::new(WebDavProvider::new(&dst_cfg).await.unwrap()),
);
let task3 = SyncTask {
id: "t_scan_3".to_string(),
name: "scan 3".to_string(),
sync_policy: Some(SyncPolicy {
delete_orphans: true,
overwrite_existing: true,
scan_cooldown_secs: 0,
}),
..task1
};
engine2.sync(&task3).await.unwrap();
let dst2 = engine2.get_provider("dst_sc").unwrap();
assert!(
dst2.exists("/file_root/a2.txt").await.unwrap(),
"关闭限频后应同步 a2.txt"
);
}