allframe_core/cqrs/
allsource_backend.rs

1//! AllSource Core event store backend
2//!
3//! This backend integrates with AllSource Core to provide production-grade
4//! event sourcing with persistence, performance, and advanced features.
5
6#[cfg(feature = "cqrs-allsource")]
7use std::sync::Arc;
8
9#[cfg(feature = "cqrs-allsource")]
10use async_trait::async_trait;
11
12#[cfg(feature = "cqrs-allsource")]
13use super::backend::{BackendStats, EventStoreBackend};
14#[cfg(feature = "cqrs-allsource")]
15use super::Event;
16
17#[cfg(feature = "cqrs-allsource")]
18/// AllSource Core backend for production event sourcing
19#[derive(Clone)]
20pub struct AllSourceBackend<E: Event> {
21    store: Arc<allsource_core::EventStore>,
22    _phantom: std::marker::PhantomData<E>,
23}
24
25#[cfg(feature = "cqrs-allsource")]
26impl<E: Event> AllSourceBackend<E> {
27    /// Create a new AllSource backend with default configuration
28    pub fn new() -> Result<Self, String> {
29        let store = allsource_core::EventStore::default();
30        Ok(Self {
31            store: Arc::new(store),
32            _phantom: std::marker::PhantomData,
33        })
34    }
35
36    /// Create a new AllSource backend with custom configuration
37    pub fn with_config(config: AllSourceConfig) -> Result<Self, String> {
38        let mut store_config = allsource_core::store::EventStoreConfig::default();
39
40        if config.enable_persistence {
41            store_config = allsource_core::store::EventStoreConfig::with_persistence(
42                config
43                    .persistence_path
44                    .unwrap_or_else(|| "./allsource_data".to_string()),
45            );
46        }
47
48        let store = allsource_core::EventStore::with_config(store_config);
49
50        Ok(Self {
51            store: Arc::new(store),
52            _phantom: std::marker::PhantomData,
53        })
54    }
55
56    /// Create a production-ready configuration (persistence + WAL)
57    pub fn production(data_path: &str) -> Result<Self, String> {
58        Self::with_config(AllSourceConfig {
59            enable_persistence: true,
60            enable_wal: true,
61            persistence_path: Some(data_path.to_string()),
62            wal_path: Some(format!("{}/wal", data_path)),
63        })
64    }
65}
66
67#[cfg(feature = "cqrs-allsource")]
68impl<E: Event> Default for AllSourceBackend<E> {
69    fn default() -> Self {
70        Self::new().expect("Failed to create default AllSource backend")
71    }
72}
73
74#[cfg(feature = "cqrs-allsource")]
75/// Configuration for AllSource backend
76pub struct AllSourceConfig {
77    /// Enable Parquet persistence
78    pub enable_persistence: bool,
79    /// Enable Write-Ahead Log
80    pub enable_wal: bool,
81    /// Persistence path (default: "./allsource_data")
82    pub persistence_path: Option<String>,
83    /// WAL path (default: "./allsource_wal")
84    pub wal_path: Option<String>,
85}
86
87#[cfg(feature = "cqrs-allsource")]
88impl Default for AllSourceConfig {
89    fn default() -> Self {
90        Self {
91            enable_persistence: false,
92            enable_wal: false,
93            persistence_path: None,
94            wal_path: None,
95        }
96    }
97}
98
99#[cfg(feature = "cqrs-allsource")]
100#[async_trait]
101impl<E: Event> EventStoreBackend<E> for AllSourceBackend<E> {
102    async fn append(&self, aggregate_id: &str, events: Vec<E>) -> Result<(), String> {
103        // Convert AllFrame events to AllSource events using the new 0.7.0 API
104        for event in events {
105            let payload = serde_json::to_value(&event)
106                .map_err(|e| format!("Failed to serialize event: {}", e))?;
107
108            // Use from_strings which validates and creates proper value objects
109            let allsource_event = allsource_core::Event::from_strings(
110                format!(
111                    "allframe.{}",
112                    std::any::type_name::<E>()
113                        .split("::")
114                        .last()
115                        .unwrap_or("event")
116                ),
117                aggregate_id.to_string(),
118                "default".to_string(),
119                payload,
120                None,
121            )
122            .map_err(|e| format!("Failed to create event: {:?}", e))?;
123
124            self.store
125                .ingest(allsource_event)
126                .map_err(|e| format!("Failed to ingest event: {:?}", e))?;
127        }
128
129        Ok(())
130    }
131
132    async fn get_events(&self, aggregate_id: &str) -> Result<Vec<E>, String> {
133        let request = allsource_core::QueryEventsRequest {
134            entity_id: Some(aggregate_id.to_string()),
135            event_type: None,
136            tenant_id: None,
137            as_of: None,
138            since: None,
139            until: None,
140            limit: None,
141        };
142
143        let allsource_events = self
144            .store
145            .query(request)
146            .map_err(|e| format!("Failed to query events: {:?}", e))?;
147
148        // Convert AllSource events back to AllFrame events
149        let mut events = Vec::new();
150        for allsource_event in allsource_events {
151            let event: E = serde_json::from_value(allsource_event.payload.clone())
152                .map_err(|e| format!("Failed to deserialize event: {}", e))?;
153            events.push(event);
154        }
155
156        Ok(events)
157    }
158
159    async fn get_all_events(&self) -> Result<Vec<E>, String> {
160        let request = allsource_core::QueryEventsRequest {
161            entity_id: None,
162            event_type: None,
163            tenant_id: None,
164            as_of: None,
165            since: None,
166            until: None,
167            limit: None,
168        };
169
170        let allsource_events = self
171            .store
172            .query(request)
173            .map_err(|e| format!("Failed to query all events: {:?}", e))?;
174
175        let mut events = Vec::new();
176        for allsource_event in allsource_events {
177            let event: E = serde_json::from_value(allsource_event.payload.clone())
178                .map_err(|e| format!("Failed to deserialize event: {}", e))?;
179            events.push(event);
180        }
181
182        Ok(events)
183    }
184
185    async fn get_events_after(&self, aggregate_id: &str, version: u64) -> Result<Vec<E>, String> {
186        let all_events = self.get_events(aggregate_id).await?;
187        Ok(all_events.into_iter().skip(version as usize).collect())
188    }
189
190    async fn save_snapshot(
191        &self,
192        aggregate_id: &str,
193        _snapshot_data: Vec<u8>,
194        _version: u64,
195    ) -> Result<(), String> {
196        // AllSource 0.7.0 handles snapshots internally
197        self.store
198            .create_snapshot(aggregate_id)
199            .map_err(|e| format!("Failed to create snapshot: {:?}", e))?;
200
201        Ok(())
202    }
203
204    async fn get_latest_snapshot(&self, aggregate_id: &str) -> Result<(Vec<u8>, u64), String> {
205        let snapshot_json = self
206            .store
207            .get_snapshot(aggregate_id)
208            .map_err(|e| format!("Failed to get snapshot: {:?}", e))?;
209
210        // Convert snapshot JSON to bytes
211        let snapshot_bytes = serde_json::to_vec(&snapshot_json)
212            .map_err(|e| format!("Failed to serialize snapshot: {}", e))?;
213
214        Ok((snapshot_bytes, 0))
215    }
216
217    async fn flush(&self) -> Result<(), String> {
218        self.store
219            .flush_storage()
220            .map_err(|e| format!("Failed to flush storage: {:?}", e))?;
221        Ok(())
222    }
223
224    async fn stats(&self) -> BackendStats {
225        let allsource_stats = self.store.stats();
226
227        let mut backend_specific = std::collections::HashMap::new();
228        backend_specific.insert("backend_type".to_string(), "allsource-core".to_string());
229        backend_specific.insert(
230            "total_ingested".to_string(),
231            allsource_stats.total_ingested.to_string(),
232        );
233        backend_specific.insert(
234            "total_entities".to_string(),
235            allsource_stats.total_entities.to_string(),
236        );
237        backend_specific.insert(
238            "total_event_types".to_string(),
239            allsource_stats.total_event_types.to_string(),
240        );
241
242        BackendStats {
243            total_events: allsource_stats.total_ingested,
244            total_aggregates: allsource_stats.total_entities as u64,
245            total_snapshots: 0,
246            backend_specific,
247        }
248    }
249}
250
251// Placeholder types when the feature is not enabled
252#[cfg(not(feature = "cqrs-allsource"))]
253/// Placeholder - requires cqrs-allsource feature
254pub struct AllSourceBackend<E> {
255    _phantom: std::marker::PhantomData<E>,
256}
257
258#[cfg(not(feature = "cqrs-allsource"))]
259/// Placeholder - requires cqrs-allsource feature
260pub struct AllSourceConfig;