use crate::handle::Handle;
use crate::method::Method;
use crate::observer::FsysObserver;
use crate::path::Mode;
use crate::pipeline::{Pipeline, PipelineConfig};
use crate::{Error, Result};
use std::path::PathBuf;
use std::sync::Arc;
pub struct Builder {
method: Method,
root: Option<PathBuf>,
mode: Mode,
pipeline_config: PipelineConfig,
buffer_pool_count: usize,
buffer_pool_block_size: usize,
io_uring_queue_depth: u32,
iouring_sqpoll_idle_ms: Option<u32>,
observer: Option<Arc<dyn FsysObserver>>,
spdk: SpdkConfig,
}
#[derive(Debug, Clone)]
pub struct SpdkConfig {
pub device: Option<crate::capability::PciAddress>,
pub queue_depth: u32,
pub polling_threads: Option<usize>,
pub hugepage_size_mb: u64,
}
impl Default for SpdkConfig {
fn default() -> Self {
Self {
device: None,
queue_depth: 256,
polling_threads: None,
hugepage_size_mb: 1024,
}
}
}
impl Builder {
#[must_use]
pub fn new() -> Self {
Self {
method: Method::Auto,
root: None,
mode: Mode::Auto,
pipeline_config: PipelineConfig::DEFAULT,
buffer_pool_count: 64,
buffer_pool_block_size: 4096,
io_uring_queue_depth: 128,
iouring_sqpoll_idle_ms: None,
observer: None,
spdk: SpdkConfig::default(),
}
}
#[must_use]
pub fn spdk_device(mut self, pci_addr: &str) -> Self {
if let Some(addr) = crate::capability::PciAddress::parse(pci_addr) {
self.spdk.device = Some(addr);
}
self
}
#[must_use]
pub fn spdk_queue_depth(mut self, depth: u32) -> Self {
self.spdk.queue_depth = depth;
self
}
#[must_use]
pub fn spdk_polling_threads(mut self, n: usize) -> Self {
self.spdk.polling_threads = if n == 0 { None } else { Some(n) };
self
}
#[must_use]
pub fn spdk_hugepage_size_mb(mut self, mb: u64) -> Self {
self.spdk.hugepage_size_mb = mb;
self
}
#[must_use]
pub fn method(mut self, method: Method) -> Self {
self.method = method;
self
}
#[must_use]
pub fn root<P: Into<PathBuf>>(mut self, root: P) -> Self {
self.root = Some(root.into());
self
}
#[must_use]
pub fn mode(mut self, mode: Mode) -> Self {
self.mode = mode;
self
}
#[must_use]
pub fn batch_window_ms(mut self, ms: u64) -> Self {
self.pipeline_config.batch_window_ms = ms;
self
}
#[must_use]
pub fn batch_size_max(mut self, n: usize) -> Self {
self.pipeline_config.batch_size_max = n;
self
}
#[must_use]
pub fn batch_queue_max(mut self, n: usize) -> Self {
self.pipeline_config.batch_queue_max = n;
self
}
#[must_use]
pub fn buffer_pool_count(mut self, n: usize) -> Self {
self.buffer_pool_count = n;
self
}
#[must_use]
pub fn buffer_pool_block_size(mut self, bytes: usize) -> Self {
self.buffer_pool_block_size = bytes;
self
}
#[must_use]
pub fn io_uring_queue_depth(mut self, depth: u32) -> Self {
self.io_uring_queue_depth = depth;
self
}
#[must_use]
pub fn sqpoll(mut self, idle_ms: u32) -> Self {
self.iouring_sqpoll_idle_ms = Some(idle_ms);
self
}
#[must_use]
pub fn dispatcher_shards(mut self, shards: usize) -> Self {
self.pipeline_config.dispatcher_shards = shards.clamp(1, 64);
self
}
#[must_use]
pub fn tune_for(mut self, workload: Workload) -> Self {
match workload {
Workload::Default => {
self.buffer_pool_count = 64;
self.buffer_pool_block_size = 4096;
self.io_uring_queue_depth = 128;
self.pipeline_config.batch_queue_max = 1024;
}
Workload::Database => {
self.buffer_pool_count = 1024;
self.buffer_pool_block_size = 8192;
self.io_uring_queue_depth = 256;
self.pipeline_config.batch_queue_max = 4096;
}
}
self
}
#[must_use]
pub fn observer(mut self, observer: Arc<dyn FsysObserver>) -> Self {
self.observer = Some(observer);
self
}
pub fn build(self) -> Result<Handle> {
if self.method.is_reserved() {
return Err(Error::UnsupportedMethod {
method: self.method.as_str(),
});
}
if self.method == Method::Spdk {
#[cfg(not(feature = "spdk"))]
{
return Err(Error::FeatureNotEnabled { feature: "spdk" });
}
#[cfg(feature = "spdk")]
{
let caps = crate::capability::capabilities();
if !caps.spdk_eligible {
let reason = caps
.first_spdk_skip_reason()
.cloned()
.unwrap_or(crate::capability::SpdkSkipReason::NotLinux);
return Err(Error::SpdkUnavailable { reason });
}
return Err(Error::SpdkUnavailable {
reason: crate::capability::SpdkSkipReason::SpdkLibraryNotFound,
});
}
}
let resolved_method = self.method.resolve();
let mode = self.mode.resolve();
let canonical_root = match self.root {
None => None,
Some(r) => match std::fs::canonicalize(&r) {
Ok(canon) => Some(canon),
Err(e) => {
return Err(Error::InvalidPath {
path: r,
reason: format!(
"Builder::root canonicalisation failed (path must exist and be a directory): {e}"
),
})
}
},
};
let probe_path = canonical_root
.as_deref()
.unwrap_or_else(|| std::path::Path::new("."));
let sector_size = crate::platform::probe_sector_size(probe_path);
let pipeline = Pipeline::new(self.pipeline_config);
let pool_block = align_up(self.buffer_pool_block_size, sector_size as usize);
let pool_config = crate::handle::HandleBufferPoolConfig {
capacity: self.buffer_pool_count,
block_size: pool_block,
block_align: sector_size as usize,
};
Ok(Handle::new_raw(
self.method,
resolved_method,
canonical_root,
mode,
sector_size,
pipeline,
pool_config,
self.io_uring_queue_depth,
self.iouring_sqpoll_idle_ms,
self.observer,
))
}
}
impl Default for Builder {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum Workload {
Default,
Database,
}
fn align_up(n: usize, align: usize) -> usize {
if align == 0 {
return n;
}
n.div_ceil(align).saturating_mul(align)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::method::Method;
#[test]
fn test_default_build_succeeds() {
let h = Builder::new().build().expect("default build");
assert_ne!(h.active_method(), Method::Auto);
}
#[test]
fn test_builder_sets_method() {
let h = Builder::new()
.method(Method::Sync)
.build()
.expect("build with Sync");
assert_eq!(h.method(), Method::Sync);
assert_eq!(h.active_method(), Method::Sync);
}
#[test]
fn test_builder_sqpoll_knob_is_idempotent_on_default_handle() {
let h_default = Builder::new().build().expect("default");
let h_sqpoll = Builder::new().sqpoll(1000).build().expect("sqpoll(1000)");
assert_eq!(h_default.method(), h_sqpoll.method());
assert_eq!(h_default.active_method(), h_sqpoll.active_method());
assert_eq!(h_default.sector_size(), h_sqpoll.sector_size());
}
#[test]
fn test_builder_sets_root() {
let root = std::env::temp_dir();
let canonical_root = std::fs::canonicalize(&root).expect("canonicalize temp");
let h = Builder::new().root(root).build().expect("build with root");
assert_eq!(h.root(), Some(canonical_root.as_path()));
}
#[test]
fn test_builder_rejects_nonexistent_root() {
let bogus = std::env::temp_dir().join("fsys_intentionally_missing_root_xyz_abc");
let _ = std::fs::remove_dir_all(&bogus);
let result = Builder::new().root(&bogus).build();
assert!(matches!(result, Err(Error::InvalidPath { .. })));
}
#[test]
fn test_builder_rejects_reserved_method() {
let err = Builder::new().method(Method::Journal).build();
assert!(err.is_err());
if let Err(Error::UnsupportedMethod { method }) = err {
assert_eq!(method, "journal");
} else {
panic!("expected UnsupportedMethod");
}
}
#[test]
fn test_builder_sector_size_at_least_512() {
let h = Builder::new().build().expect("build");
assert!(h.sector_size() >= 512);
}
#[test]
fn test_builder_default_pipeline_config_matches_prompt() {
let b = Builder::new();
assert_eq!(b.pipeline_config.batch_window_ms, 1);
assert_eq!(b.pipeline_config.batch_size_max, 128);
assert_eq!(b.pipeline_config.batch_queue_max, 1024);
}
#[test]
fn test_builder_batch_window_ms_overrides_default() {
let b = Builder::new().batch_window_ms(5);
assert_eq!(b.pipeline_config.batch_window_ms, 5);
}
#[test]
fn test_builder_batch_size_max_overrides_default() {
let b = Builder::new().batch_size_max(64);
assert_eq!(b.pipeline_config.batch_size_max, 64);
}
#[test]
fn test_builder_batch_queue_max_overrides_default() {
let b = Builder::new().batch_queue_max(256);
assert_eq!(b.pipeline_config.batch_queue_max, 256);
}
#[test]
fn test_builder_batch_knobs_chain() {
let b = Builder::new()
.batch_window_ms(3)
.batch_size_max(200)
.batch_queue_max(2048);
assert_eq!(b.pipeline_config.batch_window_ms, 3);
assert_eq!(b.pipeline_config.batch_size_max, 200);
assert_eq!(b.pipeline_config.batch_queue_max, 2048);
}
#[test]
fn test_builder_batch_knobs_survive_build() {
let h = Builder::new()
.method(Method::Sync)
.batch_window_ms(5)
.batch_size_max(16)
.batch_queue_max(8)
.build()
.expect("build with tuned pipeline");
assert_eq!(h.method(), Method::Sync);
let p = std::env::temp_dir().join(format!(
"fsys_builder_knobs_{}_{}",
std::process::id(),
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos())
.unwrap_or(0)
));
let _g = scopeguard_remove_file(p.clone());
h.write_batch(&[(p.as_path(), b"x".as_slice())])
.expect("batch flow");
assert_eq!(std::fs::read(&p).unwrap(), b"x");
}
fn scopeguard_remove_file(p: PathBuf) -> impl Drop {
struct Guard(PathBuf);
impl Drop for Guard {
fn drop(&mut self) {
let _ = std::fs::remove_file(&self.0);
}
}
Guard(p)
}
#[test]
fn test_builder_batch_window_zero_is_accepted() {
let b = Builder::new().batch_window_ms(0);
assert_eq!(b.pipeline_config.batch_window_ms, 0);
}
#[test]
fn test_builder_batch_size_zero_is_accepted() {
let b = Builder::new().batch_size_max(0);
assert_eq!(b.pipeline_config.batch_size_max, 0);
}
#[test]
fn test_builder_default_buffer_pool_knobs_match_prompt() {
let b = Builder::new();
assert_eq!(b.buffer_pool_count, 64);
assert_eq!(b.buffer_pool_block_size, 4096);
assert_eq!(b.io_uring_queue_depth, 128);
}
#[test]
fn test_builder_buffer_pool_count_overrides_default() {
let b = Builder::new().buffer_pool_count(16);
assert_eq!(b.buffer_pool_count, 16);
}
#[test]
fn test_builder_buffer_pool_block_size_overrides_default() {
let b = Builder::new().buffer_pool_block_size(65_536);
assert_eq!(b.buffer_pool_block_size, 65_536);
}
#[test]
fn test_builder_io_uring_queue_depth_overrides_default() {
let b = Builder::new().io_uring_queue_depth(256);
assert_eq!(b.io_uring_queue_depth, 256);
}
#[test]
fn test_builder_buffer_pool_knobs_chain() {
let b = Builder::new()
.buffer_pool_count(32)
.buffer_pool_block_size(8192)
.io_uring_queue_depth(64);
assert_eq!(b.buffer_pool_count, 32);
assert_eq!(b.buffer_pool_block_size, 8192);
assert_eq!(b.io_uring_queue_depth, 64);
}
#[test]
fn test_handle_buffer_pool_lazy_init() {
let h = Builder::new()
.buffer_pool_count(8)
.buffer_pool_block_size(4096)
.build()
.expect("build");
let pool = h.buffer_pool().expect("buffer pool");
assert_eq!(pool.capacity(), 8);
assert!(pool.block_size() >= 4096);
}
#[test]
fn test_align_up_known_inputs() {
assert_eq!(align_up(1, 512), 512);
assert_eq!(align_up(512, 512), 512);
assert_eq!(align_up(513, 512), 1024);
assert_eq!(align_up(100, 0), 100);
}
#[test]
fn test_tune_for_database_sets_coordinated_knobs() {
let b = Builder::new().tune_for(Workload::Database);
assert_eq!(b.buffer_pool_count, 1024);
assert_eq!(b.buffer_pool_block_size, 8192);
assert_eq!(b.io_uring_queue_depth, 256);
assert_eq!(b.pipeline_config.batch_queue_max, 4096);
}
#[test]
fn test_tune_for_default_restores_baseline() {
let b = Builder::new()
.tune_for(Workload::Database)
.tune_for(Workload::Default);
assert_eq!(b.buffer_pool_count, 64);
assert_eq!(b.buffer_pool_block_size, 4096);
assert_eq!(b.io_uring_queue_depth, 128);
assert_eq!(b.pipeline_config.batch_queue_max, 1024);
}
#[test]
fn test_tune_for_then_individual_setter_overrides() {
let b = Builder::new()
.tune_for(Workload::Database)
.buffer_pool_count(2048)
.io_uring_queue_depth(512);
assert_eq!(b.buffer_pool_count, 2048);
assert_eq!(b.io_uring_queue_depth, 512);
assert_eq!(b.buffer_pool_block_size, 8192);
assert_eq!(b.pipeline_config.batch_queue_max, 4096);
}
#[test]
fn test_tune_for_database_builds_handle() {
let h = Builder::new()
.tune_for(Workload::Database)
.build()
.expect("database preset build");
let pool = h.buffer_pool().expect("buffer pool");
assert_eq!(pool.capacity(), 1024);
assert!(pool.block_size() >= 8192);
}
#[test]
fn test_spdk_config_defaults_match_specification() {
let b = Builder::new();
assert_eq!(b.spdk.queue_depth, 256);
assert!(b.spdk.device.is_none());
assert!(b.spdk.polling_threads.is_none());
assert_eq!(b.spdk.hugepage_size_mb, 1024);
}
#[test]
fn test_spdk_device_accepts_canonical_pci_address() {
let b = Builder::new().spdk_device("0000:81:00.0");
let addr = b.spdk.device.expect("address parsed");
assert_eq!(addr.domain, 0);
assert_eq!(addr.bus, 0x81);
assert_eq!(addr.device, 0);
assert_eq!(addr.function, 0);
}
#[test]
fn test_spdk_device_rejects_garbage_silently() {
let b = Builder::new().spdk_device("not-a-pci-address");
assert!(b.spdk.device.is_none());
}
#[test]
fn test_spdk_queue_depth_override() {
let b = Builder::new().spdk_queue_depth(512);
assert_eq!(b.spdk.queue_depth, 512);
}
#[test]
fn test_spdk_polling_threads_zero_means_use_default() {
let b = Builder::new().spdk_polling_threads(0);
assert!(b.spdk.polling_threads.is_none());
}
#[test]
fn test_spdk_polling_threads_nonzero_value_persisted() {
let b = Builder::new().spdk_polling_threads(4);
assert_eq!(b.spdk.polling_threads, Some(4));
}
#[test]
fn test_spdk_hugepage_size_override() {
let b = Builder::new().spdk_hugepage_size_mb(2048);
assert_eq!(b.spdk.hugepage_size_mb, 2048);
}
#[test]
fn test_spdk_builder_methods_chain() {
let b = Builder::new()
.spdk_device("0000:01:00.0")
.spdk_queue_depth(128)
.spdk_polling_threads(2)
.spdk_hugepage_size_mb(512);
assert_eq!(b.spdk.queue_depth, 128);
assert_eq!(b.spdk.polling_threads, Some(2));
assert_eq!(b.spdk.hugepage_size_mb, 512);
assert!(b.spdk.device.is_some());
}
#[test]
#[cfg(not(feature = "spdk"))]
fn test_build_with_method_spdk_returns_feature_not_enabled_without_feature() {
match Builder::new().method(Method::Spdk).build() {
Err(Error::FeatureNotEnabled { feature }) => assert_eq!(feature, "spdk"),
Err(other) => panic!("expected FeatureNotEnabled, got {other:?}"),
Ok(_) => panic!("expected build to fail without spdk feature"),
}
}
#[test]
#[cfg(feature = "spdk")]
fn test_build_with_method_spdk_returns_spdk_unavailable_when_feature_on() {
match Builder::new().method(Method::Spdk).build() {
Err(Error::SpdkUnavailable { .. }) => {}
Err(other) => panic!("expected SpdkUnavailable, got {other:?}"),
Ok(_) => panic!("expected build to fail in 1.1.0 with spdk feature on"),
}
}
}