mod ancestors;
pub(crate) mod checkpoint;
pub(crate) mod columnar;
mod config;
mod contracts;
pub mod cursor;
mod error;
#[cfg(feature = "dangerous-test-hooks")]
pub mod fault;
pub mod index;
mod index_rebuild;
pub(crate) mod interner;
mod maintenance;
pub mod projection;
mod projection_flow;
pub mod reader;
#[cfg(test)]
mod runtime_contracts;
pub mod segment;
pub(crate) mod sidx;
pub mod stats;
pub mod subscription;
#[cfg(feature = "dangerous-test-hooks")]
mod test_support;
pub mod writer;
pub use config::{
BatchConfig, IndexConfig, IndexLayout, StoreConfig, SyncConfig, SyncMode, WriterConfig,
};
pub use contracts::{
AppendOptions, AppendReceipt, BatchAppendItem, CausationRef, CompactionConfig,
CompactionStrategy, RetentionPredicate,
};
pub use cursor::Cursor;
pub use error::BatchStage;
pub use error::StoreError;
#[cfg(feature = "dangerous-test-hooks")]
pub use fault::{
CountdownAction, CountdownInjector, FaultInjector, InjectionPoint, ProbabilisticInjector,
};
pub use index::{ClockKey, DiskPos, IndexEntry};
pub use projection::{
CacheCapabilities, CacheMeta, Freshness, NativeCache, NoCache, ProjectionCache,
};
pub use stats::{StoreDiagnostics, StoreStats};
pub use subscription::Subscription;
pub use writer::{Notification, RestartPolicy};
use crate::coordinate::{Coordinate, KindFilter, Region};
use crate::event::{Event, EventHeader, EventKind, EventSourced, StoredEvent};
#[cfg(test)]
pub(crate) use config::now_us;
use contracts::checked_payload_len;
use index::StoreIndex;
use reader::Reader;
use serde::Serialize;
use std::sync::Arc;
use writer::{AppendGuards, ReactorSubscriberList, SubscriberList, WriterCommand, WriterHandle};
#[allow(unexpected_cfgs)]
#[cfg(feature = "async-store")]
compile_error!("INVARIANT 2: Store API is sync. Use spawn_blocking or flume recv_async.");
pub struct Open;
pub struct Closed;
pub struct Store<State = Open> {
pub(crate) index: Arc<StoreIndex>,
pub(crate) reader: Arc<Reader>,
pub(crate) cache: Box<dyn ProjectionCache>,
pub(crate) writer: WriterHandle,
pub(crate) config: Arc<StoreConfig>,
pub(crate) should_shutdown_on_drop: bool,
pub(crate) _state: std::marker::PhantomData<State>,
}
impl Store<Open> {
pub fn open(config: StoreConfig) -> Result<Self, StoreError> {
Self::open_with_cache(config, Box::new(NoCache))
}
pub fn open_with_native_cache(
config: StoreConfig,
cache_path: impl AsRef<std::path::Path>,
) -> Result<Self, StoreError> {
Self::open_with_cache(config, Box::new(NativeCache::open(cache_path)?))
}
pub fn open_with_cache(
config: StoreConfig,
cache: Box<dyn ProjectionCache>,
) -> Result<Self, StoreError> {
config.validate()?;
std::fs::create_dir_all(&config.data_dir)?;
let config = Arc::new(config);
let index = Arc::new(StoreIndex::with_layout(&config.index.layout));
let reader = Arc::new(Reader::new(config.data_dir.clone(), config.fd_budget));
index_rebuild::open_index(
&index,
&reader,
&config.data_dir,
config.index.enable_checkpoint,
)?;
let active_seg_id = writer::find_latest_segment_id(&config.data_dir).unwrap_or(0) + 1;
reader.set_active_segment(active_seg_id);
let subscribers = Arc::new(SubscriberList::new());
let reactor_subscribers = Arc::new(ReactorSubscriberList::new());
let writer =
WriterHandle::spawn(&config, &index, &subscribers, &reactor_subscribers, &reader)?;
Ok(Self {
index,
reader,
cache,
writer,
config,
should_shutdown_on_drop: true,
_state: std::marker::PhantomData,
})
}
pub fn append(
&self,
coord: &Coordinate,
kind: EventKind,
payload: &impl Serialize,
) -> Result<AppendReceipt, StoreError> {
tracing::debug!(
target: "batpak::flow",
flow = "append",
entity = coord.entity(),
scope = coord.scope(),
event_kind = kind.type_id()
);
let event_id = crate::id::generate_v7_id();
self.do_append(
coord, kind, payload, event_id, event_id, None, None, None, 0,
)
}
pub fn append_reaction(
&self,
coord: &Coordinate,
kind: EventKind,
payload: &impl Serialize,
correlation_id: u128,
causation_id: u128,
) -> Result<AppendReceipt, StoreError> {
tracing::debug!(
target: "batpak::flow",
flow = "append_reaction",
entity = coord.entity(),
scope = coord.scope(),
correlation_id = format_args!("{correlation_id:032x}"),
causation_id = format_args!("{causation_id:032x}")
);
let event_id = crate::id::generate_v7_id();
self.do_append(
coord,
kind,
payload,
event_id,
correlation_id,
Some(causation_id),
None,
None,
0,
)
}
pub fn append_batch(
&self,
items: Vec<crate::store::contracts::BatchAppendItem>,
) -> Result<Vec<AppendReceipt>, StoreError> {
let (tx, rx) = flume::bounded(1);
self.writer
.tx
.send(WriterCommand::AppendBatch { items, respond: tx })
.map_err(|_| StoreError::WriterCrashed)?;
rx.recv().map_err(|_| StoreError::WriterCrashed)?
}
pub fn append_reaction_batch(
&self,
correlation_id: u128,
causation_id: u128,
items: Vec<crate::store::contracts::BatchAppendItem>,
) -> Result<Vec<AppendReceipt>, StoreError> {
let items: Vec<_> = items
.into_iter()
.map(|mut item| {
item.options.correlation_id = Some(correlation_id);
if matches!(item.causation, crate::store::contracts::CausationRef::None) {
item.options.causation_id = Some(causation_id);
}
item
})
.collect();
self.append_batch(items)
}
pub fn get(&self, event_id: u128) -> Result<StoredEvent<serde_json::Value>, StoreError> {
let entry = self
.index
.get_by_id(event_id)
.ok_or(StoreError::NotFound(event_id))?;
self.reader.read_entry(&entry.disk_pos)
}
#[must_use]
pub fn query(&self, region: &Region) -> Vec<IndexEntry> {
self.index.query(region)
}
pub fn walk_ancestors(
&self,
event_id: u128,
limit: usize,
) -> Vec<StoredEvent<serde_json::Value>> {
ancestors::walk_ancestors(self, event_id, limit)
}
pub fn project<T>(&self, entity: &str, freshness: &Freshness) -> Result<Option<T>, StoreError>
where
T: EventSourced<serde_json::Value>
+ serde::Serialize
+ serde::de::DeserializeOwned
+ 'static,
{
projection_flow::project(self, entity, freshness)
}
pub fn subscribe_lossy(&self, region: &Region) -> Subscription {
let rx = self
.writer
.subscribers
.subscribe(self.config.broadcast_capacity);
Subscription::new(rx, region.clone())
}
pub fn cursor_guaranteed(&self, region: &Region) -> Cursor {
Cursor::new(region.clone(), Arc::clone(&self.index))
}
#[must_use]
pub fn stream(&self, entity: &str) -> Vec<IndexEntry> {
self.index.stream(entity)
}
#[must_use]
pub fn by_scope(&self, scope: &str) -> Vec<IndexEntry> {
self.query(&Region::scope(scope))
}
#[must_use]
pub fn by_fact(&self, kind: EventKind) -> Vec<IndexEntry> {
self.query(&Region::all().with_fact(KindFilter::Exact(kind)))
}
pub fn react_loop<R>(
self: &Arc<Self>,
region: &Region,
reactor: R,
) -> Result<std::thread::JoinHandle<()>, StoreError>
where
R: crate::event::sourcing::Reactive<serde_json::Value> + Send + 'static,
{
let store = Arc::clone(self);
let region = region.clone();
let sub = self
.writer
.reactor_subscribers
.subscribe(self.config.broadcast_capacity);
std::thread::Builder::new()
.name("batpak-reactor".into())
.spawn(move || {
while let Ok(envelope) = sub.recv() {
let notif = envelope.notification;
if !region.matches_event(notif.coord.entity(), notif.coord.scope(), notif.kind)
{
continue;
}
for (coord, kind, payload) in reactor.react(&envelope.stored.event) {
if let Err(e) = store.append_reaction(
&coord,
kind,
&payload,
notif.correlation_id,
notif.event_id,
) {
tracing::warn!("react_loop: failed to append reaction: {e}");
}
}
}
})
.map_err(StoreError::Io)
}
pub fn watch_projection<T>(
self: &Arc<Self>,
entity: &str,
freshness: Freshness,
) -> ProjectionWatcher<T>
where
T: EventSourced<serde_json::Value>
+ serde::Serialize
+ serde::de::DeserializeOwned
+ Send
+ 'static,
{
let sub = self.subscribe_lossy(&Region::entity(entity));
let store = Arc::clone(self);
let entity_owned = entity.to_owned();
ProjectionWatcher {
sub,
store,
entity: entity_owned,
freshness,
cached_state: None,
watermark: None,
_phantom: std::marker::PhantomData,
}
}
pub fn append_with_options(
&self,
coord: &Coordinate,
kind: EventKind,
payload: &impl Serialize,
opts: AppendOptions,
) -> Result<AppendReceipt, StoreError> {
tracing::debug!(
target: "batpak::flow",
flow = "append_with_options",
entity = coord.entity(),
scope = coord.scope(),
has_cas = opts.expected_sequence.is_some(),
has_idempotency = opts.idempotency_key.is_some()
);
let event_id = opts
.idempotency_key
.unwrap_or_else(crate::id::generate_v7_id);
let correlation_id = opts.correlation_id.unwrap_or(event_id);
self.do_append(
coord,
kind,
payload,
event_id,
correlation_id,
opts.causation_id,
opts.expected_sequence,
opts.idempotency_key,
opts.flags,
)
}
#[allow(clippy::too_many_arguments)] fn do_append(
&self,
coord: &Coordinate,
kind: EventKind,
payload: &impl Serialize,
event_id: u128,
correlation_id: u128,
causation_id: Option<u128>,
expected_sequence: Option<u32>,
idempotency_key: Option<u128>,
flags: u8,
) -> Result<AppendReceipt, StoreError> {
if self.config.batch.group_commit_max_batch > 1 && idempotency_key.is_none() {
return Err(StoreError::IdempotencyRequired);
}
let payload_bytes =
rmp_serde::to_vec_named(payload).map_err(|e| StoreError::Serialization(Box::new(e)))?;
if payload_bytes.len() > self.config.single_append_max_bytes as usize {
return Err(StoreError::Configuration(format!(
"single append bytes {} exceeds max {}",
payload_bytes.len(),
self.config.single_append_max_bytes
)));
}
let payload_len = checked_payload_len(&payload_bytes)?;
let mut header = EventHeader::new(
event_id,
correlation_id,
causation_id,
self.config.now_us(),
crate::coordinate::DagPosition::root(),
payload_len,
kind,
);
if flags != 0 {
header = header.with_flags(flags);
}
let event = Event::new(header, payload_bytes);
let (tx, rx) = flume::bounded(1);
self.writer
.tx
.send(WriterCommand::Append {
coord: coord.clone(),
event: Box::new(event),
kind,
guards: AppendGuards {
correlation_id,
causation_id,
expected_sequence,
idempotency_key,
},
respond: tx,
})
.map_err(|_| StoreError::WriterCrashed)?;
rx.recv().map_err(|_| StoreError::WriterCrashed)?
}
pub fn apply_transition<From, To, P: Serialize>(
&self,
coord: &Coordinate,
transition: crate::typestate::transition::Transition<From, To, P>,
) -> Result<AppendReceipt, StoreError> {
let kind = transition.kind();
let payload = transition.into_payload();
self.append(coord, kind, &payload)
}
pub fn sync(&self) -> Result<(), StoreError> {
maintenance::sync(self)
}
pub fn snapshot(&self, dest: &std::path::Path) -> Result<(), StoreError> {
maintenance::snapshot(self, dest)
}
pub fn compact(
&self,
config: &CompactionConfig,
) -> Result<segment::CompactionResult, StoreError> {
maintenance::compact(self, config)
}
pub fn close(self) -> Result<Closed, StoreError> {
maintenance::close(self)
}
pub fn stats(&self) -> StoreStats {
maintenance::stats(self)
}
pub fn diagnostics(&self) -> StoreDiagnostics {
maintenance::diagnostics(self)
}
}
impl<State> Drop for Store<State> {
fn drop(&mut self) {
if !self.should_shutdown_on_drop {
return;
}
tracing::warn!(
"Store dropped without explicit close(); only a bounded best-effort drain will run"
);
let (tx, rx) = flume::bounded(1);
if self
.writer
.tx
.send(WriterCommand::Shutdown { respond: tx })
.is_ok()
{
let _ = rx.recv_timeout(std::time::Duration::from_millis(100));
}
}
}
pub struct ProjectionWatcher<T> {
sub: Subscription,
store: Arc<Store<Open>>,
entity: String,
freshness: Freshness,
cached_state: Option<Vec<u8>>,
watermark: Option<u64>,
_phantom: std::marker::PhantomData<T>,
}
impl<T> ProjectionWatcher<T>
where
T: EventSourced<serde_json::Value> + serde::Serialize + serde::de::DeserializeOwned + 'static,
{
pub fn recv(&mut self) -> Result<Option<T>, StoreError> {
if self.sub.recv().is_none() {
return Ok(None); }
if self.cached_state.is_none() || !T::supports_incremental_apply() || !self.store.config.index.incremental_projection
{
return self.refresh_from_full_projection();
}
let Some(watermark) = self.watermark else {
return self.refresh_from_full_projection();
};
let mut delta_entries = self.store.index.stream_since(&self.entity, watermark);
let relevant_kinds = T::relevant_event_kinds();
if !relevant_kinds.is_empty() {
delta_entries.retain(|entry| relevant_kinds.contains(&entry.kind));
}
if delta_entries.is_empty() {
return self.deserialize_cached_state().map(Some);
}
let Some(bytes) = self.cached_state.as_ref() else {
return self.refresh_from_full_projection();
};
let mut state = match serde_json::from_slice::<T>(bytes) {
Ok(value) => value,
Err(_) => return self.refresh_from_full_projection(),
};
let positions: Vec<&crate::store::DiskPos> =
delta_entries.iter().map(|entry| &entry.disk_pos).collect();
let stored_events = self.store.reader.read_entries_batch(&positions)?;
for stored in stored_events {
state.apply_event(&stored.event);
}
let new_watermark = delta_entries
.last()
.map(|entry| entry.global_sequence)
.unwrap_or(watermark);
let encoded =
serde_json::to_vec(&state).map_err(|e| StoreError::Serialization(Box::new(e)))?;
self.cached_state = Some(encoded);
self.watermark = Some(new_watermark);
Ok(Some(state))
}
#[doc(hidden)]
pub fn subscription(&self) -> &Subscription {
&self.sub
}
fn refresh_from_full_projection(&mut self) -> Result<Option<T>, StoreError> {
let result = self.store.project::<T>(&self.entity, &self.freshness)?;
if let Some(ref value) = result {
self.cached_state = Some(
serde_json::to_vec(value).map_err(|e| StoreError::Serialization(Box::new(e)))?,
);
self.watermark = self
.store
.index
.stream(&self.entity)
.last()
.map(|entry| entry.global_sequence);
} else {
self.cached_state = None;
self.watermark = None;
}
Ok(result)
}
fn deserialize_cached_state(&self) -> Result<T, StoreError> {
let bytes = self
.cached_state
.as_ref()
.ok_or_else(|| StoreError::Configuration("projection watcher state missing".into()))?;
serde_json::from_slice(bytes).map_err(|e| StoreError::Serialization(Box::new(e)))
}
}