1#[cfg(feature = "cqrs-allsource")]
7use std::sync::Arc;
8
9#[cfg(feature = "cqrs-allsource")]
10use async_trait::async_trait;
11
12#[cfg(feature = "cqrs-allsource")]
13use super::backend::{BackendStats, EventStoreBackend};
14#[cfg(feature = "cqrs-allsource")]
15use super::Event;
16
17#[cfg(feature = "cqrs-allsource")]
18#[derive(Clone)]
20pub struct AllSourceBackend<E: Event> {
21 store: Arc<allsource_core::EventStore>,
22 _phantom: std::marker::PhantomData<E>,
23}
24
25#[cfg(feature = "cqrs-allsource")]
26impl<E: Event> AllSourceBackend<E> {
27 pub fn new() -> Result<Self, String> {
29 let store = allsource_core::EventStore::default();
30 Ok(Self {
31 store: Arc::new(store),
32 _phantom: std::marker::PhantomData,
33 })
34 }
35
36 pub fn with_config(config: AllSourceConfig) -> Result<Self, String> {
38 let mut store_config = allsource_core::store::EventStoreConfig::default();
39
40 if config.enable_persistence {
41 store_config = allsource_core::store::EventStoreConfig::with_persistence(
42 config
43 .persistence_path
44 .unwrap_or_else(|| "./allsource_data".to_string()),
45 );
46 }
47
48 let store = allsource_core::EventStore::with_config(store_config);
49
50 Ok(Self {
51 store: Arc::new(store),
52 _phantom: std::marker::PhantomData,
53 })
54 }
55
56 pub fn production(data_path: &str) -> Result<Self, String> {
58 Self::with_config(AllSourceConfig {
59 enable_persistence: true,
60 enable_wal: true,
61 persistence_path: Some(data_path.to_string()),
62 wal_path: Some(format!("{}/wal", data_path)),
63 })
64 }
65}
66
67#[cfg(feature = "cqrs-allsource")]
68impl<E: Event> Default for AllSourceBackend<E> {
69 fn default() -> Self {
70 Self::new().expect("Failed to create default AllSource backend")
71 }
72}
73
74#[cfg(feature = "cqrs-allsource")]
75pub struct AllSourceConfig {
77 pub enable_persistence: bool,
79 pub enable_wal: bool,
81 pub persistence_path: Option<String>,
83 pub wal_path: Option<String>,
85}
86
87#[cfg(feature = "cqrs-allsource")]
88impl Default for AllSourceConfig {
89 fn default() -> Self {
90 Self {
91 enable_persistence: false,
92 enable_wal: false,
93 persistence_path: None,
94 wal_path: None,
95 }
96 }
97}
98
99#[cfg(feature = "cqrs-allsource")]
100#[async_trait]
101impl<E: Event> EventStoreBackend<E> for AllSourceBackend<E> {
102 async fn append(&self, aggregate_id: &str, events: Vec<E>) -> Result<(), String> {
103 for event in events {
105 let payload = serde_json::to_value(&event)
106 .map_err(|e| format!("Failed to serialize event: {}", e))?;
107
108 let allsource_event = allsource_core::Event::from_strings(
110 format!(
111 "allframe.{}",
112 std::any::type_name::<E>()
113 .split("::")
114 .last()
115 .unwrap_or("event")
116 ),
117 aggregate_id.to_string(),
118 "default".to_string(),
119 payload,
120 None,
121 )
122 .map_err(|e| format!("Failed to create event: {:?}", e))?;
123
124 self.store
125 .ingest(allsource_event)
126 .map_err(|e| format!("Failed to ingest event: {:?}", e))?;
127 }
128
129 Ok(())
130 }
131
132 async fn get_events(&self, aggregate_id: &str) -> Result<Vec<E>, String> {
133 let request = allsource_core::QueryEventsRequest {
134 entity_id: Some(aggregate_id.to_string()),
135 event_type: None,
136 tenant_id: None,
137 as_of: None,
138 since: None,
139 until: None,
140 limit: None,
141 };
142
143 let allsource_events = self
144 .store
145 .query(request)
146 .map_err(|e| format!("Failed to query events: {:?}", e))?;
147
148 let mut events = Vec::new();
150 for allsource_event in allsource_events {
151 let event: E = serde_json::from_value(allsource_event.payload.clone())
152 .map_err(|e| format!("Failed to deserialize event: {}", e))?;
153 events.push(event);
154 }
155
156 Ok(events)
157 }
158
159 async fn get_all_events(&self) -> Result<Vec<E>, String> {
160 let request = allsource_core::QueryEventsRequest {
161 entity_id: None,
162 event_type: None,
163 tenant_id: None,
164 as_of: None,
165 since: None,
166 until: None,
167 limit: None,
168 };
169
170 let allsource_events = self
171 .store
172 .query(request)
173 .map_err(|e| format!("Failed to query all events: {:?}", e))?;
174
175 let mut events = Vec::new();
176 for allsource_event in allsource_events {
177 let event: E = serde_json::from_value(allsource_event.payload.clone())
178 .map_err(|e| format!("Failed to deserialize event: {}", e))?;
179 events.push(event);
180 }
181
182 Ok(events)
183 }
184
185 async fn get_events_after(&self, aggregate_id: &str, version: u64) -> Result<Vec<E>, String> {
186 let all_events = self.get_events(aggregate_id).await?;
187 Ok(all_events.into_iter().skip(version as usize).collect())
188 }
189
190 async fn save_snapshot(
191 &self,
192 aggregate_id: &str,
193 _snapshot_data: Vec<u8>,
194 _version: u64,
195 ) -> Result<(), String> {
196 self.store
198 .create_snapshot(aggregate_id)
199 .map_err(|e| format!("Failed to create snapshot: {:?}", e))?;
200
201 Ok(())
202 }
203
204 async fn get_latest_snapshot(&self, aggregate_id: &str) -> Result<(Vec<u8>, u64), String> {
205 let snapshot_json = self
206 .store
207 .get_snapshot(aggregate_id)
208 .map_err(|e| format!("Failed to get snapshot: {:?}", e))?;
209
210 let snapshot_bytes = serde_json::to_vec(&snapshot_json)
212 .map_err(|e| format!("Failed to serialize snapshot: {}", e))?;
213
214 Ok((snapshot_bytes, 0))
215 }
216
217 async fn flush(&self) -> Result<(), String> {
218 self.store
219 .flush_storage()
220 .map_err(|e| format!("Failed to flush storage: {:?}", e))?;
221 Ok(())
222 }
223
224 async fn stats(&self) -> BackendStats {
225 let allsource_stats = self.store.stats();
226
227 let mut backend_specific = std::collections::HashMap::new();
228 backend_specific.insert("backend_type".to_string(), "allsource-core".to_string());
229 backend_specific.insert(
230 "total_ingested".to_string(),
231 allsource_stats.total_ingested.to_string(),
232 );
233 backend_specific.insert(
234 "total_entities".to_string(),
235 allsource_stats.total_entities.to_string(),
236 );
237 backend_specific.insert(
238 "total_event_types".to_string(),
239 allsource_stats.total_event_types.to_string(),
240 );
241
242 BackendStats {
243 total_events: allsource_stats.total_ingested,
244 total_aggregates: allsource_stats.total_entities as u64,
245 total_snapshots: 0,
246 backend_specific,
247 }
248 }
249}
250
251#[cfg(not(feature = "cqrs-allsource"))]
253pub struct AllSourceBackend<E> {
255 _phantom: std::marker::PhantomData<E>,
256}
257
258#[cfg(not(feature = "cqrs-allsource"))]
259pub struct AllSourceConfig;