use crate::store::RestartPolicy;
use std::path::PathBuf;
use std::sync::Arc;
#[cfg(feature = "dangerous-test-hooks")]
use crate::store::fault::FaultInjector;
#[derive(Clone, Debug, Default)]
pub enum SyncMode {
#[default]
SyncAll,
SyncData,
}
#[derive(Clone, Debug, Default)]
pub enum IndexLayout {
#[default]
AoS,
SoA,
AoSoA8,
AoSoA16,
AoSoA64,
SoAoS,
}
#[derive(Clone, Debug)]
pub struct BatchConfig {
pub max_size: u32,
pub max_bytes: u32,
pub group_commit_max_batch: u32,
}
impl Default for BatchConfig {
fn default() -> Self {
Self {
max_size: 256,
max_bytes: 1024 * 1024,
group_commit_max_batch: 1,
}
}
}
#[derive(Clone, Debug)]
pub struct WriterConfig {
pub channel_capacity: usize,
pub stack_size: Option<usize>,
pub restart_policy: RestartPolicy,
pub shutdown_drain_limit: usize,
}
impl Default for WriterConfig {
fn default() -> Self {
Self {
channel_capacity: 4096,
stack_size: None,
restart_policy: RestartPolicy::default(),
shutdown_drain_limit: 1024,
}
}
}
#[derive(Clone, Debug)]
pub struct SyncConfig {
pub mode: SyncMode,
pub every_n_events: u32,
}
impl Default for SyncConfig {
fn default() -> Self {
Self {
mode: SyncMode::default(),
every_n_events: 1000,
}
}
}
#[derive(Clone, Debug)]
pub struct IndexConfig {
pub layout: IndexLayout,
pub incremental_projection: bool,
pub enable_checkpoint: bool,
}
impl Default for IndexConfig {
fn default() -> Self {
Self {
layout: IndexLayout::default(),
incremental_projection: false,
enable_checkpoint: true,
}
}
}
pub struct StoreConfig {
pub data_dir: PathBuf,
pub segment_max_bytes: u64,
pub fd_budget: usize,
pub broadcast_capacity: usize,
pub single_append_max_bytes: u32,
pub batch: BatchConfig,
pub writer: WriterConfig,
pub sync: SyncConfig,
pub index: IndexConfig,
pub clock: Option<Arc<dyn Fn() -> i64 + Send + Sync>>,
#[cfg(feature = "dangerous-test-hooks")]
pub fault_injector: Option<Arc<dyn FaultInjector>>,
}
impl StoreConfig {
pub fn new(data_dir: impl Into<PathBuf>) -> Self {
Self {
data_dir: data_dir.into(),
segment_max_bytes: 256 * 1024 * 1024,
fd_budget: 64,
broadcast_capacity: 8192,
single_append_max_bytes: 16 * 1024 * 1024,
batch: BatchConfig::default(),
writer: WriterConfig::default(),
sync: SyncConfig::default(),
index: IndexConfig::default(),
clock: None,
#[cfg(feature = "dangerous-test-hooks")]
fault_injector: None,
}
}
pub(crate) fn validate(&self) -> Result<(), crate::store::StoreError> {
if self.segment_max_bytes == 0 {
return Err(crate::store::StoreError::Configuration(
"segment_max_bytes must be > 0".into(),
));
}
if self.writer.channel_capacity == 0 {
return Err(crate::store::StoreError::Configuration(
"writer.channel_capacity must be > 0 (0 creates a rendezvous channel that deadlocks)".into(),
));
}
if self.fd_budget == 0 {
return Err(crate::store::StoreError::Configuration(
"fd_budget must be > 0".into(),
));
}
if self.broadcast_capacity == 0 {
return Err(crate::store::StoreError::Configuration(
"broadcast_capacity must be > 0 (0 creates rendezvous channels that starve subscribers)".into(),
));
}
if self.single_append_max_bytes == 0 || self.single_append_max_bytes > 64 * 1024 * 1024 {
return Err(crate::store::StoreError::Configuration(
"single_append_max_bytes must be 1..=64MB".into(),
));
}
if self.batch.max_size == 0 || self.batch.max_size > 4096 {
return Err(crate::store::StoreError::Configuration(
"batch.max_size must be 1..=4096".into(),
));
}
if self.batch.max_bytes == 0 || self.batch.max_bytes > 16 * 1024 * 1024 {
return Err(crate::store::StoreError::Configuration(
"batch.max_bytes must be 1..=16MB".into(),
));
}
Ok(())
}
pub fn with_segment_max_bytes(mut self, segment_max_bytes: u64) -> Self {
self.segment_max_bytes = segment_max_bytes;
self
}
pub fn with_sync_every_n_events(mut self, sync_every_n_events: u32) -> Self {
self.sync.every_n_events = sync_every_n_events;
self
}
pub fn with_fd_budget(mut self, fd_budget: usize) -> Self {
self.fd_budget = fd_budget;
self
}
pub fn with_writer_channel_capacity(mut self, writer_channel_capacity: usize) -> Self {
self.writer.channel_capacity = writer_channel_capacity;
self
}
pub fn with_broadcast_capacity(mut self, broadcast_capacity: usize) -> Self {
self.broadcast_capacity = broadcast_capacity;
self
}
pub fn with_single_append_max_bytes(mut self, single_append_max_bytes: u32) -> Self {
self.single_append_max_bytes = single_append_max_bytes;
self
}
pub fn with_restart_policy(mut self, restart_policy: RestartPolicy) -> Self {
self.writer.restart_policy = restart_policy;
self
}
pub fn with_shutdown_drain_limit(mut self, shutdown_drain_limit: usize) -> Self {
self.writer.shutdown_drain_limit = shutdown_drain_limit;
self
}
pub fn with_writer_stack_size(mut self, writer_stack_size: Option<usize>) -> Self {
self.writer.stack_size = writer_stack_size;
self
}
pub fn with_clock(mut self, clock: Option<Arc<dyn Fn() -> i64 + Send + Sync>>) -> Self {
self.clock = clock;
self
}
pub fn with_sync_mode(mut self, sync_mode: SyncMode) -> Self {
self.sync.mode = sync_mode;
self
}
pub fn with_group_commit_max_batch(mut self, group_commit_max_batch: u32) -> Self {
self.batch.group_commit_max_batch = group_commit_max_batch;
self
}
pub fn with_index_layout(mut self, index_layout: IndexLayout) -> Self {
self.index.layout = index_layout;
self
}
pub fn with_incremental_projection(mut self, incremental_projection: bool) -> Self {
self.index.incremental_projection = incremental_projection;
self
}
pub fn with_enable_checkpoint(mut self, enable_checkpoint: bool) -> Self {
self.index.enable_checkpoint = enable_checkpoint;
self
}
pub fn with_batch_max_size(mut self, batch_max_size: u32) -> Self {
self.batch.max_size = batch_max_size;
self
}
pub fn with_batch_max_bytes(mut self, batch_max_bytes: u32) -> Self {
self.batch.max_bytes = batch_max_bytes;
self
}
pub(crate) fn now_us(&self) -> i64 {
match &self.clock {
Some(f) => f(),
None => now_us(),
}
}
}
impl Clone for StoreConfig {
fn clone(&self) -> Self {
Self {
data_dir: self.data_dir.clone(),
segment_max_bytes: self.segment_max_bytes,
fd_budget: self.fd_budget,
broadcast_capacity: self.broadcast_capacity,
single_append_max_bytes: self.single_append_max_bytes,
batch: self.batch.clone(),
writer: self.writer.clone(),
sync: self.sync.clone(),
index: self.index.clone(),
clock: self.clock.clone(),
#[cfg(feature = "dangerous-test-hooks")]
fault_injector: self.fault_injector.clone(),
}
}
}
impl std::fmt::Debug for StoreConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StoreConfig")
.field("data_dir", &self.data_dir)
.field("segment_max_bytes", &self.segment_max_bytes)
.field("fd_budget", &self.fd_budget)
.field("broadcast_capacity", &self.broadcast_capacity)
.field("single_append_max_bytes", &self.single_append_max_bytes)
.field("batch", &self.batch)
.field("writer", &self.writer)
.field("sync", &self.sync)
.field("index", &self.index)
.field("clock", &self.clock.as_ref().map(|_| "<fn>"))
.finish()
}
}
pub(crate) fn now_us() -> i64 {
#[allow(clippy::cast_possible_truncation)]
{
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_micros() as i64
}
}