eventuali_core/
streaming.rs

1//! Real-time event streaming and projections.
2//!
3//! This module provides real-time event streaming capabilities for reactive
4//! event sourcing architectures:
5//! - [`EventStreamer`] trait for streaming implementations
6//! - [`InMemoryEventStreamer`] for development and testing
7//! - [`Projection`] trait for building read models
8//! - Subscription management and filtering
9//!
10//! # Example
11//!
12//! ```rust
13//! use eventuali_core::{InMemoryEventStreamer, SubscriptionBuilder};
14//!
15//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
16//! let streamer = InMemoryEventStreamer::new();
17//!
18//! // Subscribe to user events
19//! let subscription = SubscriptionBuilder::new()
20//!     .aggregate_type("User".to_string())
21//!     .build();
22//!
23//! let mut receiver = streamer.subscribe(subscription).await?;
24//!
25//! // Process events as they arrive
26//! while let Ok(stream_event) = receiver.recv().await {
27//!     println!("Received event: {}", stream_event.event.event_type);
28//! }
29//! # Ok(())
30//! # }
31//! ```
32
33use crate::{Event, Result, EventualiError};
34use async_trait::async_trait;
35use tokio::sync::broadcast;
36use std::collections::HashMap;
37use std::sync::{Arc, Mutex};
38use uuid::Uuid;
39
40/// Event stream subscription configuration.
41///
42/// Subscriptions define which events a consumer wants to receive from the event stream.
43/// They support filtering by aggregate type, event type, and time ranges.
44///
45/// # Example
46///
47/// ```rust
48/// use eventuali_core::SubscriptionBuilder;
49///
50/// let subscription = SubscriptionBuilder::new()
51///     .aggregate_type("User".to_string())
52///     .event_type("UserRegistered".to_string())
53///     .build();
54/// ```
55#[derive(Debug, Clone)]
56pub struct Subscription {
57    pub id: String,
58    pub aggregate_type_filter: Option<String>,
59    pub event_type_filter: Option<String>,
60    pub from_timestamp: Option<chrono::DateTime<chrono::Utc>>,
61}
62
63/// Event stream message containing an event and its position information.
64///
65/// StreamEvent wraps an event with position metadata for stream processing:
66/// - `stream_position`: Position within the aggregate stream
67/// - `global_position`: Global position across all events
68///
69/// This enables ordered processing and checkpoint management.
70#[derive(Debug, Clone)]
71pub struct StreamEvent {
72    pub event: Event,
73    pub stream_position: u64,
74    pub global_position: u64,
75}
76
77/// Trait for event streaming implementations.
78///
79/// EventStreamer provides the core streaming interface for real-time event processing.
80/// Implementations can be in-memory (for testing), Redis-based, or other message brokers.
81///
82/// # Example Implementation
83///
84/// ```rust
85/// use eventuali_core::{EventStreamer, Subscription, EventStreamReceiver, Event, Result};
86/// use async_trait::async_trait;
87///
88/// struct MyStreamer;
89///
90/// #[async_trait]
91/// impl EventStreamer for MyStreamer {
92///     async fn subscribe(&self, subscription: Subscription) -> Result<EventStreamReceiver> {
93///         // Implementation here
94///         todo!()
95///     }
96///
97///     async fn unsubscribe(&self, subscription_id: &str) -> Result<()> {
98///         // Implementation here
99///         todo!()
100///     }
101///
102///     async fn publish_event(&self, event: Event, stream_position: u64, global_position: u64) -> Result<()> {
103///         // Implementation here
104///         todo!()
105///     }
106///
107///     async fn get_stream_position(&self, stream_id: &str) -> Result<Option<u64>> {
108///         // Implementation here
109///         todo!()
110///     }
111///
112///     async fn get_global_position(&self) -> Result<u64> {
113///         // Implementation here
114///         todo!()
115///     }
116/// }
117/// ```
118#[async_trait]
119pub trait EventStreamer {
120    async fn subscribe(&self, subscription: Subscription) -> Result<EventStreamReceiver>;
121    async fn unsubscribe(&self, subscription_id: &str) -> Result<()>;
122    async fn publish_event(&self, event: Event, stream_position: u64, global_position: u64) -> Result<()>;
123    async fn get_stream_position(&self, stream_id: &str) -> Result<Option<u64>>;
124    async fn get_global_position(&self) -> Result<u64>;
125}
126
127/// Event stream receiver
128pub type EventStreamReceiver = tokio::sync::broadcast::Receiver<StreamEvent>;
129
130/// In-memory event streamer implementation for development and testing.
131///
132/// InMemoryEventStreamer provides a simple, fast event streaming implementation
133/// that runs entirely in memory. It's perfect for:
134/// - Development and testing
135/// - Single-process applications
136/// - Prototyping event-driven architectures
137///
138/// For production use with multiple processes, consider a distributed streaming
139/// solution like Redis Streams or Apache Kafka.
140///
141/// # Example
142///
143/// ```rust
144/// use eventuali_core::{InMemoryEventStreamer, SubscriptionBuilder};
145///
146/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
147/// let streamer = InMemoryEventStreamer::new();
148///
149/// let subscription = SubscriptionBuilder::new().build();
150/// let mut receiver = streamer.subscribe(subscription).await?;
151/// # Ok(())
152/// # }
153/// ```
154pub struct InMemoryEventStreamer {
155    sender: broadcast::Sender<StreamEvent>,
156    subscriptions: Arc<Mutex<HashMap<String, Subscription>>>,
157    stream_positions: Arc<Mutex<HashMap<String, u64>>>,
158    global_position: Arc<Mutex<u64>>,
159}
160
161impl InMemoryEventStreamer {
162    pub fn new(capacity: usize) -> Self {
163        let (sender, _) = broadcast::channel(capacity);
164        
165        Self {
166            sender,
167            subscriptions: Arc::new(Mutex::new(HashMap::new())),
168            stream_positions: Arc::new(Mutex::new(HashMap::new())),
169            global_position: Arc::new(Mutex::new(0)),
170        }
171    }
172}
173
174#[async_trait]
175impl EventStreamer for InMemoryEventStreamer {
176    async fn subscribe(&self, subscription: Subscription) -> Result<EventStreamReceiver> {
177        let mut subscriptions = self.subscriptions.lock()
178            .map_err(|_| EventualiError::Configuration("Failed to acquire subscriptions lock".to_string()))?;
179        
180        subscriptions.insert(subscription.id.clone(), subscription);
181        
182        Ok(self.sender.subscribe())
183    }
184
185    async fn unsubscribe(&self, subscription_id: &str) -> Result<()> {
186        let mut subscriptions = self.subscriptions.lock()
187            .map_err(|_| EventualiError::Configuration("Failed to acquire subscriptions lock".to_string()))?;
188        
189        subscriptions.remove(subscription_id);
190        Ok(())
191    }
192
193    async fn publish_event(&self, event: Event, stream_position: u64, global_position: u64) -> Result<()> {
194        // Update positions
195        {
196            let mut positions = self.stream_positions.lock()
197                .map_err(|_| EventualiError::Configuration("Failed to acquire stream positions lock".to_string()))?;
198            positions.insert(event.aggregate_id.clone(), stream_position);
199        }
200        
201        {
202            let mut global_pos = self.global_position.lock()
203                .map_err(|_| EventualiError::Configuration("Failed to acquire global position lock".to_string()))?;
204            *global_pos = global_position;
205        }
206
207        let stream_event = StreamEvent {
208            event,
209            stream_position,
210            global_position,
211        };
212
213        // Send to all subscribers (ignore errors for disconnected receivers)
214        let _ = self.sender.send(stream_event);
215        
216        Ok(())
217    }
218
219    async fn get_stream_position(&self, stream_id: &str) -> Result<Option<u64>> {
220        let positions = self.stream_positions.lock()
221            .map_err(|_| EventualiError::Configuration("Failed to acquire stream positions lock".to_string()))?;
222        
223        Ok(positions.get(stream_id).copied())
224    }
225
226    async fn get_global_position(&self) -> Result<u64> {
227        let global_pos = self.global_position.lock()
228            .map_err(|_| EventualiError::Configuration("Failed to acquire global position lock".to_string()))?;
229        
230        Ok(*global_pos)
231    }
232}
233
234/// Event stream processor for handling events as they arrive
235#[async_trait]
236pub trait EventStreamProcessor {
237    async fn process_event(&self, event: &StreamEvent) -> Result<()>;
238}
239
240/// Built-in processors
241/// Projection processor that updates read models
242pub struct ProjectionProcessor<P: Projection> {
243    projection: Arc<P>,
244}
245
246impl<P: Projection> ProjectionProcessor<P> {
247    pub fn new(projection: P) -> Self {
248        Self {
249            projection: Arc::new(projection),
250        }
251    }
252}
253
254#[async_trait]
255impl<P: Projection + Send + Sync> EventStreamProcessor for ProjectionProcessor<P> {
256    async fn process_event(&self, event: &StreamEvent) -> Result<()> {
257        self.projection.handle_event(&event.event).await
258    }
259}
260
261/// Projection trait for building read models
262#[async_trait]
263pub trait Projection {
264    async fn handle_event(&self, event: &Event) -> Result<()>;
265    async fn reset(&self) -> Result<()>;
266    async fn get_last_processed_position(&self) -> Result<Option<u64>>;
267    async fn set_last_processed_position(&self, position: u64) -> Result<()>;
268}
269
270/// Saga processor for long-running workflows
271pub struct SagaProcessor {
272    saga_handlers: HashMap<String, Box<dyn SagaHandler + Send + Sync>>,
273}
274
275impl SagaProcessor {
276    pub fn new() -> Self {
277        Self {
278            saga_handlers: HashMap::new(),
279        }
280    }
281
282    pub fn register_handler<H: SagaHandler + Send + Sync + 'static>(&mut self, event_type: String, handler: H) {
283        self.saga_handlers.insert(event_type, Box::new(handler));
284    }
285}
286
287#[async_trait]
288impl EventStreamProcessor for SagaProcessor {
289    async fn process_event(&self, event: &StreamEvent) -> Result<()> {
290        if let Some(handler) = self.saga_handlers.get(&event.event.event_type) {
291            handler.handle_event(&event.event).await?;
292        }
293        Ok(())
294    }
295}
296
297/// Saga handler trait
298#[async_trait]
299pub trait SagaHandler {
300    async fn handle_event(&self, event: &Event) -> Result<()>;
301}
302
303/// Event stream subscription builder
304pub struct SubscriptionBuilder {
305    subscription: Subscription,
306}
307
308impl SubscriptionBuilder {
309    pub fn new() -> Self {
310        Self {
311            subscription: Subscription {
312                id: Uuid::new_v4().to_string(),
313                aggregate_type_filter: None,
314                event_type_filter: None,
315                from_timestamp: None,
316            },
317        }
318    }
319
320    pub fn with_id(mut self, id: String) -> Self {
321        self.subscription.id = id;
322        self
323    }
324
325    pub fn filter_by_aggregate_type(mut self, aggregate_type: String) -> Self {
326        self.subscription.aggregate_type_filter = Some(aggregate_type);
327        self
328    }
329
330    pub fn filter_by_event_type(mut self, event_type: String) -> Self {
331        self.subscription.event_type_filter = Some(event_type);
332        self
333    }
334
335    pub fn from_timestamp(mut self, timestamp: chrono::DateTime<chrono::Utc>) -> Self {
336        self.subscription.from_timestamp = Some(timestamp);
337        self
338    }
339
340    pub fn build(self) -> Subscription {
341        self.subscription
342    }
343}
344
345impl Default for SubscriptionBuilder {
346    fn default() -> Self {
347        Self::new()
348    }
349}
350
351impl Default for SagaProcessor {
352    fn default() -> Self {
353        Self::new()
354    }
355}