msy 0.4.5

Modern musl rsync alternative - Fast, parallel file synchronization
Documentation
//! Channel types for streaming sync pipeline.
//!
//! Three-task pipeline: Generator -> Sender -> Receiver
//! Using bounded channels for backpressure.

use crate::streaming::protocol::BlockChecksum;
use bytes::Bytes;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::mpsc;

/// Channel size for Generator -> Sender (file entries)
pub const GENERATOR_CHANNEL_SIZE: usize = 1024;

/// Channel size for Sender -> Receiver (data chunks)
pub const SENDER_CHANNEL_SIZE: usize = 64;

/// Data chunk size for transfer
pub const DATA_CHUNK_SIZE: usize = 256 * 1024; // 256KB

/// Maximum delta chunk size (16MB - well under 64MB frame limit)
pub const DELTA_CHUNK_SIZE: usize = 16 * 1024 * 1024;

/// Minimum file size for delta sync
pub const DELTA_MIN_SIZE: u64 = 64 * 1024; // 64KB

// =============================================================================
// FileJob: Generator -> Sender
// =============================================================================

/// A file job sent from Generator to Sender.
/// Contains all information needed to read and transfer the file.
#[derive(Debug, Clone)]
pub struct FileJob {
	/// Path relative to sync root
	pub path: Arc<PathBuf>,

	/// File size in bytes
	pub size: u64,

	/// Modification time (Unix timestamp)
	pub mtime: i64,

	/// File mode/permissions
	pub mode: u32,

	/// Inode number (for hard link detection)
	pub inode: u64,

	/// Whether this file needs delta transfer
	pub need_delta: bool,

	/// Block checksums from destination (for delta computation)
	/// Only present if need_delta is true and file exists on dest
	pub checksums: Option<DeltaInfo>,
}

/// Delta information from destination file
#[derive(Debug, Clone)]
pub struct DeltaInfo {
	/// Block size used for checksums
	pub block_size: u32,

	/// Destination file size (needed to calculate last block size)
	pub file_size: u64,

	/// Block checksums
	pub checksums: Vec<BlockChecksum>,
}

// =============================================================================
// DataChunk: Sender -> wire
// =============================================================================

/// A chunk of file data ready for transmission.
#[derive(Debug)]
pub struct DataChunk {
	/// Path relative to sync root
	pub path: Arc<PathBuf>,

	/// Offset within file
	pub offset: u64,

	/// Data content
	pub data: Bytes,

	/// Whether this is the final chunk for this file
	pub is_final: bool,

	/// Whether this is delta data
	pub is_delta: bool,

	/// Whether the data is compressed
	pub is_compressed: bool,
}

// =============================================================================
// Pipeline messages
// =============================================================================

/// Message from Generator to the rest of the pipeline
#[derive(Debug)]
pub enum GeneratorMessage {
	/// A file that needs to be transferred
	File(FileJob),

	/// A directory that needs to be created
	Mkdir { path: Arc<PathBuf>, mode: u32 },

	/// A symlink that needs to be created
	Symlink { path: Arc<PathBuf>, target: String },

	/// A file or directory that needs to be deleted
	Delete { path: Arc<PathBuf>, is_dir: bool },

	/// End of file list - no more files coming
	FileEnd { total_files: u64, total_bytes: u64 },

	/// End of deletes
	DeleteEnd { count: u64 },
}

/// Sync direction
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SyncDirection {
	/// Local -> Remote (push)
	Push,
	/// Remote -> Local (pull)
	Pull,
}

// =============================================================================
// Channel types
// =============================================================================

/// Sender for file jobs from Generator
pub type FileJobSender = mpsc::Sender<GeneratorMessage>;

/// Receiver for file jobs in Sender task
pub type FileJobReceiver = mpsc::Receiver<GeneratorMessage>;

/// Create a bounded channel for Generator -> Sender communication
pub fn file_job_channel() -> (FileJobSender, FileJobReceiver) {
	mpsc::channel(GENERATOR_CHANNEL_SIZE)
}

