use std::sync::Mutex;
use std::thread::{self, JoinHandle};
use std::time::Duration;
use crossbeam_channel::{bounded, Receiver, Sender};
use crate::error::BatchError;
mod dispatch;
mod group;
mod solo;
use group::BatchJob;
pub(crate) use group::{BatchOp, HandleSnapshot};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) struct PipelineConfig {
pub batch_window_ms: u64,
pub batch_size_max: usize,
pub batch_queue_max: usize,
}
impl PipelineConfig {
pub const DEFAULT: PipelineConfig = PipelineConfig {
batch_window_ms: 1,
batch_size_max: 128,
batch_queue_max: 1024,
};
}
impl Default for PipelineConfig {
fn default() -> Self {
Self::DEFAULT
}
}
pub(crate) struct Pipeline {
config: PipelineConfig,
inner: Mutex<Option<DispatcherInner>>,
}
struct DispatcherInner {
job_tx: Sender<BatchJob>,
shutdown_tx: Sender<()>,
done_rx: Receiver<()>,
thread: Option<JoinHandle<()>>,
}
impl Pipeline {
pub(crate) fn new(config: PipelineConfig) -> Self {
Self {
config,
inner: Mutex::new(None),
}
}
#[allow(dead_code)] pub(crate) fn config(&self) -> PipelineConfig {
self.config
}
pub(crate) fn submit(
&self,
ops: Vec<BatchOp>,
snapshot: HandleSnapshot,
) -> std::result::Result<(), BatchError> {
let job_tx = match self.dispatcher_sender() {
Some(tx) => tx,
None => return Err(shutdown_err()),
};
let (response_tx, response_rx) = bounded(1);
let job = BatchJob {
ops,
snapshot,
response: crate::pipeline::group::BatchResponse::Sync(response_tx),
};
if job_tx.send(job).is_err() {
return Err(shutdown_err());
}
match response_rx.recv() {
Ok(result) => result,
Err(_) => Err(shutdown_err()),
}
}
#[cfg(feature = "async")]
pub(crate) async fn submit_async(
&self,
ops: Vec<BatchOp>,
snapshot: HandleSnapshot,
) -> std::result::Result<(), BatchError> {
let job_tx = match self.dispatcher_sender() {
Some(tx) => tx,
None => return Err(shutdown_err()),
};
let (response_tx, response_rx) = tokio::sync::oneshot::channel();
let job = BatchJob {
ops,
snapshot,
response: crate::pipeline::group::BatchResponse::Async(response_tx),
};
if job_tx.send(job).is_err() {
return Err(shutdown_err());
}
match response_rx.await {
Ok(result) => result,
Err(_) => Err(shutdown_err()),
}
}
fn dispatcher_sender(&self) -> Option<Sender<BatchJob>> {
let mut guard = match self.inner.lock() {
Ok(g) => g,
Err(p) => p.into_inner(),
};
if guard.is_none() {
*guard = Some(spawn_dispatcher(self.config));
}
guard.as_ref().map(|i| i.job_tx.clone())
}
}
fn shutdown_err() -> BatchError {
BatchError {
failed_at: 0,
completed: 0,
source: Box::new(crate::Error::ShutdownInProgress),
}
}
fn spawn_dispatcher(config: PipelineConfig) -> DispatcherInner {
let (job_tx, job_rx) = bounded::<BatchJob>(config.batch_queue_max);
let (shutdown_tx, shutdown_rx) = bounded::<()>(1);
let (done_tx, done_rx) = bounded::<()>(1);
let thread = thread::Builder::new()
.name("fsys-dispatcher".into())
.spawn(move || {
group::run_dispatcher(config, job_rx, shutdown_rx, done_tx);
})
.ok();
DispatcherInner {
job_tx,
shutdown_tx,
done_rx,
thread,
}
}
impl Drop for Pipeline {
fn drop(&mut self) {
let mut guard = match self.inner.lock() {
Ok(g) => g,
Err(p) => p.into_inner(),
};
let Some(mut inner) = guard.take() else {
return;
};
drop(guard);
let _ = inner.shutdown_tx.send(());
drop(inner.job_tx);
drop(inner.shutdown_tx);
let dispatcher_acked = inner.done_rx.recv_timeout(Duration::from_secs(5)).is_ok();
if let Some(thread) = inner.thread.take() {
if dispatcher_acked {
let _ = thread.join();
} else {
std::mem::forget(thread);
}
}
}
}
const _: () = {
fn assert_send<T: Send>() {}
fn assert_sync<T: Sync>() {}
fn check() {
assert_send::<Pipeline>();
assert_sync::<Pipeline>();
}
let _ = check;
};
#[cfg(test)]
mod tests {
use super::*;
use std::path::PathBuf;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Instant;
static TEST_COUNTER: AtomicU64 = AtomicU64::new(0);
fn tmp_path(suffix: &str) -> PathBuf {
let n = TEST_COUNTER.fetch_add(1, Ordering::Relaxed);
std::env::temp_dir().join(format!(
"fsys_pipeline_test_{}_{}_{}",
std::process::id(),
n,
suffix
))
}
fn snapshot_default() -> HandleSnapshot {
HandleSnapshot {
method: crate::Method::Sync,
sector_size: 512,
use_direct: false,
}
}
#[test]
fn test_pipeline_config_default_matches_prompt() {
let c = PipelineConfig::default();
assert_eq!(c.batch_window_ms, 1);
assert_eq!(c.batch_size_max, 128);
assert_eq!(c.batch_queue_max, 1024);
}
#[test]
fn test_pipeline_new_does_not_spawn_dispatcher() {
let p = Pipeline::new(PipelineConfig::DEFAULT);
let guard = p.inner.lock().unwrap();
assert!(guard.is_none(), "lazy spawn: dispatcher must not exist yet");
}
#[test]
fn test_pipeline_submit_spawns_dispatcher_on_first_call() {
let p = Pipeline::new(PipelineConfig::DEFAULT);
let r = p.submit(Vec::new(), snapshot_default());
assert!(r.is_ok(), "empty batch should succeed: {:?}", r);
let guard = p.inner.lock().unwrap();
assert!(guard.is_some(), "dispatcher must exist after first submit");
}
#[test]
fn test_pipeline_submit_executes_a_single_write() {
let p = Pipeline::new(PipelineConfig::DEFAULT);
let path = tmp_path("single_write");
let _g = scopeguard_remove(path.clone());
let ops = vec![BatchOp::Write {
path: path.clone(),
data: b"hello-pipeline".to_vec(),
}];
p.submit(ops, snapshot_default()).expect("submit");
let actual = std::fs::read(&path).expect("read");
assert_eq!(actual, b"hello-pipeline");
}
#[test]
fn test_pipeline_submit_executes_multiple_writes_in_order() {
let p = Pipeline::new(PipelineConfig::DEFAULT);
let path = tmp_path("multi_write");
let _g = scopeguard_remove(path.clone());
let ops = vec![
BatchOp::Write {
path: path.clone(),
data: b"first".to_vec(),
},
BatchOp::Write {
path: path.clone(),
data: b"second".to_vec(),
},
BatchOp::Write {
path: path.clone(),
data: b"third".to_vec(),
},
];
p.submit(ops, snapshot_default()).expect("submit");
assert_eq!(std::fs::read(&path).unwrap(), b"third");
}
#[test]
fn test_pipeline_submit_delete_removes_file() {
let p = Pipeline::new(PipelineConfig::DEFAULT);
let path = tmp_path("delete");
std::fs::write(&path, b"to-be-deleted").unwrap();
let ops = vec![BatchOp::Delete { path: path.clone() }];
p.submit(ops, snapshot_default()).expect("submit");
assert!(!path.exists(), "file should be gone");
}
#[test]
fn test_pipeline_submit_copy_duplicates_content() {
let p = Pipeline::new(PipelineConfig::DEFAULT);
let src = tmp_path("copy_src");
let dst = tmp_path("copy_dst");
let _g1 = scopeguard_remove(src.clone());
let _g2 = scopeguard_remove(dst.clone());
std::fs::write(&src, b"copy-me").unwrap();
let ops = vec![BatchOp::Copy {
src: src.clone(),
dst: dst.clone(),
}];
p.submit(ops, snapshot_default()).expect("submit");
assert_eq!(std::fs::read(&dst).unwrap(), b"copy-me");
}
#[test]
fn test_pipeline_submit_reports_failure_at_correct_index() {
let p = Pipeline::new(PipelineConfig::DEFAULT);
let good = tmp_path("good");
let _g1 = scopeguard_remove(good.clone());
let bad_dir = tmp_path("bad_dir");
std::fs::create_dir_all(&bad_dir).unwrap();
let _g_dir = scopeguard_remove_dir(bad_dir.clone());
let ops = vec![
BatchOp::Write {
path: good.clone(),
data: b"ok".to_vec(),
},
BatchOp::Write {
path: bad_dir.clone(), data: b"fail".to_vec(),
},
BatchOp::Write {
path: good.clone(),
data: b"never".to_vec(),
},
];
let result = p.submit(ops, snapshot_default());
let err = result.expect_err("expected failure on op 1");
assert_eq!(err.failed_at, 1, "failed_at index");
assert_eq!(err.completed, 1, "completed count");
assert_eq!(std::fs::read(&good).unwrap(), b"ok");
}
#[test]
fn test_pipeline_drop_signals_dispatcher_to_exit() {
let start = Instant::now();
{
let p = Pipeline::new(PipelineConfig::DEFAULT);
p.submit(Vec::new(), snapshot_default()).unwrap();
} let elapsed = start.elapsed();
assert!(
elapsed < Duration::from_secs(2),
"drop should not stall — got {:?}",
elapsed
);
}
#[test]
fn test_pipeline_handles_many_concurrent_submitters() {
use std::sync::Arc;
let p = Arc::new(Pipeline::new(PipelineConfig::DEFAULT));
let path_base = tmp_path("concurrent");
let mut handles = Vec::new();
let n_threads = 8;
let writes_per_thread = 4;
for t in 0..n_threads {
let p = Arc::clone(&p);
let pb = path_base.clone();
handles.push(std::thread::spawn(move || {
for w in 0..writes_per_thread {
let path = PathBuf::from(format!("{}_t{}_w{}", pb.display(), t, w));
let ops = vec![BatchOp::Write {
path,
data: format!("t{}w{}", t, w).into_bytes(),
}];
p.submit(ops, snapshot_default()).unwrap();
}
}));
}
for h in handles {
h.join().unwrap();
}
for t in 0..n_threads {
for w in 0..writes_per_thread {
let path = PathBuf::from(format!("{}_t{}_w{}", path_base.display(), t, w));
let expected = format!("t{}w{}", t, w);
let actual = std::fs::read_to_string(&path).unwrap();
assert_eq!(actual, expected);
let _ = std::fs::remove_file(&path);
}
}
}
struct ScopeGuardFile(PathBuf);
impl Drop for ScopeGuardFile {
fn drop(&mut self) {
let _ = std::fs::remove_file(&self.0);
}
}
fn scopeguard_remove(p: PathBuf) -> ScopeGuardFile {
ScopeGuardFile(p)
}
struct ScopeGuardDir(PathBuf);
impl Drop for ScopeGuardDir {
fn drop(&mut self) {
let _ = std::fs::remove_dir_all(&self.0);
}
}
fn scopeguard_remove_dir(p: PathBuf) -> ScopeGuardDir {
ScopeGuardDir(p)
}
}