1use std::sync::Arc;
7
8#[cfg(feature = "di")]
10pub use allframe_macros::{command, command_handler, event, query, query_handler};
11use tokio::sync::{mpsc, RwLock};
12
13mod backend;
15mod memory_backend;
16
17#[cfg(feature = "cqrs-allsource")]
18mod allsource_backend;
19
20mod command_bus;
22
23mod projection_registry;
25
26mod event_versioning;
28
29mod saga_orchestrator;
31
32#[cfg(feature = "cqrs-allsource")]
34pub use allsource_backend::{AllSourceBackend, AllSourceConfig};
35pub use backend::{BackendStats, EventStoreBackend};
36pub use command_bus::{
38 Command, CommandBus, CommandError, CommandHandler, CommandResult, ValidationError,
39};
40pub use event_versioning::{
42 AutoUpcaster, MigrationPath, Upcaster, VersionRegistry, VersionedEvent,
43};
44pub use memory_backend::InMemoryBackend;
45pub use projection_registry::{ProjectionMetadata, ProjectionPosition, ProjectionRegistry};
47pub use saga_orchestrator::{
49 SagaDefinition, SagaError, SagaMetadata, SagaOrchestrator, SagaResult, SagaStatus, SagaStep,
50};
51
52pub trait Event:
54 Clone + Send + Sync + serde::Serialize + serde::de::DeserializeOwned + 'static
55{
56}
57
58pub trait Projection: Send + Sync {
60 type Event: Event;
62
63 fn apply(&mut self, event: &Self::Event);
65}
66
67pub trait Aggregate: Default + Send + Sync {
69 type Event: Event;
71
72 fn apply_event(&mut self, event: &Self::Event);
74}
75
76#[derive(Clone)]
82pub struct EventStore<E: Event, B: EventStoreBackend<E> = InMemoryBackend<E>> {
83 backend: Arc<B>,
84 subscribers: Arc<RwLock<Vec<mpsc::Sender<E>>>>,
85 _phantom: std::marker::PhantomData<E>,
86}
87
88impl<E: Event> EventStore<E, InMemoryBackend<E>> {
89 pub fn new() -> Self {
91 Self::with_backend(InMemoryBackend::new())
92 }
93}
94
95impl<E: Event, B: EventStoreBackend<E>> EventStore<E, B> {
96 pub fn with_backend(backend: B) -> Self {
98 Self {
99 backend: Arc::new(backend),
100 subscribers: Arc::new(RwLock::new(Vec::new())),
101 _phantom: std::marker::PhantomData,
102 }
103 }
104
105 pub fn backend(&self) -> &B {
107 &self.backend
108 }
109
110 pub async fn append(&self, aggregate_id: &str, events: Vec<E>) -> Result<(), String> {
112 let subscribers = self.subscribers.read().await;
114 for event in &events {
115 for subscriber in subscribers.iter() {
116 let _ = subscriber.send(event.clone()).await;
117 }
118 }
119
120 self.backend.append(aggregate_id, events).await
122 }
123
124 pub async fn get_events(&self, aggregate_id: &str) -> Result<Vec<E>, String> {
126 self.backend.get_events(aggregate_id).await
127 }
128
129 pub async fn get_all_events(&self) -> Result<Vec<E>, String> {
131 self.backend.get_all_events().await
132 }
133
134 pub async fn get_events_after(
136 &self,
137 aggregate_id: &str,
138 version: u64,
139 ) -> Result<Vec<E>, String> {
140 self.backend.get_events_after(aggregate_id, version).await
141 }
142
143 pub async fn subscribe(&self, tx: mpsc::Sender<E>) {
145 let mut subscribers = self.subscribers.write().await;
146 subscribers.push(tx);
147 }
148
149 pub async fn save_snapshot<A>(
151 &self,
152 aggregate_id: &str,
153 snapshot: Snapshot<A>,
154 ) -> Result<(), String>
155 where
156 A: Aggregate<Event = E> + serde::Serialize,
157 {
158 let snapshot_data = serde_json::to_vec(&snapshot.aggregate)
159 .map_err(|e| format!("Failed to serialize snapshot: {}", e))?;
160
161 self.backend
162 .save_snapshot(aggregate_id, snapshot_data, snapshot.version)
163 .await
164 }
165
166 pub async fn get_latest_snapshot<A>(&self, aggregate_id: &str) -> Result<Snapshot<A>, String>
168 where
169 A: Aggregate<Event = E> + serde::de::DeserializeOwned,
170 {
171 let (snapshot_data, version) = self.backend.get_latest_snapshot(aggregate_id).await?;
172
173 let aggregate: A = serde_json::from_slice(&snapshot_data)
174 .map_err(|e| format!("Failed to deserialize snapshot: {}", e))?;
175
176 Ok(Snapshot { aggregate, version })
177 }
178
179 pub async fn flush(&self) -> Result<(), String> {
181 self.backend.flush().await
182 }
183
184 pub async fn stats(&self) -> BackendStats {
186 self.backend.stats().await
187 }
188}
189
190impl<E: Event> Default for EventStore<E, InMemoryBackend<E>> {
191 fn default() -> Self {
192 Self::new()
193 }
194}
195
196pub struct Snapshot<A: Aggregate> {
198 pub aggregate: A,
200 pub version: u64,
202}
203
204impl<A: Aggregate> Snapshot<A> {
205 pub fn create(aggregate: A, version: u64) -> Self {
207 Self { aggregate, version }
208 }
209
210 pub fn into_aggregate(self) -> A {
212 self.aggregate
213 }
214}
215
216pub struct QueryBus;
220
221impl QueryBus {
222 pub fn new() -> Self {
224 Self
225 }
226}
227
228impl Default for QueryBus {
229 fn default() -> Self {
230 Self::new()
231 }
232}
233
234#[cfg(test)]
241mod tests {
242 use super::*;
243
244 #[derive(Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize)]
245 struct TestEvent {
246 id: String,
247 }
248
249 impl Event for TestEvent {}
250
251 #[tokio::test]
252 async fn test_event_store_append_and_retrieve() {
253 let store = EventStore::new();
254
255 let events = vec![TestEvent {
256 id: "1".to_string(),
257 }];
258 store.append("test-aggregate", events).await.unwrap();
259
260 let retrieved = store.get_events("test-aggregate").await.unwrap();
261 assert_eq!(retrieved.len(), 1);
262 assert_eq!(retrieved[0].id, "1");
263 }
264
265 #[tokio::test]
266 async fn test_event_store_multiple_aggregates() {
267 let store = EventStore::new();
268
269 store
270 .append(
271 "agg1",
272 vec![TestEvent {
273 id: "1".to_string(),
274 }],
275 )
276 .await
277 .unwrap();
278 store
279 .append(
280 "agg2",
281 vec![TestEvent {
282 id: "2".to_string(),
283 }],
284 )
285 .await
286 .unwrap();
287
288 let agg1_events = store.get_events("agg1").await.unwrap();
289 let agg2_events = store.get_events("agg2").await.unwrap();
290
291 assert_eq!(agg1_events.len(), 1);
292 assert_eq!(agg2_events.len(), 1);
293 assert_eq!(agg1_events[0].id, "1");
294 assert_eq!(agg2_events[0].id, "2");
295 }
296
297 #[tokio::test]
298 async fn test_command_bus_handlers() {
299 use crate::cqrs::{Command, CommandBus, CommandHandler, CommandResult};
300
301 #[derive(Clone)]
302 struct TestCommand1;
303 impl Command for TestCommand1 {}
304
305 #[derive(Clone)]
306 struct TestCommand2;
307 impl Command for TestCommand2 {}
308
309 struct Handler1;
310 #[async_trait::async_trait]
311 impl CommandHandler<TestCommand1, TestEvent> for Handler1 {
312 async fn handle(&self, _cmd: TestCommand1) -> CommandResult<TestEvent> {
313 Ok(vec![])
314 }
315 }
316
317 struct Handler2;
318 #[async_trait::async_trait]
319 impl CommandHandler<TestCommand2, TestEvent> for Handler2 {
320 async fn handle(&self, _cmd: TestCommand2) -> CommandResult<TestEvent> {
321 Ok(vec![])
322 }
323 }
324
325 let bus: CommandBus<TestEvent> = CommandBus::new();
326 bus.register(Handler1).await;
327 bus.register(Handler2).await;
328
329 assert_eq!(bus.handlers_count().await, 2);
330 }
331}