use crate::streaming::protocol::BlockChecksum;
use bytes::Bytes;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::mpsc;
pub const GENERATOR_CHANNEL_SIZE: usize = 1024;
pub const SENDER_CHANNEL_SIZE: usize = 64;
pub const DATA_CHUNK_SIZE: usize = 256 * 1024;
pub const DELTA_CHUNK_SIZE: usize = 16 * 1024 * 1024;
pub const DELTA_MIN_SIZE: u64 = 64 * 1024;
#[derive(Debug, Clone)]
pub struct FileJob {
pub path: Arc<PathBuf>,
pub size: u64,
pub mtime: i64,
pub mode: u32,
pub inode: u64,
pub need_delta: bool,
pub checksums: Option<DeltaInfo>,
}
#[derive(Debug, Clone)]
pub struct DeltaInfo {
pub block_size: u32,
pub file_size: u64,
pub checksums: Vec<BlockChecksum>,
}
#[derive(Debug)]
pub struct DataChunk {
pub path: Arc<PathBuf>,
pub offset: u64,
pub data: Bytes,
pub is_final: bool,
pub is_delta: bool,
pub is_compressed: bool,
}
#[derive(Debug)]
pub enum GeneratorMessage {
File(FileJob),
Mkdir { path: Arc<PathBuf>, mode: u32 },
Symlink { path: Arc<PathBuf>, target: String },
Delete { path: Arc<PathBuf>, is_dir: bool },
FileEnd { total_files: u64, total_bytes: u64 },
DeleteEnd { count: u64 },
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SyncDirection {
Push,
Pull,
}
pub type FileJobSender = mpsc::Sender<GeneratorMessage>;
pub type FileJobReceiver = mpsc::Receiver<GeneratorMessage>;
pub fn file_job_channel() -> (FileJobSender, FileJobReceiver) {
mpsc::channel(GENERATOR_CHANNEL_SIZE)
}
#[derive(Debug, Clone)]
pub struct DestFileState {
pub size: u64,
pub mtime: i64,
pub mode: u32,
pub is_dir: bool,
pub delta_info: Option<DeltaInfo>,
}
#[derive(Debug, Default)]
pub struct DestIndex {
files: std::collections::HashMap<String, DestFileState>,
}
impl DestIndex {
pub fn new() -> Self {
Self::default()
}
pub fn insert(&mut self, path: String, state: DestFileState) {
self.files.insert(path, state);
}
pub fn get(&self, path: &str) -> Option<&DestFileState> {
self.files.get(path)
}
pub fn remove(&mut self, path: &str) -> Option<DestFileState> {
self.files.remove(path)
}
pub fn contains(&self, path: &str) -> bool {
self.files.contains_key(path)
}
pub fn remaining_paths(&self) -> impl Iterator<Item = (&String, &DestFileState)> {
self.files.iter()
}
pub fn len(&self) -> usize {
self.files.len()
}
pub fn is_empty(&self) -> bool {
self.files.is_empty()
}
}
#[derive(Debug, Default, Clone)]
pub struct SyncStats {
pub files_ok: u64,
pub files_err: u64,
pub bytes_transferred: u64,
pub delta_files: u64,
pub delta_bytes_saved: u64,
pub dirs_created: u64,
pub symlinks_created: u64,
pub deleted: u64,
pub hardlinks_created: u64,
}
impl SyncStats {
pub fn new() -> Self {
Self::default()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_dest_index() {
let mut index = DestIndex::new();
index.insert("file.txt".to_string(), DestFileState { size: 1024, mtime: 1234567890, mode: 0o644, is_dir: false, delta_info: None });
assert!(index.contains("file.txt"));
assert!(!index.contains("other.txt"));
let state = index.get("file.txt").unwrap();
assert_eq!(state.size, 1024);
index.remove("file.txt");
assert!(!index.contains("file.txt"));
}
#[tokio::test]
async fn test_file_job_channel() {
let (tx, mut rx) = file_job_channel();
let job = GeneratorMessage::File(FileJob {
path: Arc::new(PathBuf::from("test.txt")),
size: 100,
mtime: 0,
mode: 0o644,
inode: 0,
need_delta: false,
checksums: None,
});
tx.send(job).await.unwrap();
drop(tx);
let received = rx.recv().await.unwrap();
match received {
GeneratorMessage::File(job) => {
assert_eq!(job.path.as_ref(), &PathBuf::from("test.txt"));
assert_eq!(job.size, 100);
}
_ => panic!("Expected File message"),
}
}
#[test]
fn test_sync_stats() {
let mut stats = SyncStats::new();
stats.files_ok = 100;
stats.bytes_transferred = 1024 * 1024;
stats.delta_files = 10;
stats.delta_bytes_saved = 512 * 1024;
assert_eq!(stats.files_ok, 100);
assert_eq!(stats.delta_files, 10);
}
}