mod ancestors;
pub(crate) mod checkpoint;
pub(crate) mod columnar;
mod config;
mod contracts;
pub mod cursor;
mod error;
#[cfg(feature = "dangerous-test-hooks")]
pub mod fault;
pub mod index;
mod index_rebuild;
pub(crate) mod interner;
mod maintenance;
pub(crate) mod mmap_index;
pub mod projection;
mod projection_flow;
pub mod reader;
#[cfg(test)]
mod runtime_contracts;
pub mod segment;
pub(crate) mod sidx;
pub mod stats;
pub mod subscription;
#[cfg(feature = "dangerous-test-hooks")]
mod test_support;
pub(crate) mod visibility_ranges;
pub mod writer;
pub use config::{
BatchConfig, IndexConfig, IndexLayout, StoreConfig, SyncConfig, SyncMode, ViewConfig,
WriterConfig,
};
pub use contracts::{
AppendOptions, AppendReceipt, BatchAppendItem, CausationRef, CompactionConfig,
CompactionStrategy, RetentionPredicate,
};
pub use cursor::{Cursor, CursorWorkerAction, CursorWorkerConfig, CursorWorkerHandle};
pub use error::BatchStage;
pub use error::StoreError;
#[cfg(feature = "dangerous-test-hooks")]
pub use fault::{
CountdownAction, CountdownInjector, FaultInjector, InjectionPoint, ProbabilisticInjector,
};
pub use index::{ClockKey, DiskPos, IndexEntry};
pub use index_rebuild::{OpenIndexPath, OpenIndexReport};
pub use projection::{
CacheCapabilities, CacheMeta, Freshness, NativeCache, NoCache, ProjectionCache,
};
pub use stats::{StoreDiagnostics, StoreStats, WriterPressure};
pub use subscription::Subscription;
pub use writer::{Notification, RestartPolicy};
use crate::coordinate::{Coordinate, KindFilter, Region};
use crate::event::{Event, EventHeader, EventKind, EventSourced, StoredEvent};
#[cfg(test)]
pub(crate) use config::now_us;
use contracts::checked_payload_len;
use index::StoreIndex;
use reader::Reader;
use serde::Serialize;
use std::sync::Arc;
use writer::{AppendGuards, ReactorSubscriberList, SubscriberList, WriterCommand, WriterHandle};
#[allow(unexpected_cfgs)]
#[cfg(feature = "async-store")]
compile_error!("INVARIANT 2: Store API is sync. Use spawn_blocking or flume recv_async.");
pub struct Open;
pub struct Closed;
pub struct ReadOnly;
pub struct Store<State = Open> {
pub(crate) index: Arc<StoreIndex>,
pub(crate) reader: Arc<Reader>,
pub(crate) cache: Box<dyn ProjectionCache>,
pub(crate) writer: Option<WriterHandle>,
pub(crate) config: Arc<StoreConfig>,
pub(crate) should_shutdown_on_drop: bool,
pub(crate) open_report: Option<index_rebuild::OpenIndexReport>,
pub(crate) _state: std::marker::PhantomData<State>,
}
type AppendReply = Result<AppendReceipt, StoreError>;
type BatchAppendReply = Result<Vec<AppendReceipt>, StoreError>;
pub struct AppendTicket {
rx: flume::Receiver<AppendReply>,
}
impl AppendTicket {
pub fn wait(self) -> AppendReply {
self.rx.recv().map_err(|_| StoreError::WriterCrashed)?
}
pub fn try_check(&self) -> Option<AppendReply> {
match self.rx.try_recv() {
Ok(value) => Some(value),
Err(flume::TryRecvError::Disconnected) => Some(Err(StoreError::WriterCrashed)),
Err(flume::TryRecvError::Empty) => None,
}
}
pub fn receiver(&self) -> &flume::Receiver<AppendReply> {
&self.rx
}
}
pub struct BatchAppendTicket {
rx: flume::Receiver<BatchAppendReply>,
}
impl BatchAppendTicket {
pub fn wait(self) -> BatchAppendReply {
self.rx.recv().map_err(|_| StoreError::WriterCrashed)?
}
pub fn try_check(&self) -> Option<BatchAppendReply> {
match self.rx.try_recv() {
Ok(value) => Some(value),
Err(flume::TryRecvError::Disconnected) => Some(Err(StoreError::WriterCrashed)),
Err(flume::TryRecvError::Empty) => None,
}
}
pub fn receiver(&self) -> &flume::Receiver<BatchAppendReply> {
&self.rx
}
}
pub struct Outbox<'a> {
store: &'a Store<Open>,
fence_token: Option<u64>,
items: Vec<BatchAppendItem>,
}
impl<'a> Outbox<'a> {
fn new(store: &'a Store<Open>, fence_token: Option<u64>) -> Self {
Self {
store,
fence_token,
items: Vec::new(),
}
}
pub fn stage(
&mut self,
coord: Coordinate,
kind: EventKind,
payload: &impl Serialize,
) -> Result<&mut Self, StoreError> {
self.stage_with_options(coord, kind, payload, AppendOptions::default())
}
pub fn stage_with_options(
&mut self,
coord: Coordinate,
kind: EventKind,
payload: &impl Serialize,
options: AppendOptions,
) -> Result<&mut Self, StoreError> {
let item = BatchAppendItem::new(coord, kind, payload, options, CausationRef::None)?;
self.items.push(item);
Ok(self)
}
pub fn push_item(&mut self, item: BatchAppendItem) -> &mut Self {
self.items.push(item);
self
}
pub fn flush(&mut self) -> Result<Vec<AppendReceipt>, StoreError> {
let items = std::mem::take(&mut self.items);
match self.fence_token {
Some(token) => self.store.submit_batch_with_fence(items, token)?.wait(),
None => self.store.append_batch(items),
}
}
pub fn submit_flush(&mut self) -> Result<BatchAppendTicket, StoreError> {
let items = std::mem::take(&mut self.items);
match self.fence_token {
Some(token) => self.store.submit_batch_with_fence(items, token),
None => self.store.submit_batch(items),
}
}
pub fn len(&self) -> usize {
self.items.len()
}
pub fn is_empty(&self) -> bool {
self.items.is_empty()
}
}
pub struct VisibilityFence<'a> {
store: &'a Store<Open>,
token: u64,
closed: bool,
}
impl<'a> VisibilityFence<'a> {
pub fn submit(
&self,
coord: &Coordinate,
kind: EventKind,
payload: &impl Serialize,
) -> Result<AppendTicket, StoreError> {
self.store
.submit_with_fence(coord, kind, payload, self.token)
}
pub fn submit_reaction(
&self,
coord: &Coordinate,
kind: EventKind,
payload: &impl Serialize,
correlation_id: u128,
causation_id: u128,
) -> Result<AppendTicket, StoreError> {
self.store.submit_reaction_with_fence(
coord,
kind,
payload,
correlation_id,
causation_id,
self.token,
)
}
pub fn submit_batch(
&self,
items: Vec<crate::store::contracts::BatchAppendItem>,
) -> Result<BatchAppendTicket, StoreError> {
self.store.submit_batch_with_fence(items, self.token)
}
pub fn outbox(&self) -> Outbox<'_> {
Outbox::new(self.store, Some(self.token))
}
pub fn commit(mut self) -> Result<(), StoreError> {
let (tx, rx) = flume::bounded(1);
self.store
.writer_handle()?
.tx
.send(WriterCommand::CommitVisibilityFence {
token: self.token,
respond: tx,
})
.map_err(|_| StoreError::WriterCrashed)?;
self.closed = true;
rx.recv().map_err(|_| StoreError::WriterCrashed)?
}
pub fn cancel(mut self) -> Result<(), StoreError> {
let (tx, rx) = flume::bounded(1);
self.store
.writer_handle()?
.tx
.send(WriterCommand::CancelVisibilityFence {
token: self.token,
respond: tx,
})
.map_err(|_| StoreError::WriterCrashed)?;
self.closed = true;
rx.recv().map_err(|_| StoreError::WriterCrashed)?
}
}
impl Drop for VisibilityFence<'_> {
fn drop(&mut self) {
if self.closed {
return;
}
let Some(writer) = self.store.writer.as_ref() else {
return;
};
let (tx, _rx) = flume::bounded(1);
let _ = writer.tx.send(WriterCommand::CancelVisibilityFence {
token: self.token,
respond: tx,
});
}
}
impl Store<Open> {
pub fn open(config: StoreConfig) -> Result<Self, StoreError> {
Self::open_with_cache(config, Box::new(NoCache))
}
pub fn open_with_native_cache(
config: StoreConfig,
cache_path: impl AsRef<std::path::Path>,
) -> Result<Self, StoreError> {
Self::open_with_cache(config, Box::new(NativeCache::open(cache_path)?))
}
pub fn open_with_cache(
config: StoreConfig,
cache: Box<dyn ProjectionCache>,
) -> Result<Self, StoreError> {
config.validate()?;
std::fs::create_dir_all(&config.data_dir)?;
let config = Arc::new(config);
let index = Arc::new(StoreIndex::with_config(&config.index));
let reader = Arc::new(Reader::new(config.data_dir.clone(), config.fd_budget));
let open_report = index_rebuild::open_index(
&index,
&reader,
&config.data_dir,
config.index.enable_checkpoint,
config.index.enable_mmap_index,
)?;
let active_seg_id = writer::find_latest_segment_id(&config.data_dir).unwrap_or(0) + 1;
reader.set_active_segment(active_seg_id);
let subscribers = Arc::new(SubscriberList::new());
let reactor_subscribers = Arc::new(ReactorSubscriberList::new());
let writer =
WriterHandle::spawn(&config, &index, &subscribers, &reactor_subscribers, &reader)?;
Ok(Self {
index,
reader,
cache,
writer: Some(writer),
config,
should_shutdown_on_drop: true,
open_report: Some(open_report),
_state: std::marker::PhantomData,
})
}
pub fn outbox(&self) -> Outbox<'_> {
Outbox::new(self, None)
}
pub fn begin_visibility_fence(&self) -> Result<VisibilityFence<'_>, StoreError> {
let token = self.index.begin_visibility_fence()?;
let (tx, rx) = flume::bounded(1);
let send_result = self
.writer_handle()?
.tx
.send(WriterCommand::BeginVisibilityFence { token, respond: tx });
if send_result.is_err() {
let _ = self.index.cancel_visibility_fence(token);
return Err(StoreError::WriterCrashed);
}
rx.recv().map_err(|_| StoreError::WriterCrashed)??;
Ok(VisibilityFence {
store: self,
token,
closed: false,
})
}
pub fn writer_pressure(&self) -> WriterPressure {
let writer = self
.writer
.as_ref()
.expect("open store always has a writer handle");
WriterPressure {
queue_len: writer.tx.len(),
capacity: self.config.writer.channel_capacity,
}
}
pub fn submit(
&self,
coord: &Coordinate,
kind: EventKind,
payload: &impl Serialize,
) -> Result<AppendTicket, StoreError> {
self.ensure_no_active_public_fence()?;
let event_id = crate::id::generate_v7_id();
self.submit_inner(
coord, kind, payload, event_id, event_id, None, None, None, 0, None,
)
}
pub fn submit_reaction(
&self,
coord: &Coordinate,
kind: EventKind,
payload: &impl Serialize,
correlation_id: u128,
causation_id: u128,
) -> Result<AppendTicket, StoreError> {
self.ensure_no_active_public_fence()?;
let event_id = crate::id::generate_v7_id();
self.submit_inner(
coord,
kind,
payload,
event_id,
correlation_id,
Some(causation_id),
None,
None,
0,
None,
)
}
pub fn submit_batch(
&self,
items: Vec<crate::store::contracts::BatchAppendItem>,
) -> Result<BatchAppendTicket, StoreError> {
self.ensure_no_active_public_fence()?;
self.submit_batch_with_fence_impl(items, None)
}
fn submit_batch_with_fence(
&self,
items: Vec<crate::store::contracts::BatchAppendItem>,
token: u64,
) -> Result<BatchAppendTicket, StoreError> {
self.submit_batch_with_fence_impl(items, Some(token))
}
fn submit_batch_with_fence_impl(
&self,
items: Vec<crate::store::contracts::BatchAppendItem>,
token: Option<u64>,
) -> Result<BatchAppendTicket, StoreError> {
let (tx, rx) = flume::bounded(1);
let command = match token {
Some(token) => WriterCommand::FenceAppendBatch {
token,
items,
respond: tx,
},
None => WriterCommand::AppendBatch { items, respond: tx },
};
self.writer_handle()?
.tx
.send(command)
.map_err(|_| StoreError::WriterCrashed)?;
Ok(BatchAppendTicket { rx })
}
pub fn try_submit(
&self,
coord: &Coordinate,
kind: EventKind,
payload: &impl Serialize,
) -> Result<crate::outcome::Outcome<AppendTicket>, StoreError> {
if self.index.active_visibility_fence().is_some() {
return Ok(crate::outcome::Outcome::cancelled(
"visibility fence is active; submit through the fence",
));
}
if let Some(outcome) = self.submit_pressure_gate() {
return Ok(outcome);
}
self.submit(coord, kind, payload)
.map(crate::outcome::Outcome::ok)
}
pub fn try_submit_reaction(
&self,
coord: &Coordinate,
kind: EventKind,
payload: &impl Serialize,
correlation_id: u128,
causation_id: u128,
) -> Result<crate::outcome::Outcome<AppendTicket>, StoreError> {
if self.index.active_visibility_fence().is_some() {
return Ok(crate::outcome::Outcome::cancelled(
"visibility fence is active; submit through the fence",
));
}
if let Some(outcome) = self.submit_pressure_gate() {
return Ok(outcome);
}
self.submit_reaction(coord, kind, payload, correlation_id, causation_id)
.map(crate::outcome::Outcome::ok)
}
pub fn try_submit_batch(
&self,
items: Vec<crate::store::contracts::BatchAppendItem>,
) -> Result<crate::outcome::Outcome<BatchAppendTicket>, StoreError> {
if self.index.active_visibility_fence().is_some() {
return Ok(crate::outcome::Outcome::cancelled(
"visibility fence is active; submit through the fence",
));
}
if let Some(outcome) = self.submit_pressure_gate_batch() {
return Ok(outcome);
}
self.submit_batch(items).map(crate::outcome::Outcome::ok)
}
pub fn append(
&self,
coord: &Coordinate,
kind: EventKind,
payload: &impl Serialize,
) -> Result<AppendReceipt, StoreError> {
tracing::debug!(
target: "batpak::flow",
flow = "append",
entity = coord.entity(),
scope = coord.scope(),
event_kind = kind.type_id()
);
self.submit(coord, kind, payload)?.wait()
}
pub fn append_reaction(
&self,
coord: &Coordinate,
kind: EventKind,
payload: &impl Serialize,
correlation_id: u128,
causation_id: u128,
) -> Result<AppendReceipt, StoreError> {
tracing::debug!(
target: "batpak::flow",
flow = "append_reaction",
entity = coord.entity(),
scope = coord.scope(),
correlation_id = format_args!("{correlation_id:032x}"),
causation_id = format_args!("{causation_id:032x}")
);
self.submit_reaction(coord, kind, payload, correlation_id, causation_id)?
.wait()
}
pub fn append_batch(
&self,
items: Vec<crate::store::contracts::BatchAppendItem>,
) -> Result<Vec<AppendReceipt>, StoreError> {
self.submit_batch(items)?.wait()
}
pub fn append_reaction_batch(
&self,
correlation_id: u128,
causation_id: u128,
items: Vec<crate::store::contracts::BatchAppendItem>,
) -> Result<Vec<AppendReceipt>, StoreError> {
let items: Vec<_> = items
.into_iter()
.map(|mut item| {
item.options.correlation_id = Some(correlation_id);
if matches!(item.causation, crate::store::contracts::CausationRef::None) {
item.options.causation_id = Some(causation_id);
}
item
})
.collect();
self.append_batch(items)
}
pub fn subscribe_lossy(&self, region: &Region) -> Subscription {
let rx = self
.writer
.as_ref()
.expect("open store has writer")
.subscribers
.subscribe(self.config.broadcast_capacity);
Subscription::new(rx, region.clone())
}
pub fn react_loop<R>(
self: &Arc<Self>,
region: &Region,
reactor: R,
) -> Result<std::thread::JoinHandle<()>, StoreError>
where
R: crate::event::sourcing::Reactive<serde_json::Value> + Send + 'static,
{
let store = Arc::clone(self);
let region = region.clone();
let sub = self
.writer
.as_ref()
.expect("open store has writer")
.reactor_subscribers
.subscribe(self.config.broadcast_capacity);
std::thread::Builder::new()
.name("batpak-reactor".into())
.spawn(move || {
while let Ok(envelope) = sub.recv() {
let notif = envelope.notification;
if !region.matches_event(notif.coord.entity(), notif.coord.scope(), notif.kind)
{
continue;
}
for (coord, kind, payload) in reactor.react(&envelope.stored.event) {
if let Err(e) = store.append_reaction(
&coord,
kind,
&payload,
notif.correlation_id,
notif.event_id,
) {
tracing::warn!("react_loop: failed to append reaction: {e}");
}
}
}
})
.map_err(StoreError::Io)
}
pub fn watch_projection<T>(
self: &Arc<Self>,
entity: &str,
freshness: Freshness,
) -> ProjectionWatcher<T>
where
T: EventSourced<serde_json::Value>
+ serde::Serialize
+ serde::de::DeserializeOwned
+ Send
+ 'static,
{
let sub = self.subscribe_lossy(&Region::entity(entity));
let store = Arc::clone(self);
let entity_owned = entity.to_owned();
ProjectionWatcher {
sub,
store,
entity: entity_owned,
freshness,
cached_state: None,
watermark: None,
_phantom: std::marker::PhantomData,
}
}
pub fn append_with_options(
&self,
coord: &Coordinate,
kind: EventKind,
payload: &impl Serialize,
opts: AppendOptions,
) -> Result<AppendReceipt, StoreError> {
tracing::debug!(
target: "batpak::flow",
flow = "append_with_options",
entity = coord.entity(),
scope = coord.scope(),
has_cas = opts.expected_sequence.is_some(),
has_idempotency = opts.idempotency_key.is_some()
);
let event_id = opts
.idempotency_key
.unwrap_or_else(crate::id::generate_v7_id);
let correlation_id = opts.correlation_id.unwrap_or(event_id);
self.submit_inner(
coord,
kind,
payload,
event_id,
correlation_id,
opts.causation_id,
opts.expected_sequence,
opts.idempotency_key,
opts.flags,
None,
)?
.wait()
}
#[allow(clippy::too_many_arguments)] fn submit_inner(
&self,
coord: &Coordinate,
kind: EventKind,
payload: &impl Serialize,
event_id: u128,
correlation_id: u128,
causation_id: Option<u128>,
expected_sequence: Option<u32>,
idempotency_key: Option<u128>,
flags: u8,
fence_token: Option<u64>,
) -> Result<AppendTicket, StoreError> {
if fence_token.is_none() {
self.ensure_no_active_public_fence()?;
}
if self.config.batch.group_commit_max_batch > 1 && idempotency_key.is_none() {
return Err(StoreError::IdempotencyRequired);
}
let payload_bytes =
rmp_serde::to_vec_named(payload).map_err(|e| StoreError::Serialization(Box::new(e)))?;
if payload_bytes.len() > self.config.single_append_max_bytes as usize {
return Err(StoreError::Configuration(format!(
"single append bytes {} exceeds max {}",
payload_bytes.len(),
self.config.single_append_max_bytes
)));
}
let payload_len = checked_payload_len(&payload_bytes)?;
let mut header = EventHeader::new(
event_id,
correlation_id,
causation_id,
self.config.now_us(),
crate::coordinate::DagPosition::root(),
payload_len,
kind,
);
if flags != 0 {
header = header.with_flags(flags);
}
let event = Event::new(header, payload_bytes);
let (tx, rx) = flume::bounded(1);
let command = match fence_token {
Some(token) => WriterCommand::FenceAppend {
token,
coord: coord.clone(),
event: Box::new(event),
kind,
guards: AppendGuards {
correlation_id,
causation_id,
expected_sequence,
idempotency_key,
},
respond: tx,
},
None => WriterCommand::Append {
coord: coord.clone(),
event: Box::new(event),
kind,
guards: AppendGuards {
correlation_id,
causation_id,
expected_sequence,
idempotency_key,
},
respond: tx,
},
};
self.writer_handle()?
.tx
.send(command)
.map_err(|_| StoreError::WriterCrashed)?;
Ok(AppendTicket { rx })
}
fn writer_handle(&self) -> Result<&WriterHandle, StoreError> {
self.writer.as_ref().ok_or(StoreError::WriterCrashed)
}
fn ensure_no_active_public_fence(&self) -> Result<(), StoreError> {
if self.index.active_visibility_fence().is_some() {
return Err(StoreError::VisibilityFenceActive);
}
Ok(())
}
fn submit_with_fence(
&self,
coord: &Coordinate,
kind: EventKind,
payload: &impl Serialize,
token: u64,
) -> Result<AppendTicket, StoreError> {
let event_id = crate::id::generate_v7_id();
self.submit_inner(
coord,
kind,
payload,
event_id,
event_id,
None,
None,
None,
0,
Some(token),
)
}
fn submit_reaction_with_fence(
&self,
coord: &Coordinate,
kind: EventKind,
payload: &impl Serialize,
correlation_id: u128,
causation_id: u128,
token: u64,
) -> Result<AppendTicket, StoreError> {
let event_id = crate::id::generate_v7_id();
self.submit_inner(
coord,
kind,
payload,
event_id,
correlation_id,
Some(causation_id),
None,
None,
0,
Some(token),
)
}
fn submit_pressure_gate(&self) -> Option<crate::outcome::Outcome<AppendTicket>> {
let writer = self.writer.as_ref()?;
let queued = writer.tx.len();
let threshold = self.pressure_retry_threshold();
if queued >= threshold {
return Some(crate::outcome::Outcome::retry(
10,
1,
1,
format!(
"writer mailbox at {queued}/{} queued commands",
self.config.writer.channel_capacity
),
));
}
None
}
fn submit_pressure_gate_batch(&self) -> Option<crate::outcome::Outcome<BatchAppendTicket>> {
let writer = self.writer.as_ref()?;
let queued = writer.tx.len();
let threshold = self.pressure_retry_threshold();
if queued >= threshold {
return Some(crate::outcome::Outcome::retry(
10,
1,
1,
format!(
"writer mailbox at {queued}/{} queued commands",
self.config.writer.channel_capacity
),
));
}
None
}
fn pressure_retry_threshold(&self) -> usize {
let capacity = self.config.writer.channel_capacity;
let pct = usize::from(self.config.writer.pressure_retry_threshold_pct);
capacity.saturating_mul(pct).div_ceil(100).max(1)
}
pub fn apply_transition<From, To, P: Serialize>(
&self,
coord: &Coordinate,
transition: crate::typestate::transition::Transition<From, To, P>,
) -> Result<AppendReceipt, StoreError> {
let kind = transition.kind();
let payload = transition.into_payload();
self.append(coord, kind, &payload)
}
pub fn sync(&self) -> Result<(), StoreError> {
maintenance::sync(self)
}
pub fn snapshot(&self, dest: &std::path::Path) -> Result<(), StoreError> {
maintenance::snapshot(self, dest)
}
pub fn compact(
&self,
config: &CompactionConfig,
) -> Result<segment::CompactionResult, StoreError> {
maintenance::compact(self, config)
}
pub fn close(self) -> Result<Closed, StoreError> {
maintenance::close(self)
}
}
impl Store<ReadOnly> {
pub fn open_read_only(config: StoreConfig) -> Result<Self, StoreError> {
Self::open_read_only_with_cache(config, Box::new(NoCache))
}
pub fn open_read_only_with_native_cache(
config: StoreConfig,
cache_path: impl AsRef<std::path::Path>,
) -> Result<Self, StoreError> {
Self::open_read_only_with_cache(config, Box::new(NativeCache::open(cache_path)?))
}
pub fn open_read_only_with_cache(
config: StoreConfig,
cache: Box<dyn ProjectionCache>,
) -> Result<Self, StoreError> {
config.validate()?;
std::fs::create_dir_all(&config.data_dir)?;
let config = Arc::new(config);
let index = Arc::new(StoreIndex::with_config(&config.index));
let reader = Arc::new(Reader::new(config.data_dir.clone(), config.fd_budget));
let open_report = index_rebuild::open_index(
&index,
&reader,
&config.data_dir,
config.index.enable_checkpoint,
config.index.enable_mmap_index,
)?;
let active_seg_id = writer::find_latest_segment_id(&config.data_dir).unwrap_or(0) + 1;
reader.set_active_segment(active_seg_id);
Ok(Self {
index,
reader,
cache,
writer: None,
config,
should_shutdown_on_drop: false,
open_report: Some(open_report),
_state: std::marker::PhantomData,
})
}
}
impl<State> Store<State> {
pub fn get(&self, event_id: u128) -> Result<StoredEvent<serde_json::Value>, StoreError> {
let entry = self
.index
.get_by_id(event_id)
.ok_or(StoreError::NotFound(event_id))?;
self.reader.read_entry(&entry.disk_pos)
}
#[must_use]
pub fn query(&self, region: &Region) -> Vec<IndexEntry> {
self.index.query(region)
}
pub fn walk_ancestors(
&self,
event_id: u128,
limit: usize,
) -> Vec<StoredEvent<serde_json::Value>> {
ancestors::walk_ancestors(self, event_id, limit)
}
pub fn project<T>(&self, entity: &str, freshness: &Freshness) -> Result<Option<T>, StoreError>
where
T: EventSourced<serde_json::Value>
+ serde::Serialize
+ serde::de::DeserializeOwned
+ 'static,
{
projection_flow::project(self, entity, freshness)
}
pub fn entity_generation(&self, entity: &str) -> Option<u64> {
self.index.entity_generation(entity)
}
pub fn project_if_changed<T>(
&self,
entity: &str,
last_seen_generation: u64,
freshness: &Freshness,
) -> Result<Option<(u64, Option<T>)>, StoreError>
where
T: EventSourced<serde_json::Value>
+ serde::Serialize
+ serde::de::DeserializeOwned
+ 'static,
{
let current_generation = self.entity_generation(entity).unwrap_or(0);
if current_generation == last_seen_generation {
return Ok(None);
}
let projected = self.project(entity, freshness)?;
Ok(Some((current_generation, projected)))
}
#[must_use]
pub fn stream(&self, entity: &str) -> Vec<IndexEntry> {
self.index.stream(entity)
}
#[must_use]
pub fn by_scope(&self, scope: &str) -> Vec<IndexEntry> {
self.query(&Region::scope(scope))
}
#[must_use]
pub fn by_fact(&self, kind: EventKind) -> Vec<IndexEntry> {
self.query(&Region::all().with_fact(KindFilter::Exact(kind)))
}
pub fn cursor_guaranteed(&self, region: &Region) -> Cursor {
Cursor::new(region.clone(), Arc::clone(&self.index))
}
pub fn stats(&self) -> StoreStats {
maintenance::stats(self)
}
pub fn diagnostics(&self) -> StoreDiagnostics {
maintenance::diagnostics(self)
}
}
impl<State> Drop for Store<State> {
fn drop(&mut self) {
if !self.should_shutdown_on_drop {
return;
}
let Some(writer) = self.writer.as_ref() else {
return;
};
tracing::warn!(
"Store dropped without explicit close(); only a bounded best-effort drain will run"
);
let (tx, rx) = flume::bounded(1);
if writer
.tx
.send(WriterCommand::Shutdown { respond: tx })
.is_ok()
{
let _ = rx.recv_timeout(std::time::Duration::from_millis(100));
}
}
}
pub struct ProjectionWatcher<T> {
sub: Subscription,
store: Arc<Store<Open>>,
entity: String,
freshness: Freshness,
cached_state: Option<Vec<u8>>,
watermark: Option<u64>,
_phantom: std::marker::PhantomData<T>,
}
impl<T> ProjectionWatcher<T>
where
T: EventSourced<serde_json::Value> + serde::Serialize + serde::de::DeserializeOwned + 'static,
{
pub fn recv(&mut self) -> Result<Option<T>, StoreError> {
if self.sub.recv().is_none() {
return Ok(None); }
if self.cached_state.is_none() || !T::supports_incremental_apply() || !self.store.config.index.incremental_projection
{
return self.refresh_from_full_projection();
}
let Some(watermark) = self.watermark else {
return self.refresh_from_full_projection();
};
let mut delta_entries = self.store.index.stream_since(&self.entity, watermark);
let relevant_kinds = T::relevant_event_kinds();
if !relevant_kinds.is_empty() {
delta_entries.retain(|entry| relevant_kinds.contains(&entry.kind));
}
if delta_entries.is_empty() {
return self.deserialize_cached_state().map(Some);
}
let Some(bytes) = self.cached_state.as_ref() else {
return self.refresh_from_full_projection();
};
let mut state = match serde_json::from_slice::<T>(bytes) {
Ok(value) => value,
Err(_) => return self.refresh_from_full_projection(),
};
let positions: Vec<&crate::store::DiskPos> =
delta_entries.iter().map(|entry| &entry.disk_pos).collect();
let stored_events = self.store.reader.read_entries_batch(&positions)?;
for stored in stored_events {
state.apply_event(&stored.event);
}
let new_watermark = delta_entries
.last()
.map(|entry| entry.global_sequence)
.unwrap_or(watermark);
let encoded =
serde_json::to_vec(&state).map_err(|e| StoreError::Serialization(Box::new(e)))?;
self.cached_state = Some(encoded);
self.watermark = Some(new_watermark);
Ok(Some(state))
}
#[doc(hidden)]
pub fn subscription(&self) -> &Subscription {
&self.sub
}
fn refresh_from_full_projection(&mut self) -> Result<Option<T>, StoreError> {
let result = self.store.project::<T>(&self.entity, &self.freshness)?;
if let Some(ref value) = result {
self.cached_state = Some(
serde_json::to_vec(value).map_err(|e| StoreError::Serialization(Box::new(e)))?,
);
self.watermark = self
.store
.index
.stream(&self.entity)
.last()
.map(|entry| entry.global_sequence);
} else {
self.cached_state = None;
self.watermark = None;
}
Ok(result)
}
fn deserialize_cached_state(&self) -> Result<T, StoreError> {
let bytes = self
.cached_state
.as_ref()
.ok_or_else(|| StoreError::Configuration("projection watcher state missing".into()))?;
serde_json::from_slice(bytes).map_err(|e| StoreError::Serialization(Box::new(e)))
}
}