Skip to main content

allframe_core/cqrs/
mod.rs

1//! CQRS + Event Sourcing implementation
2//!
3//! This module provides the core CQRS (Command Query Responsibility
4//! Segregation) and Event Sourcing infrastructure for AllFrame.
5
6// Declare submodules
7pub mod allsource_backend;
8pub mod backend;
9pub mod command_bus;
10pub mod event_versioning;
11pub mod memory_backend;
12pub mod projection_registry;
13pub mod saga;
14pub mod saga_orchestrator;
15pub mod sqlite_backend;
16pub mod sync;
17
18/// Trait for resolving the event type name used in AllSource storage.
19///
20/// The default implementation extracts the last segment from
21/// `std::any::type_name` (e.g. `my_crate::events::UserCreated` →
22/// `UserCreated`). Override this to provide a stable, user-controlled name.
23pub trait EventTypeName {
24    /// Returns the event type name for this type.
25    fn event_type_name() -> &'static str {
26        std::any::type_name::<Self>()
27            .split("::")
28            .last()
29            .unwrap_or("event")
30    }
31}
32
33/// Trait for Events - immutable facts that represent state changes
34pub trait Event:
35    Clone + Send + Sync + serde::Serialize + serde::de::DeserializeOwned + EventTypeName + 'static
36{
37}
38
39/// Trait for Aggregates - domain objects rebuilt from events
40pub trait Aggregate: Default + Send + Sync {
41    /// The event type this aggregate handles
42    type Event: Event;
43
44    /// Apply an event to the aggregate
45    fn apply_event(&mut self, event: &Self::Event);
46}
47
48/// Trait for Projections - read models built from events
49pub trait Projection: Send + Sync {
50    /// The event type this projection handles
51    type Event: Event;
52
53    /// Apply an event to update the projection state
54    fn apply(&mut self, event: &Self::Event);
55}
56
57/// Event Store - append-only log of domain events
58///
59/// The EventStore uses a pluggable backend architecture:
60/// - Default: InMemoryBackend (for testing/MVP)
61/// - Production: AllSourceBackend (requires cqrs-allsource feature)
62#[derive(Clone)]
63pub struct EventStore<E: Event, B: EventStoreBackend<E> = InMemoryBackend<E>> {
64    backend: std::sync::Arc<B>,
65    subscribers: std::sync::Arc<tokio::sync::RwLock<Vec<tokio::sync::mpsc::Sender<E>>>>,
66    _phantom: std::marker::PhantomData<E>,
67}
68
69impl<E: Event> Default for EventStore<E, InMemoryBackend<E>> {
70    fn default() -> Self {
71        Self::new()
72    }
73}
74
75impl<E: Event> EventStore<E, InMemoryBackend<E>> {
76    /// Create a new event store with in-memory backend
77    pub fn new() -> Self {
78        Self::with_backend(InMemoryBackend::new())
79    }
80}
81
82impl<E: Event, B: EventStoreBackend<E>> EventStore<E, B> {
83    /// Create a new event store with a custom backend
84    pub fn with_backend(backend: B) -> Self {
85        Self {
86            backend: std::sync::Arc::new(backend),
87            subscribers: std::sync::Arc::new(tokio::sync::RwLock::new(Vec::new())),
88            _phantom: std::marker::PhantomData,
89        }
90    }
91
92    /// Get a reference to the backend
93    pub fn backend(&self) -> &B {
94        &self.backend
95    }
96
97    /// Append events to an aggregate's event stream
98    pub async fn append(&self, aggregate_id: &str, events: Vec<E>) -> Result<(), String> {
99        // Notify subscribers before appending
100        let subscribers = self.subscribers.read().await;
101        for event in &events {
102            for subscriber in subscribers.iter() {
103                let _ = subscriber.send(event.clone()).await;
104            }
105        }
106
107        // Delegate to backend
108        self.backend.append(aggregate_id, events).await
109    }
110
111    /// Get all events for an aggregate
112    pub async fn get_events(&self, aggregate_id: &str) -> Result<Vec<E>, String> {
113        self.backend.get_events(aggregate_id).await
114    }
115
116    /// Get events after a specific version for an aggregate
117    pub async fn get_events_after(
118        &self,
119        aggregate_id: &str,
120        version: u64,
121    ) -> Result<Vec<E>, String> {
122        self.backend.get_events_after(aggregate_id, version).await
123    }
124
125    /// Get all events from all aggregates (for projection rebuild)
126    pub async fn get_all_events(&self) -> Result<Vec<E>, String> {
127        self.backend.get_all_events().await
128    }
129
130    /// Subscribe to event stream
131    pub async fn subscribe(&self, tx: tokio::sync::mpsc::Sender<E>) {
132        let mut subscribers = self.subscribers.write().await;
133        subscribers.push(tx);
134    }
135}
136
137/// Snapshot for aggregate optimization
138pub struct Snapshot<A: Aggregate> {
139    /// The aggregate state at this version
140    pub aggregate: A,
141    /// The version number of this snapshot
142    pub version: u64,
143}
144
145impl<A: Aggregate> Snapshot<A> {
146    /// Create a snapshot
147    pub fn create(aggregate: A, version: u64) -> Self {
148        Self { aggregate, version }
149    }
150
151    /// Convert snapshot back to aggregate
152    pub fn into_aggregate(self) -> A {
153        self.aggregate
154    }
155}
156
157// Re-export all CQRS types for convenience
158pub use allsource_backend::*;
159// Re-export AllSource v0.10.3 services behind the cqrs-allsource feature
160#[cfg(feature = "cqrs-allsource")]
161pub use allsource_core::ExactlyOnceRegistry;
162#[cfg(feature = "cqrs-allsource")]
163pub use allsource_core::PipelineManager;
164#[cfg(feature = "cqrs-allsource")]
165pub use allsource_core::SchemaRegistry;
166pub use backend::*;
167pub use command_bus::*;
168pub use event_versioning::*;
169pub use memory_backend::*;
170pub use projection_registry::*;
171pub use saga::{
172    CompensationResult, MacroSagaOrchestrator, Saga, SagaContext, SagaError,
173    SagaStep as MacroSagaStep, StepExecutionResult, StepOutput,
174};
175// Resolve SagaError conflict - prefer saga_orchestrator version
176pub use saga::{CompensationStrategy, FileSnapshot, WriteFileStep};
177#[cfg(feature = "cqrs-sqlite")]
178pub use saga::SqliteSavepoint;
179pub use saga_orchestrator::{
180    SagaDefinition, SagaMetadata, SagaOrchestrator, SagaResult, SagaStatus,
181    SagaStep as OrchestratorSagaStep,
182};
183pub use sqlite_backend::*;
184pub use sync::*;