// =============================================================================
// Destination state (from Initial Exchange)
// =============================================================================

/// State of a destination file, received during Initial Exchange
#[derive(Debug, Clone)]
pub struct DestFileState {
	/// File size
	pub size: u64,

	/// Modification time
	pub mtime: i64,

	/// File mode
	pub mode: u32,

	/// Whether this is a directory
	pub is_dir: bool,

	/// Block checksums for delta (if file is a delta candidate)
	pub delta_info: Option<DeltaInfo>,
}

/// Destination file index, built during Initial Exchange
#[derive(Debug, Default)]
pub struct DestIndex {
	/// Map of path -> dest state
	files: std::collections::HashMap<String, DestFileState>,
}

impl DestIndex {
	pub fn new() -> Self {
		Self::default()
	}

	pub fn insert(&mut self, path: String, state: DestFileState) {
		self.files.insert(path, state);
	}

	pub fn get(&self, path: &str) -> Option<&DestFileState> {
		self.files.get(path)
	}

	pub fn remove(&mut self, path: &str) -> Option<DestFileState> {
		self.files.remove(path)
	}

	pub fn contains(&self, path: &str) -> bool {
		self.files.contains_key(path)
	}

	/// Get all remaining paths (for delete detection)
	pub fn remaining_paths(&self) -> impl Iterator<Item = (&String, &DestFileState)> {
		self.files.iter()
	}

	pub fn len(&self) -> usize {
		self.files.len()
	}

	pub fn is_empty(&self) -> bool {
		self.files.is_empty()
	}
}

// =============================================================================
// Sync statistics
// =============================================================================

/// Statistics for a sync operation
#[derive(Debug, Default, Clone)]
pub struct SyncStats {
	/// Files successfully transferred
	pub files_ok: u64,

	/// Files that failed
	pub files_err: u64,

	/// Total bytes transferred
	pub bytes_transferred: u64,

	/// Files transferred via delta
	pub delta_files: u64,

	/// Bytes saved by delta transfer
	pub delta_bytes_saved: u64,

	/// Directories created
	pub dirs_created: u64,

	/// Symlinks created
	pub symlinks_created: u64,

	/// Files/directories deleted
	pub deleted: u64,

	/// Hard links created
	pub hardlinks_created: u64,
}

impl SyncStats {
	pub fn new() -> Self {
		Self::default()
	}
}

// =============================================================================
// Tests
// =============================================================================

#[cfg(test)]
mod tests {
	use super::*;

	#[test]
	fn test_dest_index() {
		let mut index = DestIndex::new();

		index.insert("file.txt".to_string(), DestFileState { size: 1024, mtime: 1234567890, mode: 0o644, is_dir: false, delta_info: None });

		assert!(index.contains("file.txt"));
		assert!(!index.contains("other.txt"));

		let state = index.get("file.txt").unwrap();
		assert_eq!(state.size, 1024);

		index.remove("file.txt");
		assert!(!index.contains("file.txt"));
	}

	#[tokio::test]
	async fn test_file_job_channel() {
		let (tx, mut rx) = file_job_channel();

		let job = GeneratorMessage::File(FileJob {
			path: Arc::new(PathBuf::from("test.txt")),
			size: 100,
			mtime: 0,
			mode: 0o644,
			inode: 0,
			need_delta: false,
			checksums: None,
		});

		tx.send(job).await.unwrap();
		drop(tx);

		let received = rx.recv().await.unwrap();
		match received {
			GeneratorMessage::File(job) => {
				assert_eq!(job.path.as_ref(), &PathBuf::from("test.txt"));
				assert_eq!(job.size, 100);
			}
			_ => panic!("Expected File message"),
		}
	}

	#[test]
	fn test_sync_stats() {
		let mut stats = SyncStats::new();
		stats.files_ok = 100;
		stats.bytes_transferred = 1024 * 1024;
		stats.delta_files = 10;
		stats.delta_bytes_saved = 512 * 1024;

		assert_eq!(stats.files_ok, 100);
		assert_eq!(stats.delta_files, 10);
	}
}