use std::panic::AssertUnwindSafe;
use std::path::Path;
use std::path::PathBuf;
use std::time::{Duration, Instant};
use crossbeam_channel::{select, Receiver, Sender};
use crate::error::BatchError;
use crate::handle::Handle;
use crate::method::Method;
use crate::platform;
use crate::{Error, Result};
use super::PipelineConfig;
#[derive(Debug)]
pub(crate) enum BatchOp {
Write {
path: PathBuf,
data: Vec<u8>,
},
Delete {
path: PathBuf,
},
Copy {
src: PathBuf,
dst: PathBuf,
},
}
#[derive(Debug, Clone, Copy)]
pub(crate) struct HandleSnapshot {
pub method: Method,
pub sector_size: u32,
pub use_direct: bool,
}
pub(crate) enum BatchResponse {
Sync(Sender<std::result::Result<(), BatchError>>),
#[cfg(feature = "async")]
Async(tokio::sync::oneshot::Sender<std::result::Result<(), BatchError>>),
}
impl BatchResponse {
pub(crate) fn send(self, result: std::result::Result<(), BatchError>) {
match self {
BatchResponse::Sync(tx) => {
let _ = tx.send(result);
}
#[cfg(feature = "async")]
BatchResponse::Async(tx) => {
let _ = tx.send(result);
}
}
}
}
pub(crate) struct BatchJob {
pub ops: Vec<BatchOp>,
pub snapshot: HandleSnapshot,
pub response: BatchResponse,
pub grouped: bool,
}
pub(super) fn run_dispatcher(
config: PipelineConfig,
job_rx: Receiver<BatchJob>,
shutdown_rx: Receiver<()>,
done_tx: Sender<()>,
) {
'outer: loop {
let first = select! {
recv(job_rx) -> r => match r {
Ok(job) => job,
Err(_) => break 'outer,
},
recv(shutdown_rx) -> _ => {
drain_remaining(&job_rx);
break 'outer;
}
};
let mut total_ops: usize = first.ops.len();
let mut accumulated: Vec<BatchJob> = Vec::with_capacity(8);
accumulated.push(first);
while total_ops < config.batch_size_max {
match job_rx.try_recv() {
Ok(job) => {
total_ops += job.ops.len();
accumulated.push(job);
}
Err(_) => break,
}
}
let already_full = total_ops >= config.batch_size_max;
let already_busy = accumulated.len() >= 2;
if already_full || (config.batch_window_ms == 0) {
process_jobs(accumulated);
continue 'outer;
}
if !already_busy {
process_jobs(accumulated);
continue 'outer;
}
let deadline = Instant::now() + Duration::from_millis(config.batch_window_ms);
while total_ops < config.batch_size_max {
let now = Instant::now();
if now >= deadline {
break;
}
let remaining = deadline.saturating_duration_since(now);
select! {
recv(job_rx) -> r => match r {
Ok(job) => {
total_ops += job.ops.len();
accumulated.push(job);
}
Err(_) => break,
},
recv(shutdown_rx) -> _ => {
while let Ok(j) = job_rx.try_recv() {
accumulated.push(j);
}
process_jobs(accumulated);
break 'outer;
},
default(remaining) => break, }
}
process_jobs(accumulated);
}
let _ = done_tx.send(());
}
fn drain_remaining(job_rx: &Receiver<BatchJob>) {
let mut all = Vec::new();
while let Ok(job) = job_rx.try_recv() {
all.push(job);
}
if !all.is_empty() {
process_jobs(all);
}
}
fn process_jobs(jobs: Vec<BatchJob>) {
process_jobs_with(jobs, execute_op);
}
fn process_jobs_with<F>(jobs: Vec<BatchJob>, executor: F)
where
F: Fn(BatchOp, &HandleSnapshot, bool) -> Result<()>,
{
use std::collections::BTreeMap;
for job in jobs {
let response = job.response;
let snapshot = job.snapshot;
let ops = job.ops;
let grouped = job.grouped;
let mut completed: usize = 0;
let mut failure: Option<(usize, Error)> = None;
let mut grouped_parents: BTreeMap<PathBuf, PathBuf> = BTreeMap::new();
for (idx, op) in ops.into_iter().enumerate() {
let parent_repr: Option<(PathBuf, PathBuf)> = if grouped {
match &op {
BatchOp::Write { path, .. } => {
path.parent().map(|p| (PathBuf::from(p), path.clone()))
}
BatchOp::Copy { dst, .. } => {
dst.parent().map(|p| (PathBuf::from(p), dst.clone()))
}
BatchOp::Delete { .. } => None,
}
} else {
None
};
let res =
std::panic::catch_unwind(AssertUnwindSafe(|| executor(op, &snapshot, grouped)));
match res {
Ok(Ok(())) => {
completed += 1;
if let Some((parent, repr_file)) = parent_repr {
let _ = grouped_parents.entry(parent).or_insert(repr_file);
}
}
Ok(Err(e)) => {
failure = Some((idx, e));
break;
}
Err(_panic_payload) => {
failure = Some((idx, Error::Io(std::io::Error::other("batch op panicked"))));
break;
}
}
}
if grouped && failure.is_none() {
for repr in grouped_parents.values() {
let _ = platform::sync_parent_dir(repr);
}
}
let result = match failure {
None => Ok(()),
Some((failed_at, e)) => Err(BatchError {
failed_at,
completed,
source: Box::new(e),
}),
};
response.send(result);
}
}
fn execute_op(op: BatchOp, snapshot: &HandleSnapshot, grouped: bool) -> Result<()> {
match op {
BatchOp::Write { path, data } => execute_write(&path, &data, snapshot, grouped),
BatchOp::Delete { path } => execute_delete(&path),
BatchOp::Copy { src, dst } => execute_copy(&src, &dst, snapshot, grouped),
}
}
fn execute_write(path: &Path, data: &[u8], snapshot: &HandleSnapshot, grouped: bool) -> Result<()> {
let temp = Handle::gen_temp_path(path);
let (file, direct_ok) = platform::open_write_new(&temp, snapshot.use_direct).map_err(|e| {
Error::AtomicReplaceFailed {
step: "open_temp",
source: as_io_error(e),
}
})?;
let write_result = if direct_ok {
platform::write_all_direct(&file, data, snapshot.sector_size)
} else {
platform::write_all(&file, data)
};
if let Err(e) = write_result {
let _ = std::fs::remove_file(&temp);
return Err(Error::AtomicReplaceFailed {
step: "write",
source: as_io_error(e),
});
}
if direct_ok {
drop(file);
if let Err(e) = std::fs::OpenOptions::new()
.write(true)
.open(&temp)
.and_then(|f| f.set_len(data.len() as u64))
{
let _ = std::fs::remove_file(&temp);
return Err(Error::AtomicReplaceFailed {
step: "truncate",
source: e,
});
}
} else {
let flush_result = flush_for_method(&file, snapshot.method);
if let Err(e) = flush_result {
let _ = std::fs::remove_file(&temp);
return Err(Error::AtomicReplaceFailed {
step: "flush",
source: as_io_error(e),
});
}
drop(file);
}
if let Err(e) = platform::atomic_rename(&temp, path) {
let _ = std::fs::remove_file(&temp);
return Err(Error::AtomicReplaceFailed {
step: "rename",
source: as_io_error(e),
});
}
if !grouped {
let _ = platform::sync_parent_dir(path);
}
Ok(())
}
fn execute_delete(path: &Path) -> Result<()> {
match std::fs::remove_file(path) {
Ok(()) => Ok(()),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
Err(e) => Err(Error::Io(e)),
}
}
fn execute_copy(src: &Path, dst: &Path, snapshot: &HandleSnapshot, grouped: bool) -> Result<()> {
let data = std::fs::read(src).map_err(Error::Io)?;
execute_write(dst, &data, snapshot, grouped)
}
fn flush_for_method(file: &std::fs::File, method: Method) -> Result<()> {
match method {
Method::Direct => {
#[cfg(target_os = "windows")]
{
Ok(())
}
#[cfg(not(target_os = "windows"))]
{
platform::sync_data(file)
}
}
Method::Data => platform::sync_data(file),
_ => platform::sync_full(file),
}
}
fn as_io_error(e: Error) -> std::io::Error {
match e {
Error::Io(io_err) => io_err,
other => std::io::Error::other(other.to_string()),
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicU64, Ordering};
static TEST_COUNTER: AtomicU64 = AtomicU64::new(0);
fn tmp_path(suffix: &str) -> PathBuf {
let n = TEST_COUNTER.fetch_add(1, Ordering::Relaxed);
std::env::temp_dir().join(format!(
"fsys_group_test_{}_{}_{}",
std::process::id(),
n,
suffix
))
}
fn snapshot() -> HandleSnapshot {
HandleSnapshot {
method: Method::Sync,
sector_size: 512,
use_direct: false,
}
}
struct TmpFile(PathBuf);
impl Drop for TmpFile {
fn drop(&mut self) {
let _ = std::fs::remove_file(&self.0);
}
}
#[test]
fn test_execute_write_creates_file_with_payload() {
let path = tmp_path("write_creates");
let _g = TmpFile(path.clone());
execute_write(&path, b"payload", &snapshot(), false).expect("write");
assert_eq!(std::fs::read(&path).unwrap(), b"payload");
}
#[test]
fn test_execute_write_replaces_existing_file() {
let path = tmp_path("write_replaces");
let _g = TmpFile(path.clone());
std::fs::write(&path, b"old").unwrap();
execute_write(&path, b"new", &snapshot(), false).expect("replace");
assert_eq!(std::fs::read(&path).unwrap(), b"new");
}
#[test]
fn test_execute_write_empty_payload_is_valid() {
let path = tmp_path("empty_write");
let _g = TmpFile(path.clone());
execute_write(&path, b"", &snapshot(), false).expect("empty");
assert_eq!(std::fs::read(&path).unwrap(), b"");
}
#[test]
fn test_execute_delete_idempotent_on_missing_file() {
let path = tmp_path("delete_missing");
execute_delete(&path).expect("delete missing should succeed");
}
#[test]
fn test_execute_delete_removes_existing_file() {
let path = tmp_path("delete_existing");
std::fs::write(&path, b"x").unwrap();
execute_delete(&path).expect("delete");
assert!(!path.exists());
}
#[test]
fn test_execute_copy_duplicates_payload() {
let src = tmp_path("copy_src");
let dst = tmp_path("copy_dst");
let _g1 = TmpFile(src.clone());
let _g2 = TmpFile(dst.clone());
std::fs::write(&src, b"copy-payload").unwrap();
execute_copy(&src, &dst, &snapshot(), false).expect("copy");
assert_eq!(std::fs::read(&dst).unwrap(), b"copy-payload");
}
#[test]
fn test_execute_op_dispatches_each_variant() {
let p1 = tmp_path("op_w");
let _g1 = TmpFile(p1.clone());
execute_op(
BatchOp::Write {
path: p1.clone(),
data: b"w".to_vec(),
},
&snapshot(),
false,
)
.expect("write op");
assert_eq!(std::fs::read(&p1).unwrap(), b"w");
execute_op(BatchOp::Delete { path: p1.clone() }, &snapshot(), false).expect("delete op");
assert!(!p1.exists());
let src = tmp_path("op_copy_src");
let dst = tmp_path("op_copy_dst");
let _g2 = TmpFile(src.clone());
let _g3 = TmpFile(dst.clone());
std::fs::write(&src, b"c").unwrap();
execute_op(
BatchOp::Copy {
src: src.clone(),
dst: dst.clone(),
},
&snapshot(),
false,
)
.expect("copy op");
assert_eq!(std::fs::read(&dst).unwrap(), b"c");
}
#[test]
fn test_execute_write_failure_returns_atomic_replace_error() {
let dir = tmp_path("write_to_dir");
std::fs::create_dir_all(&dir).unwrap();
let result = execute_write(&dir, b"x", &snapshot(), false);
let _ = std::fs::remove_dir_all(&dir);
assert!(result.is_err());
match result.unwrap_err() {
Error::AtomicReplaceFailed { step, .. } => {
assert!(
matches!(
step,
"open_temp" | "write" | "truncate" | "flush" | "rename"
),
"unexpected step: {step}"
);
}
other => panic!("expected AtomicReplaceFailed, got {:?}", other),
}
}
#[test]
fn test_as_io_error_passes_through_io_variant() {
let inner = std::io::Error::from(std::io::ErrorKind::PermissionDenied);
let err = Error::Io(inner);
let io = as_io_error(err);
assert_eq!(io.kind(), std::io::ErrorKind::PermissionDenied);
}
#[test]
fn test_as_io_error_wraps_non_io_variant() {
let err = Error::HardwareProbeFailed {
detail: "stub".into(),
};
let io = as_io_error(err);
assert!(io.to_string().contains("FS-00003"));
}
#[test]
fn test_flush_for_method_sync_calls_full_sync() {
let path = tmp_path("flush_sync");
let _g = TmpFile(path.clone());
let f = std::fs::File::create(&path).unwrap();
flush_for_method(&f, Method::Sync).expect("sync flush");
}
#[test]
fn test_flush_for_method_data_calls_data_sync() {
let path = tmp_path("flush_data");
let _g = TmpFile(path.clone());
let f = std::fs::File::create(&path).unwrap();
flush_for_method(&f, Method::Data).expect("data flush");
}
fn make_job(
ops: Vec<BatchOp>,
) -> (
BatchJob,
crossbeam_channel::Receiver<std::result::Result<(), BatchError>>,
) {
let (tx, rx) = crossbeam_channel::bounded(1);
let job = BatchJob {
ops,
snapshot: snapshot(),
response: BatchResponse::Sync(tx),
grouped: false,
};
(job, rx)
}
#[test]
fn test_process_jobs_with_catches_panic_and_reports_batch_error() {
let (job, rx) = make_job(vec![
BatchOp::Write {
path: PathBuf::from("/tmp/ignored-1"),
data: vec![1],
},
BatchOp::Write {
path: PathBuf::from("/tmp/__panic__"),
data: vec![2],
},
BatchOp::Write {
path: PathBuf::from("/tmp/never-reached"),
data: vec![3],
},
]);
let executor = |op: BatchOp, _snap: &HandleSnapshot, _grouped: bool| -> Result<()> {
if let BatchOp::Write { path, .. } = &op {
if path.to_string_lossy().contains("__panic__") {
panic!("test-induced panic for catch_unwind verification");
}
}
Ok(())
};
process_jobs_with(vec![job], executor);
let result = rx.recv().expect("dispatcher must send a response");
let err = result.expect_err("expected BatchError from panic");
assert_eq!(err.failed_at, 1, "panic happened at op index 1");
assert_eq!(err.completed, 1, "op 0 completed before the panic");
match *err.source {
Error::Io(ref io) => {
assert!(
io.to_string().contains("panicked"),
"expected panic marker in error display, got: {io}"
);
}
ref other => panic!("expected Error::Io, got {:?}", other),
}
}
#[test]
fn test_process_jobs_with_continues_after_panicking_job() {
let (job_panic, rx_panic) = make_job(vec![BatchOp::Write {
path: PathBuf::from("/tmp/__panic__"),
data: vec![],
}]);
let (job_ok, rx_ok) = make_job(vec![BatchOp::Write {
path: PathBuf::from("/tmp/ok"),
data: vec![],
}]);
let executor = |op: BatchOp, _snap: &HandleSnapshot, _grouped: bool| -> Result<()> {
if let BatchOp::Write { path, .. } = &op {
if path.to_string_lossy().contains("__panic__") {
panic!("test-induced panic");
}
}
Ok(())
};
process_jobs_with(vec![job_panic, job_ok], executor);
let r_panic = rx_panic.recv().expect("first response");
assert!(r_panic.is_err(), "first job should fail with BatchError");
let r_ok = rx_ok.recv().expect("second response");
assert!(
r_ok.is_ok(),
"second job should succeed despite first job's panic"
);
}
#[test]
fn test_process_jobs_with_passes_through_non_panicking_executor() {
let (job, rx) = make_job(vec![
BatchOp::Write {
path: PathBuf::from("/tmp/x1"),
data: vec![],
},
BatchOp::Write {
path: PathBuf::from("/tmp/x2"),
data: vec![],
},
]);
let executor =
|_op: BatchOp, _snap: &HandleSnapshot, _grouped: bool| -> Result<()> { Ok(()) };
process_jobs_with(vec![job], executor);
let result = rx.recv().expect("response");
assert!(result.is_ok());
}
}