allframe_core/cqrs/
mod.rs1pub mod allsource_backend;
8pub mod backend;
9pub mod command_bus;
10pub mod event_versioning;
11pub mod query_bus;
12pub mod memory_backend;
13pub mod projection_registry;
14pub mod saga;
15pub mod saga_orchestrator;
16pub mod sqlite_backend;
17pub mod sync;
18
19pub trait EventTypeName {
25 fn event_type_name() -> &'static str {
27 std::any::type_name::<Self>()
28 .split("::")
29 .last()
30 .unwrap_or("event")
31 }
32}
33
34pub trait Event:
36 Clone + Send + Sync + serde::Serialize + serde::de::DeserializeOwned + EventTypeName + 'static
37{
38}
39
40pub trait Aggregate: Default + Send + Sync {
42 type Event: Event;
44
45 fn apply_event(&mut self, event: &Self::Event);
47}
48
49pub trait Projection: Send + Sync {
51 type Event: Event;
53
54 fn apply(&mut self, event: &Self::Event);
56}
57
58#[derive(Clone)]
64pub struct EventStore<E: Event, B: EventStoreBackend<E> = InMemoryBackend<E>> {
65 backend: std::sync::Arc<B>,
66 subscribers: std::sync::Arc<tokio::sync::RwLock<Vec<tokio::sync::mpsc::Sender<E>>>>,
67 _phantom: std::marker::PhantomData<E>,
68}
69
70impl<E: Event> Default for EventStore<E, InMemoryBackend<E>> {
71 fn default() -> Self {
72 Self::new()
73 }
74}
75
76impl<E: Event> EventStore<E, InMemoryBackend<E>> {
77 pub fn new() -> Self {
79 Self::with_backend(InMemoryBackend::new())
80 }
81}
82
83impl<E: Event, B: EventStoreBackend<E>> EventStore<E, B> {
84 pub fn with_backend(backend: B) -> Self {
86 Self {
87 backend: std::sync::Arc::new(backend),
88 subscribers: std::sync::Arc::new(tokio::sync::RwLock::new(Vec::new())),
89 _phantom: std::marker::PhantomData,
90 }
91 }
92
93 pub fn backend(&self) -> &B {
95 &self.backend
96 }
97
98 pub async fn append(&self, aggregate_id: &str, events: Vec<E>) -> Result<(), String> {
100 let subscribers = self.subscribers.read().await;
102 for event in &events {
103 for subscriber in subscribers.iter() {
104 let _ = subscriber.send(event.clone()).await;
105 }
106 }
107
108 self.backend.append(aggregate_id, events).await
110 }
111
112 pub async fn get_events(&self, aggregate_id: &str) -> Result<Vec<E>, String> {
114 self.backend.get_events(aggregate_id).await
115 }
116
117 pub async fn get_events_after(
119 &self,
120 aggregate_id: &str,
121 version: u64,
122 ) -> Result<Vec<E>, String> {
123 self.backend.get_events_after(aggregate_id, version).await
124 }
125
126 pub async fn get_all_events(&self) -> Result<Vec<E>, String> {
128 self.backend.get_all_events().await
129 }
130
131 pub async fn subscribe(&self, tx: tokio::sync::mpsc::Sender<E>) {
133 let mut subscribers = self.subscribers.write().await;
134 subscribers.push(tx);
135 }
136}
137
138pub struct Snapshot<A: Aggregate> {
140 pub aggregate: A,
142 pub version: u64,
144}
145
146impl<A: Aggregate> Snapshot<A> {
147 pub fn create(aggregate: A, version: u64) -> Self {
149 Self { aggregate, version }
150 }
151
152 pub fn into_aggregate(self) -> A {
154 self.aggregate
155 }
156}
157
158pub use allsource_backend::*;
160#[cfg(feature = "cqrs-allsource")]
162pub use allsource_core::ExactlyOnceRegistry;
163#[cfg(feature = "cqrs-allsource")]
164pub use allsource_core::PipelineManager;
165#[cfg(feature = "cqrs-allsource")]
166pub use allsource_core::SchemaRegistry;
167pub use backend::*;
168pub use command_bus::*;
169pub use event_versioning::*;
170pub use query_bus::*;
171pub use memory_backend::*;
172pub use projection_registry::*;
173pub use saga::{
174 CompensationResult, MacroSagaOrchestrator, Saga, SagaContext, SagaError,
175 SagaStep as MacroSagaStep, StepExecutionResult, StepOutput,
176};
177pub use saga::{CompensationStrategy, FileSnapshot, WriteFileStep};
179#[cfg(feature = "cqrs-sqlite")]
180pub use saga::SqliteSavepoint;
181pub use saga_orchestrator::{
182 SagaDefinition, SagaMetadata, SagaOrchestrator, SagaResult, SagaStatus,
183 SagaStep as OrchestratorSagaStep,
184};
185pub use sqlite_backend::*;
186pub use sync::*;