mod error;
mod event;
#[doc(hidden)]
pub mod lease;
mod projection;
mod snapshot;
mod store;
mod stream;
mod subscription;
pub use lease::LeaseStatus;
pub use error::{DbErrorKind, EventStoreError};
pub use event::{Event, EventData, EventMetadata, RecordedEvent};
#[cfg(feature = "derive")]
pub use mire_derive::EventData;
pub use projection::{EventHandler, HandledEvent, ProjectionRunner, ProjectionRunnerBuilder};
pub use snapshot::Snapshot;
pub use store::{CommittedEvents, EventStore, TransactionScope};
pub use stream::{ExpectedVersion, ReadDirection, StreamQuery};
pub use subscription::Subscription;
use serde::de::DeserializeOwned;
pub trait Aggregate: Default + Send + Sync {
type Event: EventData;
fn stream_category() -> &'static str;
fn apply(&mut self, event: &Self::Event);
}
pub struct AggregateRoot<A: Aggregate> {
pub state: A,
pub version: i64,
pub stream_id: String,
pending_events: Vec<A::Event>,
pub metadata: EventMetadata,
}
impl<A: Aggregate> AggregateRoot<A> {
pub fn new(id: &str) -> Self {
let stream_id = format!("{}-{}", A::stream_category(), id);
Self {
state: A::default(),
version: 0,
stream_id,
pending_events: Vec::new(),
metadata: EventMetadata::default(),
}
}
pub fn set_metadata(&mut self, metadata: EventMetadata) {
self.metadata = metadata;
}
pub fn from_snapshot(stream_id: String, state: A, version: i64) -> Self {
Self {
state,
version,
stream_id,
pending_events: Vec::new(),
metadata: EventMetadata::default(),
}
}
pub fn hydrate(
stream_id: String,
events: &[RecordedEvent],
version: i64,
) -> Result<Self, EventStoreError>
where
A::Event: DeserializeOwned,
{
let mut state = A::default();
for recorded in events {
let event =
serde_json::from_value::<A::Event>(recorded.data.clone()).map_err(|source| {
EventStoreError::Deserialization {
stream_id: recorded.stream_id.clone(),
global_position: recorded.global_position,
event_type: recorded.event_type.clone(),
source,
}
})?;
state.apply(&event);
}
Ok(Self {
state,
version,
stream_id,
pending_events: Vec::new(),
metadata: EventMetadata::default(),
})
}
pub fn record(&mut self, event: A::Event) {
self.state.apply(&event);
self.pending_events.push(event);
}
pub fn record_many<I>(&mut self, events: I)
where
I: IntoIterator<Item = A::Event>,
{
for event in events {
self.record(event);
}
}
pub fn take_pending(&mut self) -> Vec<A::Event> {
std::mem::take(&mut self.pending_events)
}
pub fn has_pending(&self) -> bool {
!self.pending_events.is_empty()
}
pub fn pending_count(&self) -> usize {
self.pending_events.len()
}
}