allframe_core/cqrs/
mod.rs1pub mod allsource_backend;
8pub mod backend;
9pub mod command_bus;
10pub mod event_versioning;
11pub mod memory_backend;
12pub mod projection_registry;
13pub mod saga;
14pub mod saga_orchestrator;
15pub mod sqlite_backend;
16pub mod sync;
17
18pub trait EventTypeName {
24 fn event_type_name() -> &'static str {
26 std::any::type_name::<Self>()
27 .split("::")
28 .last()
29 .unwrap_or("event")
30 }
31}
32
33pub trait Event:
35 Clone + Send + Sync + serde::Serialize + serde::de::DeserializeOwned + EventTypeName + 'static
36{
37}
38
39pub trait Aggregate: Default + Send + Sync {
41 type Event: Event;
43
44 fn apply_event(&mut self, event: &Self::Event);
46}
47
48pub trait Projection: Send + Sync {
50 type Event: Event;
52
53 fn apply(&mut self, event: &Self::Event);
55}
56
57#[derive(Clone)]
63pub struct EventStore<E: Event, B: EventStoreBackend<E> = InMemoryBackend<E>> {
64 backend: std::sync::Arc<B>,
65 subscribers: std::sync::Arc<tokio::sync::RwLock<Vec<tokio::sync::mpsc::Sender<E>>>>,
66 _phantom: std::marker::PhantomData<E>,
67}
68
69impl<E: Event> Default for EventStore<E, InMemoryBackend<E>> {
70 fn default() -> Self {
71 Self::new()
72 }
73}
74
75impl<E: Event> EventStore<E, InMemoryBackend<E>> {
76 pub fn new() -> Self {
78 Self::with_backend(InMemoryBackend::new())
79 }
80}
81
82impl<E: Event, B: EventStoreBackend<E>> EventStore<E, B> {
83 pub fn with_backend(backend: B) -> Self {
85 Self {
86 backend: std::sync::Arc::new(backend),
87 subscribers: std::sync::Arc::new(tokio::sync::RwLock::new(Vec::new())),
88 _phantom: std::marker::PhantomData,
89 }
90 }
91
92 pub fn backend(&self) -> &B {
94 &self.backend
95 }
96
97 pub async fn append(&self, aggregate_id: &str, events: Vec<E>) -> Result<(), String> {
99 let subscribers = self.subscribers.read().await;
101 for event in &events {
102 for subscriber in subscribers.iter() {
103 let _ = subscriber.send(event.clone()).await;
104 }
105 }
106
107 self.backend.append(aggregate_id, events).await
109 }
110
111 pub async fn get_events(&self, aggregate_id: &str) -> Result<Vec<E>, String> {
113 self.backend.get_events(aggregate_id).await
114 }
115
116 pub async fn get_events_after(
118 &self,
119 aggregate_id: &str,
120 version: u64,
121 ) -> Result<Vec<E>, String> {
122 self.backend.get_events_after(aggregate_id, version).await
123 }
124
125 pub async fn get_all_events(&self) -> Result<Vec<E>, String> {
127 self.backend.get_all_events().await
128 }
129
130 pub async fn subscribe(&self, tx: tokio::sync::mpsc::Sender<E>) {
132 let mut subscribers = self.subscribers.write().await;
133 subscribers.push(tx);
134 }
135}
136
137pub struct Snapshot<A: Aggregate> {
139 pub aggregate: A,
141 pub version: u64,
143}
144
145impl<A: Aggregate> Snapshot<A> {
146 pub fn create(aggregate: A, version: u64) -> Self {
148 Self { aggregate, version }
149 }
150
151 pub fn into_aggregate(self) -> A {
153 self.aggregate
154 }
155}
156
157pub use allsource_backend::*;
159#[cfg(feature = "cqrs-allsource")]
161pub use allsource_core::ExactlyOnceRegistry;
162#[cfg(feature = "cqrs-allsource")]
163pub use allsource_core::PipelineManager;
164#[cfg(feature = "cqrs-allsource")]
165pub use allsource_core::SchemaRegistry;
166pub use backend::*;
167pub use command_bus::*;
168pub use event_versioning::*;
169pub use memory_backend::*;
170pub use projection_registry::*;
171pub use saga::{
172 CompensationResult, MacroSagaOrchestrator, Saga, SagaContext, SagaError,
173 SagaStep as MacroSagaStep, StepExecutionResult, StepOutput,
174};
175pub use saga::{CompensationStrategy, FileSnapshot, WriteFileStep};
177#[cfg(feature = "cqrs-sqlite")]
178pub use saga::SqliteSavepoint;
179pub use saga_orchestrator::{
180 SagaDefinition, SagaMetadata, SagaOrchestrator, SagaResult, SagaStatus,
181 SagaStep as OrchestratorSagaStep,
182};
183pub use sqlite_backend::*;
184pub use sync::*;