1use crate::cdc::{CdcConfig, ChangeEvent};
9use crate::channel::{Channel, ChannelConfig, ChannelError, ChannelId, ChannelReceiver};
10use crate::event::{Event, EventFilter};
11use crate::subscriber::{ConsumerGroup, Subscriber, SubscriberId, Subscription};
12use std::collections::{HashMap, HashSet};
13use std::sync::{Arc, RwLock};
14
15#[derive(Debug, Clone)]
21pub struct EngineConfig {
22 pub max_channels: usize,
23 pub max_subscribers: usize,
24 pub default_channel_config: ChannelConfig,
25 pub cdc_config: CdcConfig,
26}
27
28impl Default for EngineConfig {
29 fn default() -> Self {
30 Self {
31 max_channels: 1000,
32 max_subscribers: 10000,
33 default_channel_config: ChannelConfig::default(),
34 cdc_config: CdcConfig::default(),
35 }
36 }
37}
38
39pub struct StreamingEngine {
45 config: EngineConfig,
46 channels: RwLock<HashMap<ChannelId, Channel>>,
47 subscribers: RwLock<HashMap<SubscriberId, Subscriber>>,
48 consumer_groups: Arc<RwLock<HashMap<String, ConsumerGroup>>>,
49 stats: RwLock<EngineStats>,
50}
51
52impl StreamingEngine {
53 pub fn new() -> Self {
55 Self::with_config(EngineConfig::default())
56 }
57
58 pub fn with_config(config: EngineConfig) -> Self {
60 Self {
61 config,
62 channels: RwLock::new(HashMap::new()),
63 subscribers: RwLock::new(HashMap::new()),
64 consumer_groups: Arc::new(RwLock::new(HashMap::new())),
65 stats: RwLock::new(EngineStats::default()),
66 }
67 }
68
69 pub fn create_channel(&self, id: impl Into<ChannelId>) -> Result<(), EngineError> {
75 let id = id.into();
76 let mut channels = self
77 .channels
78 .write()
79 .expect("channels RwLock poisoned in create_channel");
80
81 if channels.len() >= self.config.max_channels {
82 return Err(EngineError::TooManyChannels);
83 }
84
85 if channels.contains_key(&id) {
86 return Err(EngineError::ChannelExists(id));
87 }
88
89 let channel = Channel::with_config(id.clone(), self.config.default_channel_config.clone());
90 channels.insert(id, channel);
91
92 Ok(())
93 }
94
95 pub fn create_channel_with_config(
97 &self,
98 id: impl Into<ChannelId>,
99 config: ChannelConfig,
100 ) -> Result<(), EngineError> {
101 let id = id.into();
102 let mut channels = self
103 .channels
104 .write()
105 .expect("channels RwLock poisoned in create_channel_with_config");
106
107 if channels.len() >= self.config.max_channels {
108 return Err(EngineError::TooManyChannels);
109 }
110
111 if channels.contains_key(&id) {
112 return Err(EngineError::ChannelExists(id));
113 }
114
115 let channel = Channel::with_config(id.clone(), config);
116 channels.insert(id, channel);
117
118 Ok(())
119 }
120
121 pub fn delete_channel(&self, id: &ChannelId) -> Result<(), EngineError> {
123 let mut channels = self
124 .channels
125 .write()
126 .expect("channels RwLock poisoned in delete_channel");
127
128 if channels.remove(id).is_none() {
129 return Err(EngineError::ChannelNotFound(id.clone()));
130 }
131
132 Ok(())
133 }
134
135 pub fn list_channels(&self) -> Vec<ChannelId> {
137 let channels = self
138 .channels
139 .read()
140 .expect("channels RwLock poisoned in list_channels");
141 channels.keys().cloned().collect()
142 }
143
144 pub fn channel_exists(&self, id: &ChannelId) -> bool {
146 let channels = self
147 .channels
148 .read()
149 .expect("channels RwLock poisoned in channel_exists");
150 channels.contains_key(id)
151 }
152
153 pub fn publish(&self, channel_id: &ChannelId, event: Event) -> Result<usize, EngineError> {
159 let channels = self
160 .channels
161 .read()
162 .expect("channels RwLock poisoned in publish");
163 let channel = channels
164 .get(channel_id)
165 .ok_or_else(|| EngineError::ChannelNotFound(channel_id.clone()))?;
166
167 let receivers = channel.publish(event).map_err(EngineError::Channel)?;
168
169 drop(channels);
170
171 {
172 let mut stats = self
173 .stats
174 .write()
175 .expect("stats RwLock poisoned in publish");
176 stats.events_published += 1;
177 }
178
179 Ok(receivers)
180 }
181
182 pub fn publish_change(
184 &self,
185 channel_id: &ChannelId,
186 change: ChangeEvent,
187 ) -> Result<usize, EngineError> {
188 let event = change.to_event();
189 self.publish(channel_id, event)
190 }
191
192 pub fn publish_to_many(
194 &self,
195 channel_ids: &[ChannelId],
196 event: Event,
197 ) -> HashMap<ChannelId, Result<usize, EngineError>> {
198 let mut results = HashMap::new();
199
200 for id in channel_ids {
201 results.insert(id.clone(), self.publish(id, event.clone()));
202 }
203
204 results
205 }
206
207 pub fn subscribe(
213 &self,
214 channel_id: &ChannelId,
215 subscriber_id: impl Into<SubscriberId>,
216 ) -> Result<ChannelReceiver, EngineError> {
217 let subscriber_id = subscriber_id.into();
218 let channels = self
219 .channels
220 .read()
221 .expect("channels RwLock poisoned in subscribe");
222 let channel = channels
223 .get(channel_id)
224 .ok_or_else(|| EngineError::ChannelNotFound(channel_id.clone()))?;
225
226 let receiver = channel
227 .subscribe(subscriber_id.clone())
228 .map_err(EngineError::Channel)?;
229
230 drop(channels);
231
232 self.ensure_subscriber(&subscriber_id, channel_id);
233
234 {
235 let mut stats = self
236 .stats
237 .write()
238 .expect("stats RwLock poisoned in subscribe");
239 stats.active_subscriptions += 1;
240 }
241
242 Ok(receiver)
243 }
244
245 pub fn subscribe_with_filter(
247 &self,
248 channel_id: &ChannelId,
249 subscriber_id: impl Into<SubscriberId>,
250 filter: EventFilter,
251 ) -> Result<ChannelReceiver, EngineError> {
252 let subscriber_id = subscriber_id.into();
253 let channels = self
254 .channels
255 .read()
256 .expect("channels RwLock poisoned in subscribe_with_filter");
257 let channel = channels
258 .get(channel_id)
259 .ok_or_else(|| EngineError::ChannelNotFound(channel_id.clone()))?;
260
261 let receiver = channel
262 .subscribe_with_filter(subscriber_id.clone(), filter)
263 .map_err(EngineError::Channel)?;
264
265 drop(channels);
266
267 self.ensure_subscriber(&subscriber_id, channel_id);
268
269 {
270 let mut stats = self
271 .stats
272 .write()
273 .expect("stats RwLock poisoned in subscribe_with_filter");
274 stats.active_subscriptions += 1;
275 }
276
277 Ok(receiver)
278 }
279
280 pub fn unsubscribe(&self, channel_id: &ChannelId, subscriber_id: &SubscriberId) {
282 let channels = self
283 .channels
284 .read()
285 .expect("channels RwLock poisoned in unsubscribe");
286 if let Some(channel) = channels.get(channel_id) {
287 channel.unsubscribe(subscriber_id);
288 }
289
290 let mut stats = self
291 .stats
292 .write()
293 .expect("stats RwLock poisoned in unsubscribe");
294 stats.active_subscriptions = stats.active_subscriptions.saturating_sub(1);
295 }
296
297 fn ensure_subscriber(&self, subscriber_id: &SubscriberId, channel_id: &ChannelId) {
298 let mut subscribers = self
299 .subscribers
300 .write()
301 .expect("subscribers RwLock poisoned in ensure_subscriber");
302
303 let subscriber = subscribers
304 .entry(subscriber_id.clone())
305 .or_insert_with(|| Subscriber::new(subscriber_id.clone()));
306
307 let mut subscription = Subscription::new(subscriber_id.clone());
308 subscription.add_channel(channel_id.clone());
309 subscriber.add_subscription(subscription);
310 }
311
312 pub fn get_subscriber(&self, id: &SubscriberId) -> Option<Subscriber> {
318 let subscribers = self
319 .subscribers
320 .read()
321 .expect("subscribers RwLock poisoned in get_subscriber");
322 subscribers.get(id).cloned()
323 }
324
325 pub fn list_subscribers(&self) -> Vec<SubscriberId> {
327 let subscribers = self
328 .subscribers
329 .read()
330 .expect("subscribers RwLock poisoned in list_subscribers");
331 subscribers.keys().cloned().collect()
332 }
333
334 pub fn remove_subscriber(&self, id: &SubscriberId) {
336 let mut subscribers = self
337 .subscribers
338 .write()
339 .expect("subscribers RwLock poisoned in remove_subscriber");
340 subscribers.remove(id);
341 }
342
343 pub fn create_consumer_group(&self, group_id: impl Into<String>) -> Result<(), EngineError> {
349 let group_id = group_id.into();
350 let mut groups = self
351 .consumer_groups
352 .write()
353 .expect("consumer_groups RwLock poisoned in create_consumer_group");
354
355 if groups.contains_key(&group_id) {
356 return Err(EngineError::ConsumerGroupExists(group_id));
357 }
358
359 groups.insert(group_id.clone(), ConsumerGroup::new(group_id));
360 Ok(())
361 }
362
363 pub fn delete_consumer_group(&self, group_id: &str) -> Result<(), EngineError> {
365 let mut groups = self
366 .consumer_groups
367 .write()
368 .expect("consumer_groups RwLock poisoned in delete_consumer_group");
369
370 if groups.remove(group_id).is_none() {
371 return Err(EngineError::ConsumerGroupNotFound(group_id.to_string()));
372 }
373
374 Ok(())
375 }
376
377 pub fn join_consumer_group(
380 &self,
381 group_id: &str,
382 subscriber_id: SubscriberId,
383 channels: HashSet<String>,
384 ) -> Result<(), EngineError> {
385 let mut groups = self
386 .consumer_groups
387 .write()
388 .expect("consumer_groups RwLock poisoned in join_consumer_group");
389
390 let group = groups
391 .get_mut(group_id)
392 .ok_or_else(|| EngineError::ConsumerGroupNotFound(group_id.to_string()))?;
393
394 group.add_member(subscriber_id, channels);
395 Ok(())
396 }
397
398 pub fn leave_consumer_group(
401 &self,
402 group_id: &str,
403 subscriber_id: &SubscriberId,
404 ) -> Result<HashSet<String>, EngineError> {
405 let mut groups = self
406 .consumer_groups
407 .write()
408 .expect("consumer_groups RwLock poisoned in leave_consumer_group");
409
410 let group = groups
411 .get_mut(group_id)
412 .ok_or_else(|| EngineError::ConsumerGroupNotFound(group_id.to_string()))?;
413
414 group.remove_member(subscriber_id).ok_or_else(|| {
415 EngineError::SubscriberNotInGroup(subscriber_id.clone(), group_id.to_string())
416 })
417 }
418
419 pub fn commit_offset(
421 &self,
422 group_id: &str,
423 channel_name: impl Into<String>,
424 offset: u64,
425 ) -> Result<(), EngineError> {
426 let mut groups = self
427 .consumer_groups
428 .write()
429 .expect("consumer_groups RwLock poisoned in commit_offset");
430
431 let group = groups
432 .get_mut(group_id)
433 .ok_or_else(|| EngineError::ConsumerGroupNotFound(group_id.to_string()))?;
434
435 group.commit_offset(channel_name, offset);
436 Ok(())
437 }
438
439 pub fn get_committed_offset(
441 &self,
442 group_id: &str,
443 channel_name: &str,
444 ) -> Result<Option<u64>, EngineError> {
445 let groups = self
446 .consumer_groups
447 .read()
448 .expect("consumer_groups RwLock poisoned in get_committed_offset");
449
450 let group = groups
451 .get(group_id)
452 .ok_or_else(|| EngineError::ConsumerGroupNotFound(group_id.to_string()))?;
453
454 Ok(group.get_offset(channel_name))
455 }
456
457 pub fn list_consumer_groups(&self) -> Vec<String> {
459 let groups = self
460 .consumer_groups
461 .read()
462 .expect("consumer_groups RwLock poisoned in list_consumer_groups");
463 groups.keys().cloned().collect()
464 }
465
466 pub fn get_consumer_group(&self, group_id: &str) -> Option<ConsumerGroup> {
468 let groups = self
469 .consumer_groups
470 .read()
471 .expect("consumer_groups RwLock poisoned in get_consumer_group");
472 groups.get(group_id).cloned()
473 }
474
475 pub fn get_history(
481 &self,
482 channel_id: &ChannelId,
483 count: usize,
484 ) -> Result<Vec<Event>, EngineError> {
485 let channels = self
486 .channels
487 .read()
488 .expect("channels RwLock poisoned in get_history");
489 let channel = channels
490 .get(channel_id)
491 .ok_or_else(|| EngineError::ChannelNotFound(channel_id.clone()))?;
492
493 Ok(channel.get_history(count))
494 }
495
496 pub fn get_history_after(
498 &self,
499 channel_id: &ChannelId,
500 timestamp: u64,
501 ) -> Result<Vec<Event>, EngineError> {
502 let channels = self
503 .channels
504 .read()
505 .expect("channels RwLock poisoned in get_history_after");
506 let channel = channels
507 .get(channel_id)
508 .ok_or_else(|| EngineError::ChannelNotFound(channel_id.clone()))?;
509
510 Ok(channel.get_history_after(timestamp))
511 }
512
513 pub fn stats(&self) -> EngineStats {
519 let stats = self.stats.read().expect("stats RwLock poisoned in stats");
520 stats.clone()
521 }
522
523 pub fn reset_stats(&self) {
525 let mut stats = self
526 .stats
527 .write()
528 .expect("stats RwLock poisoned in reset_stats");
529 *stats = EngineStats::default();
530 }
531
532 pub fn channel_stats(&self, id: &ChannelId) -> Option<crate::channel::ChannelStats> {
534 let channels = self
535 .channels
536 .read()
537 .expect("channels RwLock poisoned in channel_stats");
538 channels.get(id).map(|c| c.stats())
539 }
540}
541
542impl Default for StreamingEngine {
543 fn default() -> Self {
544 Self::new()
545 }
546}
547
548#[derive(Debug, Clone, Default)]
554pub struct EngineStats {
555 pub events_published: u64,
556 pub active_subscriptions: usize,
557 pub channels_created: usize,
558}
559
560#[derive(Debug, Clone)]
566pub enum EngineError {
567 ChannelExists(ChannelId),
568 ChannelNotFound(ChannelId),
569 TooManyChannels,
570 TooManySubscribers,
571 Channel(ChannelError),
572 ConsumerGroupExists(String),
573 ConsumerGroupNotFound(String),
574 SubscriberNotInGroup(SubscriberId, String),
575}
576
577impl std::fmt::Display for EngineError {
578 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
579 match self {
580 Self::ChannelExists(id) => write!(f, "Channel already exists: {}", id),
581 Self::ChannelNotFound(id) => write!(f, "Channel not found: {}", id),
582 Self::TooManyChannels => write!(f, "Maximum channels reached"),
583 Self::TooManySubscribers => write!(f, "Maximum subscribers reached"),
584 Self::Channel(err) => write!(f, "Channel error: {}", err),
585 Self::ConsumerGroupExists(id) => write!(f, "Consumer group already exists: {}", id),
586 Self::ConsumerGroupNotFound(id) => write!(f, "Consumer group not found: {}", id),
587 Self::SubscriberNotInGroup(sub_id, group_id) => {
588 write!(
589 f,
590 "Subscriber {} is not in consumer group {}",
591 sub_id, group_id
592 )
593 }
594 }
595 }
596}
597
598impl std::error::Error for EngineError {}
599
600#[cfg(test)]
605mod tests {
606 use super::*;
607 use crate::event::{EventData, EventType};
608
609 #[test]
610 fn test_engine_creation() {
611 let engine = StreamingEngine::new();
612 assert!(engine.list_channels().is_empty());
613 }
614
615 #[test]
616 fn test_channel_management() {
617 let engine = StreamingEngine::new();
618
619 engine.create_channel("events").unwrap();
620 assert!(engine.channel_exists(&ChannelId::new("events")));
621
622 let channels = engine.list_channels();
623 assert_eq!(channels.len(), 1);
624
625 engine.delete_channel(&ChannelId::new("events")).unwrap();
626 assert!(!engine.channel_exists(&ChannelId::new("events")));
627 }
628
629 #[tokio::test]
630 async fn test_publish_subscribe() {
631 let engine = StreamingEngine::new();
632 engine.create_channel("test").unwrap();
633
634 let channel_id = ChannelId::new("test");
635 let mut receiver = engine.subscribe(&channel_id, "sub1").unwrap();
636
637 let event = Event::new(
638 EventType::Created,
639 "source",
640 EventData::String("hello".to_string()),
641 );
642 engine.publish(&channel_id, event).unwrap();
643
644 let received = receiver.recv().await.unwrap();
645 assert_eq!(received.source, "source");
646 }
647
648 #[test]
649 fn test_duplicate_channel() {
650 let engine = StreamingEngine::new();
651
652 engine.create_channel("test").unwrap();
653 let result = engine.create_channel("test");
654
655 assert!(matches!(result, Err(EngineError::ChannelExists(_))));
656 }
657
658 #[test]
659 fn test_stats() {
660 let engine = StreamingEngine::new();
661 engine.create_channel("test").unwrap();
662
663 let channel_id = ChannelId::new("test");
664 engine.subscribe(&channel_id, "sub1").unwrap();
665
666 let event = Event::new(EventType::Created, "source", EventData::Null);
667 engine.publish(&channel_id, event).unwrap();
668
669 let stats = engine.stats();
670 assert_eq!(stats.events_published, 1);
671 assert_eq!(stats.active_subscriptions, 1);
672 }
673
674 #[test]
675 fn test_history() {
676 let config = EngineConfig {
677 default_channel_config: ChannelConfig {
678 persistent: true,
679 retention_count: 100,
680 ..Default::default()
681 },
682 ..Default::default()
683 };
684
685 let engine = StreamingEngine::with_config(config);
686 engine.create_channel("history").unwrap();
687
688 let channel_id = ChannelId::new("history");
689
690 for i in 0..5 {
691 let event = Event::new(EventType::Created, "test", EventData::Int(i));
692 engine.publish(&channel_id, event).unwrap();
693 }
694
695 let history = engine.get_history(&channel_id, 10).unwrap();
696 assert_eq!(history.len(), 5);
697 }
698
699 #[test]
700 fn test_consumer_group_create_delete() {
701 let engine = StreamingEngine::new();
702
703 engine.create_consumer_group("group1").unwrap();
704 assert_eq!(engine.list_consumer_groups().len(), 1);
705
706 let result = engine.create_consumer_group("group1");
708 assert!(matches!(result, Err(EngineError::ConsumerGroupExists(_))));
709
710 engine.delete_consumer_group("group1").unwrap();
711 assert!(engine.list_consumer_groups().is_empty());
712
713 let result = engine.delete_consumer_group("group1");
715 assert!(matches!(result, Err(EngineError::ConsumerGroupNotFound(_))));
716 }
717
718 #[test]
719 fn test_consumer_group_join_leave() {
720 let engine = StreamingEngine::new();
721 engine.create_consumer_group("group1").unwrap();
722
723 let sub1 = SubscriberId::new("sub1");
724 let sub2 = SubscriberId::new("sub2");
725
726 let mut channels1 = std::collections::HashSet::new();
727 channels1.insert("events".to_string());
728
729 let mut channels2 = std::collections::HashSet::new();
730 channels2.insert("logs".to_string());
731
732 engine
733 .join_consumer_group("group1", sub1.clone(), channels1)
734 .unwrap();
735 engine
736 .join_consumer_group("group1", sub2.clone(), channels2)
737 .unwrap();
738
739 let group = engine.get_consumer_group("group1").unwrap();
740 assert_eq!(group.member_count(), 2);
741 assert!(group.is_member(&sub1));
742
743 let removed_channels = engine.leave_consumer_group("group1", &sub1).unwrap();
745 assert!(removed_channels.contains("events"));
746
747 let group = engine.get_consumer_group("group1").unwrap();
748 assert_eq!(group.member_count(), 1);
749 assert!(!group.is_member(&sub1));
750
751 let result = engine.leave_consumer_group("group1", &sub1);
753 assert!(matches!(
754 result,
755 Err(EngineError::SubscriberNotInGroup(_, _))
756 ));
757 }
758
759 #[test]
760 fn test_consumer_group_join_nonexistent() {
761 let engine = StreamingEngine::new();
762
763 let result = engine.join_consumer_group(
764 "nonexistent",
765 SubscriberId::new("sub1"),
766 std::collections::HashSet::new(),
767 );
768 assert!(matches!(result, Err(EngineError::ConsumerGroupNotFound(_))));
769 }
770
771 #[test]
772 fn test_consumer_group_offset_tracking() {
773 let engine = StreamingEngine::new();
774 engine.create_consumer_group("group1").unwrap();
775
776 let offset = engine.get_committed_offset("group1", "events").unwrap();
778 assert_eq!(offset, None);
779
780 engine.commit_offset("group1", "events", 42).unwrap();
782 let offset = engine.get_committed_offset("group1", "events").unwrap();
783 assert_eq!(offset, Some(42));
784
785 engine.commit_offset("group1", "events", 100).unwrap();
787 let offset = engine.get_committed_offset("group1", "events").unwrap();
788 assert_eq!(offset, Some(100));
789
790 engine.commit_offset("group1", "logs", 5).unwrap();
792 let offset = engine.get_committed_offset("group1", "logs").unwrap();
793 assert_eq!(offset, Some(5));
794 let offset = engine.get_committed_offset("group1", "events").unwrap();
796 assert_eq!(offset, Some(100));
797 }
798
799 #[test]
800 fn test_consumer_group_offset_nonexistent_group() {
801 let engine = StreamingEngine::new();
802
803 let result = engine.commit_offset("nonexistent", "events", 10);
804 assert!(matches!(result, Err(EngineError::ConsumerGroupNotFound(_))));
805
806 let result = engine.get_committed_offset("nonexistent", "events");
807 assert!(matches!(result, Err(EngineError::ConsumerGroupNotFound(_))));
808 }
809
810 #[test]
811 fn test_get_consumer_group() {
812 let engine = StreamingEngine::new();
813
814 assert!(engine.get_consumer_group("nonexistent").is_none());
815
816 engine.create_consumer_group("group1").unwrap();
817 let group = engine.get_consumer_group("group1");
818 assert!(group.is_some());
819 assert_eq!(group.unwrap().group_id, "group1");
820 }
821}