Skip to main content

allframe_core/cqrs/
allsource_backend.rs

1//! AllSource Core event store backend
2//!
3//! This backend integrates with AllSource Core to provide production-grade
4//! event sourcing with persistence, performance, and advanced features.
5
6#[cfg(feature = "cqrs-allsource")]
7use std::sync::Arc;
8
9#[cfg(feature = "cqrs-allsource")]
10use async_trait::async_trait;
11
12#[cfg(feature = "cqrs-allsource")]
13use super::backend::{BackendStats, EventStoreBackend};
14#[cfg(feature = "cqrs-allsource")]
15use super::Event;
16
17#[cfg(feature = "cqrs-allsource")]
18/// AllSource Core backend for production event sourcing
19#[derive(Clone)]
20pub struct AllSourceBackend<E: Event> {
21    store: Arc<allsource_core::EventStore>,
22    _phantom: std::marker::PhantomData<E>,
23}
24
25#[cfg(feature = "cqrs-allsource")]
26impl<E: Event> AllSourceBackend<E> {
27    /// Create a new AllSource backend with default configuration
28    pub fn new() -> Result<Self, String> {
29        let store = allsource_core::EventStore::default();
30        Ok(Self {
31            store: Arc::new(store),
32            _phantom: std::marker::PhantomData,
33        })
34    }
35
36    /// Create a new AllSource backend with custom configuration
37    pub fn with_config(config: AllSourceConfig) -> Result<Self, String> {
38        let mut store_config = allsource_core::store::EventStoreConfig::default();
39
40        if config.enable_persistence {
41            store_config = allsource_core::store::EventStoreConfig::with_persistence(
42                config
43                    .persistence_path
44                    .unwrap_or_else(|| "./allsource_data".to_string()),
45            );
46        }
47
48        let store = allsource_core::EventStore::with_config(store_config);
49
50        Ok(Self {
51            store: Arc::new(store),
52            _phantom: std::marker::PhantomData,
53        })
54    }
55
56    /// Create a production-ready configuration (persistence + WAL)
57    pub fn production(data_path: &str) -> Result<Self, String> {
58        Self::with_config(AllSourceConfig {
59            enable_persistence: true,
60            enable_wal: true,
61            persistence_path: Some(data_path.to_string()),
62            wal_path: Some(format!("{}/wal", data_path)),
63        })
64    }
65
66    /// Access the ExactlyOnceRegistry for idempotency/dedup of event processing
67    pub fn exactly_once(&self) -> Arc<allsource_core::ExactlyOnceRegistry> {
68        self.store.exactly_once()
69    }
70
71    /// Access the SchemaRegistry for JSON Schema validation of events
72    pub fn schema_registry(&self) -> Arc<allsource_core::SchemaRegistry> {
73        self.store.schema_registry()
74    }
75
76    /// Access the PipelineManager for stream processing pipelines
77    pub fn pipeline_manager(&self) -> Arc<allsource_core::PipelineManager> {
78        self.store.pipeline_manager()
79    }
80}
81
82#[cfg(feature = "cqrs-allsource")]
83impl<E: Event> Default for AllSourceBackend<E> {
84    fn default() -> Self {
85        Self::new().expect("Failed to create default AllSource backend")
86    }
87}
88
89#[cfg(feature = "cqrs-allsource")]
90/// Configuration for AllSource backend
91pub struct AllSourceConfig {
92    /// Enable Parquet persistence
93    pub enable_persistence: bool,
94    /// Enable Write-Ahead Log
95    pub enable_wal: bool,
96    /// Persistence path (default: "./allsource_data")
97    pub persistence_path: Option<String>,
98    /// WAL path (default: "./allsource_wal")
99    pub wal_path: Option<String>,
100}
101
102#[cfg(feature = "cqrs-allsource")]
103impl Default for AllSourceConfig {
104    fn default() -> Self {
105        Self {
106            enable_persistence: false,
107            enable_wal: false,
108            persistence_path: None,
109            wal_path: None,
110        }
111    }
112}
113
114#[cfg(feature = "cqrs-allsource")]
115#[async_trait]
116impl<E: Event> EventStoreBackend<E> for AllSourceBackend<E> {
117    async fn append(&self, aggregate_id: &str, events: Vec<E>) -> Result<(), String> {
118        // Convert AllFrame events to AllSource events using the new 0.7.0 API
119        for event in events {
120            let payload = serde_json::to_value(&event)
121                .map_err(|e| format!("Failed to serialize event: {}", e))?;
122
123            // Use from_strings which validates and creates proper value objects
124            let allsource_event = allsource_core::Event::from_strings(
125                format!("allframe.{}", E::event_type_name()),
126                aggregate_id.to_string(),
127                "default".to_string(),
128                payload,
129                None,
130            )
131            .map_err(|e| format!("Failed to create event: {:?}", e))?;
132
133            self.store
134                .ingest(allsource_event)
135                .map_err(|e| format!("Failed to ingest event: {:?}", e))?;
136        }
137
138        Ok(())
139    }
140
141    async fn get_events(&self, aggregate_id: &str) -> Result<Vec<E>, String> {
142        let request = allsource_core::QueryEventsRequest {
143            entity_id: Some(aggregate_id.to_string()),
144            event_type: None,
145            tenant_id: None,
146            as_of: None,
147            since: None,
148            until: None,
149            limit: None,
150        };
151
152        let allsource_events = self
153            .store
154            .query(request)
155            .map_err(|e| format!("Failed to query events: {:?}", e))?;
156
157        // Convert AllSource events back to AllFrame events
158        let mut events = Vec::new();
159        for allsource_event in allsource_events {
160            let event: E = serde_json::from_value(allsource_event.payload.clone())
161                .map_err(|e| format!("Failed to deserialize event: {}", e))?;
162            events.push(event);
163        }
164
165        Ok(events)
166    }
167
168    async fn get_all_events(&self) -> Result<Vec<E>, String> {
169        let request = allsource_core::QueryEventsRequest {
170            entity_id: None,
171            event_type: None,
172            tenant_id: None,
173            as_of: None,
174            since: None,
175            until: None,
176            limit: None,
177        };
178
179        let allsource_events = self
180            .store
181            .query(request)
182            .map_err(|e| format!("Failed to query all events: {:?}", e))?;
183
184        let mut events = Vec::new();
185        for allsource_event in allsource_events {
186            let event: E = serde_json::from_value(allsource_event.payload.clone())
187                .map_err(|e| format!("Failed to deserialize event: {}", e))?;
188            events.push(event);
189        }
190
191        Ok(events)
192    }
193
194    async fn get_events_after(&self, aggregate_id: &str, version: u64) -> Result<Vec<E>, String> {
195        let request = allsource_core::QueryEventsRequest {
196            entity_id: Some(aggregate_id.to_string()),
197            event_type: None,
198            tenant_id: None,
199            as_of: None,
200            since: None,
201            until: None,
202            limit: None,
203        };
204
205        let allsource_events = self
206            .store
207            .query(request)
208            .map_err(|e| format!("Failed to query events: {:?}", e))?;
209
210        // Skip the first `version` events and only deserialize the rest.
211        // AllSource's QueryEventsRequest does not support version-based offset,
212        // so we skip at the allsource Event level to avoid deserializing events
213        // we will discard.
214        let mut events = Vec::new();
215        for allsource_event in allsource_events.into_iter().skip(version as usize) {
216            let event: E = serde_json::from_value(allsource_event.payload.clone())
217                .map_err(|e| format!("Failed to deserialize event: {}", e))?;
218            events.push(event);
219        }
220
221        Ok(events)
222    }
223
224    async fn save_snapshot(
225        &self,
226        aggregate_id: &str,
227        snapshot_data: Vec<u8>,
228        version: u64,
229    ) -> Result<(), String> {
230        let state: serde_json::Value = serde_json::from_slice(&snapshot_data)
231            .map_err(|e| format!("Failed to deserialize snapshot data: {}", e))?;
232
233        self.store
234            .snapshot_manager()
235            .create_snapshot(
236                aggregate_id.to_string(),
237                state,
238                chrono::Utc::now(),
239                version as usize,
240                allsource_core::infrastructure::persistence::SnapshotType::Manual,
241            )
242            .map_err(|e| format!("Failed to create snapshot: {:?}", e))?;
243
244        Ok(())
245    }
246
247    async fn get_latest_snapshot(&self, aggregate_id: &str) -> Result<(Vec<u8>, u64), String> {
248        let snapshot = self
249            .store
250            .snapshot_manager()
251            .get_latest_snapshot(aggregate_id)
252            .ok_or_else(|| format!("No snapshot found for aggregate: {}", aggregate_id))?;
253
254        let snapshot_bytes = serde_json::to_vec(&snapshot.state)
255            .map_err(|e| format!("Failed to serialize snapshot: {}", e))?;
256
257        Ok((snapshot_bytes, snapshot.event_count as u64))
258    }
259
260    async fn flush(&self) -> Result<(), String> {
261        self.store
262            .flush_storage()
263            .map_err(|e| format!("Failed to flush storage: {:?}", e))?;
264        Ok(())
265    }
266
267    async fn stats(&self) -> BackendStats {
268        let allsource_stats = self.store.stats();
269
270        let mut backend_specific = std::collections::HashMap::new();
271        backend_specific.insert("backend_type".to_string(), "allsource-core".to_string());
272        backend_specific.insert(
273            "total_ingested".to_string(),
274            allsource_stats.total_ingested.to_string(),
275        );
276        backend_specific.insert(
277            "total_entities".to_string(),
278            allsource_stats.total_entities.to_string(),
279        );
280        backend_specific.insert(
281            "total_event_types".to_string(),
282            allsource_stats.total_event_types.to_string(),
283        );
284
285        let snapshot_stats = self.store.snapshot_manager().stats();
286
287        BackendStats {
288            total_events: allsource_stats.total_ingested,
289            total_aggregates: allsource_stats.total_entities as u64,
290            total_snapshots: snapshot_stats.total_snapshots as u64,
291            backend_specific,
292        }
293    }
294}
295
296// Placeholder types when the feature is not enabled
297#[cfg(not(feature = "cqrs-allsource"))]
298/// Placeholder - requires cqrs-allsource feature
299pub struct AllSourceBackend<E> {
300    _phantom: std::marker::PhantomData<E>,
301}
302
303#[cfg(not(feature = "cqrs-allsource"))]
304/// Placeholder - requires cqrs-allsource feature
305pub struct AllSourceConfig;