aegis_streaming/
engine.rs

1//! Aegis Streaming Engine
2//!
3//! Core engine for real-time event streaming.
4//!
5//! @version 0.1.0
6//! @author AutomataNexus Development Team
7
8use crate::cdc::{CdcConfig, ChangeEvent};
9use crate::channel::{Channel, ChannelConfig, ChannelError, ChannelId, ChannelReceiver};
10use crate::event::{Event, EventFilter};
11use crate::subscriber::{Subscriber, SubscriberId, Subscription};
12use std::collections::HashMap;
13use std::sync::RwLock;
14
15// =============================================================================
16// Engine Configuration
17// =============================================================================
18
19/// Configuration for the streaming engine.
20#[derive(Debug, Clone)]
21pub struct EngineConfig {
22    pub max_channels: usize,
23    pub max_subscribers: usize,
24    pub default_channel_config: ChannelConfig,
25    pub cdc_config: CdcConfig,
26}
27
28impl Default for EngineConfig {
29    fn default() -> Self {
30        Self {
31            max_channels: 1000,
32            max_subscribers: 10000,
33            default_channel_config: ChannelConfig::default(),
34            cdc_config: CdcConfig::default(),
35        }
36    }
37}
38
39// =============================================================================
40// Streaming Engine
41// =============================================================================
42
43/// The main streaming engine for pub/sub and CDC.
44pub struct StreamingEngine {
45    config: EngineConfig,
46    channels: RwLock<HashMap<ChannelId, Channel>>,
47    subscribers: RwLock<HashMap<SubscriberId, Subscriber>>,
48    stats: RwLock<EngineStats>,
49}
50
51impl StreamingEngine {
52    /// Create a new streaming engine.
53    pub fn new() -> Self {
54        Self::with_config(EngineConfig::default())
55    }
56
57    /// Create an engine with custom configuration.
58    pub fn with_config(config: EngineConfig) -> Self {
59        Self {
60            config,
61            channels: RwLock::new(HashMap::new()),
62            subscribers: RwLock::new(HashMap::new()),
63            stats: RwLock::new(EngineStats::default()),
64        }
65    }
66
67    // -------------------------------------------------------------------------
68    // Channel Management
69    // -------------------------------------------------------------------------
70
71    /// Create a new channel.
72    pub fn create_channel(&self, id: impl Into<ChannelId>) -> Result<(), EngineError> {
73        let id = id.into();
74        let mut channels = self.channels.write().unwrap();
75
76        if channels.len() >= self.config.max_channels {
77            return Err(EngineError::TooManyChannels);
78        }
79
80        if channels.contains_key(&id) {
81            return Err(EngineError::ChannelExists(id));
82        }
83
84        let channel = Channel::with_config(id.clone(), self.config.default_channel_config.clone());
85        channels.insert(id, channel);
86
87        Ok(())
88    }
89
90    /// Create a channel with custom configuration.
91    pub fn create_channel_with_config(
92        &self,
93        id: impl Into<ChannelId>,
94        config: ChannelConfig,
95    ) -> Result<(), EngineError> {
96        let id = id.into();
97        let mut channels = self.channels.write().unwrap();
98
99        if channels.len() >= self.config.max_channels {
100            return Err(EngineError::TooManyChannels);
101        }
102
103        if channels.contains_key(&id) {
104            return Err(EngineError::ChannelExists(id));
105        }
106
107        let channel = Channel::with_config(id.clone(), config);
108        channels.insert(id, channel);
109
110        Ok(())
111    }
112
113    /// Delete a channel.
114    pub fn delete_channel(&self, id: &ChannelId) -> Result<(), EngineError> {
115        let mut channels = self.channels.write().unwrap();
116
117        if channels.remove(id).is_none() {
118            return Err(EngineError::ChannelNotFound(id.clone()));
119        }
120
121        Ok(())
122    }
123
124    /// List all channels.
125    pub fn list_channels(&self) -> Vec<ChannelId> {
126        let channels = self.channels.read().unwrap();
127        channels.keys().cloned().collect()
128    }
129
130    /// Check if a channel exists.
131    pub fn channel_exists(&self, id: &ChannelId) -> bool {
132        let channels = self.channels.read().unwrap();
133        channels.contains_key(id)
134    }
135
136    // -------------------------------------------------------------------------
137    // Publishing
138    // -------------------------------------------------------------------------
139
140    /// Publish an event to a channel.
141    pub fn publish(&self, channel_id: &ChannelId, event: Event) -> Result<usize, EngineError> {
142        let channels = self.channels.read().unwrap();
143        let channel = channels
144            .get(channel_id)
145            .ok_or_else(|| EngineError::ChannelNotFound(channel_id.clone()))?;
146
147        let receivers = channel.publish(event).map_err(EngineError::Channel)?;
148
149        drop(channels);
150
151        {
152            let mut stats = self.stats.write().unwrap();
153            stats.events_published += 1;
154        }
155
156        Ok(receivers)
157    }
158
159    /// Publish a CDC change event.
160    pub fn publish_change(&self, channel_id: &ChannelId, change: ChangeEvent) -> Result<usize, EngineError> {
161        let event = change.to_event();
162        self.publish(channel_id, event)
163    }
164
165    /// Publish to multiple channels.
166    pub fn publish_to_many(
167        &self,
168        channel_ids: &[ChannelId],
169        event: Event,
170    ) -> HashMap<ChannelId, Result<usize, EngineError>> {
171        let mut results = HashMap::new();
172
173        for id in channel_ids {
174            results.insert(id.clone(), self.publish(id, event.clone()));
175        }
176
177        results
178    }
179
180    // -------------------------------------------------------------------------
181    // Subscribing
182    // -------------------------------------------------------------------------
183
184    /// Subscribe to a channel.
185    pub fn subscribe(
186        &self,
187        channel_id: &ChannelId,
188        subscriber_id: impl Into<SubscriberId>,
189    ) -> Result<ChannelReceiver, EngineError> {
190        let subscriber_id = subscriber_id.into();
191        let channels = self.channels.read().unwrap();
192        let channel = channels
193            .get(channel_id)
194            .ok_or_else(|| EngineError::ChannelNotFound(channel_id.clone()))?;
195
196        let receiver = channel
197            .subscribe(subscriber_id.clone())
198            .map_err(EngineError::Channel)?;
199
200        drop(channels);
201
202        self.ensure_subscriber(&subscriber_id, channel_id);
203
204        {
205            let mut stats = self.stats.write().unwrap();
206            stats.active_subscriptions += 1;
207        }
208
209        Ok(receiver)
210    }
211
212    /// Subscribe with a filter.
213    pub fn subscribe_with_filter(
214        &self,
215        channel_id: &ChannelId,
216        subscriber_id: impl Into<SubscriberId>,
217        filter: EventFilter,
218    ) -> Result<ChannelReceiver, EngineError> {
219        let subscriber_id = subscriber_id.into();
220        let channels = self.channels.read().unwrap();
221        let channel = channels
222            .get(channel_id)
223            .ok_or_else(|| EngineError::ChannelNotFound(channel_id.clone()))?;
224
225        let receiver = channel
226            .subscribe_with_filter(subscriber_id.clone(), filter)
227            .map_err(EngineError::Channel)?;
228
229        drop(channels);
230
231        self.ensure_subscriber(&subscriber_id, channel_id);
232
233        {
234            let mut stats = self.stats.write().unwrap();
235            stats.active_subscriptions += 1;
236        }
237
238        Ok(receiver)
239    }
240
241    /// Unsubscribe from a channel.
242    pub fn unsubscribe(&self, channel_id: &ChannelId, subscriber_id: &SubscriberId) {
243        let channels = self.channels.read().unwrap();
244        if let Some(channel) = channels.get(channel_id) {
245            channel.unsubscribe(subscriber_id);
246        }
247
248        let mut stats = self.stats.write().unwrap();
249        stats.active_subscriptions = stats.active_subscriptions.saturating_sub(1);
250    }
251
252    fn ensure_subscriber(&self, subscriber_id: &SubscriberId, channel_id: &ChannelId) {
253        let mut subscribers = self.subscribers.write().unwrap();
254
255        let subscriber = subscribers
256            .entry(subscriber_id.clone())
257            .or_insert_with(|| Subscriber::new(subscriber_id.clone()));
258
259        let mut subscription = Subscription::new(subscriber_id.clone());
260        subscription.add_channel(channel_id.clone());
261        subscriber.add_subscription(subscription);
262    }
263
264    // -------------------------------------------------------------------------
265    // Subscriber Management
266    // -------------------------------------------------------------------------
267
268    /// Get a subscriber.
269    pub fn get_subscriber(&self, id: &SubscriberId) -> Option<Subscriber> {
270        let subscribers = self.subscribers.read().unwrap();
271        subscribers.get(id).cloned()
272    }
273
274    /// List all subscribers.
275    pub fn list_subscribers(&self) -> Vec<SubscriberId> {
276        let subscribers = self.subscribers.read().unwrap();
277        subscribers.keys().cloned().collect()
278    }
279
280    /// Remove a subscriber.
281    pub fn remove_subscriber(&self, id: &SubscriberId) {
282        let mut subscribers = self.subscribers.write().unwrap();
283        subscribers.remove(id);
284    }
285
286    // -------------------------------------------------------------------------
287    // History and Replay
288    // -------------------------------------------------------------------------
289
290    /// Get recent events from a channel.
291    pub fn get_history(&self, channel_id: &ChannelId, count: usize) -> Result<Vec<Event>, EngineError> {
292        let channels = self.channels.read().unwrap();
293        let channel = channels
294            .get(channel_id)
295            .ok_or_else(|| EngineError::ChannelNotFound(channel_id.clone()))?;
296
297        Ok(channel.get_history(count))
298    }
299
300    /// Get events after a timestamp.
301    pub fn get_history_after(
302        &self,
303        channel_id: &ChannelId,
304        timestamp: u64,
305    ) -> Result<Vec<Event>, EngineError> {
306        let channels = self.channels.read().unwrap();
307        let channel = channels
308            .get(channel_id)
309            .ok_or_else(|| EngineError::ChannelNotFound(channel_id.clone()))?;
310
311        Ok(channel.get_history_after(timestamp))
312    }
313
314    // -------------------------------------------------------------------------
315    // Statistics
316    // -------------------------------------------------------------------------
317
318    /// Get engine statistics.
319    pub fn stats(&self) -> EngineStats {
320        let stats = self.stats.read().unwrap();
321        stats.clone()
322    }
323
324    /// Reset statistics.
325    pub fn reset_stats(&self) {
326        let mut stats = self.stats.write().unwrap();
327        *stats = EngineStats::default();
328    }
329
330    /// Get channel statistics.
331    pub fn channel_stats(&self, id: &ChannelId) -> Option<crate::channel::ChannelStats> {
332        let channels = self.channels.read().unwrap();
333        channels.get(id).map(|c| c.stats())
334    }
335}
336
337impl Default for StreamingEngine {
338    fn default() -> Self {
339        Self::new()
340    }
341}
342
343// =============================================================================
344// Engine Statistics
345// =============================================================================
346
347/// Statistics for the streaming engine.
348#[derive(Debug, Clone, Default)]
349pub struct EngineStats {
350    pub events_published: u64,
351    pub active_subscriptions: usize,
352    pub channels_created: usize,
353}
354
355// =============================================================================
356// Engine Error
357// =============================================================================
358
359/// Errors that can occur in the streaming engine.
360#[derive(Debug, Clone)]
361pub enum EngineError {
362    ChannelExists(ChannelId),
363    ChannelNotFound(ChannelId),
364    TooManyChannels,
365    TooManySubscribers,
366    Channel(ChannelError),
367}
368
369impl std::fmt::Display for EngineError {
370    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
371        match self {
372            Self::ChannelExists(id) => write!(f, "Channel already exists: {}", id),
373            Self::ChannelNotFound(id) => write!(f, "Channel not found: {}", id),
374            Self::TooManyChannels => write!(f, "Maximum channels reached"),
375            Self::TooManySubscribers => write!(f, "Maximum subscribers reached"),
376            Self::Channel(err) => write!(f, "Channel error: {}", err),
377        }
378    }
379}
380
381impl std::error::Error for EngineError {}
382
383// =============================================================================
384// Tests
385// =============================================================================
386
387#[cfg(test)]
388mod tests {
389    use super::*;
390    use crate::event::{EventData, EventType};
391
392    #[test]
393    fn test_engine_creation() {
394        let engine = StreamingEngine::new();
395        assert!(engine.list_channels().is_empty());
396    }
397
398    #[test]
399    fn test_channel_management() {
400        let engine = StreamingEngine::new();
401
402        engine.create_channel("events").unwrap();
403        assert!(engine.channel_exists(&ChannelId::new("events")));
404
405        let channels = engine.list_channels();
406        assert_eq!(channels.len(), 1);
407
408        engine.delete_channel(&ChannelId::new("events")).unwrap();
409        assert!(!engine.channel_exists(&ChannelId::new("events")));
410    }
411
412    #[tokio::test]
413    async fn test_publish_subscribe() {
414        let engine = StreamingEngine::new();
415        engine.create_channel("test").unwrap();
416
417        let channel_id = ChannelId::new("test");
418        let mut receiver = engine.subscribe(&channel_id, "sub1").unwrap();
419
420        let event = Event::new(EventType::Created, "source", EventData::String("hello".to_string()));
421        engine.publish(&channel_id, event).unwrap();
422
423        let received = receiver.recv().await.unwrap();
424        assert_eq!(received.source, "source");
425    }
426
427    #[test]
428    fn test_duplicate_channel() {
429        let engine = StreamingEngine::new();
430
431        engine.create_channel("test").unwrap();
432        let result = engine.create_channel("test");
433
434        assert!(matches!(result, Err(EngineError::ChannelExists(_))));
435    }
436
437    #[test]
438    fn test_stats() {
439        let engine = StreamingEngine::new();
440        engine.create_channel("test").unwrap();
441
442        let channel_id = ChannelId::new("test");
443        engine.subscribe(&channel_id, "sub1").unwrap();
444
445        let event = Event::new(EventType::Created, "source", EventData::Null);
446        engine.publish(&channel_id, event).unwrap();
447
448        let stats = engine.stats();
449        assert_eq!(stats.events_published, 1);
450        assert_eq!(stats.active_subscriptions, 1);
451    }
452
453    #[test]
454    fn test_history() {
455        let config = EngineConfig {
456            default_channel_config: ChannelConfig {
457                persistent: true,
458                retention_count: 100,
459                ..Default::default()
460            },
461            ..Default::default()
462        };
463
464        let engine = StreamingEngine::with_config(config);
465        engine.create_channel("history").unwrap();
466
467        let channel_id = ChannelId::new("history");
468
469        for i in 0..5 {
470            let event = Event::new(EventType::Created, "test", EventData::Int(i));
471            engine.publish(&channel_id, event).unwrap();
472        }
473
474        let history = engine.get_history(&channel_id, 10).unwrap();
475        assert_eq!(history.len(), 5);
476    }
477}