mod error;
mod event;
#[doc(hidden)]
pub mod lease;
mod projection;
mod snapshot;
mod store;
mod stream;
mod subscription;
pub use lease::LeaseStatus;
pub use error::{DbErrorKind, EventStoreError};
pub use event::{
Event, EventData, EventMetadata, EventTypeStatic, IntoResponse, RecordedEvent, ResponseValue,
};
#[cfg(feature = "derive")]
pub use mire_derive::{EventData, IntoResponse};
pub use projection::{
EventHandler, HandledEvent, ProjectionRunner, ProjectionRunnerBuilder,
TransactionalEventHandler,
};
pub use snapshot::Snapshot;
pub use store::{CommittedEvents, EventStore, TransactionScope};
pub use stream::{ExpectedVersion, ReadDirection, StreamQuery};
#[doc(hidden)]
pub use subscription::Subscription;
use serde::de::DeserializeOwned;
pub trait Aggregate: Default + Send + Sync {
type Event: EventData;
fn stream_category() -> &'static str;
fn apply(&mut self, event: &Self::Event);
}
pub struct AggregateRoot<A: Aggregate> {
pub state: A,
pub version: i64,
pub stream_id: String,
pending_events: Vec<A::Event>,
pub metadata: EventMetadata,
}
impl<A: Aggregate> AggregateRoot<A> {
pub fn new(id: &str) -> Self {
let stream_id = format!("{}-{}", A::stream_category(), id);
Self {
state: A::default(),
version: 0,
stream_id,
pending_events: Vec::new(),
metadata: EventMetadata::default(),
}
}
pub fn set_metadata(&mut self, metadata: EventMetadata) {
self.metadata = metadata;
}
pub fn from_snapshot(stream_id: String, state: A, version: i64) -> Self {
Self {
state,
version,
stream_id,
pending_events: Vec::new(),
metadata: EventMetadata::default(),
}
}
pub fn hydrate(
stream_id: String,
events: &[RecordedEvent],
version: i64,
) -> Result<Self, EventStoreError>
where
A::Event: DeserializeOwned,
{
Self::check_contiguous(&stream_id, events, version)?;
let mut state = A::default();
for recorded in events {
let event =
serde_json::from_value::<A::Event>(recorded.data.clone()).map_err(|source| {
EventStoreError::Deserialization {
stream_id: recorded.stream_id.clone(),
global_position: recorded.global_position,
event_type: recorded.event_type.clone(),
source,
}
})?;
state.apply(&event);
}
Ok(Self {
state,
version,
stream_id,
pending_events: Vec::new(),
metadata: EventMetadata::default(),
})
}
fn check_contiguous(
stream_id: &str,
events: &[RecordedEvent],
version: i64,
) -> Result<(), EventStoreError> {
let corruption = |detail: String| EventStoreError::StreamCorruption {
stream_id: stream_id.to_string(),
recorded_version: version,
read_count: events.len() as i64,
detail,
};
let Some(last) = events.last() else {
if version != 0 {
return Err(corruption("read no events".to_string()));
}
return Ok(());
};
if events[0].stream_version != 1 {
return Err(corruption(format!(
"first event is version {} (expected 1 — missing prefix)",
events[0].stream_version
)));
}
if last.stream_version < version {
return Err(corruption(format!(
"highest event is version {} but recorded version is {version} (truncated read)",
last.stream_version
)));
}
for pair in events.windows(2) {
if pair[1].stream_version != pair[0].stream_version + 1 {
return Err(corruption(format!(
"non-contiguous versions {} then {}",
pair[0].stream_version, pair[1].stream_version
)));
}
}
Ok(())
}
pub fn record(&mut self, event: A::Event) {
self.state.apply(&event);
self.pending_events.push(event);
}
pub fn record_many<I>(&mut self, events: I)
where
I: IntoIterator<Item = A::Event>,
{
for event in events {
self.record(event);
}
}
pub fn take_pending(&mut self) -> Vec<A::Event> {
std::mem::take(&mut self.pending_events)
}
pub fn restore_pending(&mut self, mut events: Vec<A::Event>) {
events.append(&mut self.pending_events);
self.pending_events = events;
}
pub fn has_pending(&self) -> bool {
!self.pending_events.is_empty()
}
pub fn pending_count(&self) -> usize {
self.pending_events.len()
}
}