pub mod allsource_backend;
pub mod backend;
pub mod command_bus;
pub mod event_versioning;
pub mod query_bus;
pub mod memory_backend;
pub mod projection_registry;
pub mod saga;
pub mod saga_orchestrator;
pub mod sqlite_backend;
pub mod sync;
pub trait EventTypeName {
fn event_type_name() -> &'static str {
std::any::type_name::<Self>()
.split("::")
.last()
.unwrap_or("event")
}
}
pub trait Event:
Clone + Send + Sync + serde::Serialize + serde::de::DeserializeOwned + EventTypeName + 'static
{
}
pub trait Aggregate: Default + Send + Sync {
type Event: Event;
fn apply_event(&mut self, event: &Self::Event);
}
pub trait Projection: Send + Sync {
type Event: Event;
fn apply(&mut self, event: &Self::Event);
}
#[derive(Clone)]
pub struct EventStore<E: Event, B: EventStoreBackend<E> = InMemoryBackend<E>> {
backend: std::sync::Arc<B>,
subscribers: std::sync::Arc<tokio::sync::RwLock<Vec<tokio::sync::mpsc::Sender<E>>>>,
_phantom: std::marker::PhantomData<E>,
}
impl<E: Event> Default for EventStore<E, InMemoryBackend<E>> {
fn default() -> Self {
Self::new()
}
}
impl<E: Event> EventStore<E, InMemoryBackend<E>> {
pub fn new() -> Self {
Self::with_backend(InMemoryBackend::new())
}
}
impl<E: Event, B: EventStoreBackend<E>> EventStore<E, B> {
pub fn with_backend(backend: B) -> Self {
Self {
backend: std::sync::Arc::new(backend),
subscribers: std::sync::Arc::new(tokio::sync::RwLock::new(Vec::new())),
_phantom: std::marker::PhantomData,
}
}
pub fn backend(&self) -> &B {
&self.backend
}
pub async fn append(&self, aggregate_id: &str, events: Vec<E>) -> Result<(), String> {
let subscribers = self.subscribers.read().await;
for event in &events {
for subscriber in subscribers.iter() {
let _ = subscriber.send(event.clone()).await;
}
}
self.backend.append(aggregate_id, events).await
}
pub async fn get_events(&self, aggregate_id: &str) -> Result<Vec<E>, String> {
self.backend.get_events(aggregate_id).await
}
pub async fn get_events_after(
&self,
aggregate_id: &str,
version: u64,
) -> Result<Vec<E>, String> {
self.backend.get_events_after(aggregate_id, version).await
}
pub async fn get_all_events(&self) -> Result<Vec<E>, String> {
self.backend.get_all_events().await
}
pub async fn subscribe(&self, tx: tokio::sync::mpsc::Sender<E>) {
let mut subscribers = self.subscribers.write().await;
subscribers.push(tx);
}
}
pub struct Snapshot<A: Aggregate> {
pub aggregate: A,
pub version: u64,
}
impl<A: Aggregate> Snapshot<A> {
pub fn create(aggregate: A, version: u64) -> Self {
Self { aggregate, version }
}
pub fn into_aggregate(self) -> A {
self.aggregate
}
}
pub use allsource_backend::*;
#[cfg(feature = "cqrs-allsource")]
pub use allsource_core::ExactlyOnceRegistry;
#[cfg(feature = "cqrs-allsource")]
pub use allsource_core::PipelineManager;
#[cfg(feature = "cqrs-allsource")]
pub use allsource_core::SchemaRegistry;
pub use backend::*;
pub use command_bus::*;
pub use event_versioning::*;
pub use query_bus::*;
pub use memory_backend::*;
pub use projection_registry::*;
pub use saga::{
CompensationResult, MacroSagaOrchestrator, Saga, SagaContext, SagaError,
SagaStep as MacroSagaStep, StepExecutionResult, StepOutput,
};
pub use saga::{CompensationStrategy, FileSnapshot, WriteFileStep};
#[cfg(feature = "cqrs-sqlite")]
pub use saga::SqliteSavepoint;
pub use saga_orchestrator::{
SagaDefinition, SagaMetadata, SagaOrchestrator, SagaResult, SagaStatus,
SagaStep as OrchestratorSagaStep,
};
pub use sqlite_backend::*;
pub use sync::*;