eventuali_core/
streaming.rs1use crate::{Event, Result, EventualiError};
34use async_trait::async_trait;
35use tokio::sync::broadcast;
36use std::collections::HashMap;
37use std::sync::{Arc, Mutex};
38use uuid::Uuid;
39
40#[derive(Debug, Clone)]
56pub struct Subscription {
57 pub id: String,
58 pub aggregate_type_filter: Option<String>,
59 pub event_type_filter: Option<String>,
60 pub from_timestamp: Option<chrono::DateTime<chrono::Utc>>,
61}
62
63#[derive(Debug, Clone)]
71pub struct StreamEvent {
72 pub event: Event,
73 pub stream_position: u64,
74 pub global_position: u64,
75}
76
77#[async_trait]
119pub trait EventStreamer {
120 async fn subscribe(&self, subscription: Subscription) -> Result<EventStreamReceiver>;
121 async fn unsubscribe(&self, subscription_id: &str) -> Result<()>;
122 async fn publish_event(&self, event: Event, stream_position: u64, global_position: u64) -> Result<()>;
123 async fn get_stream_position(&self, stream_id: &str) -> Result<Option<u64>>;
124 async fn get_global_position(&self) -> Result<u64>;
125}
126
127pub type EventStreamReceiver = tokio::sync::broadcast::Receiver<StreamEvent>;
129
130pub struct InMemoryEventStreamer {
155 sender: broadcast::Sender<StreamEvent>,
156 subscriptions: Arc<Mutex<HashMap<String, Subscription>>>,
157 stream_positions: Arc<Mutex<HashMap<String, u64>>>,
158 global_position: Arc<Mutex<u64>>,
159}
160
161impl InMemoryEventStreamer {
162 pub fn new(capacity: usize) -> Self {
163 let (sender, _) = broadcast::channel(capacity);
164
165 Self {
166 sender,
167 subscriptions: Arc::new(Mutex::new(HashMap::new())),
168 stream_positions: Arc::new(Mutex::new(HashMap::new())),
169 global_position: Arc::new(Mutex::new(0)),
170 }
171 }
172}
173
174#[async_trait]
175impl EventStreamer for InMemoryEventStreamer {
176 async fn subscribe(&self, subscription: Subscription) -> Result<EventStreamReceiver> {
177 let mut subscriptions = self.subscriptions.lock()
178 .map_err(|_| EventualiError::Configuration("Failed to acquire subscriptions lock".to_string()))?;
179
180 subscriptions.insert(subscription.id.clone(), subscription);
181
182 Ok(self.sender.subscribe())
183 }
184
185 async fn unsubscribe(&self, subscription_id: &str) -> Result<()> {
186 let mut subscriptions = self.subscriptions.lock()
187 .map_err(|_| EventualiError::Configuration("Failed to acquire subscriptions lock".to_string()))?;
188
189 subscriptions.remove(subscription_id);
190 Ok(())
191 }
192
193 async fn publish_event(&self, event: Event, stream_position: u64, global_position: u64) -> Result<()> {
194 {
196 let mut positions = self.stream_positions.lock()
197 .map_err(|_| EventualiError::Configuration("Failed to acquire stream positions lock".to_string()))?;
198 positions.insert(event.aggregate_id.clone(), stream_position);
199 }
200
201 {
202 let mut global_pos = self.global_position.lock()
203 .map_err(|_| EventualiError::Configuration("Failed to acquire global position lock".to_string()))?;
204 *global_pos = global_position;
205 }
206
207 let stream_event = StreamEvent {
208 event,
209 stream_position,
210 global_position,
211 };
212
213 let _ = self.sender.send(stream_event);
215
216 Ok(())
217 }
218
219 async fn get_stream_position(&self, stream_id: &str) -> Result<Option<u64>> {
220 let positions = self.stream_positions.lock()
221 .map_err(|_| EventualiError::Configuration("Failed to acquire stream positions lock".to_string()))?;
222
223 Ok(positions.get(stream_id).copied())
224 }
225
226 async fn get_global_position(&self) -> Result<u64> {
227 let global_pos = self.global_position.lock()
228 .map_err(|_| EventualiError::Configuration("Failed to acquire global position lock".to_string()))?;
229
230 Ok(*global_pos)
231 }
232}
233
234#[async_trait]
236pub trait EventStreamProcessor {
237 async fn process_event(&self, event: &StreamEvent) -> Result<()>;
238}
239
240pub struct ProjectionProcessor<P: Projection> {
243 projection: Arc<P>,
244}
245
246impl<P: Projection> ProjectionProcessor<P> {
247 pub fn new(projection: P) -> Self {
248 Self {
249 projection: Arc::new(projection),
250 }
251 }
252}
253
254#[async_trait]
255impl<P: Projection + Send + Sync> EventStreamProcessor for ProjectionProcessor<P> {
256 async fn process_event(&self, event: &StreamEvent) -> Result<()> {
257 self.projection.handle_event(&event.event).await
258 }
259}
260
261#[async_trait]
263pub trait Projection {
264 async fn handle_event(&self, event: &Event) -> Result<()>;
265 async fn reset(&self) -> Result<()>;
266 async fn get_last_processed_position(&self) -> Result<Option<u64>>;
267 async fn set_last_processed_position(&self, position: u64) -> Result<()>;
268}
269
270pub struct SagaProcessor {
272 saga_handlers: HashMap<String, Box<dyn SagaHandler + Send + Sync>>,
273}
274
275impl SagaProcessor {
276 pub fn new() -> Self {
277 Self {
278 saga_handlers: HashMap::new(),
279 }
280 }
281
282 pub fn register_handler<H: SagaHandler + Send + Sync + 'static>(&mut self, event_type: String, handler: H) {
283 self.saga_handlers.insert(event_type, Box::new(handler));
284 }
285}
286
287#[async_trait]
288impl EventStreamProcessor for SagaProcessor {
289 async fn process_event(&self, event: &StreamEvent) -> Result<()> {
290 if let Some(handler) = self.saga_handlers.get(&event.event.event_type) {
291 handler.handle_event(&event.event).await?;
292 }
293 Ok(())
294 }
295}
296
297#[async_trait]
299pub trait SagaHandler {
300 async fn handle_event(&self, event: &Event) -> Result<()>;
301}
302
303pub struct SubscriptionBuilder {
305 subscription: Subscription,
306}
307
308impl SubscriptionBuilder {
309 pub fn new() -> Self {
310 Self {
311 subscription: Subscription {
312 id: Uuid::new_v4().to_string(),
313 aggregate_type_filter: None,
314 event_type_filter: None,
315 from_timestamp: None,
316 },
317 }
318 }
319
320 pub fn with_id(mut self, id: String) -> Self {
321 self.subscription.id = id;
322 self
323 }
324
325 pub fn filter_by_aggregate_type(mut self, aggregate_type: String) -> Self {
326 self.subscription.aggregate_type_filter = Some(aggregate_type);
327 self
328 }
329
330 pub fn filter_by_event_type(mut self, event_type: String) -> Self {
331 self.subscription.event_type_filter = Some(event_type);
332 self
333 }
334
335 pub fn from_timestamp(mut self, timestamp: chrono::DateTime<chrono::Utc>) -> Self {
336 self.subscription.from_timestamp = Some(timestamp);
337 self
338 }
339
340 pub fn build(self) -> Subscription {
341 self.subscription
342 }
343}
344
345impl Default for SubscriptionBuilder {
346 fn default() -> Self {
347 Self::new()
348 }
349}
350
351impl Default for SagaProcessor {
352 fn default() -> Self {
353 Self::new()
354 }
355}