1use crate::cdc::{CdcConfig, ChangeEvent};
9use crate::channel::{Channel, ChannelConfig, ChannelError, ChannelId, ChannelReceiver};
10use crate::event::{Event, EventFilter};
11use crate::subscriber::{Subscriber, SubscriberId, Subscription};
12use std::collections::HashMap;
13use std::sync::RwLock;
14
15#[derive(Debug, Clone)]
21pub struct EngineConfig {
22 pub max_channels: usize,
23 pub max_subscribers: usize,
24 pub default_channel_config: ChannelConfig,
25 pub cdc_config: CdcConfig,
26}
27
28impl Default for EngineConfig {
29 fn default() -> Self {
30 Self {
31 max_channels: 1000,
32 max_subscribers: 10000,
33 default_channel_config: ChannelConfig::default(),
34 cdc_config: CdcConfig::default(),
35 }
36 }
37}
38
39pub struct StreamingEngine {
45 config: EngineConfig,
46 channels: RwLock<HashMap<ChannelId, Channel>>,
47 subscribers: RwLock<HashMap<SubscriberId, Subscriber>>,
48 stats: RwLock<EngineStats>,
49}
50
51impl StreamingEngine {
52 pub fn new() -> Self {
54 Self::with_config(EngineConfig::default())
55 }
56
57 pub fn with_config(config: EngineConfig) -> Self {
59 Self {
60 config,
61 channels: RwLock::new(HashMap::new()),
62 subscribers: RwLock::new(HashMap::new()),
63 stats: RwLock::new(EngineStats::default()),
64 }
65 }
66
67 pub fn create_channel(&self, id: impl Into<ChannelId>) -> Result<(), EngineError> {
73 let id = id.into();
74 let mut channels = self
75 .channels
76 .write()
77 .expect("channels RwLock poisoned in create_channel");
78
79 if channels.len() >= self.config.max_channels {
80 return Err(EngineError::TooManyChannels);
81 }
82
83 if channels.contains_key(&id) {
84 return Err(EngineError::ChannelExists(id));
85 }
86
87 let channel = Channel::with_config(id.clone(), self.config.default_channel_config.clone());
88 channels.insert(id, channel);
89
90 Ok(())
91 }
92
93 pub fn create_channel_with_config(
95 &self,
96 id: impl Into<ChannelId>,
97 config: ChannelConfig,
98 ) -> Result<(), EngineError> {
99 let id = id.into();
100 let mut channels = self
101 .channels
102 .write()
103 .expect("channels RwLock poisoned in create_channel_with_config");
104
105 if channels.len() >= self.config.max_channels {
106 return Err(EngineError::TooManyChannels);
107 }
108
109 if channels.contains_key(&id) {
110 return Err(EngineError::ChannelExists(id));
111 }
112
113 let channel = Channel::with_config(id.clone(), config);
114 channels.insert(id, channel);
115
116 Ok(())
117 }
118
119 pub fn delete_channel(&self, id: &ChannelId) -> Result<(), EngineError> {
121 let mut channels = self
122 .channels
123 .write()
124 .expect("channels RwLock poisoned in delete_channel");
125
126 if channels.remove(id).is_none() {
127 return Err(EngineError::ChannelNotFound(id.clone()));
128 }
129
130 Ok(())
131 }
132
133 pub fn list_channels(&self) -> Vec<ChannelId> {
135 let channels = self
136 .channels
137 .read()
138 .expect("channels RwLock poisoned in list_channels");
139 channels.keys().cloned().collect()
140 }
141
142 pub fn channel_exists(&self, id: &ChannelId) -> bool {
144 let channels = self
145 .channels
146 .read()
147 .expect("channels RwLock poisoned in channel_exists");
148 channels.contains_key(id)
149 }
150
151 pub fn publish(&self, channel_id: &ChannelId, event: Event) -> Result<usize, EngineError> {
157 let channels = self
158 .channels
159 .read()
160 .expect("channels RwLock poisoned in publish");
161 let channel = channels
162 .get(channel_id)
163 .ok_or_else(|| EngineError::ChannelNotFound(channel_id.clone()))?;
164
165 let receivers = channel.publish(event).map_err(EngineError::Channel)?;
166
167 drop(channels);
168
169 {
170 let mut stats = self
171 .stats
172 .write()
173 .expect("stats RwLock poisoned in publish");
174 stats.events_published += 1;
175 }
176
177 Ok(receivers)
178 }
179
180 pub fn publish_change(&self, channel_id: &ChannelId, change: ChangeEvent) -> Result<usize, EngineError> {
182 let event = change.to_event();
183 self.publish(channel_id, event)
184 }
185
186 pub fn publish_to_many(
188 &self,
189 channel_ids: &[ChannelId],
190 event: Event,
191 ) -> HashMap<ChannelId, Result<usize, EngineError>> {
192 let mut results = HashMap::new();
193
194 for id in channel_ids {
195 results.insert(id.clone(), self.publish(id, event.clone()));
196 }
197
198 results
199 }
200
201 pub fn subscribe(
207 &self,
208 channel_id: &ChannelId,
209 subscriber_id: impl Into<SubscriberId>,
210 ) -> Result<ChannelReceiver, EngineError> {
211 let subscriber_id = subscriber_id.into();
212 let channels = self
213 .channels
214 .read()
215 .expect("channels RwLock poisoned in subscribe");
216 let channel = channels
217 .get(channel_id)
218 .ok_or_else(|| EngineError::ChannelNotFound(channel_id.clone()))?;
219
220 let receiver = channel
221 .subscribe(subscriber_id.clone())
222 .map_err(EngineError::Channel)?;
223
224 drop(channels);
225
226 self.ensure_subscriber(&subscriber_id, channel_id);
227
228 {
229 let mut stats = self
230 .stats
231 .write()
232 .expect("stats RwLock poisoned in subscribe");
233 stats.active_subscriptions += 1;
234 }
235
236 Ok(receiver)
237 }
238
239 pub fn subscribe_with_filter(
241 &self,
242 channel_id: &ChannelId,
243 subscriber_id: impl Into<SubscriberId>,
244 filter: EventFilter,
245 ) -> Result<ChannelReceiver, EngineError> {
246 let subscriber_id = subscriber_id.into();
247 let channels = self
248 .channels
249 .read()
250 .expect("channels RwLock poisoned in subscribe_with_filter");
251 let channel = channels
252 .get(channel_id)
253 .ok_or_else(|| EngineError::ChannelNotFound(channel_id.clone()))?;
254
255 let receiver = channel
256 .subscribe_with_filter(subscriber_id.clone(), filter)
257 .map_err(EngineError::Channel)?;
258
259 drop(channels);
260
261 self.ensure_subscriber(&subscriber_id, channel_id);
262
263 {
264 let mut stats = self
265 .stats
266 .write()
267 .expect("stats RwLock poisoned in subscribe_with_filter");
268 stats.active_subscriptions += 1;
269 }
270
271 Ok(receiver)
272 }
273
274 pub fn unsubscribe(&self, channel_id: &ChannelId, subscriber_id: &SubscriberId) {
276 let channels = self
277 .channels
278 .read()
279 .expect("channels RwLock poisoned in unsubscribe");
280 if let Some(channel) = channels.get(channel_id) {
281 channel.unsubscribe(subscriber_id);
282 }
283
284 let mut stats = self
285 .stats
286 .write()
287 .expect("stats RwLock poisoned in unsubscribe");
288 stats.active_subscriptions = stats.active_subscriptions.saturating_sub(1);
289 }
290
291 fn ensure_subscriber(&self, subscriber_id: &SubscriberId, channel_id: &ChannelId) {
292 let mut subscribers = self
293 .subscribers
294 .write()
295 .expect("subscribers RwLock poisoned in ensure_subscriber");
296
297 let subscriber = subscribers
298 .entry(subscriber_id.clone())
299 .or_insert_with(|| Subscriber::new(subscriber_id.clone()));
300
301 let mut subscription = Subscription::new(subscriber_id.clone());
302 subscription.add_channel(channel_id.clone());
303 subscriber.add_subscription(subscription);
304 }
305
306 pub fn get_subscriber(&self, id: &SubscriberId) -> Option<Subscriber> {
312 let subscribers = self
313 .subscribers
314 .read()
315 .expect("subscribers RwLock poisoned in get_subscriber");
316 subscribers.get(id).cloned()
317 }
318
319 pub fn list_subscribers(&self) -> Vec<SubscriberId> {
321 let subscribers = self
322 .subscribers
323 .read()
324 .expect("subscribers RwLock poisoned in list_subscribers");
325 subscribers.keys().cloned().collect()
326 }
327
328 pub fn remove_subscriber(&self, id: &SubscriberId) {
330 let mut subscribers = self
331 .subscribers
332 .write()
333 .expect("subscribers RwLock poisoned in remove_subscriber");
334 subscribers.remove(id);
335 }
336
337 pub fn get_history(&self, channel_id: &ChannelId, count: usize) -> Result<Vec<Event>, EngineError> {
343 let channels = self
344 .channels
345 .read()
346 .expect("channels RwLock poisoned in get_history");
347 let channel = channels
348 .get(channel_id)
349 .ok_or_else(|| EngineError::ChannelNotFound(channel_id.clone()))?;
350
351 Ok(channel.get_history(count))
352 }
353
354 pub fn get_history_after(
356 &self,
357 channel_id: &ChannelId,
358 timestamp: u64,
359 ) -> Result<Vec<Event>, EngineError> {
360 let channels = self
361 .channels
362 .read()
363 .expect("channels RwLock poisoned in get_history_after");
364 let channel = channels
365 .get(channel_id)
366 .ok_or_else(|| EngineError::ChannelNotFound(channel_id.clone()))?;
367
368 Ok(channel.get_history_after(timestamp))
369 }
370
371 pub fn stats(&self) -> EngineStats {
377 let stats = self
378 .stats
379 .read()
380 .expect("stats RwLock poisoned in stats");
381 stats.clone()
382 }
383
384 pub fn reset_stats(&self) {
386 let mut stats = self
387 .stats
388 .write()
389 .expect("stats RwLock poisoned in reset_stats");
390 *stats = EngineStats::default();
391 }
392
393 pub fn channel_stats(&self, id: &ChannelId) -> Option<crate::channel::ChannelStats> {
395 let channels = self
396 .channels
397 .read()
398 .expect("channels RwLock poisoned in channel_stats");
399 channels.get(id).map(|c| c.stats())
400 }
401}
402
403impl Default for StreamingEngine {
404 fn default() -> Self {
405 Self::new()
406 }
407}
408
409#[derive(Debug, Clone, Default)]
415pub struct EngineStats {
416 pub events_published: u64,
417 pub active_subscriptions: usize,
418 pub channels_created: usize,
419}
420
421#[derive(Debug, Clone)]
427pub enum EngineError {
428 ChannelExists(ChannelId),
429 ChannelNotFound(ChannelId),
430 TooManyChannels,
431 TooManySubscribers,
432 Channel(ChannelError),
433}
434
435impl std::fmt::Display for EngineError {
436 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
437 match self {
438 Self::ChannelExists(id) => write!(f, "Channel already exists: {}", id),
439 Self::ChannelNotFound(id) => write!(f, "Channel not found: {}", id),
440 Self::TooManyChannels => write!(f, "Maximum channels reached"),
441 Self::TooManySubscribers => write!(f, "Maximum subscribers reached"),
442 Self::Channel(err) => write!(f, "Channel error: {}", err),
443 }
444 }
445}
446
447impl std::error::Error for EngineError {}
448
449#[cfg(test)]
454mod tests {
455 use super::*;
456 use crate::event::{EventData, EventType};
457
458 #[test]
459 fn test_engine_creation() {
460 let engine = StreamingEngine::new();
461 assert!(engine.list_channels().is_empty());
462 }
463
464 #[test]
465 fn test_channel_management() {
466 let engine = StreamingEngine::new();
467
468 engine.create_channel("events").unwrap();
469 assert!(engine.channel_exists(&ChannelId::new("events")));
470
471 let channels = engine.list_channels();
472 assert_eq!(channels.len(), 1);
473
474 engine.delete_channel(&ChannelId::new("events")).unwrap();
475 assert!(!engine.channel_exists(&ChannelId::new("events")));
476 }
477
478 #[tokio::test]
479 async fn test_publish_subscribe() {
480 let engine = StreamingEngine::new();
481 engine.create_channel("test").unwrap();
482
483 let channel_id = ChannelId::new("test");
484 let mut receiver = engine.subscribe(&channel_id, "sub1").unwrap();
485
486 let event = Event::new(EventType::Created, "source", EventData::String("hello".to_string()));
487 engine.publish(&channel_id, event).unwrap();
488
489 let received = receiver.recv().await.unwrap();
490 assert_eq!(received.source, "source");
491 }
492
493 #[test]
494 fn test_duplicate_channel() {
495 let engine = StreamingEngine::new();
496
497 engine.create_channel("test").unwrap();
498 let result = engine.create_channel("test");
499
500 assert!(matches!(result, Err(EngineError::ChannelExists(_))));
501 }
502
503 #[test]
504 fn test_stats() {
505 let engine = StreamingEngine::new();
506 engine.create_channel("test").unwrap();
507
508 let channel_id = ChannelId::new("test");
509 engine.subscribe(&channel_id, "sub1").unwrap();
510
511 let event = Event::new(EventType::Created, "source", EventData::Null);
512 engine.publish(&channel_id, event).unwrap();
513
514 let stats = engine.stats();
515 assert_eq!(stats.events_published, 1);
516 assert_eq!(stats.active_subscriptions, 1);
517 }
518
519 #[test]
520 fn test_history() {
521 let config = EngineConfig {
522 default_channel_config: ChannelConfig {
523 persistent: true,
524 retention_count: 100,
525 ..Default::default()
526 },
527 ..Default::default()
528 };
529
530 let engine = StreamingEngine::with_config(config);
531 engine.create_channel("history").unwrap();
532
533 let channel_id = ChannelId::new("history");
534
535 for i in 0..5 {
536 let event = Event::new(EventType::Created, "test", EventData::Int(i));
537 engine.publish(&channel_id, event).unwrap();
538 }
539
540 let history = engine.get_history(&channel_id, 10).unwrap();
541 assert_eq!(history.len(), 5);
542 }
543}