use std::sync::Mutex;
use std::thread::{self, JoinHandle};
use std::time::Duration;
use crossbeam_channel::{bounded, Receiver, Sender};
use crate::error::BatchError;
mod dispatch;
mod group;
mod solo;
use group::BatchJob;
pub(crate) use group::{BatchOp, HandleSnapshot};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) struct PipelineConfig {
pub batch_window_ms: u64,
pub batch_size_max: usize,
pub batch_queue_max: usize,
pub dispatcher_shards: usize,
}
impl PipelineConfig {
pub const DEFAULT: PipelineConfig = PipelineConfig {
batch_window_ms: 1,
batch_size_max: 128,
batch_queue_max: 1024,
dispatcher_shards: 1,
};
}
impl Default for PipelineConfig {
fn default() -> Self {
Self::DEFAULT
}
}
pub(crate) struct Pipeline {
config: PipelineConfig,
inner: Mutex<Option<Vec<DispatcherInner>>>,
}
struct DispatcherInner {
job_tx: Sender<BatchJob>,
shutdown_tx: Sender<()>,
done_rx: Receiver<()>,
thread: Option<JoinHandle<()>>,
}
impl Pipeline {
pub(crate) fn new(config: PipelineConfig) -> Self {
Self {
config,
inner: Mutex::new(None),
}
}
#[allow(dead_code)] pub(crate) fn config(&self) -> PipelineConfig {
self.config
}
pub(crate) fn submit(
&self,
ops: Vec<BatchOp>,
snapshot: HandleSnapshot,
grouped: bool,
) -> std::result::Result<(), BatchError> {
let shard = pick_shard(&ops, self.config.dispatcher_shards);
let job_tx = match self.dispatcher_sender(shard) {
Some(tx) => tx,
None => return Err(shutdown_err()),
};
let (response_tx, response_rx) = bounded(1);
let job = BatchJob {
ops,
snapshot,
response: crate::pipeline::group::BatchResponse::Sync(response_tx),
grouped,
};
if job_tx.send(job).is_err() {
return Err(shutdown_err());
}
match response_rx.recv() {
Ok(result) => result,
Err(_) => Err(shutdown_err()),
}
}
#[cfg(feature = "async")]
pub(crate) async fn submit_async(
&self,
ops: Vec<BatchOp>,
snapshot: HandleSnapshot,
grouped: bool,
) -> std::result::Result<(), BatchError> {
let shard = pick_shard(&ops, self.config.dispatcher_shards);
let job_tx = match self.dispatcher_sender(shard) {
Some(tx) => tx,
None => return Err(shutdown_err()),
};
let (response_tx, response_rx) = tokio::sync::oneshot::channel();
let mut job = BatchJob {
ops,
snapshot,
response: crate::pipeline::group::BatchResponse::Async(response_tx),
grouped,
};
loop {
match job_tx.try_send(job) {
Ok(()) => break,
Err(crossbeam_channel::TrySendError::Full(returned)) => {
job = returned;
tokio::task::yield_now().await;
}
Err(crossbeam_channel::TrySendError::Disconnected(_)) => {
return Err(shutdown_err());
}
}
}
match response_rx.await {
Ok(result) => result,
Err(_) => Err(shutdown_err()),
}
}
fn dispatcher_sender(&self, shard: usize) -> Option<Sender<BatchJob>> {
let mut guard = match self.inner.lock() {
Ok(g) => g,
Err(p) => p.into_inner(),
};
if guard.is_none() {
*guard = Some(spawn_dispatcher_fleet(self.config));
}
guard
.as_ref()
.and_then(|fleet| fleet.get(shard).map(|d| d.job_tx.clone()))
}
}
fn pick_shard(ops: &[BatchOp], n_shards: usize) -> usize {
if n_shards <= 1 {
return 0;
}
let Some(first) = ops.first() else { return 0 };
let path: &std::path::Path = match first {
BatchOp::Write { path, .. } => path,
BatchOp::Delete { path } => path,
BatchOp::Copy { src, .. } => src,
};
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut h = DefaultHasher::new();
path.hash(&mut h);
(h.finish() % n_shards as u64) as usize
}
fn shutdown_err() -> BatchError {
BatchError {
failed_at: 0,
completed: 0,
source: Box::new(crate::Error::ShutdownInProgress),
}
}
fn spawn_dispatcher_fleet(config: PipelineConfig) -> Vec<DispatcherInner> {
let n = config.dispatcher_shards.max(1);
let mut fleet = Vec::with_capacity(n);
for shard_idx in 0..n {
let (job_tx, job_rx) = bounded::<BatchJob>(config.batch_queue_max);
let (shutdown_tx, shutdown_rx) = bounded::<()>(1);
let (done_tx, done_rx) = bounded::<()>(1);
let name = if n == 1 {
"fsys-dispatcher".to_string()
} else {
format!("fsys-dispatcher-{shard_idx}")
};
let thread = thread::Builder::new()
.name(name)
.spawn(move || {
group::run_dispatcher(config, job_rx, shutdown_rx, done_tx);
})
.ok();
fleet.push(DispatcherInner {
job_tx,
shutdown_tx,
done_rx,
thread,
});
}
fleet
}
impl Drop for Pipeline {
fn drop(&mut self) {
let mut guard = match self.inner.lock() {
Ok(g) => g,
Err(p) => p.into_inner(),
};
let Some(fleet) = guard.take() else {
return;
};
drop(guard);
for shard in &fleet {
let _ = shard.shutdown_tx.send(());
}
let mut done_handles: Vec<(Receiver<()>, Option<JoinHandle<()>>)> =
Vec::with_capacity(fleet.len());
for shard in fleet {
let DispatcherInner {
job_tx,
shutdown_tx,
done_rx,
thread,
} = shard;
drop(job_tx);
drop(shutdown_tx);
done_handles.push((done_rx, thread));
}
for (done_rx, thread_slot) in done_handles {
let dispatcher_acked = done_rx.recv_timeout(Duration::from_secs(5)).is_ok();
if let Some(thread) = thread_slot {
if dispatcher_acked {
let _ = thread.join();
} else {
std::mem::forget(thread);
}
}
}
}
}
const _: () = {
fn assert_send<T: Send>() {}
fn assert_sync<T: Sync>() {}
fn check() {
assert_send::<Pipeline>();
assert_sync::<Pipeline>();
}
let _ = check;
};
#[cfg(test)]
mod tests {
use super::*;
use std::path::PathBuf;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Instant;
static TEST_COUNTER: AtomicU64 = AtomicU64::new(0);
fn tmp_path(suffix: &str) -> PathBuf {
let n = TEST_COUNTER.fetch_add(1, Ordering::Relaxed);
std::env::temp_dir().join(format!(
"fsys_pipeline_test_{}_{}_{}",
std::process::id(),
n,
suffix
))
}
fn snapshot_default() -> HandleSnapshot {
HandleSnapshot {
method: crate::Method::Sync,
sector_size: 512,
use_direct: false,
}
}
#[test]
fn test_pipeline_config_default_matches_prompt() {
let c = PipelineConfig::default();
assert_eq!(c.batch_window_ms, 1);
assert_eq!(c.batch_size_max, 128);
assert_eq!(c.batch_queue_max, 1024);
assert_eq!(c.dispatcher_shards, 1);
}
#[test]
fn test_pick_shard_returns_zero_when_unsharded() {
assert_eq!(pick_shard(&[], 1), 0);
let ops = vec![BatchOp::Write {
path: PathBuf::from("/x"),
data: vec![],
}];
assert_eq!(pick_shard(&ops, 1), 0);
}
#[test]
fn test_pick_shard_empty_batch_maps_to_zero() {
assert_eq!(pick_shard(&[], 8), 0);
}
#[test]
fn test_pick_shard_is_deterministic_per_path() {
let ops = vec![BatchOp::Write {
path: PathBuf::from("/data/segment.00001"),
data: vec![],
}];
let a = pick_shard(&ops, 16);
let b = pick_shard(&ops, 16);
assert_eq!(a, b);
assert!(a < 16);
}
#[test]
fn test_pick_shard_uses_first_op_path() {
let a = vec![BatchOp::Write {
path: PathBuf::from("/data/a"),
data: vec![],
}];
let b = vec![BatchOp::Delete {
path: PathBuf::from("/data/a"),
}];
assert_eq!(pick_shard(&a, 8), pick_shard(&b, 8));
}
#[test]
fn test_pick_shard_copy_uses_src_path() {
let copy = vec![BatchOp::Copy {
src: PathBuf::from("/data/src"),
dst: PathBuf::from("/elsewhere/dst"),
}];
let write = vec![BatchOp::Write {
path: PathBuf::from("/data/src"),
data: vec![],
}];
assert_eq!(pick_shard(©, 8), pick_shard(&write, 8));
}
#[test]
fn test_pipeline_spawns_n_dispatchers_when_sharded() {
let p = Pipeline::new(PipelineConfig {
dispatcher_shards: 4,
..PipelineConfig::DEFAULT
});
p.submit(Vec::new(), snapshot_default(), false)
.expect("submit");
let guard = p.inner.lock().unwrap();
let fleet = guard.as_ref().expect("fleet must exist");
assert_eq!(fleet.len(), 4);
for shard in fleet.iter() {
assert!(shard.thread.is_some());
}
}
#[test]
fn test_multi_shard_executes_writes_across_paths() {
let p = Pipeline::new(PipelineConfig {
dispatcher_shards: 4,
..PipelineConfig::DEFAULT
});
let mut paths = Vec::new();
for i in 0..16 {
let path = tmp_path(&format!("multi_shard_{i:02}"));
paths.push(path.clone());
p.submit(
vec![BatchOp::Write {
path,
data: format!("payload-{i:02}").into_bytes(),
}],
snapshot_default(),
false,
)
.expect("submit");
}
for (i, path) in paths.iter().enumerate() {
let bytes = std::fs::read(path).expect("read");
assert_eq!(bytes, format!("payload-{i:02}").into_bytes());
let _ = std::fs::remove_file(path);
}
}
#[test]
fn test_multi_shard_concurrent_submitters() {
use std::sync::Arc;
let p = Arc::new(Pipeline::new(PipelineConfig {
dispatcher_shards: 4,
..PipelineConfig::DEFAULT
}));
let start = Instant::now();
let mut handles = Vec::new();
for t in 0..8u32 {
let p = p.clone();
handles.push(std::thread::spawn(move || {
let mut written = Vec::new();
for i in 0..16u32 {
let path = tmp_path(&format!("concurrent_t{t:02}_i{i:02}"));
p.submit(
vec![BatchOp::Write {
path: path.clone(),
data: format!("t{t}-i{i}").into_bytes(),
}],
snapshot_default(),
false,
)
.expect("submit");
written.push(path);
}
written
}));
}
let mut all_paths = Vec::new();
for h in handles {
all_paths.extend(h.join().expect("join"));
}
assert_eq!(all_paths.len(), 128);
for p in &all_paths {
assert!(std::fs::metadata(p).is_ok(), "missing: {}", p.display());
let _ = std::fs::remove_file(p);
}
let elapsed = start.elapsed();
assert!(
elapsed < Duration::from_secs(30),
"multi-shard concurrent test exceeded 30s: {elapsed:?}"
);
}
#[test]
fn test_multi_shard_clean_shutdown_drains_all_shards() {
let p = Pipeline::new(PipelineConfig {
dispatcher_shards: 4,
..PipelineConfig::DEFAULT
});
for i in 0..16 {
let path = tmp_path(&format!("drop_shard_{i:02}"));
p.submit(
vec![BatchOp::Write {
path: path.clone(),
data: vec![],
}],
snapshot_default(),
false,
)
.expect("submit");
let _ = std::fs::remove_file(&path);
}
let start = Instant::now();
drop(p);
let elapsed = start.elapsed();
assert!(
elapsed < Duration::from_secs(5),
"drop must complete within 5s; took {elapsed:?}"
);
}
#[test]
fn test_pipeline_new_does_not_spawn_dispatcher() {
let p = Pipeline::new(PipelineConfig::DEFAULT);
let guard = p.inner.lock().unwrap();
assert!(guard.is_none(), "lazy spawn: dispatcher must not exist yet");
}
#[test]
fn test_pipeline_submit_spawns_dispatcher_on_first_call() {
let p = Pipeline::new(PipelineConfig::DEFAULT);
let r = p.submit(Vec::new(), snapshot_default(), false);
assert!(r.is_ok(), "empty batch should succeed: {:?}", r);
let guard = p.inner.lock().unwrap();
assert!(guard.is_some(), "dispatcher must exist after first submit");
}
#[test]
fn test_pipeline_submit_executes_a_single_write() {
let p = Pipeline::new(PipelineConfig::DEFAULT);
let path = tmp_path("single_write");
let _g = scopeguard_remove(path.clone());
let ops = vec![BatchOp::Write {
path: path.clone(),
data: b"hello-pipeline".to_vec(),
}];
p.submit(ops, snapshot_default(), false).expect("submit");
let actual = std::fs::read(&path).expect("read");
assert_eq!(actual, b"hello-pipeline");
}
#[test]
fn test_pipeline_submit_executes_multiple_writes_in_order() {
let p = Pipeline::new(PipelineConfig::DEFAULT);
let path = tmp_path("multi_write");
let _g = scopeguard_remove(path.clone());
let ops = vec![
BatchOp::Write {
path: path.clone(),
data: b"first".to_vec(),
},
BatchOp::Write {
path: path.clone(),
data: b"second".to_vec(),
},
BatchOp::Write {
path: path.clone(),
data: b"third".to_vec(),
},
];
p.submit(ops, snapshot_default(), false).expect("submit");
assert_eq!(std::fs::read(&path).unwrap(), b"third");
}
#[test]
fn test_pipeline_submit_delete_removes_file() {
let p = Pipeline::new(PipelineConfig::DEFAULT);
let path = tmp_path("delete");
std::fs::write(&path, b"to-be-deleted").unwrap();
let ops = vec![BatchOp::Delete { path: path.clone() }];
p.submit(ops, snapshot_default(), false).expect("submit");
assert!(!path.exists(), "file should be gone");
}
#[test]
fn test_pipeline_submit_copy_duplicates_content() {
let p = Pipeline::new(PipelineConfig::DEFAULT);
let src = tmp_path("copy_src");
let dst = tmp_path("copy_dst");
let _g1 = scopeguard_remove(src.clone());
let _g2 = scopeguard_remove(dst.clone());
std::fs::write(&src, b"copy-me").unwrap();
let ops = vec![BatchOp::Copy {
src: src.clone(),
dst: dst.clone(),
}];
p.submit(ops, snapshot_default(), false).expect("submit");
assert_eq!(std::fs::read(&dst).unwrap(), b"copy-me");
}
#[test]
fn test_pipeline_submit_reports_failure_at_correct_index() {
let p = Pipeline::new(PipelineConfig::DEFAULT);
let good = tmp_path("good");
let _g1 = scopeguard_remove(good.clone());
let bad_dir = tmp_path("bad_dir");
std::fs::create_dir_all(&bad_dir).unwrap();
let _g_dir = scopeguard_remove_dir(bad_dir.clone());
let ops = vec![
BatchOp::Write {
path: good.clone(),
data: b"ok".to_vec(),
},
BatchOp::Write {
path: bad_dir.clone(), data: b"fail".to_vec(),
},
BatchOp::Write {
path: good.clone(),
data: b"never".to_vec(),
},
];
let result = p.submit(ops, snapshot_default(), false);
let err = result.expect_err("expected failure on op 1");
assert_eq!(err.failed_at, 1, "failed_at index");
assert_eq!(err.completed, 1, "completed count");
assert_eq!(std::fs::read(&good).unwrap(), b"ok");
}
#[test]
fn test_pipeline_drop_signals_dispatcher_to_exit() {
let start = Instant::now();
{
let p = Pipeline::new(PipelineConfig::DEFAULT);
p.submit(Vec::new(), snapshot_default(), false).unwrap();
} let elapsed = start.elapsed();
assert!(
elapsed < Duration::from_secs(2),
"drop should not stall — got {:?}",
elapsed
);
}
#[test]
fn test_pipeline_handles_many_concurrent_submitters() {
use std::sync::Arc;
let p = Arc::new(Pipeline::new(PipelineConfig::DEFAULT));
let path_base = tmp_path("concurrent");
let mut handles = Vec::new();
let n_threads = 8;
let writes_per_thread = 4;
for t in 0..n_threads {
let p = Arc::clone(&p);
let pb = path_base.clone();
handles.push(std::thread::spawn(move || {
for w in 0..writes_per_thread {
let path = PathBuf::from(format!("{}_t{}_w{}", pb.display(), t, w));
let ops = vec![BatchOp::Write {
path,
data: format!("t{}w{}", t, w).into_bytes(),
}];
p.submit(ops, snapshot_default(), false).unwrap();
}
}));
}
for h in handles {
h.join().unwrap();
}
for t in 0..n_threads {
for w in 0..writes_per_thread {
let path = PathBuf::from(format!("{}_t{}_w{}", path_base.display(), t, w));
let expected = format!("t{}w{}", t, w);
let actual = std::fs::read_to_string(&path).unwrap();
assert_eq!(actual, expected);
let _ = std::fs::remove_file(&path);
}
}
}
struct ScopeGuardFile(PathBuf);
impl Drop for ScopeGuardFile {
fn drop(&mut self) {
let _ = std::fs::remove_file(&self.0);
}
}
fn scopeguard_remove(p: PathBuf) -> ScopeGuardFile {
ScopeGuardFile(p)
}
struct ScopeGuardDir(PathBuf);
impl Drop for ScopeGuardDir {
fn drop(&mut self) {
let _ = std::fs::remove_dir_all(&self.0);
}
}
fn scopeguard_remove_dir(p: PathBuf) -> ScopeGuardDir {
ScopeGuardDir(p)
}
}