use anyhow::Result;
use std::fs;
use std::os::unix::fs::MetadataExt;
use std::path::{Path, PathBuf};
#[cfg(target_os = "linux")]
use io_uring::{opcode, types, IoUring};
const SMALL_FILE_THRESHOLD: usize = 64 * 1024;
pub const IO_URING_BATCH_SIZE: usize = 256;
pub struct SmallFileBuffer {
buffers: Vec<Vec<u8>>,
next_buffer: usize,
}
impl SmallFileBuffer {
pub fn new(count: usize) -> Self {
let mut buffers = Vec::with_capacity(count);
for _ in 0..count {
buffers.push(vec![0u8; SMALL_FILE_THRESHOLD]);
}
Self {
buffers,
next_buffer: 0,
}
}
pub fn get_buffer(&mut self) -> &mut [u8] {
let current = self.next_buffer;
let total = self.buffers.len();
self.next_buffer = (current + 1) % total;
&mut self.buffers[current]
}
}
#[cfg(target_os = "linux")]
pub fn copy_small_files_batch(files: &[(PathBuf, PathBuf)]) -> Result<u64> {
let mut total_bytes = 0u64;
let mut ring = IoUring::builder()
.setup_sqpoll(1000) .build(IO_URING_BATCH_SIZE as u32)?;
for batch in files.chunks(IO_URING_BATCH_SIZE) {
let batch_bytes = submit_batch_copy(&mut ring, batch)?;
total_bytes += batch_bytes;
}
Ok(total_bytes)
}
#[cfg(target_os = "linux")]
fn submit_batch_copy(ring: &mut IoUring, files: &[(PathBuf, PathBuf)]) -> Result<u64> {
use std::os::unix::io::AsRawFd;
let mut total_bytes = 0u64;
let mut file_handles = Vec::new();
let mut buffers = SmallFileBuffer::new(files.len());
for (idx, (src, dst)) in files.iter().enumerate() {
let src_file = match fs::File::open(src) {
Ok(f) => f,
Err(e) => {
eprintln!("Failed to open source {src:?}: {e}");
continue;
}
};
let metadata = match src_file.metadata() {
Ok(m) => m,
Err(e) => {
eprintln!("Failed to get metadata for {src:?}: {e}");
continue;
}
};
let file_size = metadata.len();
if file_size > SMALL_FILE_THRESHOLD as u64 {
match fs::copy(src, dst) {
Ok(bytes) => total_bytes += bytes,
Err(e) => eprintln!("Failed to copy large file {src:?}: {e}"),
}
continue;
}
if let Some(parent) = dst.parent() {
let _ = fs::create_dir_all(parent);
}
let dst_file = match fs::File::create(dst) {
Ok(f) => f,
Err(e) => {
eprintln!("Failed to create destination {dst:?}: {e}");
continue;
}
};
let src_fd = src_file.as_raw_fd();
let dst_fd = dst_file.as_raw_fd();
let buffer = buffers.get_buffer();
let buffer_ptr = buffer.as_mut_ptr();
let read_op = opcode::Read::new(types::Fd(src_fd), buffer_ptr, file_size as u32)
.offset(0)
.build()
.user_data(idx as u64 * 2);
unsafe {
ring.submission()
.push(&read_op)
.map_err(|e| anyhow::anyhow!("Failed to submit read: {}", e))?;
}
file_handles.push((
src_file,
dst_file,
file_size,
dst_fd,
buffer_ptr,
metadata.mode(),
));
}
ring.submit_and_wait(file_handles.len())
.map_err(|e| anyhow::anyhow!("Failed to submit batch: {}", e))?;
let mut completed_reads = Vec::new();
for _ in 0..file_handles.len() {
let cqe = ring.completion().next().expect("completion queue entry");
let idx = (cqe.user_data() / 2) as usize;
if cqe.result() < 0 {
eprintln!("Read failed for file {}: {}", idx, cqe.result());
continue;
}
completed_reads.push((idx, cqe.result() as u32));
}
let num_writes = completed_reads.len();
for (idx, bytes_read) in completed_reads {
if let Some((_, _, _, dst_fd, buffer_ptr, _)) = file_handles.get(idx) {
let write_op = opcode::Write::new(types::Fd(*dst_fd), *buffer_ptr, bytes_read)
.offset(0)
.build()
.user_data(idx as u64 * 2 + 1);
unsafe {
ring.submission()
.push(&write_op)
.map_err(|e| anyhow::anyhow!("Failed to submit write: {}", e))?;
}
}
}
ring.submit_and_wait(num_writes)
.map_err(|e| anyhow::anyhow!("Failed to submit writes: {}", e))?;
for _ in 0..num_writes {
let cqe = ring.completion().next().expect("completion queue entry");
let idx = ((cqe.user_data() - 1) / 2) as usize;
if cqe.result() < 0 {
eprintln!("Write failed for file {}: {}", idx, cqe.result());
continue;
}
total_bytes += cqe.result() as u64;
if let Some((_, dst_file, _, _, _, mode)) = file_handles.get(idx) {
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
let _ = dst_file.set_permissions(fs::Permissions::from_mode(*mode));
}
}
}
Ok(total_bytes)
}
pub fn mmap_copy_small_file(src: &Path, dst: &Path) -> Result<u64> {
use memmap2::MmapOptions;
let file = fs::File::open(src)?;
let metadata = file.metadata()?;
let len = metadata.len() as usize;
if len > SMALL_FILE_THRESHOLD {
return Ok(fs::copy(src, dst)?);
}
let mmap = unsafe { MmapOptions::new().map(&file)? };
fs::write(dst, &mmap[..])?;
let dst_file = fs::OpenOptions::new().write(true).open(dst)?;
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
dst_file.set_permissions(fs::Permissions::from_mode(metadata.mode()))?;
}
Ok(len as u64)
}
pub fn scan_directory_parallel(path: &Path) -> Result<Vec<PathBuf>> {
use jwalk::WalkDir;
use rayon::prelude::*;
let entries: Vec<PathBuf> = WalkDir::new(path)
.parallelism(jwalk::Parallelism::RayonNewPool(num_cpus::get()))
.into_iter()
.par_bridge()
.filter_map(|entry| match entry {
Ok(e) => {
let file_type = e.file_type();
if file_type.is_file() {
Some(e.path().to_owned())
} else {
None
}
}
_ => None,
})
.collect();
Ok(entries)
}
pub fn batch_copy_files(operations: Vec<(PathBuf, PathBuf)>) -> Result<BatchCopyStats> {
use rayon::prelude::*;
use std::sync::atomic::{AtomicU64, Ordering};
let total_files = operations.len();
let files_copied = AtomicU64::new(0);
let bytes_copied = AtomicU64::new(0);
let start = std::time::Instant::now();
let (small_files, large_files): (Vec<_>, Vec<_>) =
operations.into_par_iter().partition(|(src, _)| {
fs::metadata(src)
.map(|m| m.len() < SMALL_FILE_THRESHOLD as u64)
.unwrap_or(false)
});
small_files.par_chunks(100).for_each(|chunk| {
for (src, dst) in chunk {
if let Some(parent) = dst.parent() {
let _ = fs::create_dir_all(parent);
}
match mmap_copy_small_file(src, dst) {
Ok(bytes) => {
files_copied.fetch_add(1, Ordering::Relaxed);
bytes_copied.fetch_add(bytes, Ordering::Relaxed);
}
Err(e) => eprintln!("Error copying {src:?}: {e}"),
}
}
});
large_files.par_iter().for_each(|(src, dst)| {
if let Some(parent) = dst.parent() {
let _ = fs::create_dir_all(parent);
}
match fs::copy(src, dst) {
Ok(bytes) => {
files_copied.fetch_add(1, Ordering::Relaxed);
bytes_copied.fetch_add(bytes, Ordering::Relaxed);
}
Err(e) => eprintln!("Error copying {src:?}: {e}"),
}
});
let elapsed = start.elapsed();
Ok(BatchCopyStats {
total_files,
files_copied: files_copied.load(Ordering::Relaxed),
bytes_copied: bytes_copied.load(Ordering::Relaxed),
elapsed,
})
}
#[derive(Debug)]
pub struct BatchCopyStats {
pub total_files: usize,
pub files_copied: u64,
pub bytes_copied: u64,
pub elapsed: std::time::Duration,
}
impl BatchCopyStats {
pub fn files_per_second(&self) -> f64 {
self.files_copied as f64 / self.elapsed.as_secs_f64()
}
pub fn throughput_mb_per_sec(&self) -> f64 {
(self.bytes_copied as f64 / 1_000_000.0) / self.elapsed.as_secs_f64()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_small_file_buffer() {
let mut buffer = SmallFileBuffer::new(4);
let buf1 = buffer.get_buffer();
assert_eq!(buf1.len(), SMALL_FILE_THRESHOLD);
}
}