pub mod dual;
#[cfg(feature = "gcs")]
pub mod gcs;
pub mod local;
pub mod router;
#[cfg(feature = "s3")]
pub mod s3;
pub mod server;
#[cfg(feature = "ssh")]
pub mod ssh;
use crate::error::Result;
use crate::sync::scanner::FileEntry;
use async_trait::async_trait;
use futures::stream::{BoxStream, StreamExt};
use std::path::Path;
use std::time::SystemTime;
#[derive(Debug, Clone, Copy)]
pub struct FileInfo {
pub size: u64,
pub modified: SystemTime,
}
#[derive(Debug, Clone, Copy)]
pub struct TransferResult {
pub bytes_written: u64,
pub delta_operations: Option<usize>,
pub literal_bytes: Option<u64>,
pub transferred_bytes: Option<u64>,
pub compression_used: bool,
}
impl TransferResult {
pub fn new(bytes_written: u64) -> Self {
Self { bytes_written, delta_operations: None, literal_bytes: None, transferred_bytes: None, compression_used: false }
}
pub fn with_delta(bytes_written: u64, delta_operations: usize, literal_bytes: u64) -> Self {
Self {
bytes_written,
delta_operations: Some(delta_operations),
literal_bytes: Some(literal_bytes),
transferred_bytes: None,
compression_used: false,
}
}
pub fn with_compression(bytes_written: u64, transferred_bytes: u64) -> Self {
Self {
bytes_written,
delta_operations: None,
literal_bytes: None,
transferred_bytes: Some(transferred_bytes),
compression_used: true,
}
}
pub fn used_delta(&self) -> bool {
self.delta_operations.is_some()
}
#[allow(dead_code)]
pub fn compression_ratio(&self) -> Option<f64> {
if let (Some(literal), true) = (self.literal_bytes, self.bytes_written > 0) {
Some((literal as f64 / self.bytes_written as f64) * 100.0)
} else {
None
}
}
}
#[async_trait]
#[allow(dead_code)] pub trait Transport: Send + Sync {
fn set_scan_options(&mut self, _options: crate::sync::scanner::ScanOptions) {
}
async fn prepare_for_transfer(&self, _file_count: usize) -> Result<()> {
Ok(())
}
async fn scan(&self, path: &Path) -> Result<Vec<FileEntry>>;
async fn scan_streaming(&self, path: &Path) -> Result<BoxStream<'static, Result<FileEntry>>> {
let entries = self.scan(path).await?;
Ok(futures::stream::iter(entries.into_iter().map(Ok)).boxed())
}
async fn exists(&self, path: &Path) -> Result<bool>;
async fn metadata(&self, path: &Path) -> Result<std::fs::Metadata>;
async fn file_info(&self, path: &Path) -> Result<FileInfo> {
let meta = self.metadata(path).await?;
let modified = meta
.modified()
.map_err(|e| crate::error::SyncError::Io(std::io::Error::new(e.kind(), format!("Failed to get mtime for {}: {}", path.display(), e))))?;
Ok(FileInfo { size: meta.len(), modified })
}
async fn create_dir_all(&self, path: &Path) -> Result<()>;
async fn create_dirs_batch(&self, paths: &[&Path]) -> Result<()> {
for path in paths {
self.create_dir_all(path).await?;
}
Ok(())
}
async fn copy_file(&self, source: &Path, dest: &Path) -> Result<TransferResult>;
async fn sync_file_with_delta(&self, source: &Path, dest: &Path) -> Result<TransferResult> {
self.copy_file(source, dest).await
}
async fn remove(&self, path: &Path, is_dir: bool) -> Result<()>;
async fn create_hardlink(&self, source: &Path, dest: &Path) -> Result<()>;
async fn create_symlink(&self, target: &Path, dest: &Path) -> Result<()>;
async fn read_file(&self, path: &Path) -> Result<Vec<u8>> {
tokio::fs::read(path)
.await
.map_err(|e| crate::error::SyncError::Io(std::io::Error::new(e.kind(), format!("Failed to read file {}: {}", path.display(), e))))
}
async fn compute_checksum(&self, path: &Path, verifier: &crate::integrity::IntegrityVerifier) -> Result<crate::integrity::Checksum> {
let path = path.to_path_buf();
let verifier = verifier.clone();
tokio::task::spawn_blocking(move || verifier.compute_file_checksum(&path))
.await
.map_err(|e| crate::error::SyncError::Io(std::io::Error::other(e.to_string())))?
}
async fn write_file(&self, path: &Path, data: &[u8], mtime: std::time::SystemTime) -> Result<()> {
use tokio::io::AsyncWriteExt;
if let Some(parent) = path.parent() {
tokio::fs::create_dir_all(parent).await?;
}
let mut file = tokio::fs::File::create(path).await?;
file.write_all(data).await?;
file.flush().await?;
drop(file);
filetime::set_file_mtime(path, filetime::FileTime::from_system_time(mtime))?;
Ok(())
}
async fn get_mtime(&self, path: &Path) -> Result<std::time::SystemTime> {
let metadata = tokio::fs::metadata(path).await?;
metadata
.modified()
.map_err(|e| crate::error::SyncError::Io(std::io::Error::new(e.kind(), format!("Failed to get mtime for {}: {}", path.display(), e))))
}
async fn copy_file_streaming(
&self, source: &Path, dest: &Path, progress_callback: Option<std::sync::Arc<dyn Fn(u64, u64) + Send + Sync>>,
) -> Result<TransferResult> {
let data = self.read_file(source).await?;
let total_size = data.len() as u64;
let mtime = self.get_mtime(source).await?;
if let Some(callback) = &progress_callback {
callback(0, total_size);
}
self.write_file(dest, &data, mtime).await?;
if let Some(callback) = &progress_callback {
callback(total_size, total_size);
}
Ok(TransferResult::new(total_size))
}
async fn check_disk_space(&self, path: &Path, bytes_needed: u64) -> Result<()> {
crate::resource::check_disk_space(path, bytes_needed)
}
async fn set_xattrs(&self, path: &Path, xattrs: &[(String, Vec<u8>)]) -> Result<()> {
#[cfg(unix)]
{
let path = path.to_path_buf();
let xattrs = xattrs.to_vec();
tokio::task::spawn_blocking(move || {
for (name, value) in xattrs {
if let Err(e) = xattr::set(&path, &name, &value) {
tracing::warn!("Failed to set xattr {} on {}: {}", name, path.display(), e);
}
}
})
.await
.map_err(|e| crate::error::SyncError::Io(std::io::Error::other(e.to_string())))?;
}
#[cfg(not(unix))]
{
let _ = (path, xattrs);
}
Ok(())
}
async fn set_acls(&self, path: &Path, acls_text: &str) -> Result<()> {
#[cfg(all(unix, feature = "acl"))]
{
use exacl::{AclEntry, setfacl};
use std::str::FromStr;
let path = path.to_path_buf();
let acls_text = acls_text.to_string();
tokio::task::spawn_blocking(move || {
let mut acl_entries = Vec::new();
for line in acls_text.lines() {
let line = line.trim();
if line.is_empty() {
continue;
}
match AclEntry::from_str(line) {
Ok(entry) => acl_entries.push(entry),
Err(e) => {
tracing::warn!("Failed to parse ACL entry '{}' for {}: {}", line, path.display(), e);
continue;
}
}
}
if !acl_entries.is_empty()
&& let Err(e) = setfacl(&[&path], &acl_entries, None)
{
tracing::warn!("Failed to set ACLs on {}: {}", path.display(), e);
}
})
.await
.map_err(|e| crate::error::SyncError::Io(std::io::Error::other(e.to_string())))?;
}
#[cfg(not(all(unix, feature = "acl")))]
{
let _ = (path, acls_text);
}
Ok(())
}
async fn set_bsd_flags(&self, path: &Path, flags: u32) -> Result<()> {
#[cfg(target_os = "macos")]
{
use std::ffi::CString;
let path = path.to_path_buf();
tokio::task::spawn_blocking(move || {
let c_path = match CString::new(path.to_str().unwrap_or("")) {
Ok(p) => p,
Err(e) => {
tracing::warn!("Failed to create C string for {}: {}", path.display(), e);
return;
}
};
let result = unsafe { libc::chflags(c_path.as_ptr(), flags as _) };
if result != 0 {
tracing::warn!("Failed to set BSD flags on {}: {}", path.display(), std::io::Error::last_os_error());
}
})
.await
.map_err(|e| crate::error::SyncError::Io(std::io::Error::other(e.to_string())))?;
}
#[cfg(not(target_os = "macos"))]
{
let _ = (path, flags);
}
Ok(())
}
async fn bulk_copy_files(&self, source_base: &Path, dest_base: &Path, relative_paths: &[&Path]) -> Result<u64> {
let mut total_bytes = 0u64;
for rel_path in relative_paths {
let source = source_base.join(rel_path);
let dest = dest_base.join(rel_path);
match self.copy_file(&source, &dest).await {
Ok(result) => total_bytes += result.bytes_written,
Err(e) => {
tracing::warn!("Failed to copy {}: {}", source.display(), e);
}
}
}
Ok(total_bytes)
}
}
#[async_trait]
impl<T: Transport + ?Sized> Transport for std::sync::Arc<T> {
async fn scan(&self, path: &Path) -> Result<Vec<FileEntry>> {
(**self).scan(path).await
}
async fn scan_streaming(&self, path: &Path) -> Result<BoxStream<'static, Result<FileEntry>>> {
(**self).scan_streaming(path).await
}
async fn exists(&self, path: &Path) -> Result<bool> {
(**self).exists(path).await
}
async fn metadata(&self, path: &Path) -> Result<std::fs::Metadata> {
(**self).metadata(path).await
}
async fn file_info(&self, path: &Path) -> Result<FileInfo> {
(**self).file_info(path).await
}
async fn create_dir_all(&self, path: &Path) -> Result<()> {
(**self).create_dir_all(path).await
}
async fn create_dirs_batch(&self, paths: &[&Path]) -> Result<()> {
(**self).create_dirs_batch(paths).await
}
async fn copy_file(&self, source: &Path, dest: &Path) -> Result<TransferResult> {
(**self).copy_file(source, dest).await
}
async fn sync_file_with_delta(&self, source: &Path, dest: &Path) -> Result<TransferResult> {
(**self).sync_file_with_delta(source, dest).await
}
async fn remove(&self, path: &Path, is_dir: bool) -> Result<()> {
(**self).remove(path, is_dir).await
}
async fn create_hardlink(&self, source: &Path, dest: &Path) -> Result<()> {
(**self).create_hardlink(source, dest).await
}
async fn create_symlink(&self, target: &Path, dest: &Path) -> Result<()> {
(**self).create_symlink(target, dest).await
}
async fn read_file(&self, path: &Path) -> Result<Vec<u8>> {
(**self).read_file(path).await
}
async fn write_file(&self, path: &Path, data: &[u8], mtime: std::time::SystemTime) -> Result<()> {
(**self).write_file(path, data, mtime).await
}
async fn get_mtime(&self, path: &Path) -> Result<std::time::SystemTime> {
(**self).get_mtime(path).await
}
async fn copy_file_streaming(
&self, source: &Path, dest: &Path, progress_callback: Option<std::sync::Arc<dyn Fn(u64, u64) + Send + Sync>>,
) -> Result<TransferResult> {
(**self).copy_file_streaming(source, dest, progress_callback).await
}
async fn check_disk_space(&self, path: &Path, bytes_needed: u64) -> Result<()> {
(**self).check_disk_space(path, bytes_needed).await
}
async fn set_xattrs(&self, path: &Path, xattrs: &[(String, Vec<u8>)]) -> Result<()> {
(**self).set_xattrs(path, xattrs).await
}
async fn set_acls(&self, path: &Path, acls_text: &str) -> Result<()> {
(**self).set_acls(path, acls_text).await
}
async fn set_bsd_flags(&self, path: &Path, flags: u32) -> Result<()> {
(**self).set_bsd_flags(path, flags).await
}
async fn bulk_copy_files(&self, source_base: &Path, dest_base: &Path, relative_paths: &[&Path]) -> Result<u64> {
(**self).bulk_copy_files(source_base, dest_base, relative_paths).await
}
}