pub mod cursor;
pub mod index;
pub mod projection;
pub mod reader;
pub mod segment;
pub mod subscription;
pub mod writer;
pub use cursor::Cursor;
pub use index::{ClockKey, DiskPos, IndexEntry};
#[cfg(feature = "lmdb")]
pub use projection::LmdbCache;
#[cfg(feature = "redb")]
pub use projection::RedbCache;
pub use projection::{CacheMeta, Freshness, NoCache, ProjectionCache};
pub use subscription::Subscription;
pub use writer::{Notification, RestartPolicy};
use crate::coordinate::{Coordinate, CoordinateError, KindFilter, Region};
use crate::event::{Event, EventHeader, EventKind, EventSourced, StoredEvent};
use index::StoreIndex;
use reader::Reader;
use serde::Serialize;
use std::path::PathBuf;
use std::sync::Arc;
use writer::{AppendGuards, SubscriberList, WriterCommand, WriterHandle};
#[allow(unexpected_cfgs)]
#[cfg(feature = "async-store")]
compile_error!("INVARIANT 2: Store API is sync. Use spawn_blocking or flume recv_async.");
pub struct Store {
index: Arc<StoreIndex>,
reader: Arc<Reader>,
cache: Box<dyn ProjectionCache>,
writer: WriterHandle,
config: Arc<StoreConfig>,
}
#[derive(Clone, Debug, Default)]
pub enum SyncMode {
#[default]
SyncAll,
SyncData,
}
pub struct StoreConfig {
pub data_dir: PathBuf,
pub segment_max_bytes: u64,
pub sync_every_n_events: u32,
pub fd_budget: usize,
pub writer_channel_capacity: usize,
pub broadcast_capacity: usize,
pub cache_map_size_bytes: usize,
pub restart_policy: RestartPolicy,
pub shutdown_drain_limit: usize,
pub writer_stack_size: Option<usize>,
pub clock: Option<Arc<dyn Fn() -> i64 + Send + Sync>>,
pub sync_mode: SyncMode,
}
impl StoreConfig {
pub fn new(data_dir: impl Into<PathBuf>) -> Self {
Self {
data_dir: data_dir.into(),
segment_max_bytes: 256 * 1024 * 1024, sync_every_n_events: 1000, fd_budget: 64, writer_channel_capacity: 4096, broadcast_capacity: 8192, cache_map_size_bytes: 64 * 1024 * 1024, restart_policy: RestartPolicy::default(),
shutdown_drain_limit: 1024, writer_stack_size: None, clock: None, sync_mode: SyncMode::default(), }
}
pub(crate) fn now_us(&self) -> i64 {
match &self.clock {
Some(f) => f(),
None => now_us(), }
}
}
impl Clone for StoreConfig {
fn clone(&self) -> Self {
Self {
data_dir: self.data_dir.clone(),
segment_max_bytes: self.segment_max_bytes,
sync_every_n_events: self.sync_every_n_events,
fd_budget: self.fd_budget,
writer_channel_capacity: self.writer_channel_capacity,
broadcast_capacity: self.broadcast_capacity,
cache_map_size_bytes: self.cache_map_size_bytes,
restart_policy: self.restart_policy.clone(),
shutdown_drain_limit: self.shutdown_drain_limit,
writer_stack_size: self.writer_stack_size,
clock: self.clock.clone(),
sync_mode: self.sync_mode.clone(),
}
}
}
impl std::fmt::Debug for StoreConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StoreConfig")
.field("data_dir", &self.data_dir)
.field("segment_max_bytes", &self.segment_max_bytes)
.field("sync_every_n_events", &self.sync_every_n_events)
.field("fd_budget", &self.fd_budget)
.field("writer_channel_capacity", &self.writer_channel_capacity)
.field("broadcast_capacity", &self.broadcast_capacity)
.field("cache_map_size_bytes", &self.cache_map_size_bytes)
.field("restart_policy", &self.restart_policy)
.field("shutdown_drain_limit", &self.shutdown_drain_limit)
.field("writer_stack_size", &self.writer_stack_size)
.field("clock", &self.clock.as_ref().map(|_| "<fn>"))
.field("sync_mode", &self.sync_mode)
.finish()
}
}
#[derive(Debug)]
#[non_exhaustive]
pub enum StoreError {
Io(std::io::Error),
Coordinate(CoordinateError),
Serialization(String),
CrcMismatch {
segment_id: u64,
offset: u64,
},
CorruptSegment {
segment_id: u64,
detail: String,
},
NotFound(u128),
SequenceMismatch {
entity: String,
expected: u32,
actual: u32,
},
DuplicateEvent(u128),
WriterCrashed,
ShuttingDown,
CacheFailed(String),
}
impl std::fmt::Display for StoreError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Io(e) => write!(f, "IO error: {e}"),
Self::Coordinate(e) => write!(f, "coordinate error: {e}"),
Self::Serialization(s) => write!(f, "serialization error: {s}"),
Self::CrcMismatch { segment_id, offset } => {
write!(f, "CRC mismatch in segment {segment_id} at offset {offset}")
}
Self::CorruptSegment { segment_id, detail } => {
write!(f, "corrupt segment {segment_id}: {detail}")
}
Self::NotFound(id) => write!(f, "event {id:032x} not found"),
Self::SequenceMismatch {
entity,
expected,
actual,
} => write!(
f,
"CAS failed for {entity}: expected seq {expected}, got {actual}"
),
Self::DuplicateEvent(key) => write!(f, "duplicate idempotency key {key:032x}"),
Self::WriterCrashed => write!(f, "writer thread crashed"),
Self::ShuttingDown => write!(f, "store is shutting down"),
Self::CacheFailed(s) => write!(f, "cache error: {s}"),
}
}
}
impl std::error::Error for StoreError {}
impl From<CoordinateError> for StoreError {
fn from(e: CoordinateError) -> Self {
Self::Coordinate(e)
}
}
impl From<std::io::Error> for StoreError {
fn from(e: std::io::Error) -> Self {
Self::Io(e)
}
}
#[derive(Clone, Debug)]
pub struct AppendReceipt {
pub event_id: u128,
pub sequence: u64,
pub disk_pos: DiskPos,
}
#[derive(Clone, Copy, Debug, Default)]
pub struct AppendOptions {
pub expected_sequence: Option<u32>,
pub idempotency_key: Option<u128>,
pub correlation_id: Option<u128>,
pub causation_id: Option<u128>,
pub flags: u8,
}
impl AppendOptions {
pub fn new() -> Self {
Self::default()
}
pub fn with_cas(mut self, seq: u32) -> Self {
self.expected_sequence = Some(seq);
self
}
pub fn with_idempotency(mut self, key: u128) -> Self {
self.idempotency_key = Some(key);
self
}
pub fn with_flags(mut self, flags: u8) -> Self {
self.flags = flags;
self
}
pub fn with_correlation(mut self, id: u128) -> Self {
self.correlation_id = Some(id);
self
}
pub fn with_causation(mut self, id: u128) -> Self {
self.causation_id = Some(id);
self
}
}
pub type RetentionPredicate = Box<dyn Fn(&StoredEvent<serde_json::Value>) -> bool + Send>;
#[non_exhaustive]
pub enum CompactionStrategy {
Merge,
Retention(RetentionPredicate),
Tombstone(RetentionPredicate),
}
pub struct CompactionConfig {
pub strategy: CompactionStrategy,
pub min_segments: usize,
}
impl Default for CompactionConfig {
fn default() -> Self {
Self {
strategy: CompactionStrategy::Merge,
min_segments: 2,
}
}
}
impl Store {
pub fn open_default() -> Result<Self, StoreError> {
Self::open(StoreConfig::new("./batpak-data"))
}
pub fn open(config: StoreConfig) -> Result<Self, StoreError> {
Self::open_with_cache(config, Box::new(NoCache))
}
pub fn open_with_cache(
config: StoreConfig,
cache: Box<dyn ProjectionCache>,
) -> Result<Self, StoreError> {
std::fs::create_dir_all(&config.data_dir)?;
let config = Arc::new(config);
let index = Arc::new(StoreIndex::new());
let reader = Arc::new(Reader::new(config.data_dir.clone(), config.fd_budget));
let mut entries: Vec<std::fs::DirEntry> = std::fs::read_dir(&config.data_dir)?
.filter_map(|e| e.ok())
.filter(|e| {
e.path()
.extension()
.map(|ext| ext == segment::SEGMENT_EXTENSION)
.unwrap_or(false)
})
.collect();
entries.sort_by_key(|e| e.file_name());
for dir_entry in &entries {
let scanned = reader.scan_segment(&dir_entry.path())?;
for se in scanned {
let coord = Coordinate::new(&se.entity, &se.scope)?;
let clock = se.event.header.position.sequence;
let entry = IndexEntry {
event_id: se.event.header.event_id,
correlation_id: se.event.header.correlation_id,
causation_id: se.event.header.causation_id,
coord,
kind: se.event.header.event_kind,
wall_ms: se.event.header.position.wall_ms,
clock,
hash_chain: se.event.hash_chain.clone().unwrap_or_default(),
disk_pos: DiskPos {
segment_id: se.segment_id,
offset: se.offset,
length: se.length,
},
global_sequence: index.global_sequence(),
};
index.insert(entry);
}
}
let subscribers = Arc::new(SubscriberList::new());
let writer = WriterHandle::spawn(&config, &index, &subscribers)?;
Ok(Self {
index,
reader,
cache,
writer,
config,
})
}
pub fn append(
&self,
coord: &Coordinate,
kind: EventKind,
payload: &impl Serialize,
) -> Result<AppendReceipt, StoreError> {
let payload_bytes = rmp_serde::to_vec_named(payload)
.map_err(|e| StoreError::Serialization(e.to_string()))?;
let payload_len = checked_payload_len(&payload_bytes)?;
let event_id = crate::id::generate_v7_id();
let header = EventHeader::new(
event_id,
event_id,
None, self.config.now_us(),
crate::coordinate::DagPosition::root(),
payload_len,
kind,
);
let event = Event::new(header, payload_bytes);
let (tx, rx) = flume::bounded(1);
self.writer
.tx
.send(WriterCommand::Append {
entity: coord.entity_arc(),
scope: coord.scope_arc(),
event: Box::new(event),
kind,
guards: AppendGuards {
correlation_id: event_id,
causation_id: None,
expected_sequence: None,
idempotency_key: None,
},
respond: tx,
})
.map_err(|_| StoreError::WriterCrashed)?;
rx.recv().map_err(|_| StoreError::WriterCrashed)?
}
pub fn append_reaction(
&self,
coord: &Coordinate,
kind: EventKind,
payload: &impl Serialize,
correlation_id: u128,
causation_id: u128,
) -> Result<AppendReceipt, StoreError> {
let payload_bytes = rmp_serde::to_vec_named(payload)
.map_err(|e| StoreError::Serialization(e.to_string()))?;
let payload_len = checked_payload_len(&payload_bytes)?;
let event_id = crate::id::generate_v7_id();
let header = EventHeader::new(
event_id,
correlation_id,
Some(causation_id),
self.config.now_us(),
crate::coordinate::DagPosition::root(),
payload_len,
kind,
);
let event = Event::new(header, payload_bytes);
let (tx, rx) = flume::bounded(1);
self.writer
.tx
.send(WriterCommand::Append {
entity: coord.entity_arc(),
scope: coord.scope_arc(),
event: Box::new(event),
kind,
guards: AppendGuards {
correlation_id,
causation_id: Some(causation_id),
expected_sequence: None,
idempotency_key: None,
},
respond: tx,
})
.map_err(|_| StoreError::WriterCrashed)?;
rx.recv().map_err(|_| StoreError::WriterCrashed)?
}
pub fn get(&self, event_id: u128) -> Result<StoredEvent<serde_json::Value>, StoreError> {
let entry = self
.index
.get_by_id(event_id)
.ok_or(StoreError::NotFound(event_id))?;
self.reader.read_entry(&entry.disk_pos)
}
pub fn query(&self, region: &Region) -> Vec<IndexEntry> {
self.index.query(region)
}
pub fn walk_ancestors(
&self,
event_id: u128,
limit: usize,
) -> Vec<StoredEvent<serde_json::Value>> {
let mut results = Vec::new();
#[cfg(feature = "blake3")]
{
let mut current_id = Some(event_id);
while let Some(id) = current_id {
if results.len() >= limit {
break;
}
if let Some(entry) = self.index.get_by_id(id) {
if let Ok(stored) = self.reader.read_entry(&entry.disk_pos) {
results.push(stored);
}
let prev = entry.hash_chain.prev_hash;
if prev == [0u8; 32] {
break;
} current_id = self
.index
.stream(entry.coord.entity())
.iter()
.find(|e| e.hash_chain.event_hash == prev)
.map(|e| e.event_id);
} else {
break;
}
}
}
#[cfg(not(feature = "blake3"))]
{
let Some(start_entry) = self.index.get_by_id(event_id) else {
return results;
};
let entity = start_entry.coord.entity();
let stream = self.index.stream(entity);
for entry in stream.iter().rev() {
if results.len() >= limit {
break;
}
if entry.clock > start_entry.clock {
continue;
}
if let Ok(stored) = self.reader.read_entry(&entry.disk_pos) {
results.push(stored);
}
}
}
results
}
pub fn project<T>(&self, entity: &str, freshness: &Freshness) -> Result<Option<T>, StoreError>
where
T: EventSourced<serde_json::Value> + serde::Serialize + serde::de::DeserializeOwned,
{
let entries = self.index.stream(entity);
if entries.is_empty() {
return Ok(None);
}
let watermark = entries.last().map(|e| e.global_sequence).unwrap_or(0);
let cache_key = entity.as_bytes();
let predicted_meta = projection::CacheMeta {
watermark,
cached_at_us: self.config.now_us(),
};
if let Err(e) = self.cache.prefetch(cache_key, predicted_meta) {
tracing::warn!("cache prefetch failed (non-fatal): {e}");
}
if let Ok(Some((bytes, meta))) = self.cache.get(cache_key) {
let is_fresh = match freshness {
Freshness::Consistent => meta.watermark == watermark,
Freshness::BestEffort { max_stale_ms } => {
let age_us = self
.config
.now_us()
.saturating_sub(meta.cached_at_us)
.max(0);
age_us < (*max_stale_ms as i64) * 1000
}
};
if is_fresh {
if let Ok(t) = serde_json::from_slice::<T>(&bytes) {
return Ok(Some(t));
}
}
}
let mut events = Vec::with_capacity(entries.len());
for entry in &entries {
let stored = self.reader.read_entry(&entry.disk_pos)?;
events.push(stored.event);
}
let result = T::from_events(&events);
if let Some(ref t) = result {
if let Ok(bytes) = serde_json::to_vec(t) {
let meta = projection::CacheMeta {
watermark,
cached_at_us: self.config.now_us(),
};
if let Err(e) = self.cache.put(cache_key, &bytes, meta) {
tracing::warn!("cache put failed (non-fatal): {e}");
}
}
}
Ok(result)
}
pub fn subscribe(&self, region: &Region) -> Subscription {
let rx = self
.writer
.subscribers
.subscribe(self.config.broadcast_capacity);
Subscription::new(rx, region.clone())
}
pub fn cursor(&self, region: &Region) -> Cursor {
Cursor::new(region.clone(), Arc::clone(&self.index))
}
pub fn stream(&self, entity: &str) -> Vec<IndexEntry> {
self.index.stream(entity)
}
pub fn by_scope(&self, scope: &str) -> Vec<IndexEntry> {
self.query(&Region::scope(scope))
}
pub fn by_fact(&self, kind: EventKind) -> Vec<IndexEntry> {
self.query(&Region::all().with_fact(KindFilter::Exact(kind)))
}
pub fn react_loop<R>(
self: &Arc<Self>,
region: &Region,
reactor: R,
) -> Result<std::thread::JoinHandle<()>, StoreError>
where
R: crate::event::sourcing::Reactive<serde_json::Value> + Send + 'static,
{
let store = Arc::clone(self);
let sub = self.subscribe(region);
std::thread::Builder::new()
.name("batpak-reactor".into())
.spawn(move || {
while let Some(notif) = sub.recv() {
let stored = match store.get(notif.event_id) {
Ok(s) => s,
Err(e) => {
tracing::warn!(
"react_loop: failed to get event {}: {e}",
notif.event_id
);
continue;
}
};
for (coord, kind, payload) in reactor.react(&stored.event) {
if let Err(e) = store.append_reaction(
&coord,
kind,
&payload,
notif.correlation_id,
notif.event_id,
) {
tracing::warn!("react_loop: failed to append reaction: {e}");
}
}
}
})
.map_err(StoreError::Io)
}
pub fn append_with_options(
&self,
coord: &Coordinate,
kind: EventKind,
payload: &impl Serialize,
opts: AppendOptions,
) -> Result<AppendReceipt, StoreError> {
let payload_bytes = rmp_serde::to_vec_named(payload)
.map_err(|e| StoreError::Serialization(e.to_string()))?;
let payload_len = checked_payload_len(&payload_bytes)?;
let event_id = opts
.idempotency_key
.unwrap_or_else(crate::id::generate_v7_id);
let correlation_id = opts.correlation_id.unwrap_or(event_id);
let causation_id = opts.causation_id;
let header = EventHeader::new(
event_id,
correlation_id,
causation_id,
self.config.now_us(),
crate::coordinate::DagPosition::root(),
payload_len,
kind,
)
.with_flags(opts.flags);
let event = Event::new(header, payload_bytes);
let (tx, rx) = flume::bounded(1);
self.writer
.tx
.send(WriterCommand::Append {
entity: coord.entity_arc(),
scope: coord.scope_arc(),
event: Box::new(event),
kind,
guards: AppendGuards {
correlation_id,
causation_id,
expected_sequence: opts.expected_sequence,
idempotency_key: opts.idempotency_key,
},
respond: tx,
})
.map_err(|_| StoreError::WriterCrashed)?;
rx.recv().map_err(|_| StoreError::WriterCrashed)?
}
pub fn apply_transition<From, To, P: Serialize>(
&self,
coord: &Coordinate,
transition: crate::typestate::transition::Transition<From, To, P>,
) -> Result<AppendReceipt, StoreError> {
let kind = transition.kind();
let payload = transition.into_payload();
self.append(coord, kind, &payload)
}
pub fn sync(&self) -> Result<(), StoreError> {
let (tx, rx) = flume::bounded(1);
self.writer
.tx
.send(WriterCommand::Sync { respond: tx })
.map_err(|_| StoreError::WriterCrashed)?;
rx.recv().map_err(|_| StoreError::WriterCrashed)?
}
pub fn snapshot(&self, dest: &std::path::Path) -> Result<(), StoreError> {
self.sync()?;
std::fs::create_dir_all(dest).map_err(StoreError::Io)?;
let entries = std::fs::read_dir(&self.config.data_dir).map_err(StoreError::Io)?;
for entry in entries.flatten() {
let path = entry.path();
if path
.extension()
.map(|e| e == segment::SEGMENT_EXTENSION)
.unwrap_or(false)
{
let dest_path = dest.join(entry.file_name());
std::fs::copy(&path, &dest_path).map_err(StoreError::Io)?;
}
}
Ok(())
}
pub fn compact(
&self,
config: &CompactionConfig,
) -> Result<segment::CompactionResult, StoreError> {
self.sync()?;
let active_segment_id = std::fs::read_dir(&self.config.data_dir)
.map_err(StoreError::Io)?
.filter_map(|e| e.ok())
.filter_map(|e| {
let path = e.path();
if path
.extension()
.map(|ext| ext == segment::SEGMENT_EXTENSION)
.unwrap_or(false)
{
path.file_stem()?.to_str()?.parse::<u64>().ok()
} else {
None
}
})
.max()
.unwrap_or(0);
let mut sealed: Vec<(u64, std::path::PathBuf)> = std::fs::read_dir(&self.config.data_dir)
.map_err(StoreError::Io)?
.filter_map(|e| e.ok())
.filter_map(|e| {
let path = e.path();
let ext_ok = path
.extension()
.map(|ext| ext == segment::SEGMENT_EXTENSION)
.unwrap_or(false);
if !ext_ok {
return None;
}
let seg_id = path
.file_stem()
.and_then(|s| s.to_str())
.and_then(|s| s.parse::<u64>().ok())?;
if seg_id >= active_segment_id {
return None;
} Some((seg_id, path))
})
.collect();
sealed.sort_by_key(|(id, _)| *id);
if sealed.len() < config.min_segments {
return Ok(segment::CompactionResult {
segments_removed: 0,
bytes_reclaimed: 0,
});
}
let mut all_events: Vec<reader::ScannedEntry> = Vec::new();
for (_, path) in &sealed {
let scanned = self.reader.scan_segment(path)?;
all_events.extend(scanned);
}
let tombstone_kind = EventKind::TOMBSTONE;
let mut kept_events: Vec<reader::ScannedEntry> = Vec::new();
match &config.strategy {
CompactionStrategy::Merge => {
kept_events = all_events;
}
CompactionStrategy::Retention(predicate) => {
for entry in all_events {
let coord = Coordinate::new(&entry.entity, &entry.scope)?;
let stored = StoredEvent {
coordinate: coord,
event: entry.event.clone(),
};
if predicate(&stored) {
kept_events.push(entry);
}
}
}
CompactionStrategy::Tombstone(predicate) => {
for entry in all_events {
let coord = Coordinate::new(&entry.entity, &entry.scope)?;
let stored = StoredEvent {
coordinate: coord,
event: entry.event.clone(),
};
if predicate(&stored) {
kept_events.push(entry);
} else {
let mut tombstone = entry;
tombstone.event.header.event_kind = tombstone_kind;
kept_events.push(tombstone);
}
}
}
}
let merged_id = sealed[0].0;
let merged_path = self
.config
.data_dir
.join(segment::segment_filename(merged_id));
for (seg_id, _) in &sealed {
self.reader.evict_segment(*seg_id);
}
let _ = std::fs::remove_file(&merged_path); let mut merged_segment =
segment::Segment::<segment::Active>::create(&self.config.data_dir, merged_id)?;
for entry in &kept_events {
let frame_payload = segment::FramePayload {
event: entry.event.clone(),
entity: entry.entity.clone(),
scope: entry.scope.clone(),
};
let frame = segment::frame_encode(&frame_payload)?;
merged_segment.write_frame(&frame)?;
}
merged_segment.sync_with_mode(&self.config.sync_mode)?;
let _sealed_seg = merged_segment.seal();
let mut bytes_reclaimed: u64 = 0;
let mut segments_removed: usize = 0;
for (seg_id, path) in &sealed {
if *seg_id == merged_id {
continue;
} if let Ok(meta) = std::fs::metadata(path) {
bytes_reclaimed += meta.len();
}
std::fs::remove_file(path).map_err(StoreError::Io)?;
segments_removed += 1;
}
self.sync()?;
self.index.clear();
let mut remaining: Vec<std::fs::DirEntry> = std::fs::read_dir(&self.config.data_dir)
.map_err(StoreError::Io)?
.filter_map(|e| e.ok())
.filter(|e| {
e.path()
.extension()
.map(|ext| ext == segment::SEGMENT_EXTENSION)
.unwrap_or(false)
})
.collect();
remaining.sort_by_key(|e| e.file_name());
for dir_entry in &remaining {
let scanned = self.reader.scan_segment(&dir_entry.path())?;
for se in scanned {
let coord = Coordinate::new(&se.entity, &se.scope)?;
let clock = se.event.header.position.sequence;
let entry = IndexEntry {
event_id: se.event.header.event_id,
correlation_id: se.event.header.correlation_id,
causation_id: se.event.header.causation_id,
coord,
kind: se.event.header.event_kind,
wall_ms: se.event.header.position.wall_ms,
clock,
hash_chain: se.event.hash_chain.clone().unwrap_or_default(),
disk_pos: DiskPos {
segment_id: se.segment_id,
offset: se.offset,
length: se.length,
},
global_sequence: self.index.global_sequence(),
};
self.index.insert(entry);
}
}
self.sync()?;
Ok(segment::CompactionResult {
segments_removed,
bytes_reclaimed,
})
}
pub fn close(self) -> Result<(), StoreError> {
let (tx, rx) = flume::bounded(1);
self.writer
.tx
.send(WriterCommand::Shutdown { respond: tx })
.map_err(|_| StoreError::WriterCrashed)?;
rx.recv().map_err(|_| StoreError::WriterCrashed)?
}
pub fn stats(&self) -> StoreStats {
StoreStats {
event_count: self.index.len(),
global_sequence: self.index.global_sequence(),
}
}
pub fn diagnostics(&self) -> StoreDiagnostics {
StoreDiagnostics {
event_count: self.index.len(),
global_sequence: self.index.global_sequence(),
data_dir: self.config.data_dir.clone(),
segment_max_bytes: self.config.segment_max_bytes,
fd_budget: self.config.fd_budget,
restart_policy: self.config.restart_policy.clone(),
}
}
}
#[doc(hidden)]
impl Store {
pub fn panic_writer_for_test(&self) -> Result<(), StoreError> {
let (tx, rx) = flume::bounded(1);
self.writer
.tx
.send(WriterCommand::PanicForTest { respond: tx })
.map_err(|_| StoreError::WriterCrashed)?;
let _ = rx.recv_timeout(std::time::Duration::from_millis(500));
std::thread::sleep(std::time::Duration::from_millis(50));
Ok(())
}
}
impl Drop for Store {
fn drop(&mut self) {
let (tx, rx) = flume::bounded(1);
if self
.writer
.tx
.send(WriterCommand::Shutdown { respond: tx })
.is_ok()
{
let _ = rx.recv_timeout(std::time::Duration::from_millis(100));
}
}
}
fn checked_payload_len(payload_bytes: &[u8]) -> Result<u32, StoreError> {
u32::try_from(payload_bytes.len()).map_err(|_| {
StoreError::Serialization(format!(
"payload size {} exceeds u32::MAX (4GB limit)",
payload_bytes.len()
))
})
}
pub(crate) fn now_us() -> i64 {
#[allow(clippy::cast_possible_truncation)] {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_micros() as i64
}
}
#[derive(Clone, Debug)]
pub struct StoreStats {
pub event_count: usize,
pub global_sequence: u64,
}
#[derive(Clone, Debug)]
pub struct StoreDiagnostics {
pub event_count: usize,
pub global_sequence: u64,
pub data_dir: PathBuf,
pub segment_max_bytes: u64,
pub fd_budget: usize,
pub restart_policy: RestartPolicy,
}