use anyhow::{Context, Result};
use std::fs::{self, File};
use std::io::{BufReader, BufWriter, Read, Write};
use std::path::{Path, PathBuf};
use std::time::Instant;
pub const STREAM_BUFFER_SIZE: usize = 8 * 1024;
pub const MAX_BATCH_SIZE: usize = 100_000;
#[derive(Debug, Clone, Copy)]
pub struct BatchResult {
pub files_processed: usize,
pub bytes_read: u64,
pub bytes_written: u64,
pub duration_ms: f64,
pub throughput: f64,
}
pub fn stream_copy(src: &Path, dst: &Path) -> Result<u64> {
let file_in = File::open(src)
.with_context(|| format!("cannot open {}", src.display()))?;
let file_out = File::create(dst)
.with_context(|| format!("cannot create {}", dst.display()))?;
let mut reader = BufReader::with_capacity(STREAM_BUFFER_SIZE, file_in);
let mut writer = BufWriter::with_capacity(STREAM_BUFFER_SIZE, file_out);
let mut buf = [0u8; STREAM_BUFFER_SIZE];
let mut total: u64 = 0;
loop {
let n = reader.read(&mut buf)
.with_context(|| format!("read error: {}", src.display()))?;
if n == 0 {
break;
}
writer.write_all(&buf[..n])
.with_context(|| format!("write error: {}", dst.display()))?;
total += n as u64;
}
writer.flush()
.with_context(|| format!("flush error: {}", dst.display()))?;
Ok(total)
}
pub fn stream_hash(path: &Path) -> Result<String> {
use std::hash::{DefaultHasher, Hasher};
let file = File::open(path)
.with_context(|| format!("cannot open {}", path.display()))?;
let mut reader = BufReader::with_capacity(STREAM_BUFFER_SIZE, file);
let mut hasher = DefaultHasher::new();
let mut buf = [0u8; STREAM_BUFFER_SIZE];
loop {
let n = reader.read(&mut buf)
.with_context(|| format!("read error: {}", path.display()))?;
if n == 0 {
break;
}
hasher.write(&buf[..n]);
}
Ok(format!("{:016x}", hasher.finish()))
}
pub fn process_batch<F>(
src_dir: &Path,
dst_dir: &Path,
processor: F,
) -> Result<BatchResult>
where
F: Fn(&Path, &Path) -> Result<u64>,
{
let start = Instant::now();
fs::create_dir_all(dst_dir)
.with_context(|| format!("cannot create {}", dst_dir.display()))?;
let entries: Vec<PathBuf> = collect_files_bounded(src_dir)?;
let mut bytes_read: u64 = 0;
let mut bytes_written: u64 = 0;
let mut count: usize = 0;
for src_path in &entries {
let rel = src_path.strip_prefix(src_dir)
.with_context(|| "strip_prefix failed")?;
let dst_path = dst_dir.join(rel);
if let Some(parent) = dst_path.parent() {
fs::create_dir_all(parent)?;
}
let src_size = fs::metadata(src_path)
.map(|m| m.len())
.unwrap_or(0);
let written = processor(src_path, &dst_path)?;
bytes_read += src_size;
bytes_written += written;
count += 1;
}
let elapsed = start.elapsed();
let duration_ms = elapsed.as_secs_f64() * 1000.0;
let throughput = if duration_ms > 0.0 {
count as f64 / elapsed.as_secs_f64()
} else {
f64::INFINITY
};
Ok(BatchResult {
files_processed: count,
bytes_read,
bytes_written,
duration_ms,
throughput,
})
}
fn collect_files_bounded(dir: &Path) -> Result<Vec<PathBuf>> {
let mut files = Vec::new();
let mut stack = vec![dir.to_path_buf()];
let mut iterations: usize = 0;
while let Some(current) = stack.pop() {
if iterations >= MAX_BATCH_SIZE {
break;
}
let entries = fs::read_dir(¤t)
.with_context(|| format!("cannot read {}", current.display()))?;
for entry in entries {
let path = entry?.path();
if path.is_dir() {
stack.push(path);
} else {
files.push(path);
iterations += 1;
if iterations >= MAX_BATCH_SIZE {
break;
}
}
}
}
Ok(files)
}
pub fn stream_lines<F>(path: &Path, mut line_fn: F) -> Result<usize>
where
F: FnMut(usize, &str) -> Result<()>,
{
use std::io::BufRead;
let file = File::open(path)
.with_context(|| format!("cannot open {}", path.display()))?;
let reader = BufReader::with_capacity(STREAM_BUFFER_SIZE, file);
let mut count: usize = 0;
for line in reader.lines() {
let line = line.with_context(|| format!("read error at line {count}"))?;
line_fn(count, &line)?;
count += 1;
}
Ok(count)
}
pub fn benchmark_throughput(n: usize) -> Result<BatchResult> {
let tmp = tempfile::tempdir().context("cannot create temp dir")?;
let src = tmp.path().join("src");
let dst = tmp.path().join("dst");
fs::create_dir_all(&src)?;
for i in 0..n {
fs::write(src.join(format!("f{i}.txt")), "a]".repeat(32))?;
}
process_batch(&src, &dst, |s, d| stream_copy(s, d))
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[test]
fn test_stream_copy_small_file() -> Result<()> {
let tmp = tempdir()?;
let src = tmp.path().join("src.txt");
let dst = tmp.path().join("dst.txt");
fs::write(&src, "hello world")?;
let bytes = stream_copy(&src, &dst)?;
assert_eq!(bytes, 11);
assert_eq!(fs::read_to_string(&dst)?, "hello world");
Ok(())
}
#[test]
fn test_stream_copy_large_file() -> Result<()> {
let tmp = tempdir()?;
let src = tmp.path().join("large.bin");
let dst = tmp.path().join("large_copy.bin");
let data = vec![0xABu8; 1024 * 1024];
fs::write(&src, &data)?;
let bytes = stream_copy(&src, &dst)?;
assert_eq!(bytes, 1024 * 1024);
assert_eq!(fs::read(&dst)?, data);
Ok(())
}
#[test]
fn test_stream_copy_empty_file() -> Result<()> {
let tmp = tempdir()?;
let src = tmp.path().join("empty.txt");
let dst = tmp.path().join("empty_copy.txt");
fs::write(&src, "")?;
let bytes = stream_copy(&src, &dst)?;
assert_eq!(bytes, 0);
Ok(())
}
#[test]
fn test_stream_hash_deterministic() -> Result<()> {
let tmp = tempdir()?;
let path = tmp.path().join("test.txt");
fs::write(&path, "consistent content")?;
let h1 = stream_hash(&path)?;
let h2 = stream_hash(&path)?;
assert_eq!(h1, h2);
assert_eq!(h1.len(), 16);
Ok(())
}
#[test]
fn test_stream_hash_differs_for_different_content() -> Result<()> {
let tmp = tempdir()?;
let a = tmp.path().join("a.txt");
let b = tmp.path().join("b.txt");
fs::write(&a, "content a")?;
fs::write(&b, "content b")?;
assert_ne!(stream_hash(&a)?, stream_hash(&b)?);
Ok(())
}
#[test]
fn test_stream_hash_large_file() -> Result<()> {
let tmp = tempdir()?;
let path = tmp.path().join("big.bin");
fs::write(&path, vec![0u8; 100_000])?;
let hash = stream_hash(&path)?;
assert_eq!(hash.len(), 16);
Ok(())
}
#[test]
fn test_process_batch_copies_files() -> Result<()> {
let tmp = tempdir()?;
let src = tmp.path().join("src");
let dst = tmp.path().join("dst");
fs::create_dir_all(&src)?;
for i in 0..10 {
fs::write(src.join(format!("f{i}.txt")), format!("data {i}"))?;
}
let result = process_batch(&src, &dst, |s, d| stream_copy(s, d))?;
assert_eq!(result.files_processed, 10);
assert!(result.bytes_written > 0);
assert!(result.throughput > 0.0);
Ok(())
}
#[test]
fn test_process_batch_empty_directory() -> Result<()> {
let tmp = tempdir()?;
let src = tmp.path().join("src");
let dst = tmp.path().join("dst");
fs::create_dir_all(&src)?;
let result = process_batch(&src, &dst, |s, d| stream_copy(s, d))?;
assert_eq!(result.files_processed, 0);
Ok(())
}
#[test]
fn test_process_batch_nested_dirs() -> Result<()> {
let tmp = tempdir()?;
let src = tmp.path().join("src");
let dst = tmp.path().join("dst");
fs::create_dir_all(src.join("sub/deep"))?;
fs::write(src.join("root.txt"), "root")?;
fs::write(src.join("sub/mid.txt"), "mid")?;
fs::write(src.join("sub/deep/leaf.txt"), "leaf")?;
let result = process_batch(&src, &dst, |s, d| stream_copy(s, d))?;
assert_eq!(result.files_processed, 3);
assert_eq!(fs::read_to_string(dst.join("sub/deep/leaf.txt"))?, "leaf");
Ok(())
}
#[test]
fn test_stream_lines_counts_correctly() -> Result<()> {
let tmp = tempdir()?;
let path = tmp.path().join("lines.txt");
fs::write(&path, "line1\nline2\nline3\n")?;
let count = stream_lines(&path, |_i, _line| Ok(()))?;
assert_eq!(count, 3);
Ok(())
}
#[test]
fn test_stream_lines_provides_content() -> Result<()> {
let tmp = tempdir()?;
let path = tmp.path().join("data.txt");
fs::write(&path, "alpha\nbeta\ngamma")?;
let mut collected = Vec::new();
stream_lines(&path, |_i, line| {
collected.push(line.to_string());
Ok(())
})?;
assert_eq!(collected, vec!["alpha", "beta", "gamma"]);
Ok(())
}
#[test]
fn test_collect_files_bounded_respects_limit() -> Result<()> {
let tmp = tempdir()?;
for i in 0..50 {
fs::write(tmp.path().join(format!("f{i}.txt")), "x")?;
}
let files = collect_files_bounded(tmp.path())?;
assert_eq!(files.len(), 50);
Ok(())
}
#[test]
fn test_benchmark_throughput_runs() -> Result<()> {
let result = benchmark_throughput(100)?;
assert_eq!(result.files_processed, 100);
assert!(result.throughput > 1000.0, "throughput too low: {}", result.throughput);
println!(
"Benchmark: {} files in {:.2} ms ({:.0} files/sec)",
result.files_processed, result.duration_ms, result.throughput
);
Ok(())
}
#[test]
fn test_batch_result_fields() {
let r = BatchResult {
files_processed: 10,
bytes_read: 1000,
bytes_written: 900,
duration_ms: 1.5,
throughput: 6666.0,
};
assert_eq!(r.files_processed, 10);
assert!(r.throughput > 0.0);
}
#[test]
fn test_stream_copy_nonexistent_source() {
let result = stream_copy(Path::new("/nonexistent"), Path::new("/tmp/out"));
assert!(result.is_err());
}
#[test]
fn test_stream_hash_nonexistent() {
let result = stream_hash(Path::new("/nonexistent"));
assert!(result.is_err());
}
#[test]
fn test_stream_lines_empty_file() -> Result<()> {
let tmp = tempdir()?;
let path = tmp.path().join("empty.txt");
fs::write(&path, "")?;
let count = stream_lines(&path, |_i, _line| Ok(()))?;
assert_eq!(count, 0);
Ok(())
}
#[test]
fn stream_copy_exact_buffer_size_file() -> Result<()> {
let tmp = tempdir()?;
let src = tmp.path().join("exact.bin");
let dst = tmp.path().join("exact_copy.bin");
let data = vec![0xCDu8; STREAM_BUFFER_SIZE];
fs::write(&src, &data)?;
let bytes = stream_copy(&src, &dst)?;
assert_eq!(bytes, STREAM_BUFFER_SIZE as u64);
assert_eq!(fs::read(&dst)?, data);
Ok(())
}
#[test]
fn stream_hash_empty_file() -> Result<()> {
let tmp = tempdir()?;
let path = tmp.path().join("empty.bin");
fs::write(&path, b"")?;
let h1 = stream_hash(&path)?;
let h2 = stream_hash(&path)?;
assert_eq!(h1, h2, "hash of empty file must be deterministic");
assert_eq!(h1.len(), 16);
Ok(())
}
#[test]
fn stream_hash_same_content_same_hash() -> Result<()> {
let tmp = tempdir()?;
let a = tmp.path().join("file_a.txt");
let b = tmp.path().join("file_b.txt");
let content = "identical content in both files";
fs::write(&a, content)?;
fs::write(&b, content)?;
let hash_a = stream_hash(&a)?;
let hash_b = stream_hash(&b)?;
assert_eq!(hash_a, hash_b, "same content must produce same hash");
Ok(())
}
#[test]
fn stream_lines_binary_content() -> Result<()> {
let tmp = tempdir()?;
let path = tmp.path().join("binary.bin");
fs::write(&path, "no-newlines-here")?;
let mut lines_seen = Vec::new();
let count = stream_lines(&path, |_i, line| {
lines_seen.push(line.to_string());
Ok(())
})?;
assert_eq!(count, 1);
assert_eq!(lines_seen, vec!["no-newlines-here"]);
Ok(())
}
#[test]
fn process_batch_empty_directory() -> Result<()> {
let tmp = tempdir()?;
let src = tmp.path().join("empty_src");
let dst = tmp.path().join("empty_dst");
fs::create_dir_all(&src)?;
let result = process_batch(&src, &dst, |s, d| stream_copy(s, d))?;
assert_eq!(result.files_processed, 0);
assert_eq!(result.bytes_read, 0);
assert_eq!(result.bytes_written, 0);
Ok(())
}
}