use crate::event::EventPayloadValidation;
use crate::store::cold_start::rebuild::OpenIndexReport;
use crate::store::cold_start::ColdStartPolicy;
pub(crate) use crate::store::platform::clock::{
now_mono_ns, now_us, process_boot_ns, wall_ms_from_timestamp_us, MonotonicClock,
};
use crate::store::signing::{ReceiptSigningRegistry, SigningKey};
use crate::store::RestartPolicy;
use std::path::PathBuf;
use std::sync::Arc;
#[cfg(feature = "dangerous-test-hooks")]
use crate::store::fault::FaultInjector;
pub type OpenReportObserver = Arc<dyn Fn(&OpenIndexReport) + Send + Sync>;
#[derive(Clone, Debug, Default)]
pub enum SyncMode {
#[default]
SyncAll,
SyncData,
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct IndexTopology {
soa: bool,
entity_groups: bool,
tiles64: bool,
tiles64_simd: bool,
}
impl IndexTopology {
pub fn aos() -> Self {
Self {
soa: false,
entity_groups: false,
tiles64: false,
tiles64_simd: false,
}
}
pub fn scan() -> Self {
Self {
soa: true,
entity_groups: false,
tiles64: false,
tiles64_simd: false,
}
}
pub fn entity_local() -> Self {
Self {
soa: false,
entity_groups: true,
tiles64: false,
tiles64_simd: false,
}
}
pub fn tiled() -> Self {
Self {
soa: false,
entity_groups: false,
tiles64: true,
tiles64_simd: false,
}
}
pub fn tiled_simd() -> Self {
Self {
soa: false,
entity_groups: false,
tiles64: false,
tiles64_simd: true,
}
}
pub fn all() -> Self {
Self {
soa: true,
entity_groups: true,
tiles64: true,
tiles64_simd: false,
}
}
pub fn with_soa(mut self, enabled: bool) -> Self {
self.soa = enabled;
self
}
pub fn with_entity_groups(mut self, enabled: bool) -> Self {
self.entity_groups = enabled;
self
}
pub fn with_tiles64(mut self, enabled: bool) -> Self {
self.tiles64 = enabled;
self
}
pub fn with_tiles64_simd(mut self, enabled: bool) -> Self {
self.tiles64_simd = enabled;
self
}
pub(crate) fn soa_enabled(&self) -> bool {
self.soa
}
pub(crate) fn entity_groups_enabled(&self) -> bool {
self.entity_groups
}
pub(crate) fn tiles64_enabled(&self) -> bool {
self.tiles64
}
pub(crate) fn tiles64_simd_enabled(&self) -> bool {
self.tiles64_simd
}
}
impl Default for IndexTopology {
fn default() -> Self {
Self::aos()
}
}
#[derive(Clone, Debug)]
pub struct BatchConfig {
pub max_size: u32,
pub max_bytes: u32,
pub group_commit_max_batch: u32,
}
impl Default for BatchConfig {
fn default() -> Self {
Self {
max_size: 256,
max_bytes: 1024 * 1024,
group_commit_max_batch: 1,
}
}
}
#[derive(Clone, Debug)]
pub struct WriterConfig {
pub channel_capacity: usize,
pub pressure_retry_threshold_pct: u8,
pub stack_size: Option<usize>,
pub restart_policy: RestartPolicy,
pub shutdown_drain_limit: usize,
}
impl Default for WriterConfig {
fn default() -> Self {
Self {
channel_capacity: 4096,
pressure_retry_threshold_pct: 75,
stack_size: None,
restart_policy: RestartPolicy::default(),
shutdown_drain_limit: 1024,
}
}
}
#[derive(Clone, Debug)]
pub struct SyncConfig {
pub mode: SyncMode,
pub every_n_events: u32,
}
impl Default for SyncConfig {
fn default() -> Self {
Self {
mode: SyncMode::default(),
every_n_events: 1000,
}
}
}
#[derive(Clone, Debug)]
pub struct IndexConfig {
pub topology: IndexTopology,
pub incremental_projection: bool,
pub enable_checkpoint: bool,
pub enable_mmap_index: bool,
}
impl Default for IndexConfig {
fn default() -> Self {
Self {
topology: IndexTopology::default(),
incremental_projection: false,
enable_checkpoint: true,
enable_mmap_index: true,
}
}
}
pub struct StoreConfig {
pub data_dir: PathBuf,
pub segment_max_bytes: u64,
pub fd_budget: usize,
pub broadcast_capacity: usize,
pub single_append_max_bytes: u32,
pub batch: BatchConfig,
pub writer: WriterConfig,
pub sync: SyncConfig,
pub index: IndexConfig,
pub clock: Option<Arc<dyn Fn() -> i64 + Send + Sync>>,
pub open_report_observer: Option<OpenReportObserver>,
pub platform_profile_path: Option<PathBuf>,
pub signing_keys: Vec<SigningKey>,
pub event_payload_validation: EventPayloadValidation,
#[cfg(feature = "dangerous-test-hooks")]
#[cfg_attr(
all(docsrs, not(batpak_stable_docs)),
doc(cfg(feature = "dangerous-test-hooks"))
)]
pub fault_injector: Option<Arc<dyn FaultInjector>>,
}
#[derive(Clone)]
pub(crate) struct ValidatedStoreConfig {
pub(crate) pressure_retry_threshold: usize,
pub(crate) require_idempotency_keys: bool,
pub(crate) incremental_projection: bool,
pub(crate) cold_start: ColdStartPolicy,
pub(crate) shutdown_drain_limit: usize,
pub(crate) group_commit_drain_budget: u32,
pub(crate) signing_registry: ReceiptSigningRegistry,
clock: Option<MonotonicClock>,
}
impl StoreConfig {
pub fn new(data_dir: impl Into<PathBuf>) -> Self {
Self {
data_dir: data_dir.into(),
segment_max_bytes: 256 * 1024 * 1024,
fd_budget: 64,
broadcast_capacity: 8192,
single_append_max_bytes: 16 * 1024 * 1024,
batch: BatchConfig::default(),
writer: WriterConfig::default(),
sync: SyncConfig::default(),
index: IndexConfig::default(),
clock: None,
open_report_observer: None,
platform_profile_path: None,
signing_keys: Vec::new(),
event_payload_validation: EventPayloadValidation::default(),
#[cfg(feature = "dangerous-test-hooks")]
fault_injector: None,
}
}
pub(crate) fn validated(&self) -> Result<ValidatedStoreConfig, crate::store::StoreError> {
if self.segment_max_bytes == 0 {
return Err(crate::store::StoreError::Configuration(
"segment_max_bytes must be > 0".into(),
));
}
if self.writer.channel_capacity == 0 {
return Err(crate::store::StoreError::Configuration(
"writer.channel_capacity must be > 0 (0 creates a rendezvous channel that deadlocks)".into(),
));
}
if self.writer.pressure_retry_threshold_pct == 0
|| self.writer.pressure_retry_threshold_pct > 100
{
return Err(crate::store::StoreError::Configuration(
"writer.pressure_retry_threshold_pct must be 1..=100".into(),
));
}
if self.fd_budget == 0 {
return Err(crate::store::StoreError::Configuration(
"fd_budget must be > 0".into(),
));
}
if self.broadcast_capacity == 0 {
return Err(crate::store::StoreError::Configuration(
"broadcast_capacity must be > 0 (0 creates rendezvous channels that starve subscribers)".into(),
));
}
if self.single_append_max_bytes == 0 || self.single_append_max_bytes > 64 * 1024 * 1024 {
return Err(crate::store::StoreError::Configuration(
"single_append_max_bytes must be 1..=64MB".into(),
));
}
if self.batch.max_size == 0 || self.batch.max_size > 4096 {
return Err(crate::store::StoreError::Configuration(
"batch.max_size must be 1..=4096".into(),
));
}
if self.batch.max_bytes == 0 || self.batch.max_bytes > 16 * 1024 * 1024 {
return Err(crate::store::StoreError::Configuration(
"batch.max_bytes must be 1..=16MB".into(),
));
}
#[cfg(not(feature = "blake3"))]
if !self.signing_keys.is_empty() {
return Err(crate::store::StoreError::Configuration(
"receipt signing requires the blake3 feature".into(),
));
}
let pressure_retry_threshold = self
.writer
.channel_capacity
.saturating_mul(usize::from(self.writer.pressure_retry_threshold_pct))
.div_ceil(100)
.max(1);
let group_commit_drain_budget = if self.batch.group_commit_max_batch == 0 {
u32::MAX
} else if self.batch.group_commit_max_batch == 1 {
0
} else {
self.batch.group_commit_max_batch.saturating_sub(1)
};
Ok(ValidatedStoreConfig {
pressure_retry_threshold,
require_idempotency_keys: self.batch.group_commit_max_batch > 1,
incremental_projection: self.index.incremental_projection,
cold_start: ColdStartPolicy::new(
self.index.enable_checkpoint,
self.index.enable_mmap_index,
),
shutdown_drain_limit: self.writer.shutdown_drain_limit,
group_commit_drain_budget,
signing_registry: ReceiptSigningRegistry::from_keys(&self.signing_keys),
clock: self.clock.clone().map(MonotonicClock::wrap),
})
}
pub fn with_segment_max_bytes(mut self, segment_max_bytes: u64) -> Self {
self.segment_max_bytes = segment_max_bytes;
self
}
pub fn with_sync_every_n_events(mut self, sync_every_n_events: u32) -> Self {
self.sync.every_n_events = sync_every_n_events;
self
}
pub fn with_fd_budget(mut self, fd_budget: usize) -> Self {
self.fd_budget = fd_budget;
self
}
pub fn with_writer_channel_capacity(mut self, writer_channel_capacity: usize) -> Self {
self.writer.channel_capacity = writer_channel_capacity;
self
}
pub fn with_writer_pressure_retry_threshold_pct(
mut self,
pressure_retry_threshold_pct: u8,
) -> Self {
self.writer.pressure_retry_threshold_pct = pressure_retry_threshold_pct;
self
}
pub fn with_broadcast_capacity(mut self, broadcast_capacity: usize) -> Self {
self.broadcast_capacity = broadcast_capacity;
self
}
pub fn with_single_append_max_bytes(mut self, single_append_max_bytes: u32) -> Self {
self.single_append_max_bytes = single_append_max_bytes;
self
}
pub fn with_restart_policy(mut self, restart_policy: RestartPolicy) -> Self {
self.writer.restart_policy = restart_policy;
self
}
pub fn with_shutdown_drain_limit(mut self, shutdown_drain_limit: usize) -> Self {
self.writer.shutdown_drain_limit = shutdown_drain_limit;
self
}
pub fn with_writer_stack_size(mut self, writer_stack_size: Option<usize>) -> Self {
self.writer.stack_size = writer_stack_size;
self
}
pub fn with_clock(mut self, clock: Option<Arc<dyn Fn() -> i64 + Send + Sync>>) -> Self {
self.clock = clock;
self
}
pub fn with_open_report_observer(mut self, observer: Option<OpenReportObserver>) -> Self {
self.open_report_observer = observer;
self
}
pub fn with_platform_profile_path(mut self, path: impl Into<PathBuf>) -> Self {
self.platform_profile_path = Some(path.into());
self
}
pub fn without_platform_profile_path(mut self) -> Self {
self.platform_profile_path = None;
self
}
pub fn with_signing_key(mut self, signing_key: SigningKey) -> Self {
self.signing_keys.push(signing_key);
self
}
pub fn with_event_payload_validation(mut self, validation: EventPayloadValidation) -> Self {
self.event_payload_validation = validation;
self
}
pub fn with_sync_mode(mut self, sync_mode: SyncMode) -> Self {
self.sync.mode = sync_mode;
self
}
pub fn with_group_commit_max_batch(mut self, group_commit_max_batch: u32) -> Self {
self.batch.group_commit_max_batch = group_commit_max_batch;
self
}
pub fn with_index_topology(mut self, index_topology: IndexTopology) -> Self {
self.index.topology = index_topology;
self
}
pub fn with_incremental_projection(mut self, incremental_projection: bool) -> Self {
self.index.incremental_projection = incremental_projection;
self
}
pub fn with_enable_checkpoint(mut self, enable_checkpoint: bool) -> Self {
self.index.enable_checkpoint = enable_checkpoint;
self
}
pub fn with_enable_mmap_index(mut self, enable_mmap_index: bool) -> Self {
self.index.enable_mmap_index = enable_mmap_index;
self
}
pub fn with_batch_max_size(mut self, batch_max_size: u32) -> Self {
self.batch.max_size = batch_max_size;
self
}
pub fn with_batch_max_bytes(mut self, batch_max_bytes: u32) -> Self {
self.batch.max_bytes = batch_max_bytes;
self
}
}
impl Clone for StoreConfig {
fn clone(&self) -> Self {
Self {
data_dir: self.data_dir.clone(),
segment_max_bytes: self.segment_max_bytes,
fd_budget: self.fd_budget,
broadcast_capacity: self.broadcast_capacity,
single_append_max_bytes: self.single_append_max_bytes,
batch: self.batch.clone(),
writer: self.writer.clone(),
sync: self.sync.clone(),
index: self.index.clone(),
clock: self.clock.clone(),
open_report_observer: self.open_report_observer.clone(),
platform_profile_path: self.platform_profile_path.clone(),
signing_keys: self.signing_keys.clone(),
event_payload_validation: self.event_payload_validation,
#[cfg(feature = "dangerous-test-hooks")]
fault_injector: self.fault_injector.clone(),
}
}
}
impl std::fmt::Debug for StoreConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StoreConfig")
.field("data_dir", &self.data_dir)
.field("segment_max_bytes", &self.segment_max_bytes)
.field("fd_budget", &self.fd_budget)
.field("broadcast_capacity", &self.broadcast_capacity)
.field("single_append_max_bytes", &self.single_append_max_bytes)
.field("batch", &self.batch)
.field("writer", &self.writer)
.field("sync", &self.sync)
.field("index", &self.index)
.field("clock", &self.clock.as_ref().map(|_| "<fn>"))
.field(
"open_report_observer",
&self.open_report_observer.as_ref().map(|_| "<observer>"),
)
.field("platform_profile_path", &self.platform_profile_path)
.field("signing_keys", &self.signing_keys.len())
.field("event_payload_validation", &self.event_payload_validation)
.finish()
}
}
impl std::fmt::Debug for ValidatedStoreConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ValidatedStoreConfig")
.field("pressure_retry_threshold", &self.pressure_retry_threshold)
.field("require_idempotency_keys", &self.require_idempotency_keys)
.field("incremental_projection", &self.incremental_projection)
.field("cold_start", &self.cold_start)
.field("shutdown_drain_limit", &self.shutdown_drain_limit)
.field("group_commit_drain_budget", &self.group_commit_drain_budget)
.field("signing_registry", &"<registry>")
.field("clock", &self.clock.as_ref().map(|_| "<monotonic>"))
.finish()
}
}
impl ValidatedStoreConfig {
pub(crate) fn now_us(&self) -> i64 {
match &self.clock {
Some(clock) => clock.now_us(),
None => now_us(),
}
}
pub(crate) fn cache_now_us(&self) -> i64 {
let now_us = self.now_us();
match now_us.cmp(&0) {
std::cmp::Ordering::Less => {
tracing::error!(
raw_us = now_us,
"custom clock returned a negative value; clamping projection/cache metadata timestamp to zero"
);
0
}
std::cmp::Ordering::Equal | std::cmp::Ordering::Greater => now_us,
}
}
}
#[inline]
pub(crate) fn duration_micros(d: std::time::Duration) -> u64 {
u64::try_from(d.as_micros()).unwrap_or(u64::MAX)
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicI64, Ordering};
use std::time::Duration;
#[test]
fn validated_runtime_clock_wraps_direct_field_assignment() {
let raw = Arc::new(AtomicI64::new(2_000));
let raw_clock = {
let raw = Arc::clone(&raw);
Arc::new(move || raw.load(Ordering::SeqCst)) as Arc<dyn Fn() -> i64 + Send + Sync>
};
let mut config = StoreConfig::new("target/test-clock-wrap");
config.clock = Some(raw_clock);
let runtime = config.validated().expect("config validates");
assert_eq!(runtime.now_us(), 2_000);
raw.store(1_500, Ordering::SeqCst);
assert_eq!(
runtime.now_us(),
2_000,
"validated runtime clock must clamp direct-field regressions"
);
}
#[test]
fn cache_now_us_clamps_negative_custom_clock_values() {
let raw_clock = Arc::new(|| -42_i64) as Arc<dyn Fn() -> i64 + Send + Sync>;
let mut config = StoreConfig::new("target/test-cache-clock-clamp");
config.clock = Some(raw_clock);
let runtime = config.validated().expect("config validates");
assert_eq!(
runtime.cache_now_us(),
0,
"projection/cache metadata clock must not persist negative timestamps"
);
}
#[test]
fn cache_now_us_preserves_zero_custom_clock_value() {
let raw_clock = Arc::new(|| 0_i64) as Arc<dyn Fn() -> i64 + Send + Sync>;
let mut config = StoreConfig::new("target/test-cache-clock-zero");
config.clock = Some(raw_clock);
let runtime = config.validated().expect("config validates");
assert_eq!(
runtime.cache_now_us(),
0,
"PROPERTY: zero is a valid cache timestamp boundary, not a negative-clock violation"
);
}
#[test]
fn index_topology_tiles64_simd_builder_sets_only_simd_overlay() {
let topology = IndexTopology::default().with_tiles64_simd(true);
assert!(
topology.tiles64_simd_enabled(),
"PROPERTY: with_tiles64_simd(true) must enable the SIMD overlay"
);
assert!(
!IndexTopology::default().tiles64_simd_enabled(),
"PROPERTY: default topology keeps the experimental SIMD overlay disabled"
);
assert!(
topology.soa_enabled() == IndexTopology::default().soa_enabled()
&& topology.entity_groups_enabled()
== IndexTopology::default().entity_groups_enabled()
&& topology.tiles64_enabled() == IndexTopology::default().tiles64_enabled(),
"PROPERTY: with_tiles64_simd must not silently reset the rest of the topology"
);
}
#[test]
fn validated_accepts_documented_inclusive_upper_bounds() {
let mut config = StoreConfig::new("target/test-config-upper-bounds");
config.writer.pressure_retry_threshold_pct = 100;
config.batch.max_size = 4096;
config
.validated()
.expect("documented inclusive upper bounds should validate");
}
#[test]
fn validated_rejects_values_above_documented_upper_bounds() {
let mut pressure = StoreConfig::new("target/test-config-pressure-too-high");
pressure.writer.pressure_retry_threshold_pct = 101;
assert!(
matches!(
pressure.validated(),
Err(crate::store::StoreError::Configuration(_))
),
"PROPERTY: pressure retry threshold above 100 must be rejected"
);
let mut batch = StoreConfig::new("target/test-config-batch-too-large");
batch.batch.max_size = 4097;
assert!(
matches!(
batch.validated(),
Err(crate::store::StoreError::Configuration(_))
),
"PROPERTY: batch.max_size above 4096 must be rejected"
);
let mut single_append = StoreConfig::new("target/test-config-single-append-too-large");
single_append.single_append_max_bytes = 64 * 1024 * 1024 + 1;
assert!(
matches!(
single_append.validated(),
Err(crate::store::StoreError::Configuration(_))
),
"PROPERTY: single_append_max_bytes above 64MB must be rejected"
);
let mut batch_bytes = StoreConfig::new("target/test-config-batch-bytes-too-large");
batch_bytes.batch.max_bytes = 16 * 1024 * 1024 + 1;
assert!(
matches!(
batch_bytes.validated(),
Err(crate::store::StoreError::Configuration(_))
),
"PROPERTY: batch.max_bytes above 16MB must be rejected"
);
}
#[test]
fn validated_rejects_zero_payload_size_boundaries() {
let mut single_append = StoreConfig::new("target/test-config-single-append-zero");
single_append.single_append_max_bytes = 0;
assert!(
matches!(
single_append.validated(),
Err(crate::store::StoreError::Configuration(_))
),
"PROPERTY: single_append_max_bytes of zero must be rejected"
);
let mut batch_bytes = StoreConfig::new("target/test-config-batch-bytes-zero");
batch_bytes.batch.max_bytes = 0;
assert!(
matches!(
batch_bytes.validated(),
Err(crate::store::StoreError::Configuration(_))
),
"PROPERTY: batch.max_bytes of zero must be rejected"
);
}
#[test]
fn validated_config_debug_names_runtime_policy_fields() {
let runtime = StoreConfig::new("target/test-validated-debug")
.validated()
.expect("config validates");
let rendered = format!("{runtime:?}");
assert!(
rendered.contains("ValidatedStoreConfig")
&& rendered.contains("pressure_retry_threshold")
&& rendered.contains("group_commit_drain_budget")
&& rendered.contains("signing_registry"),
"PROPERTY: ValidatedStoreConfig Debug must name the runtime policy fields, got: {rendered}"
);
}
#[test]
fn process_boot_ns_is_nonzero_and_stable_in_process() {
let first = process_boot_ns();
let second = process_boot_ns();
assert_ne!(
first, 0,
"PROPERTY: process_boot_ns must expose the captured wall-clock anchor, not zero/default"
);
assert_eq!(
first, second,
"PROPERTY: process_boot_ns must stay stable for the process lifetime"
);
}
#[test]
fn now_mono_ns_advances_beyond_nonzero_sentinel() {
std::thread::sleep(Duration::from_millis(1));
let elapsed = now_mono_ns();
assert!(
elapsed > 1,
"PROPERTY: now_mono_ns must report elapsed nanoseconds from the process anchor, not a fixed sentinel; got {elapsed}"
);
}
#[test]
fn duration_micros_preserves_zero_and_one_microsecond_boundaries() {
assert_eq!(
duration_micros(Duration::ZERO),
0,
"PROPERTY: zero duration must remain zero, not a default/nonzero sentinel"
);
assert_eq!(
duration_micros(Duration::from_micros(1)),
1,
"PROPERTY: one microsecond must round-trip exactly"
);
}
}