Skip to main content

aegis_streaming/
engine.rs

1//! Aegis Streaming Engine
2//!
3//! Core engine for real-time event streaming.
4//!
5//! @version 0.1.0
6//! @author AutomataNexus Development Team
7
8use crate::cdc::{CdcConfig, ChangeEvent};
9use crate::channel::{Channel, ChannelConfig, ChannelError, ChannelId, ChannelReceiver};
10use crate::event::{Event, EventFilter};
11use crate::subscriber::{Subscriber, SubscriberId, Subscription};
12use std::collections::HashMap;
13use std::sync::RwLock;
14
15// =============================================================================
16// Engine Configuration
17// =============================================================================
18
19/// Configuration for the streaming engine.
20#[derive(Debug, Clone)]
21pub struct EngineConfig {
22    pub max_channels: usize,
23    pub max_subscribers: usize,
24    pub default_channel_config: ChannelConfig,
25    pub cdc_config: CdcConfig,
26}
27
28impl Default for EngineConfig {
29    fn default() -> Self {
30        Self {
31            max_channels: 1000,
32            max_subscribers: 10000,
33            default_channel_config: ChannelConfig::default(),
34            cdc_config: CdcConfig::default(),
35        }
36    }
37}
38
39// =============================================================================
40// Streaming Engine
41// =============================================================================
42
43/// The main streaming engine for pub/sub and CDC.
44pub struct StreamingEngine {
45    config: EngineConfig,
46    channels: RwLock<HashMap<ChannelId, Channel>>,
47    subscribers: RwLock<HashMap<SubscriberId, Subscriber>>,
48    stats: RwLock<EngineStats>,
49}
50
51impl StreamingEngine {
52    /// Create a new streaming engine.
53    pub fn new() -> Self {
54        Self::with_config(EngineConfig::default())
55    }
56
57    /// Create an engine with custom configuration.
58    pub fn with_config(config: EngineConfig) -> Self {
59        Self {
60            config,
61            channels: RwLock::new(HashMap::new()),
62            subscribers: RwLock::new(HashMap::new()),
63            stats: RwLock::new(EngineStats::default()),
64        }
65    }
66
67    // -------------------------------------------------------------------------
68    // Channel Management
69    // -------------------------------------------------------------------------
70
71    /// Create a new channel.
72    pub fn create_channel(&self, id: impl Into<ChannelId>) -> Result<(), EngineError> {
73        let id = id.into();
74        let mut channels = self
75            .channels
76            .write()
77            .expect("channels RwLock poisoned in create_channel");
78
79        if channels.len() >= self.config.max_channels {
80            return Err(EngineError::TooManyChannels);
81        }
82
83        if channels.contains_key(&id) {
84            return Err(EngineError::ChannelExists(id));
85        }
86
87        let channel = Channel::with_config(id.clone(), self.config.default_channel_config.clone());
88        channels.insert(id, channel);
89
90        Ok(())
91    }
92
93    /// Create a channel with custom configuration.
94    pub fn create_channel_with_config(
95        &self,
96        id: impl Into<ChannelId>,
97        config: ChannelConfig,
98    ) -> Result<(), EngineError> {
99        let id = id.into();
100        let mut channels = self
101            .channels
102            .write()
103            .expect("channels RwLock poisoned in create_channel_with_config");
104
105        if channels.len() >= self.config.max_channels {
106            return Err(EngineError::TooManyChannels);
107        }
108
109        if channels.contains_key(&id) {
110            return Err(EngineError::ChannelExists(id));
111        }
112
113        let channel = Channel::with_config(id.clone(), config);
114        channels.insert(id, channel);
115
116        Ok(())
117    }
118
119    /// Delete a channel.
120    pub fn delete_channel(&self, id: &ChannelId) -> Result<(), EngineError> {
121        let mut channels = self
122            .channels
123            .write()
124            .expect("channels RwLock poisoned in delete_channel");
125
126        if channels.remove(id).is_none() {
127            return Err(EngineError::ChannelNotFound(id.clone()));
128        }
129
130        Ok(())
131    }
132
133    /// List all channels.
134    pub fn list_channels(&self) -> Vec<ChannelId> {
135        let channels = self
136            .channels
137            .read()
138            .expect("channels RwLock poisoned in list_channels");
139        channels.keys().cloned().collect()
140    }
141
142    /// Check if a channel exists.
143    pub fn channel_exists(&self, id: &ChannelId) -> bool {
144        let channels = self
145            .channels
146            .read()
147            .expect("channels RwLock poisoned in channel_exists");
148        channels.contains_key(id)
149    }
150
151    // -------------------------------------------------------------------------
152    // Publishing
153    // -------------------------------------------------------------------------
154
155    /// Publish an event to a channel.
156    pub fn publish(&self, channel_id: &ChannelId, event: Event) -> Result<usize, EngineError> {
157        let channels = self
158            .channels
159            .read()
160            .expect("channels RwLock poisoned in publish");
161        let channel = channels
162            .get(channel_id)
163            .ok_or_else(|| EngineError::ChannelNotFound(channel_id.clone()))?;
164
165        let receivers = channel.publish(event).map_err(EngineError::Channel)?;
166
167        drop(channels);
168
169        {
170            let mut stats = self
171                .stats
172                .write()
173                .expect("stats RwLock poisoned in publish");
174            stats.events_published += 1;
175        }
176
177        Ok(receivers)
178    }
179
180    /// Publish a CDC change event.
181    pub fn publish_change(&self, channel_id: &ChannelId, change: ChangeEvent) -> Result<usize, EngineError> {
182        let event = change.to_event();
183        self.publish(channel_id, event)
184    }
185
186    /// Publish to multiple channels.
187    pub fn publish_to_many(
188        &self,
189        channel_ids: &[ChannelId],
190        event: Event,
191    ) -> HashMap<ChannelId, Result<usize, EngineError>> {
192        let mut results = HashMap::new();
193
194        for id in channel_ids {
195            results.insert(id.clone(), self.publish(id, event.clone()));
196        }
197
198        results
199    }
200
201    // -------------------------------------------------------------------------
202    // Subscribing
203    // -------------------------------------------------------------------------
204
205    /// Subscribe to a channel.
206    pub fn subscribe(
207        &self,
208        channel_id: &ChannelId,
209        subscriber_id: impl Into<SubscriberId>,
210    ) -> Result<ChannelReceiver, EngineError> {
211        let subscriber_id = subscriber_id.into();
212        let channels = self
213            .channels
214            .read()
215            .expect("channels RwLock poisoned in subscribe");
216        let channel = channels
217            .get(channel_id)
218            .ok_or_else(|| EngineError::ChannelNotFound(channel_id.clone()))?;
219
220        let receiver = channel
221            .subscribe(subscriber_id.clone())
222            .map_err(EngineError::Channel)?;
223
224        drop(channels);
225
226        self.ensure_subscriber(&subscriber_id, channel_id);
227
228        {
229            let mut stats = self
230                .stats
231                .write()
232                .expect("stats RwLock poisoned in subscribe");
233            stats.active_subscriptions += 1;
234        }
235
236        Ok(receiver)
237    }
238
239    /// Subscribe with a filter.
240    pub fn subscribe_with_filter(
241        &self,
242        channel_id: &ChannelId,
243        subscriber_id: impl Into<SubscriberId>,
244        filter: EventFilter,
245    ) -> Result<ChannelReceiver, EngineError> {
246        let subscriber_id = subscriber_id.into();
247        let channels = self
248            .channels
249            .read()
250            .expect("channels RwLock poisoned in subscribe_with_filter");
251        let channel = channels
252            .get(channel_id)
253            .ok_or_else(|| EngineError::ChannelNotFound(channel_id.clone()))?;
254
255        let receiver = channel
256            .subscribe_with_filter(subscriber_id.clone(), filter)
257            .map_err(EngineError::Channel)?;
258
259        drop(channels);
260
261        self.ensure_subscriber(&subscriber_id, channel_id);
262
263        {
264            let mut stats = self
265                .stats
266                .write()
267                .expect("stats RwLock poisoned in subscribe_with_filter");
268            stats.active_subscriptions += 1;
269        }
270
271        Ok(receiver)
272    }
273
274    /// Unsubscribe from a channel.
275    pub fn unsubscribe(&self, channel_id: &ChannelId, subscriber_id: &SubscriberId) {
276        let channels = self
277            .channels
278            .read()
279            .expect("channels RwLock poisoned in unsubscribe");
280        if let Some(channel) = channels.get(channel_id) {
281            channel.unsubscribe(subscriber_id);
282        }
283
284        let mut stats = self
285            .stats
286            .write()
287            .expect("stats RwLock poisoned in unsubscribe");
288        stats.active_subscriptions = stats.active_subscriptions.saturating_sub(1);
289    }
290
291    fn ensure_subscriber(&self, subscriber_id: &SubscriberId, channel_id: &ChannelId) {
292        let mut subscribers = self
293            .subscribers
294            .write()
295            .expect("subscribers RwLock poisoned in ensure_subscriber");
296
297        let subscriber = subscribers
298            .entry(subscriber_id.clone())
299            .or_insert_with(|| Subscriber::new(subscriber_id.clone()));
300
301        let mut subscription = Subscription::new(subscriber_id.clone());
302        subscription.add_channel(channel_id.clone());
303        subscriber.add_subscription(subscription);
304    }
305
306    // -------------------------------------------------------------------------
307    // Subscriber Management
308    // -------------------------------------------------------------------------
309
310    /// Get a subscriber.
311    pub fn get_subscriber(&self, id: &SubscriberId) -> Option<Subscriber> {
312        let subscribers = self
313            .subscribers
314            .read()
315            .expect("subscribers RwLock poisoned in get_subscriber");
316        subscribers.get(id).cloned()
317    }
318
319    /// List all subscribers.
320    pub fn list_subscribers(&self) -> Vec<SubscriberId> {
321        let subscribers = self
322            .subscribers
323            .read()
324            .expect("subscribers RwLock poisoned in list_subscribers");
325        subscribers.keys().cloned().collect()
326    }
327
328    /// Remove a subscriber.
329    pub fn remove_subscriber(&self, id: &SubscriberId) {
330        let mut subscribers = self
331            .subscribers
332            .write()
333            .expect("subscribers RwLock poisoned in remove_subscriber");
334        subscribers.remove(id);
335    }
336
337    // -------------------------------------------------------------------------
338    // History and Replay
339    // -------------------------------------------------------------------------
340
341    /// Get recent events from a channel.
342    pub fn get_history(&self, channel_id: &ChannelId, count: usize) -> Result<Vec<Event>, EngineError> {
343        let channels = self
344            .channels
345            .read()
346            .expect("channels RwLock poisoned in get_history");
347        let channel = channels
348            .get(channel_id)
349            .ok_or_else(|| EngineError::ChannelNotFound(channel_id.clone()))?;
350
351        Ok(channel.get_history(count))
352    }
353
354    /// Get events after a timestamp.
355    pub fn get_history_after(
356        &self,
357        channel_id: &ChannelId,
358        timestamp: u64,
359    ) -> Result<Vec<Event>, EngineError> {
360        let channels = self
361            .channels
362            .read()
363            .expect("channels RwLock poisoned in get_history_after");
364        let channel = channels
365            .get(channel_id)
366            .ok_or_else(|| EngineError::ChannelNotFound(channel_id.clone()))?;
367
368        Ok(channel.get_history_after(timestamp))
369    }
370
371    // -------------------------------------------------------------------------
372    // Statistics
373    // -------------------------------------------------------------------------
374
375    /// Get engine statistics.
376    pub fn stats(&self) -> EngineStats {
377        let stats = self
378            .stats
379            .read()
380            .expect("stats RwLock poisoned in stats");
381        stats.clone()
382    }
383
384    /// Reset statistics.
385    pub fn reset_stats(&self) {
386        let mut stats = self
387            .stats
388            .write()
389            .expect("stats RwLock poisoned in reset_stats");
390        *stats = EngineStats::default();
391    }
392
393    /// Get channel statistics.
394    pub fn channel_stats(&self, id: &ChannelId) -> Option<crate::channel::ChannelStats> {
395        let channels = self
396            .channels
397            .read()
398            .expect("channels RwLock poisoned in channel_stats");
399        channels.get(id).map(|c| c.stats())
400    }
401}
402
403impl Default for StreamingEngine {
404    fn default() -> Self {
405        Self::new()
406    }
407}
408
409// =============================================================================
410// Engine Statistics
411// =============================================================================
412
413/// Statistics for the streaming engine.
414#[derive(Debug, Clone, Default)]
415pub struct EngineStats {
416    pub events_published: u64,
417    pub active_subscriptions: usize,
418    pub channels_created: usize,
419}
420
421// =============================================================================
422// Engine Error
423// =============================================================================
424
425/// Errors that can occur in the streaming engine.
426#[derive(Debug, Clone)]
427pub enum EngineError {
428    ChannelExists(ChannelId),
429    ChannelNotFound(ChannelId),
430    TooManyChannels,
431    TooManySubscribers,
432    Channel(ChannelError),
433}
434
435impl std::fmt::Display for EngineError {
436    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
437        match self {
438            Self::ChannelExists(id) => write!(f, "Channel already exists: {}", id),
439            Self::ChannelNotFound(id) => write!(f, "Channel not found: {}", id),
440            Self::TooManyChannels => write!(f, "Maximum channels reached"),
441            Self::TooManySubscribers => write!(f, "Maximum subscribers reached"),
442            Self::Channel(err) => write!(f, "Channel error: {}", err),
443        }
444    }
445}
446
447impl std::error::Error for EngineError {}
448
449// =============================================================================
450// Tests
451// =============================================================================
452
453#[cfg(test)]
454mod tests {
455    use super::*;
456    use crate::event::{EventData, EventType};
457
458    #[test]
459    fn test_engine_creation() {
460        let engine = StreamingEngine::new();
461        assert!(engine.list_channels().is_empty());
462    }
463
464    #[test]
465    fn test_channel_management() {
466        let engine = StreamingEngine::new();
467
468        engine.create_channel("events").unwrap();
469        assert!(engine.channel_exists(&ChannelId::new("events")));
470
471        let channels = engine.list_channels();
472        assert_eq!(channels.len(), 1);
473
474        engine.delete_channel(&ChannelId::new("events")).unwrap();
475        assert!(!engine.channel_exists(&ChannelId::new("events")));
476    }
477
478    #[tokio::test]
479    async fn test_publish_subscribe() {
480        let engine = StreamingEngine::new();
481        engine.create_channel("test").unwrap();
482
483        let channel_id = ChannelId::new("test");
484        let mut receiver = engine.subscribe(&channel_id, "sub1").unwrap();
485
486        let event = Event::new(EventType::Created, "source", EventData::String("hello".to_string()));
487        engine.publish(&channel_id, event).unwrap();
488
489        let received = receiver.recv().await.unwrap();
490        assert_eq!(received.source, "source");
491    }
492
493    #[test]
494    fn test_duplicate_channel() {
495        let engine = StreamingEngine::new();
496
497        engine.create_channel("test").unwrap();
498        let result = engine.create_channel("test");
499
500        assert!(matches!(result, Err(EngineError::ChannelExists(_))));
501    }
502
503    #[test]
504    fn test_stats() {
505        let engine = StreamingEngine::new();
506        engine.create_channel("test").unwrap();
507
508        let channel_id = ChannelId::new("test");
509        engine.subscribe(&channel_id, "sub1").unwrap();
510
511        let event = Event::new(EventType::Created, "source", EventData::Null);
512        engine.publish(&channel_id, event).unwrap();
513
514        let stats = engine.stats();
515        assert_eq!(stats.events_published, 1);
516        assert_eq!(stats.active_subscriptions, 1);
517    }
518
519    #[test]
520    fn test_history() {
521        let config = EngineConfig {
522            default_channel_config: ChannelConfig {
523                persistent: true,
524                retention_count: 100,
525                ..Default::default()
526            },
527            ..Default::default()
528        };
529
530        let engine = StreamingEngine::with_config(config);
531        engine.create_channel("history").unwrap();
532
533        let channel_id = ChannelId::new("history");
534
535        for i in 0..5 {
536            let event = Event::new(EventType::Created, "test", EventData::Int(i));
537            engine.publish(&channel_id, event).unwrap();
538        }
539
540        let history = engine.get_history(&channel_id, 10).unwrap();
541        assert_eq!(history.len(), 5);
542    }
543}