use crate::handle::Handle;
use crate::method::Method;
use crate::path::Mode;
use crate::pipeline::{Pipeline, PipelineConfig};
use crate::{Error, Result};
use std::path::PathBuf;
pub struct Builder {
method: Method,
root: Option<PathBuf>,
mode: Mode,
pipeline_config: PipelineConfig,
buffer_pool_size: usize,
buffer_pool_block: usize,
io_uring_queue_depth: u32,
}
impl Builder {
#[must_use]
pub fn new() -> Self {
Self {
method: Method::Auto,
root: None,
mode: Mode::Auto,
pipeline_config: PipelineConfig::DEFAULT,
buffer_pool_size: 64,
buffer_pool_block: 4096,
io_uring_queue_depth: 128,
}
}
#[must_use]
pub fn method(mut self, method: Method) -> Self {
self.method = method;
self
}
#[must_use]
pub fn root<P: Into<PathBuf>>(mut self, root: P) -> Self {
self.root = Some(root.into());
self
}
#[must_use]
pub fn mode(mut self, mode: Mode) -> Self {
self.mode = mode;
self
}
#[must_use]
pub fn batch_window_ms(mut self, ms: u64) -> Self {
self.pipeline_config.batch_window_ms = ms;
self
}
#[must_use]
pub fn batch_size_max(mut self, n: usize) -> Self {
self.pipeline_config.batch_size_max = n;
self
}
#[must_use]
pub fn batch_queue_max(mut self, n: usize) -> Self {
self.pipeline_config.batch_queue_max = n;
self
}
#[must_use]
pub fn buffer_pool_size(mut self, n: usize) -> Self {
self.buffer_pool_size = n;
self
}
#[must_use]
pub fn buffer_pool_block(mut self, bytes: usize) -> Self {
self.buffer_pool_block = bytes;
self
}
#[must_use]
pub fn io_uring_queue_depth(mut self, depth: u32) -> Self {
self.io_uring_queue_depth = depth;
self
}
pub fn build(self) -> Result<Handle> {
if self.method.is_reserved() {
return Err(Error::UnsupportedMethod {
method: self.method.as_str(),
});
}
let resolved_method = self.method.resolve();
let mode = self.mode.resolve();
let probe_path = self
.root
.as_deref()
.unwrap_or_else(|| std::path::Path::new("."));
let sector_size = crate::platform::probe_sector_size(probe_path);
let pipeline = Pipeline::new(self.pipeline_config);
let pool_block = align_up(self.buffer_pool_block, sector_size as usize);
let pool_config = crate::handle::HandleBufferPoolConfig {
capacity: self.buffer_pool_size,
block_size: pool_block,
block_align: sector_size as usize,
};
Ok(Handle::new_raw(
self.method,
resolved_method,
self.root,
mode,
sector_size,
pipeline,
pool_config,
self.io_uring_queue_depth,
))
}
}
impl Default for Builder {
fn default() -> Self {
Self::new()
}
}
fn align_up(n: usize, align: usize) -> usize {
if align == 0 {
return n;
}
n.div_ceil(align).saturating_mul(align)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::method::Method;
#[test]
fn test_default_build_succeeds() {
let h = Builder::new().build().expect("default build");
assert_ne!(h.active_method(), Method::Auto);
}
#[test]
fn test_builder_sets_method() {
let h = Builder::new()
.method(Method::Sync)
.build()
.expect("build with Sync");
assert_eq!(h.method(), Method::Sync);
assert_eq!(h.active_method(), Method::Sync);
}
#[test]
fn test_builder_sets_root() {
let root = std::env::temp_dir();
let h = Builder::new()
.root(root.clone())
.build()
.expect("build with root");
assert_eq!(h.root(), Some(root.as_path()));
}
#[test]
fn test_builder_rejects_reserved_method() {
let err = Builder::new().method(Method::Journal).build();
assert!(err.is_err());
if let Err(Error::UnsupportedMethod { method }) = err {
assert_eq!(method, "journal");
} else {
panic!("expected UnsupportedMethod");
}
}
#[test]
fn test_builder_sector_size_at_least_512() {
let h = Builder::new().build().expect("build");
assert!(h.sector_size() >= 512);
}
#[test]
fn test_builder_default_pipeline_config_matches_prompt() {
let b = Builder::new();
assert_eq!(b.pipeline_config.batch_window_ms, 1);
assert_eq!(b.pipeline_config.batch_size_max, 128);
assert_eq!(b.pipeline_config.batch_queue_max, 1024);
}
#[test]
fn test_builder_batch_window_ms_overrides_default() {
let b = Builder::new().batch_window_ms(5);
assert_eq!(b.pipeline_config.batch_window_ms, 5);
}
#[test]
fn test_builder_batch_size_max_overrides_default() {
let b = Builder::new().batch_size_max(64);
assert_eq!(b.pipeline_config.batch_size_max, 64);
}
#[test]
fn test_builder_batch_queue_max_overrides_default() {
let b = Builder::new().batch_queue_max(256);
assert_eq!(b.pipeline_config.batch_queue_max, 256);
}
#[test]
fn test_builder_batch_knobs_chain() {
let b = Builder::new()
.batch_window_ms(3)
.batch_size_max(200)
.batch_queue_max(2048);
assert_eq!(b.pipeline_config.batch_window_ms, 3);
assert_eq!(b.pipeline_config.batch_size_max, 200);
assert_eq!(b.pipeline_config.batch_queue_max, 2048);
}
#[test]
fn test_builder_batch_knobs_survive_build() {
let h = Builder::new()
.method(Method::Sync)
.batch_window_ms(5)
.batch_size_max(16)
.batch_queue_max(8)
.build()
.expect("build with tuned pipeline");
assert_eq!(h.method(), Method::Sync);
let p = std::env::temp_dir().join(format!(
"fsys_builder_knobs_{}_{}",
std::process::id(),
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos())
.unwrap_or(0)
));
let _g = scopeguard_remove_file(p.clone());
h.write_batch(&[(p.as_path(), b"x".as_slice())])
.expect("batch flow");
assert_eq!(std::fs::read(&p).unwrap(), b"x");
}
fn scopeguard_remove_file(p: PathBuf) -> impl Drop {
struct Guard(PathBuf);
impl Drop for Guard {
fn drop(&mut self) {
let _ = std::fs::remove_file(&self.0);
}
}
Guard(p)
}
#[test]
fn test_builder_batch_window_zero_is_accepted() {
let b = Builder::new().batch_window_ms(0);
assert_eq!(b.pipeline_config.batch_window_ms, 0);
}
#[test]
fn test_builder_batch_size_zero_is_accepted() {
let b = Builder::new().batch_size_max(0);
assert_eq!(b.pipeline_config.batch_size_max, 0);
}
#[test]
fn test_builder_default_buffer_pool_knobs_match_prompt() {
let b = Builder::new();
assert_eq!(b.buffer_pool_size, 64);
assert_eq!(b.buffer_pool_block, 4096);
assert_eq!(b.io_uring_queue_depth, 128);
}
#[test]
fn test_builder_buffer_pool_size_overrides_default() {
let b = Builder::new().buffer_pool_size(16);
assert_eq!(b.buffer_pool_size, 16);
}
#[test]
fn test_builder_buffer_pool_block_overrides_default() {
let b = Builder::new().buffer_pool_block(65_536);
assert_eq!(b.buffer_pool_block, 65_536);
}
#[test]
fn test_builder_io_uring_queue_depth_overrides_default() {
let b = Builder::new().io_uring_queue_depth(256);
assert_eq!(b.io_uring_queue_depth, 256);
}
#[test]
fn test_builder_buffer_pool_knobs_chain() {
let b = Builder::new()
.buffer_pool_size(32)
.buffer_pool_block(8192)
.io_uring_queue_depth(64);
assert_eq!(b.buffer_pool_size, 32);
assert_eq!(b.buffer_pool_block, 8192);
assert_eq!(b.io_uring_queue_depth, 64);
}
#[test]
fn test_handle_buffer_pool_lazy_init() {
let h = Builder::new()
.buffer_pool_size(8)
.buffer_pool_block(4096)
.build()
.expect("build");
let pool = h.buffer_pool().expect("buffer pool");
assert_eq!(pool.capacity(), 8);
assert!(pool.block_size() >= 4096);
}
#[test]
fn test_align_up_known_inputs() {
assert_eq!(align_up(1, 512), 512);
assert_eq!(align_up(512, 512), 512);
assert_eq!(align_up(513, 512), 1024);
assert_eq!(align_up(100, 0), 100);
}
}