allframe_core/
cqrs.rs

1//! CQRS + Event Sourcing implementation
2//!
3//! This module provides the core CQRS (Command Query Responsibility
4//! Segregation) and Event Sourcing infrastructure for AllFrame.
5
6use std::sync::Arc;
7
8// Re-export macros
9#[cfg(feature = "di")]
10pub use allframe_macros::{command, command_handler, event, query, query_handler};
11use tokio::sync::{mpsc, RwLock};
12
13// Backend abstraction
14mod backend;
15mod memory_backend;
16
17#[cfg(feature = "cqrs-allsource")]
18mod allsource_backend;
19
20// Command bus infrastructure
21mod command_bus;
22
23// Projection registry infrastructure
24mod projection_registry;
25
26// Event versioning infrastructure
27mod event_versioning;
28
29// Saga orchestration infrastructure
30mod saga_orchestrator;
31
32// Re-export backend types
33#[cfg(feature = "cqrs-allsource")]
34pub use allsource_backend::{AllSourceBackend, AllSourceConfig};
35pub use backend::{BackendStats, EventStoreBackend};
36// Re-export command bus types
37pub use command_bus::{
38    Command, CommandBus, CommandError, CommandHandler, CommandResult, ValidationError,
39};
40// Re-export event versioning types
41pub use event_versioning::{
42    AutoUpcaster, MigrationPath, Upcaster, VersionRegistry, VersionedEvent,
43};
44pub use memory_backend::InMemoryBackend;
45// Re-export projection registry types
46pub use projection_registry::{ProjectionMetadata, ProjectionPosition, ProjectionRegistry};
47// Re-export saga orchestration types
48pub use saga_orchestrator::{
49    SagaDefinition, SagaError, SagaMetadata, SagaOrchestrator, SagaResult, SagaStatus, SagaStep,
50};
51
52/// Trait for Events - immutable facts that represent state changes
53pub trait Event:
54    Clone + Send + Sync + serde::Serialize + serde::de::DeserializeOwned + 'static
55{
56}
57
58/// Trait for Projections - read models built from events
59pub trait Projection: Send + Sync {
60    /// The event type this projection handles
61    type Event: Event;
62
63    /// Apply an event to update the projection state
64    fn apply(&mut self, event: &Self::Event);
65}
66
67/// Trait for Aggregates - domain objects rebuilt from events
68pub trait Aggregate: Default + Send + Sync {
69    /// The event type this aggregate handles
70    type Event: Event;
71
72    /// Apply an event to the aggregate
73    fn apply_event(&mut self, event: &Self::Event);
74}
75
76/// Event Store - append-only log of domain events
77///
78/// The EventStore uses a pluggable backend architecture:
79/// - Default: InMemoryBackend (for testing/MVP)
80/// - Production: AllSourceBackend (requires cqrs-allsource feature)
81#[derive(Clone)]
82pub struct EventStore<E: Event, B: EventStoreBackend<E> = InMemoryBackend<E>> {
83    backend: Arc<B>,
84    subscribers: Arc<RwLock<Vec<mpsc::Sender<E>>>>,
85    _phantom: std::marker::PhantomData<E>,
86}
87
88impl<E: Event> EventStore<E, InMemoryBackend<E>> {
89    /// Create a new event store with in-memory backend
90    pub fn new() -> Self {
91        Self::with_backend(InMemoryBackend::new())
92    }
93}
94
95impl<E: Event, B: EventStoreBackend<E>> EventStore<E, B> {
96    /// Create a new event store with a custom backend
97    pub fn with_backend(backend: B) -> Self {
98        Self {
99            backend: Arc::new(backend),
100            subscribers: Arc::new(RwLock::new(Vec::new())),
101            _phantom: std::marker::PhantomData,
102        }
103    }
104
105    /// Get a reference to the backend
106    pub fn backend(&self) -> &B {
107        &self.backend
108    }
109
110    /// Append events to an aggregate's event stream
111    pub async fn append(&self, aggregate_id: &str, events: Vec<E>) -> Result<(), String> {
112        // Notify subscribers before appending
113        let subscribers = self.subscribers.read().await;
114        for event in &events {
115            for subscriber in subscribers.iter() {
116                let _ = subscriber.send(event.clone()).await;
117            }
118        }
119
120        // Delegate to backend
121        self.backend.append(aggregate_id, events).await
122    }
123
124    /// Get all events for an aggregate
125    pub async fn get_events(&self, aggregate_id: &str) -> Result<Vec<E>, String> {
126        self.backend.get_events(aggregate_id).await
127    }
128
129    /// Get all events from all aggregates (for projection rebuild)
130    pub async fn get_all_events(&self) -> Result<Vec<E>, String> {
131        self.backend.get_all_events().await
132    }
133
134    /// Get events after a specific version (for snapshot optimization)
135    pub async fn get_events_after(
136        &self,
137        aggregate_id: &str,
138        version: u64,
139    ) -> Result<Vec<E>, String> {
140        self.backend.get_events_after(aggregate_id, version).await
141    }
142
143    /// Subscribe to event stream
144    pub async fn subscribe(&self, tx: mpsc::Sender<E>) {
145        let mut subscribers = self.subscribers.write().await;
146        subscribers.push(tx);
147    }
148
149    /// Save a snapshot
150    pub async fn save_snapshot<A>(
151        &self,
152        aggregate_id: &str,
153        snapshot: Snapshot<A>,
154    ) -> Result<(), String>
155    where
156        A: Aggregate<Event = E> + serde::Serialize,
157    {
158        let snapshot_data = serde_json::to_vec(&snapshot.aggregate)
159            .map_err(|e| format!("Failed to serialize snapshot: {}", e))?;
160
161        self.backend
162            .save_snapshot(aggregate_id, snapshot_data, snapshot.version)
163            .await
164    }
165
166    /// Get latest snapshot
167    pub async fn get_latest_snapshot<A>(&self, aggregate_id: &str) -> Result<Snapshot<A>, String>
168    where
169        A: Aggregate<Event = E> + serde::de::DeserializeOwned,
170    {
171        let (snapshot_data, version) = self.backend.get_latest_snapshot(aggregate_id).await?;
172
173        let aggregate: A = serde_json::from_slice(&snapshot_data)
174            .map_err(|e| format!("Failed to deserialize snapshot: {}", e))?;
175
176        Ok(Snapshot { aggregate, version })
177    }
178
179    /// Flush pending writes to storage (useful with WAL or batching backends)
180    pub async fn flush(&self) -> Result<(), String> {
181        self.backend.flush().await
182    }
183
184    /// Get backend statistics
185    pub async fn stats(&self) -> BackendStats {
186        self.backend.stats().await
187    }
188}
189
190impl<E: Event> Default for EventStore<E, InMemoryBackend<E>> {
191    fn default() -> Self {
192        Self::new()
193    }
194}
195
196/// Snapshot for aggregate optimization
197pub struct Snapshot<A: Aggregate> {
198    /// The aggregate state at this version
199    pub aggregate: A,
200    /// The version number of this snapshot
201    pub version: u64,
202}
203
204impl<A: Aggregate> Snapshot<A> {
205    /// Create a snapshot
206    pub fn create(aggregate: A, version: u64) -> Self {
207        Self { aggregate, version }
208    }
209
210    /// Convert snapshot back to aggregate
211    pub fn into_aggregate(self) -> A {
212        self.aggregate
213    }
214}
215
216// Old CommandBus removed - use command_bus::CommandBus<E> instead
217
218/// Query Bus for dispatching queries
219pub struct QueryBus;
220
221impl QueryBus {
222    /// Create a new query bus
223    pub fn new() -> Self {
224        Self
225    }
226}
227
228impl Default for QueryBus {
229    fn default() -> Self {
230        Self::new()
231    }
232}
233
234// Old Saga types removed - use saga_orchestrator module instead
235// The new saga system provides:
236// - SagaStep trait for defining steps
237// - SagaDefinition for building sagas
238// - SagaOrchestrator for execution with automatic compensation
239
240#[cfg(test)]
241mod tests {
242    use super::*;
243
244    #[derive(Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize)]
245    struct TestEvent {
246        id: String,
247    }
248
249    impl Event for TestEvent {}
250
251    #[tokio::test]
252    async fn test_event_store_append_and_retrieve() {
253        let store = EventStore::new();
254
255        let events = vec![TestEvent {
256            id: "1".to_string(),
257        }];
258        store.append("test-aggregate", events).await.unwrap();
259
260        let retrieved = store.get_events("test-aggregate").await.unwrap();
261        assert_eq!(retrieved.len(), 1);
262        assert_eq!(retrieved[0].id, "1");
263    }
264
265    #[tokio::test]
266    async fn test_event_store_multiple_aggregates() {
267        let store = EventStore::new();
268
269        store
270            .append(
271                "agg1",
272                vec![TestEvent {
273                    id: "1".to_string(),
274                }],
275            )
276            .await
277            .unwrap();
278        store
279            .append(
280                "agg2",
281                vec![TestEvent {
282                    id: "2".to_string(),
283                }],
284            )
285            .await
286            .unwrap();
287
288        let agg1_events = store.get_events("agg1").await.unwrap();
289        let agg2_events = store.get_events("agg2").await.unwrap();
290
291        assert_eq!(agg1_events.len(), 1);
292        assert_eq!(agg2_events.len(), 1);
293        assert_eq!(agg1_events[0].id, "1");
294        assert_eq!(agg2_events[0].id, "2");
295    }
296
297    #[tokio::test]
298    async fn test_command_bus_handlers() {
299        use crate::cqrs::{Command, CommandBus, CommandHandler, CommandResult};
300
301        #[derive(Clone)]
302        struct TestCommand1;
303        impl Command for TestCommand1 {}
304
305        #[derive(Clone)]
306        struct TestCommand2;
307        impl Command for TestCommand2 {}
308
309        struct Handler1;
310        #[async_trait::async_trait]
311        impl CommandHandler<TestCommand1, TestEvent> for Handler1 {
312            async fn handle(&self, _cmd: TestCommand1) -> CommandResult<TestEvent> {
313                Ok(vec![])
314            }
315        }
316
317        struct Handler2;
318        #[async_trait::async_trait]
319        impl CommandHandler<TestCommand2, TestEvent> for Handler2 {
320            async fn handle(&self, _cmd: TestCommand2) -> CommandResult<TestEvent> {
321                Ok(vec![])
322            }
323        }
324
325        let bus: CommandBus<TestEvent> = CommandBus::new();
326        bus.register(Handler1).await;
327        bus.register(Handler2).await;
328
329        assert_eq!(bus.handlers_count().await, 2);
330    }
331}