use crate::handle::Handle;
use crate::method::Method;
use crate::observer::FsysObserver;
use crate::path::Mode;
use crate::pipeline::{Pipeline, PipelineConfig};
use crate::{Error, Result};
use std::path::PathBuf;
use std::sync::Arc;
pub struct Builder {
method: Method,
root: Option<PathBuf>,
mode: Mode,
pipeline_config: PipelineConfig,
buffer_pool_count: usize,
buffer_pool_block_size: usize,
io_uring_queue_depth: u32,
observer: Option<Arc<dyn FsysObserver>>,
}
impl Builder {
#[must_use]
pub fn new() -> Self {
Self {
method: Method::Auto,
root: None,
mode: Mode::Auto,
pipeline_config: PipelineConfig::DEFAULT,
buffer_pool_count: 64,
buffer_pool_block_size: 4096,
io_uring_queue_depth: 128,
observer: None,
}
}
#[must_use]
pub fn method(mut self, method: Method) -> Self {
self.method = method;
self
}
#[must_use]
pub fn root<P: Into<PathBuf>>(mut self, root: P) -> Self {
self.root = Some(root.into());
self
}
#[must_use]
pub fn mode(mut self, mode: Mode) -> Self {
self.mode = mode;
self
}
#[must_use]
pub fn batch_window_ms(mut self, ms: u64) -> Self {
self.pipeline_config.batch_window_ms = ms;
self
}
#[must_use]
pub fn batch_size_max(mut self, n: usize) -> Self {
self.pipeline_config.batch_size_max = n;
self
}
#[must_use]
pub fn batch_queue_max(mut self, n: usize) -> Self {
self.pipeline_config.batch_queue_max = n;
self
}
#[must_use]
pub fn buffer_pool_count(mut self, n: usize) -> Self {
self.buffer_pool_count = n;
self
}
#[must_use]
pub fn buffer_pool_block_size(mut self, bytes: usize) -> Self {
self.buffer_pool_block_size = bytes;
self
}
#[must_use]
pub fn io_uring_queue_depth(mut self, depth: u32) -> Self {
self.io_uring_queue_depth = depth;
self
}
#[must_use]
pub fn dispatcher_shards(mut self, shards: usize) -> Self {
self.pipeline_config.dispatcher_shards = shards.clamp(1, 64);
self
}
#[must_use]
pub fn tune_for(mut self, workload: Workload) -> Self {
match workload {
Workload::Default => {
self.buffer_pool_count = 64;
self.buffer_pool_block_size = 4096;
self.io_uring_queue_depth = 128;
self.pipeline_config.batch_queue_max = 1024;
}
Workload::Database => {
self.buffer_pool_count = 1024;
self.buffer_pool_block_size = 8192;
self.io_uring_queue_depth = 256;
self.pipeline_config.batch_queue_max = 4096;
}
}
self
}
#[must_use]
pub fn observer(mut self, observer: Arc<dyn FsysObserver>) -> Self {
self.observer = Some(observer);
self
}
pub fn build(self) -> Result<Handle> {
if self.method.is_reserved() {
return Err(Error::UnsupportedMethod {
method: self.method.as_str(),
});
}
let resolved_method = self.method.resolve();
let mode = self.mode.resolve();
let canonical_root = match self.root {
None => None,
Some(r) => match std::fs::canonicalize(&r) {
Ok(canon) => Some(canon),
Err(e) => {
return Err(Error::InvalidPath {
path: r,
reason: format!(
"Builder::root canonicalisation failed (path must exist and be a directory): {e}"
),
})
}
},
};
let probe_path = canonical_root
.as_deref()
.unwrap_or_else(|| std::path::Path::new("."));
let sector_size = crate::platform::probe_sector_size(probe_path);
let pipeline = Pipeline::new(self.pipeline_config);
let pool_block = align_up(self.buffer_pool_block_size, sector_size as usize);
let pool_config = crate::handle::HandleBufferPoolConfig {
capacity: self.buffer_pool_count,
block_size: pool_block,
block_align: sector_size as usize,
};
Ok(Handle::new_raw(
self.method,
resolved_method,
canonical_root,
mode,
sector_size,
pipeline,
pool_config,
self.io_uring_queue_depth,
self.observer,
))
}
}
impl Default for Builder {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum Workload {
Default,
Database,
}
fn align_up(n: usize, align: usize) -> usize {
if align == 0 {
return n;
}
n.div_ceil(align).saturating_mul(align)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::method::Method;
#[test]
fn test_default_build_succeeds() {
let h = Builder::new().build().expect("default build");
assert_ne!(h.active_method(), Method::Auto);
}
#[test]
fn test_builder_sets_method() {
let h = Builder::new()
.method(Method::Sync)
.build()
.expect("build with Sync");
assert_eq!(h.method(), Method::Sync);
assert_eq!(h.active_method(), Method::Sync);
}
#[test]
fn test_builder_sets_root() {
let root = std::env::temp_dir();
let canonical_root = std::fs::canonicalize(&root).expect("canonicalize temp");
let h = Builder::new().root(root).build().expect("build with root");
assert_eq!(h.root(), Some(canonical_root.as_path()));
}
#[test]
fn test_builder_rejects_nonexistent_root() {
let bogus = std::env::temp_dir().join("fsys_intentionally_missing_root_xyz_abc");
let _ = std::fs::remove_dir_all(&bogus);
let result = Builder::new().root(&bogus).build();
assert!(matches!(result, Err(Error::InvalidPath { .. })));
}
#[test]
fn test_builder_rejects_reserved_method() {
let err = Builder::new().method(Method::Journal).build();
assert!(err.is_err());
if let Err(Error::UnsupportedMethod { method }) = err {
assert_eq!(method, "journal");
} else {
panic!("expected UnsupportedMethod");
}
}
#[test]
fn test_builder_sector_size_at_least_512() {
let h = Builder::new().build().expect("build");
assert!(h.sector_size() >= 512);
}
#[test]
fn test_builder_default_pipeline_config_matches_prompt() {
let b = Builder::new();
assert_eq!(b.pipeline_config.batch_window_ms, 1);
assert_eq!(b.pipeline_config.batch_size_max, 128);
assert_eq!(b.pipeline_config.batch_queue_max, 1024);
}
#[test]
fn test_builder_batch_window_ms_overrides_default() {
let b = Builder::new().batch_window_ms(5);
assert_eq!(b.pipeline_config.batch_window_ms, 5);
}
#[test]
fn test_builder_batch_size_max_overrides_default() {
let b = Builder::new().batch_size_max(64);
assert_eq!(b.pipeline_config.batch_size_max, 64);
}
#[test]
fn test_builder_batch_queue_max_overrides_default() {
let b = Builder::new().batch_queue_max(256);
assert_eq!(b.pipeline_config.batch_queue_max, 256);
}
#[test]
fn test_builder_batch_knobs_chain() {
let b = Builder::new()
.batch_window_ms(3)
.batch_size_max(200)
.batch_queue_max(2048);
assert_eq!(b.pipeline_config.batch_window_ms, 3);
assert_eq!(b.pipeline_config.batch_size_max, 200);
assert_eq!(b.pipeline_config.batch_queue_max, 2048);
}
#[test]
fn test_builder_batch_knobs_survive_build() {
let h = Builder::new()
.method(Method::Sync)
.batch_window_ms(5)
.batch_size_max(16)
.batch_queue_max(8)
.build()
.expect("build with tuned pipeline");
assert_eq!(h.method(), Method::Sync);
let p = std::env::temp_dir().join(format!(
"fsys_builder_knobs_{}_{}",
std::process::id(),
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos())
.unwrap_or(0)
));
let _g = scopeguard_remove_file(p.clone());
h.write_batch(&[(p.as_path(), b"x".as_slice())])
.expect("batch flow");
assert_eq!(std::fs::read(&p).unwrap(), b"x");
}
fn scopeguard_remove_file(p: PathBuf) -> impl Drop {
struct Guard(PathBuf);
impl Drop for Guard {
fn drop(&mut self) {
let _ = std::fs::remove_file(&self.0);
}
}
Guard(p)
}
#[test]
fn test_builder_batch_window_zero_is_accepted() {
let b = Builder::new().batch_window_ms(0);
assert_eq!(b.pipeline_config.batch_window_ms, 0);
}
#[test]
fn test_builder_batch_size_zero_is_accepted() {
let b = Builder::new().batch_size_max(0);
assert_eq!(b.pipeline_config.batch_size_max, 0);
}
#[test]
fn test_builder_default_buffer_pool_knobs_match_prompt() {
let b = Builder::new();
assert_eq!(b.buffer_pool_count, 64);
assert_eq!(b.buffer_pool_block_size, 4096);
assert_eq!(b.io_uring_queue_depth, 128);
}
#[test]
fn test_builder_buffer_pool_count_overrides_default() {
let b = Builder::new().buffer_pool_count(16);
assert_eq!(b.buffer_pool_count, 16);
}
#[test]
fn test_builder_buffer_pool_block_size_overrides_default() {
let b = Builder::new().buffer_pool_block_size(65_536);
assert_eq!(b.buffer_pool_block_size, 65_536);
}
#[test]
fn test_builder_io_uring_queue_depth_overrides_default() {
let b = Builder::new().io_uring_queue_depth(256);
assert_eq!(b.io_uring_queue_depth, 256);
}
#[test]
fn test_builder_buffer_pool_knobs_chain() {
let b = Builder::new()
.buffer_pool_count(32)
.buffer_pool_block_size(8192)
.io_uring_queue_depth(64);
assert_eq!(b.buffer_pool_count, 32);
assert_eq!(b.buffer_pool_block_size, 8192);
assert_eq!(b.io_uring_queue_depth, 64);
}
#[test]
fn test_handle_buffer_pool_lazy_init() {
let h = Builder::new()
.buffer_pool_count(8)
.buffer_pool_block_size(4096)
.build()
.expect("build");
let pool = h.buffer_pool().expect("buffer pool");
assert_eq!(pool.capacity(), 8);
assert!(pool.block_size() >= 4096);
}
#[test]
fn test_align_up_known_inputs() {
assert_eq!(align_up(1, 512), 512);
assert_eq!(align_up(512, 512), 512);
assert_eq!(align_up(513, 512), 1024);
assert_eq!(align_up(100, 0), 100);
}
#[test]
fn test_tune_for_database_sets_coordinated_knobs() {
let b = Builder::new().tune_for(Workload::Database);
assert_eq!(b.buffer_pool_count, 1024);
assert_eq!(b.buffer_pool_block_size, 8192);
assert_eq!(b.io_uring_queue_depth, 256);
assert_eq!(b.pipeline_config.batch_queue_max, 4096);
}
#[test]
fn test_tune_for_default_restores_baseline() {
let b = Builder::new()
.tune_for(Workload::Database)
.tune_for(Workload::Default);
assert_eq!(b.buffer_pool_count, 64);
assert_eq!(b.buffer_pool_block_size, 4096);
assert_eq!(b.io_uring_queue_depth, 128);
assert_eq!(b.pipeline_config.batch_queue_max, 1024);
}
#[test]
fn test_tune_for_then_individual_setter_overrides() {
let b = Builder::new()
.tune_for(Workload::Database)
.buffer_pool_count(2048)
.io_uring_queue_depth(512);
assert_eq!(b.buffer_pool_count, 2048);
assert_eq!(b.io_uring_queue_depth, 512);
assert_eq!(b.buffer_pool_block_size, 8192);
assert_eq!(b.pipeline_config.batch_queue_max, 4096);
}
#[test]
fn test_tune_for_database_builds_handle() {
let h = Builder::new()
.tune_for(Workload::Database)
.build()
.expect("database preset build");
let pool = h.buffer_pool().expect("buffer pool");
assert_eq!(pool.capacity(), 1024);
assert!(pool.block_size() >= 8192);
}
}