use crate::delta::generator::{DeltaOp, generate_delta_streaming};
use crate::streaming::channel::{DATA_CHUNK_SIZE, DELTA_CHUNK_SIZE, DeltaInfo, FileJob, FileJobReceiver, GeneratorMessage};
use crate::streaming::protocol::{Data, DataEnd, DataFlags, Delete, DeleteEnd, FileEnd, FileEntry, FileFlags, Mkdir, Symlink};
use anyhow::{Context, Result};
use bytes::Bytes;
use std::path::{Path, PathBuf};
use tokio::fs::File;
use tokio::io::{AsyncReadExt, BufReader};
pub struct SenderConfig {
pub root: PathBuf,
pub compress: bool,
}
pub struct Sender {
config: SenderConfig,
}
impl Sender {
pub fn new(config: SenderConfig) -> Self {
Self { config }
}
pub async fn run<F>(self, mut rx: FileJobReceiver, mut on_data: F) -> Result<()>
where
F: FnMut(Bytes) -> Result<()>,
{
while let Some(msg) = rx.recv().await {
match msg {
GeneratorMessage::File(job) => {
self.process_file(job, &mut on_data).await?;
}
GeneratorMessage::Mkdir { path, mode } => {
let msg = Mkdir { path: path.to_string_lossy().to_string(), mode };
on_data(msg.encode())?;
}
GeneratorMessage::Symlink { path, target } => {
let msg = Symlink { path: path.to_string_lossy().to_string(), target };
on_data(msg.encode())?;
}
GeneratorMessage::Delete { path, is_dir } => {
let msg = Delete { path: path.to_string_lossy().to_string(), is_dir };
on_data(msg.encode())?;
}
GeneratorMessage::FileEnd { total_files, total_bytes } => {
let msg = FileEnd { total_files, total_bytes };
on_data(msg.encode())?;
}
GeneratorMessage::DeleteEnd { count } => {
let msg = DeleteEnd { count };
on_data(msg.encode())?;
}
}
}
Ok(())
}
async fn process_file<F>(&self, job: FileJob, on_data: &mut F) -> Result<()>
where
F: FnMut(Bytes) -> Result<()>,
{
let path_str = job.path.to_string_lossy().to_string();
let full_path = self.config.root.join(job.path.as_ref());
let entry = FileEntry {
path: path_str.clone(),
size: job.size,
mtime: job.mtime,
mode: job.mode,
inode: job.inode,
flags: FileFlags::empty(),
symlink_target: None,
link_target: None,
};
on_data(entry.encode())?;
if job.need_delta {
if let Some(checksums) = job.checksums {
self.send_delta(&full_path, &path_str, checksums, on_data).await?;
} else {
self.send_full(&full_path, &path_str, on_data).await?;
}
} else {
self.send_full(&full_path, &path_str, on_data).await?;
}
let end = DataEnd { path: path_str, status: DataEnd::STATUS_OK };
on_data(end.encode())?;
Ok(())
}
async fn send_full<F>(&self, path: &Path, path_str: &str, on_data: &mut F) -> Result<()>
where
F: FnMut(Bytes) -> Result<()>,
{
let file = File::open(path).await.context("Failed to open file for full transfer")?;
let mut reader = BufReader::new(file);
let mut offset = 0u64;
let mut buf = vec![0u8; DATA_CHUNK_SIZE];
loop {
let n = reader.read(&mut buf).await?;
if n == 0 {
break;
}
let mut flags = DataFlags::empty();
if self.config.compress {
flags |= DataFlags::COMPRESSED;
}
let data = Data { path: path_str.to_string(), offset, flags, data: Bytes::copy_from_slice(&buf[..n]) };
on_data(data.encode())?;
offset += n as u64;
}
Ok(())
}
async fn send_delta<F>(&self, path: &Path, path_str: &str, delta_info: DeltaInfo, on_data: &mut F) -> Result<()>
where
F: FnMut(Bytes) -> Result<()>,
{
let block_size = delta_info.block_size as usize;
let file_size = delta_info.file_size;
let num_checksums = delta_info.checksums.len();
let dest_checksums: Vec<_> = delta_info
.checksums
.iter()
.enumerate()
.map(|(i, c)| {
let actual_size = if i == num_checksums - 1 {
let remaining = file_size.saturating_sub(c.offset);
remaining.min(block_size as u64) as usize
} else {
block_size
};
crate::delta::BlockChecksum { index: i as u64, offset: c.offset, size: actual_size, weak: c.weak, strong: c.strong }
})
.collect();
let p = path.to_path_buf();
let delta = tokio::task::spawn_blocking(move || generate_delta_streaming(&p, &dest_checksums, block_size)).await??;
let mut flags = DataFlags::DELTA;
if self.config.compress {
flags |= DataFlags::COMPRESSED;
}
let mut delta_bytes = Vec::new();
for op in delta.ops {
let op_bytes = match &op {
DeltaOp::Copy { offset, size } => {
let mut buf = Vec::with_capacity(13);
buf.push(0x00);
buf.extend_from_slice(&offset.to_be_bytes());
buf.extend_from_slice(&(*size as u32).to_be_bytes());
buf
}
DeltaOp::Data(data) => {
let mut buf = Vec::with_capacity(5 + data.len());
buf.push(0x01);
buf.extend_from_slice(&(data.len() as u32).to_be_bytes());
buf.extend_from_slice(data);
buf
}
};
if !delta_bytes.is_empty() && delta_bytes.len() + op_bytes.len() > DELTA_CHUNK_SIZE {
let data = Data {
path: path_str.to_string(),
offset: 0, flags,
data: Bytes::from(std::mem::take(&mut delta_bytes)),
};
on_data(data.encode())?;
}
delta_bytes.extend(op_bytes);
}
if !delta_bytes.is_empty() {
let data = Data {
path: path_str.to_string(),
offset: 0, flags,
data: Bytes::from(delta_bytes),
};
on_data(data.encode())?;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::streaming::protocol::BlockChecksum;
use std::fs;
use std::sync::Arc;
use tempfile::TempDir;
#[tokio::test]
async fn test_sender_simple_file() {
let tmp = TempDir::new().unwrap();
let file_path = tmp.path().join("test.txt");
fs::write(&file_path, "hello world").unwrap();
let config = SenderConfig { root: tmp.path().to_path_buf(), compress: false };
let (tx, rx) = crate::streaming::channel::file_job_channel();
let sender = Sender::new(config);
tx.send(GeneratorMessage::File(FileJob {
path: Arc::new(PathBuf::from("test.txt")),
size: 11,
mtime: 0,
mode: 0o644,
inode: 0,
need_delta: false,
checksums: None,
}))
.await
.unwrap();
tx.send(GeneratorMessage::FileEnd { total_files: 1, total_bytes: 11 }).await.unwrap();
drop(tx);
let mut messages = Vec::new();
sender
.run(rx, |bytes| {
messages.push(bytes);
Ok(())
})
.await
.unwrap();
assert!(messages.len() >= 4);
}
#[tokio::test]
async fn test_sender_delta_file() {
let tmp = TempDir::new().unwrap();
let file_path = tmp.path().join("test.txt");
let content = "new content that differs from original";
fs::write(&file_path, content).unwrap();
let config = SenderConfig { root: tmp.path().to_path_buf(), compress: false };
let (tx, rx) = crate::streaming::channel::file_job_channel();
let sender = Sender::new(config);
let delta_info = DeltaInfo {
block_size: 16,
file_size: 32,
checksums: vec![BlockChecksum { offset: 0, weak: 0xDEADBEEF, strong: 0x0 }, BlockChecksum { offset: 16, weak: 0xCAFEBABE, strong: 0x1 }],
};
tx.send(GeneratorMessage::File(FileJob {
path: Arc::new(PathBuf::from("test.txt")),
size: content.len() as u64,
mtime: 0,
mode: 0o644,
inode: 0,
need_delta: true,
checksums: Some(delta_info),
}))
.await
.unwrap();
tx.send(GeneratorMessage::FileEnd { total_files: 1, total_bytes: content.len() as u64 }).await.unwrap();
drop(tx);
let mut messages = Vec::new();
sender
.run(rx, |bytes| {
messages.push(bytes);
Ok(())
})
.await
.unwrap();
assert!(messages.len() >= 4, "Expected at least 4 messages");
let data_msg = &messages[1];
assert_eq!(data_msg[4], 0x06, "Expected DATA message type");
let path_len = u16::from_be_bytes([data_msg[5], data_msg[6]]) as usize;
let flags_offset = 4 + 1 + 2 + path_len + 8; let flags = data_msg[flags_offset];
assert!(flags & DataFlags::DELTA.bits() != 0, "Expected DELTA flag to be set");
}
#[tokio::test]
async fn test_delta_always_uses_zero_offset() {
let tmp = TempDir::new().unwrap();
let file_path = tmp.path().join("large.txt");
let content = "a".repeat(100_000); fs::write(&file_path, &content).unwrap();
let config = SenderConfig { root: tmp.path().to_path_buf(), compress: false };
let (tx, rx) = crate::streaming::channel::file_job_channel();
let sender = Sender::new(config);
let delta_info = DeltaInfo {
block_size: 1024,
file_size: 2048,
checksums: vec![
BlockChecksum { offset: 0, weak: 0x12345678, strong: 0x99 },
BlockChecksum { offset: 1024, weak: 0x87654321, strong: 0x88 },
],
};
tx.send(GeneratorMessage::File(FileJob {
path: Arc::new(PathBuf::from("large.txt")),
size: content.len() as u64,
mtime: 0,
mode: 0o644,
inode: 0,
need_delta: true,
checksums: Some(delta_info),
}))
.await
.unwrap();
tx.send(GeneratorMessage::FileEnd { total_files: 1, total_bytes: content.len() as u64 }).await.unwrap();
drop(tx);
let mut messages = Vec::new();
sender
.run(rx, |bytes| {
messages.push(bytes);
Ok(())
})
.await
.unwrap();
for msg in &messages {
if msg.len() > 4 && msg[4] == 0x06 {
let path_len = u16::from_be_bytes([msg[5], msg[6]]) as usize;
let offset_start = 4 + 1 + 2 + path_len; let offset = u64::from_be_bytes([
msg[offset_start],
msg[offset_start + 1],
msg[offset_start + 2],
msg[offset_start + 3],
msg[offset_start + 4],
msg[offset_start + 5],
msg[offset_start + 6],
msg[offset_start + 7],
]);
assert_eq!(offset, 0, "Delta Data message should always use offset 0");
}
}
}
}