Skip to main content

allframe_core/cqrs/
mod.rs

1//! CQRS + Event Sourcing implementation
2//!
3//! This module provides the core CQRS (Command Query Responsibility
4//! Segregation) and Event Sourcing infrastructure for AllFrame.
5
6// Declare submodules
7pub mod allsource_backend;
8pub mod backend;
9pub mod command_bus;
10pub mod event_versioning;
11pub mod query_bus;
12pub mod memory_backend;
13pub mod projection_registry;
14pub mod saga;
15pub mod saga_orchestrator;
16pub mod sqlite_backend;
17pub mod sync;
18
19/// Trait for resolving the event type name used in AllSource storage.
20///
21/// The default implementation extracts the last segment from
22/// `std::any::type_name` (e.g. `my_crate::events::UserCreated` →
23/// `UserCreated`). Override this to provide a stable, user-controlled name.
24pub trait EventTypeName {
25    /// Returns the event type name for this type.
26    fn event_type_name() -> &'static str {
27        std::any::type_name::<Self>()
28            .split("::")
29            .last()
30            .unwrap_or("event")
31    }
32}
33
34/// Trait for Events - immutable facts that represent state changes
35pub trait Event:
36    Clone + Send + Sync + serde::Serialize + serde::de::DeserializeOwned + EventTypeName + 'static
37{
38}
39
40/// Trait for Aggregates - domain objects rebuilt from events
41pub trait Aggregate: Default + Send + Sync {
42    /// The event type this aggregate handles
43    type Event: Event;
44
45    /// Apply an event to the aggregate
46    fn apply_event(&mut self, event: &Self::Event);
47}
48
49/// Trait for Projections - read models built from events
50pub trait Projection: Send + Sync {
51    /// The event type this projection handles
52    type Event: Event;
53
54    /// Apply an event to update the projection state
55    fn apply(&mut self, event: &Self::Event);
56}
57
58/// Event Store - append-only log of domain events
59///
60/// The EventStore uses a pluggable backend architecture:
61/// - Default: InMemoryBackend (for testing/MVP)
62/// - Production: AllSourceBackend (requires cqrs-allsource feature)
63#[derive(Clone)]
64pub struct EventStore<E: Event, B: EventStoreBackend<E> = InMemoryBackend<E>> {
65    backend: std::sync::Arc<B>,
66    subscribers: std::sync::Arc<tokio::sync::RwLock<Vec<tokio::sync::mpsc::Sender<E>>>>,
67    _phantom: std::marker::PhantomData<E>,
68}
69
70impl<E: Event> Default for EventStore<E, InMemoryBackend<E>> {
71    fn default() -> Self {
72        Self::new()
73    }
74}
75
76impl<E: Event> EventStore<E, InMemoryBackend<E>> {
77    /// Create a new event store with in-memory backend
78    pub fn new() -> Self {
79        Self::with_backend(InMemoryBackend::new())
80    }
81}
82
83impl<E: Event, B: EventStoreBackend<E>> EventStore<E, B> {
84    /// Create a new event store with a custom backend
85    pub fn with_backend(backend: B) -> Self {
86        Self {
87            backend: std::sync::Arc::new(backend),
88            subscribers: std::sync::Arc::new(tokio::sync::RwLock::new(Vec::new())),
89            _phantom: std::marker::PhantomData,
90        }
91    }
92
93    /// Get a reference to the backend
94    pub fn backend(&self) -> &B {
95        &self.backend
96    }
97
98    /// Append events to an aggregate's event stream
99    pub async fn append(&self, aggregate_id: &str, events: Vec<E>) -> Result<(), String> {
100        // Notify subscribers before appending
101        let subscribers = self.subscribers.read().await;
102        for event in &events {
103            for subscriber in subscribers.iter() {
104                let _ = subscriber.send(event.clone()).await;
105            }
106        }
107
108        // Delegate to backend
109        self.backend.append(aggregate_id, events).await
110    }
111
112    /// Get all events for an aggregate
113    pub async fn get_events(&self, aggregate_id: &str) -> Result<Vec<E>, String> {
114        self.backend.get_events(aggregate_id).await
115    }
116
117    /// Get events after a specific version for an aggregate
118    pub async fn get_events_after(
119        &self,
120        aggregate_id: &str,
121        version: u64,
122    ) -> Result<Vec<E>, String> {
123        self.backend.get_events_after(aggregate_id, version).await
124    }
125
126    /// Get all events from all aggregates (for projection rebuild)
127    pub async fn get_all_events(&self) -> Result<Vec<E>, String> {
128        self.backend.get_all_events().await
129    }
130
131    /// Subscribe to event stream
132    pub async fn subscribe(&self, tx: tokio::sync::mpsc::Sender<E>) {
133        let mut subscribers = self.subscribers.write().await;
134        subscribers.push(tx);
135    }
136}
137
138/// Snapshot for aggregate optimization
139pub struct Snapshot<A: Aggregate> {
140    /// The aggregate state at this version
141    pub aggregate: A,
142    /// The version number of this snapshot
143    pub version: u64,
144}
145
146impl<A: Aggregate> Snapshot<A> {
147    /// Create a snapshot
148    pub fn create(aggregate: A, version: u64) -> Self {
149        Self { aggregate, version }
150    }
151
152    /// Convert snapshot back to aggregate
153    pub fn into_aggregate(self) -> A {
154        self.aggregate
155    }
156}
157
158// Re-export all CQRS types for convenience
159pub use allsource_backend::*;
160// Re-export AllSource v0.10.3 services behind the cqrs-allsource feature
161#[cfg(feature = "cqrs-allsource")]
162pub use allsource_core::ExactlyOnceRegistry;
163#[cfg(feature = "cqrs-allsource")]
164pub use allsource_core::PipelineManager;
165#[cfg(feature = "cqrs-allsource")]
166pub use allsource_core::SchemaRegistry;
167pub use backend::*;
168pub use command_bus::*;
169pub use event_versioning::*;
170pub use query_bus::*;
171pub use memory_backend::*;
172pub use projection_registry::*;
173pub use saga::{
174    CompensationResult, MacroSagaOrchestrator, Saga, SagaContext, SagaError,
175    SagaStep as MacroSagaStep, StepExecutionResult, StepOutput,
176};
177// Resolve SagaError conflict - prefer saga_orchestrator version
178pub use saga::{CompensationStrategy, FileSnapshot, WriteFileStep};
179#[cfg(feature = "cqrs-sqlite")]
180pub use saga::SqliteSavepoint;
181pub use saga_orchestrator::{
182    SagaDefinition, SagaMetadata, SagaOrchestrator, SagaResult, SagaStatus,
183    SagaStep as OrchestratorSagaStep,
184};
185pub use sqlite_backend::*;
186pub use sync::*;