mod ancestors;
pub(crate) mod checkpoint;
pub(crate) mod columnar;
mod config;
mod contracts;
pub mod cursor;
mod error;
pub mod index;
mod index_rebuild;
pub(crate) mod interner;
mod maintenance;
pub mod projection;
mod projection_flow;
pub mod reader;
#[cfg(test)]
mod runtime_contracts;
pub mod segment;
pub(crate) mod sidx;
pub mod stats;
pub mod subscription;
#[cfg(feature = "test-support")]
mod test_support;
pub mod writer;
pub use config::{IndexLayout, StoreConfig, SyncMode};
pub use contracts::{
AppendOptions, AppendReceipt, CompactionConfig, CompactionStrategy, RetentionPredicate,
};
pub use cursor::Cursor;
pub use error::StoreError;
pub use index::{ClockKey, DiskPos, IndexEntry};
#[cfg(feature = "lmdb")]
pub use projection::LmdbCache;
#[cfg(feature = "redb")]
pub use projection::RedbCache;
pub use projection::{CacheCapabilities, CacheMeta, Freshness, NoCache, ProjectionCache};
pub use stats::{StoreDiagnostics, StoreStats};
pub use subscription::Subscription;
pub use writer::{Notification, RestartPolicy};
use crate::coordinate::{Coordinate, KindFilter, Region};
use crate::event::{Event, EventHeader, EventKind, EventSourced, StoredEvent};
#[cfg(test)]
pub(crate) use config::now_us;
use contracts::checked_payload_len;
use index::StoreIndex;
use reader::Reader;
use serde::Serialize;
use std::sync::Arc;
use writer::{AppendGuards, SubscriberList, WriterCommand, WriterHandle};
#[allow(unexpected_cfgs)]
#[cfg(feature = "async-store")]
compile_error!("INVARIANT 2: Store API is sync. Use spawn_blocking or flume recv_async.");
pub struct Store {
index: Arc<StoreIndex>,
reader: Arc<Reader>,
cache: Box<dyn ProjectionCache>,
writer: WriterHandle,
config: Arc<StoreConfig>,
}
impl Store {
pub fn open(config: StoreConfig) -> Result<Self, StoreError> {
Self::open_with_cache(config, Box::new(NoCache))
}
#[cfg(feature = "redb")]
pub fn open_with_redb_cache(
config: StoreConfig,
cache_path: impl AsRef<std::path::Path>,
) -> Result<Self, StoreError> {
Self::open_with_cache(config, Box::new(RedbCache::open(cache_path)?))
}
#[cfg(feature = "lmdb")]
pub fn open_with_lmdb_cache(
config: StoreConfig,
cache_path: impl AsRef<std::path::Path>,
) -> Result<Self, StoreError> {
let map_size = config.cache_map_size_bytes;
Self::open_with_cache(config, Box::new(LmdbCache::open(cache_path, map_size)?))
}
pub fn open_with_cache(
config: StoreConfig,
cache: Box<dyn ProjectionCache>,
) -> Result<Self, StoreError> {
config.validate()?;
std::fs::create_dir_all(&config.data_dir)?;
let config = Arc::new(config);
let index = Arc::new(StoreIndex::with_layout(&config.index_layout));
let reader = Arc::new(Reader::new(config.data_dir.clone(), config.fd_budget));
index_rebuild::open_index(&index, &reader, &config.data_dir, config.enable_checkpoint)?;
let active_seg_id = writer::find_latest_segment_id(&config.data_dir).unwrap_or(0) + 1;
reader.set_active_segment(active_seg_id);
let subscribers = Arc::new(SubscriberList::new());
let writer = WriterHandle::spawn(&config, &index, &subscribers, &reader)?;
Ok(Self {
index,
reader,
cache,
writer,
config,
})
}
pub fn append(
&self,
coord: &Coordinate,
kind: EventKind,
payload: &impl Serialize,
) -> Result<AppendReceipt, StoreError> {
tracing::debug!(
target: "batpak::flow",
flow = "append",
entity = coord.entity(),
scope = coord.scope(),
event_kind = kind.type_id()
);
let event_id = crate::id::generate_v7_id();
self.do_append(
coord, kind, payload, event_id, event_id, None, None, None, 0,
)
}
pub fn append_reaction(
&self,
coord: &Coordinate,
kind: EventKind,
payload: &impl Serialize,
correlation_id: u128,
causation_id: u128,
) -> Result<AppendReceipt, StoreError> {
tracing::debug!(
target: "batpak::flow",
flow = "append_reaction",
entity = coord.entity(),
scope = coord.scope(),
correlation_id = format_args!("{correlation_id:032x}"),
causation_id = format_args!("{causation_id:032x}")
);
let event_id = crate::id::generate_v7_id();
self.do_append(
coord,
kind,
payload,
event_id,
correlation_id,
Some(causation_id),
None,
None,
0,
)
}
pub fn get(&self, event_id: u128) -> Result<StoredEvent<serde_json::Value>, StoreError> {
let entry = self
.index
.get_by_id(event_id)
.ok_or(StoreError::NotFound(event_id))?;
self.reader.read_entry(&entry.disk_pos)
}
#[must_use]
pub fn query(&self, region: &Region) -> Vec<IndexEntry> {
self.index.query(region)
}
pub fn walk_ancestors(
&self,
event_id: u128,
limit: usize,
) -> Vec<StoredEvent<serde_json::Value>> {
ancestors::walk_ancestors(self, event_id, limit)
}
pub fn project<T>(&self, entity: &str, freshness: &Freshness) -> Result<Option<T>, StoreError>
where
T: EventSourced<serde_json::Value>
+ serde::Serialize
+ serde::de::DeserializeOwned
+ 'static,
{
projection_flow::project(self, entity, freshness)
}
pub fn subscribe(&self, region: &Region) -> Subscription {
let rx = self
.writer
.subscribers
.subscribe(self.config.broadcast_capacity);
Subscription::new(rx, region.clone())
}
pub fn cursor(&self, region: &Region) -> Cursor {
Cursor::new(region.clone(), Arc::clone(&self.index))
}
#[must_use]
pub fn stream(&self, entity: &str) -> Vec<IndexEntry> {
self.index.stream(entity)
}
#[must_use]
pub fn by_scope(&self, scope: &str) -> Vec<IndexEntry> {
self.query(&Region::scope(scope))
}
#[must_use]
pub fn by_fact(&self, kind: EventKind) -> Vec<IndexEntry> {
self.query(&Region::all().with_fact(KindFilter::Exact(kind)))
}
pub fn react_loop<R>(
self: &Arc<Self>,
region: &Region,
reactor: R,
) -> Result<std::thread::JoinHandle<()>, StoreError>
where
R: crate::event::sourcing::Reactive<serde_json::Value> + Send + 'static,
{
let store = Arc::clone(self);
let sub = self.subscribe(region);
std::thread::Builder::new()
.name("batpak-reactor".into())
.spawn(move || {
while let Some(notif) = sub.recv() {
let stored = match store.get(notif.event_id) {
Ok(s) => s,
Err(e) => {
tracing::warn!(
"react_loop: failed to get event {}: {e}",
notif.event_id
);
continue;
}
};
for (coord, kind, payload) in reactor.react(&stored.event) {
if let Err(e) = store.append_reaction(
&coord,
kind,
&payload,
notif.correlation_id,
notif.event_id,
) {
tracing::warn!("react_loop: failed to append reaction: {e}");
}
}
}
})
.map_err(StoreError::Io)
}
pub fn watch_projection<T>(
self: &Arc<Self>,
entity: &str,
freshness: Freshness,
) -> ProjectionWatcher<T>
where
T: EventSourced<serde_json::Value>
+ serde::Serialize
+ serde::de::DeserializeOwned
+ Send
+ 'static,
{
let sub = self.subscribe(&Region::entity(entity));
let store = Arc::clone(self);
let entity_owned = entity.to_owned();
ProjectionWatcher {
sub,
store,
entity: entity_owned,
freshness,
_phantom: std::marker::PhantomData,
}
}
pub fn append_with_options(
&self,
coord: &Coordinate,
kind: EventKind,
payload: &impl Serialize,
opts: AppendOptions,
) -> Result<AppendReceipt, StoreError> {
tracing::debug!(
target: "batpak::flow",
flow = "append_with_options",
entity = coord.entity(),
scope = coord.scope(),
has_cas = opts.expected_sequence.is_some(),
has_idempotency = opts.idempotency_key.is_some()
);
let event_id = opts
.idempotency_key
.unwrap_or_else(crate::id::generate_v7_id);
let correlation_id = opts.correlation_id.unwrap_or(event_id);
self.do_append(
coord,
kind,
payload,
event_id,
correlation_id,
opts.causation_id,
opts.expected_sequence,
opts.idempotency_key,
opts.flags,
)
}
#[allow(clippy::too_many_arguments)] fn do_append(
&self,
coord: &Coordinate,
kind: EventKind,
payload: &impl Serialize,
event_id: u128,
correlation_id: u128,
causation_id: Option<u128>,
expected_sequence: Option<u32>,
idempotency_key: Option<u128>,
flags: u8,
) -> Result<AppendReceipt, StoreError> {
if self.config.group_commit_max_batch > 1 && idempotency_key.is_none() {
return Err(StoreError::IdempotencyRequired);
}
let payload_bytes =
rmp_serde::to_vec_named(payload).map_err(|e| StoreError::Serialization(Box::new(e)))?;
let payload_len = checked_payload_len(&payload_bytes)?;
let mut header = EventHeader::new(
event_id,
correlation_id,
causation_id,
self.config.now_us(),
crate::coordinate::DagPosition::root(),
payload_len,
kind,
);
if flags != 0 {
header = header.with_flags(flags);
}
let event = Event::new(header, payload_bytes);
let (tx, rx) = flume::bounded(1);
self.writer
.tx
.send(WriterCommand::Append {
entity: coord.entity_arc(),
scope: coord.scope_arc(),
event: Box::new(event),
kind,
guards: AppendGuards {
correlation_id,
causation_id,
expected_sequence,
idempotency_key,
},
respond: tx,
})
.map_err(|_| StoreError::WriterCrashed)?;
rx.recv().map_err(|_| StoreError::WriterCrashed)?
}
pub fn apply_transition<From, To, P: Serialize>(
&self,
coord: &Coordinate,
transition: crate::typestate::transition::Transition<From, To, P>,
) -> Result<AppendReceipt, StoreError> {
let kind = transition.kind();
let payload = transition.into_payload();
self.append(coord, kind, &payload)
}
pub fn sync(&self) -> Result<(), StoreError> {
maintenance::sync(self)
}
pub fn snapshot(&self, dest: &std::path::Path) -> Result<(), StoreError> {
maintenance::snapshot(self, dest)
}
pub fn compact(
&self,
config: &CompactionConfig,
) -> Result<segment::CompactionResult, StoreError> {
maintenance::compact(self, config)
}
pub fn close(self) -> Result<(), StoreError> {
maintenance::close(self)
}
pub fn stats(&self) -> StoreStats {
maintenance::stats(self)
}
pub fn diagnostics(&self) -> StoreDiagnostics {
maintenance::diagnostics(self)
}
}
impl Drop for Store {
fn drop(&mut self) {
let (tx, rx) = flume::bounded(1);
if self
.writer
.tx
.send(WriterCommand::Shutdown { respond: tx })
.is_ok()
{
let _ = rx.recv_timeout(std::time::Duration::from_millis(100));
}
}
}
pub struct ProjectionWatcher<T> {
sub: Subscription,
store: Arc<Store>,
entity: String,
freshness: Freshness,
_phantom: std::marker::PhantomData<T>,
}
impl<T> ProjectionWatcher<T>
where
T: EventSourced<serde_json::Value> + serde::Serialize + serde::de::DeserializeOwned + 'static,
{
pub fn recv(&self) -> Result<Option<T>, StoreError> {
if self.sub.recv().is_none() {
return Ok(None); }
self.store.project::<T>(&self.entity, &self.freshness)
}
#[doc(hidden)]
pub fn subscription(&self) -> &Subscription {
&self.sub
}
}