1#[cfg(feature = "cqrs-allsource")]
7use std::sync::Arc;
8
9#[cfg(feature = "cqrs-allsource")]
10use async_trait::async_trait;
11
12#[cfg(feature = "cqrs-allsource")]
13use super::backend::{BackendStats, EventStoreBackend};
14#[cfg(feature = "cqrs-allsource")]
15use super::Event;
16
17#[cfg(feature = "cqrs-allsource")]
18#[derive(Clone)]
20pub struct AllSourceBackend<E: Event> {
21 store: Arc<allsource_core::EventStore>,
22 _phantom: std::marker::PhantomData<E>,
23}
24
25#[cfg(feature = "cqrs-allsource")]
26impl<E: Event> AllSourceBackend<E> {
27 pub fn new() -> Result<Self, String> {
29 let store = allsource_core::EventStore::default();
30 Ok(Self {
31 store: Arc::new(store),
32 _phantom: std::marker::PhantomData,
33 })
34 }
35
36 pub fn with_config(config: AllSourceConfig) -> Result<Self, String> {
38 let mut store_config = allsource_core::store::EventStoreConfig::default();
39
40 if config.enable_persistence {
41 store_config = allsource_core::store::EventStoreConfig::with_persistence(
42 config
43 .persistence_path
44 .unwrap_or_else(|| "./allsource_data".to_string()),
45 );
46 }
47
48 let store = allsource_core::EventStore::with_config(store_config);
49
50 Ok(Self {
51 store: Arc::new(store),
52 _phantom: std::marker::PhantomData,
53 })
54 }
55
56 pub fn production(data_path: &str) -> Result<Self, String> {
58 Self::with_config(AllSourceConfig {
59 enable_persistence: true,
60 enable_wal: true,
61 persistence_path: Some(data_path.to_string()),
62 wal_path: Some(format!("{}/wal", data_path)),
63 })
64 }
65
66 pub fn exactly_once(&self) -> Arc<allsource_core::ExactlyOnceRegistry> {
68 self.store.exactly_once()
69 }
70
71 pub fn schema_registry(&self) -> Arc<allsource_core::SchemaRegistry> {
73 self.store.schema_registry()
74 }
75
76 pub fn pipeline_manager(&self) -> Arc<allsource_core::PipelineManager> {
78 self.store.pipeline_manager()
79 }
80}
81
82#[cfg(feature = "cqrs-allsource")]
83impl<E: Event> Default for AllSourceBackend<E> {
84 fn default() -> Self {
85 Self::new().expect("Failed to create default AllSource backend")
86 }
87}
88
89#[cfg(feature = "cqrs-allsource")]
90pub struct AllSourceConfig {
92 pub enable_persistence: bool,
94 pub enable_wal: bool,
96 pub persistence_path: Option<String>,
98 pub wal_path: Option<String>,
100}
101
102#[cfg(feature = "cqrs-allsource")]
103impl Default for AllSourceConfig {
104 fn default() -> Self {
105 Self {
106 enable_persistence: false,
107 enable_wal: false,
108 persistence_path: None,
109 wal_path: None,
110 }
111 }
112}
113
114#[cfg(feature = "cqrs-allsource")]
115#[async_trait]
116impl<E: Event> EventStoreBackend<E> for AllSourceBackend<E> {
117 async fn append(&self, aggregate_id: &str, events: Vec<E>) -> Result<(), String> {
118 for event in events {
120 let payload = serde_json::to_value(&event)
121 .map_err(|e| format!("Failed to serialize event: {}", e))?;
122
123 let allsource_event = allsource_core::Event::from_strings(
125 format!("allframe.{}", E::event_type_name()),
126 aggregate_id.to_string(),
127 "default".to_string(),
128 payload,
129 None,
130 )
131 .map_err(|e| format!("Failed to create event: {:?}", e))?;
132
133 self.store
134 .ingest(allsource_event)
135 .map_err(|e| format!("Failed to ingest event: {:?}", e))?;
136 }
137
138 Ok(())
139 }
140
141 async fn get_events(&self, aggregate_id: &str) -> Result<Vec<E>, String> {
142 let request = allsource_core::QueryEventsRequest {
143 entity_id: Some(aggregate_id.to_string()),
144 event_type: None,
145 tenant_id: None,
146 as_of: None,
147 since: None,
148 until: None,
149 limit: None,
150 };
151
152 let allsource_events = self
153 .store
154 .query(request)
155 .map_err(|e| format!("Failed to query events: {:?}", e))?;
156
157 let mut events = Vec::new();
159 for allsource_event in allsource_events {
160 let event: E = serde_json::from_value(allsource_event.payload.clone())
161 .map_err(|e| format!("Failed to deserialize event: {}", e))?;
162 events.push(event);
163 }
164
165 Ok(events)
166 }
167
168 async fn get_all_events(&self) -> Result<Vec<E>, String> {
169 let request = allsource_core::QueryEventsRequest {
170 entity_id: None,
171 event_type: None,
172 tenant_id: None,
173 as_of: None,
174 since: None,
175 until: None,
176 limit: None,
177 };
178
179 let allsource_events = self
180 .store
181 .query(request)
182 .map_err(|e| format!("Failed to query all events: {:?}", e))?;
183
184 let mut events = Vec::new();
185 for allsource_event in allsource_events {
186 let event: E = serde_json::from_value(allsource_event.payload.clone())
187 .map_err(|e| format!("Failed to deserialize event: {}", e))?;
188 events.push(event);
189 }
190
191 Ok(events)
192 }
193
194 async fn get_events_after(&self, aggregate_id: &str, version: u64) -> Result<Vec<E>, String> {
195 let request = allsource_core::QueryEventsRequest {
196 entity_id: Some(aggregate_id.to_string()),
197 event_type: None,
198 tenant_id: None,
199 as_of: None,
200 since: None,
201 until: None,
202 limit: None,
203 };
204
205 let allsource_events = self
206 .store
207 .query(request)
208 .map_err(|e| format!("Failed to query events: {:?}", e))?;
209
210 let mut events = Vec::new();
215 for allsource_event in allsource_events.into_iter().skip(version as usize) {
216 let event: E = serde_json::from_value(allsource_event.payload.clone())
217 .map_err(|e| format!("Failed to deserialize event: {}", e))?;
218 events.push(event);
219 }
220
221 Ok(events)
222 }
223
224 async fn save_snapshot(
225 &self,
226 aggregate_id: &str,
227 snapshot_data: Vec<u8>,
228 version: u64,
229 ) -> Result<(), String> {
230 let state: serde_json::Value = serde_json::from_slice(&snapshot_data)
231 .map_err(|e| format!("Failed to deserialize snapshot data: {}", e))?;
232
233 self.store
234 .snapshot_manager()
235 .create_snapshot(
236 aggregate_id.to_string(),
237 state,
238 chrono::Utc::now(),
239 version as usize,
240 allsource_core::infrastructure::persistence::SnapshotType::Manual,
241 )
242 .map_err(|e| format!("Failed to create snapshot: {:?}", e))?;
243
244 Ok(())
245 }
246
247 async fn get_latest_snapshot(&self, aggregate_id: &str) -> Result<(Vec<u8>, u64), String> {
248 let snapshot = self
249 .store
250 .snapshot_manager()
251 .get_latest_snapshot(aggregate_id)
252 .ok_or_else(|| format!("No snapshot found for aggregate: {}", aggregate_id))?;
253
254 let snapshot_bytes = serde_json::to_vec(&snapshot.state)
255 .map_err(|e| format!("Failed to serialize snapshot: {}", e))?;
256
257 Ok((snapshot_bytes, snapshot.event_count as u64))
258 }
259
260 async fn flush(&self) -> Result<(), String> {
261 self.store
262 .flush_storage()
263 .map_err(|e| format!("Failed to flush storage: {:?}", e))?;
264 Ok(())
265 }
266
267 async fn stats(&self) -> BackendStats {
268 let allsource_stats = self.store.stats();
269
270 let mut backend_specific = std::collections::HashMap::new();
271 backend_specific.insert("backend_type".to_string(), "allsource-core".to_string());
272 backend_specific.insert(
273 "total_ingested".to_string(),
274 allsource_stats.total_ingested.to_string(),
275 );
276 backend_specific.insert(
277 "total_entities".to_string(),
278 allsource_stats.total_entities.to_string(),
279 );
280 backend_specific.insert(
281 "total_event_types".to_string(),
282 allsource_stats.total_event_types.to_string(),
283 );
284
285 let snapshot_stats = self.store.snapshot_manager().stats();
286
287 BackendStats {
288 total_events: allsource_stats.total_ingested,
289 total_aggregates: allsource_stats.total_entities as u64,
290 total_snapshots: snapshot_stats.total_snapshots as u64,
291 backend_specific,
292 }
293 }
294}
295
296#[cfg(not(feature = "cqrs-allsource"))]
298pub struct AllSourceBackend<E> {
300 _phantom: std::marker::PhantomData<E>,
301}
302
303#[cfg(not(feature = "cqrs-allsource"))]
304pub struct AllSourceConfig;