Skip to main content

aegis_streaming/
engine.rs

1//! Aegis Streaming Engine
2//!
3//! Core engine for real-time event streaming.
4//!
5//! @version 0.1.0
6//! @author AutomataNexus Development Team
7
8use crate::cdc::{CdcConfig, ChangeEvent};
9use crate::channel::{Channel, ChannelConfig, ChannelError, ChannelId, ChannelReceiver};
10use crate::event::{Event, EventFilter};
11use crate::subscriber::{ConsumerGroup, Subscriber, SubscriberId, Subscription};
12use std::collections::{HashMap, HashSet};
13use std::sync::{Arc, RwLock};
14
15// =============================================================================
16// Engine Configuration
17// =============================================================================
18
19/// Configuration for the streaming engine.
20#[derive(Debug, Clone)]
21pub struct EngineConfig {
22    pub max_channels: usize,
23    pub max_subscribers: usize,
24    pub default_channel_config: ChannelConfig,
25    pub cdc_config: CdcConfig,
26}
27
28impl Default for EngineConfig {
29    fn default() -> Self {
30        Self {
31            max_channels: 1000,
32            max_subscribers: 10000,
33            default_channel_config: ChannelConfig::default(),
34            cdc_config: CdcConfig::default(),
35        }
36    }
37}
38
39// =============================================================================
40// Streaming Engine
41// =============================================================================
42
43/// The main streaming engine for pub/sub and CDC.
44pub struct StreamingEngine {
45    config: EngineConfig,
46    channels: RwLock<HashMap<ChannelId, Channel>>,
47    subscribers: RwLock<HashMap<SubscriberId, Subscriber>>,
48    consumer_groups: Arc<RwLock<HashMap<String, ConsumerGroup>>>,
49    stats: RwLock<EngineStats>,
50}
51
52impl StreamingEngine {
53    /// Create a new streaming engine.
54    pub fn new() -> Self {
55        Self::with_config(EngineConfig::default())
56    }
57
58    /// Create an engine with custom configuration.
59    pub fn with_config(config: EngineConfig) -> Self {
60        Self {
61            config,
62            channels: RwLock::new(HashMap::new()),
63            subscribers: RwLock::new(HashMap::new()),
64            consumer_groups: Arc::new(RwLock::new(HashMap::new())),
65            stats: RwLock::new(EngineStats::default()),
66        }
67    }
68
69    // -------------------------------------------------------------------------
70    // Channel Management
71    // -------------------------------------------------------------------------
72
73    /// Create a new channel.
74    pub fn create_channel(&self, id: impl Into<ChannelId>) -> Result<(), EngineError> {
75        let id = id.into();
76        let mut channels = self
77            .channels
78            .write()
79            .expect("channels RwLock poisoned in create_channel");
80
81        if channels.len() >= self.config.max_channels {
82            return Err(EngineError::TooManyChannels);
83        }
84
85        if channels.contains_key(&id) {
86            return Err(EngineError::ChannelExists(id));
87        }
88
89        let channel = Channel::with_config(id.clone(), self.config.default_channel_config.clone());
90        channels.insert(id, channel);
91
92        Ok(())
93    }
94
95    /// Create a channel with custom configuration.
96    pub fn create_channel_with_config(
97        &self,
98        id: impl Into<ChannelId>,
99        config: ChannelConfig,
100    ) -> Result<(), EngineError> {
101        let id = id.into();
102        let mut channels = self
103            .channels
104            .write()
105            .expect("channels RwLock poisoned in create_channel_with_config");
106
107        if channels.len() >= self.config.max_channels {
108            return Err(EngineError::TooManyChannels);
109        }
110
111        if channels.contains_key(&id) {
112            return Err(EngineError::ChannelExists(id));
113        }
114
115        let channel = Channel::with_config(id.clone(), config);
116        channels.insert(id, channel);
117
118        Ok(())
119    }
120
121    /// Delete a channel.
122    pub fn delete_channel(&self, id: &ChannelId) -> Result<(), EngineError> {
123        let mut channels = self
124            .channels
125            .write()
126            .expect("channels RwLock poisoned in delete_channel");
127
128        if channels.remove(id).is_none() {
129            return Err(EngineError::ChannelNotFound(id.clone()));
130        }
131
132        Ok(())
133    }
134
135    /// List all channels.
136    pub fn list_channels(&self) -> Vec<ChannelId> {
137        let channels = self
138            .channels
139            .read()
140            .expect("channels RwLock poisoned in list_channels");
141        channels.keys().cloned().collect()
142    }
143
144    /// Check if a channel exists.
145    pub fn channel_exists(&self, id: &ChannelId) -> bool {
146        let channels = self
147            .channels
148            .read()
149            .expect("channels RwLock poisoned in channel_exists");
150        channels.contains_key(id)
151    }
152
153    // -------------------------------------------------------------------------
154    // Publishing
155    // -------------------------------------------------------------------------
156
157    /// Publish an event to a channel.
158    pub fn publish(&self, channel_id: &ChannelId, event: Event) -> Result<usize, EngineError> {
159        let channels = self
160            .channels
161            .read()
162            .expect("channels RwLock poisoned in publish");
163        let channel = channels
164            .get(channel_id)
165            .ok_or_else(|| EngineError::ChannelNotFound(channel_id.clone()))?;
166
167        let receivers = channel.publish(event).map_err(EngineError::Channel)?;
168
169        drop(channels);
170
171        {
172            let mut stats = self
173                .stats
174                .write()
175                .expect("stats RwLock poisoned in publish");
176            stats.events_published += 1;
177        }
178
179        Ok(receivers)
180    }
181
182    /// Publish a CDC change event.
183    pub fn publish_change(
184        &self,
185        channel_id: &ChannelId,
186        change: ChangeEvent,
187    ) -> Result<usize, EngineError> {
188        let event = change.to_event();
189        self.publish(channel_id, event)
190    }
191
192    /// Publish to multiple channels.
193    pub fn publish_to_many(
194        &self,
195        channel_ids: &[ChannelId],
196        event: Event,
197    ) -> HashMap<ChannelId, Result<usize, EngineError>> {
198        let mut results = HashMap::new();
199
200        for id in channel_ids {
201            results.insert(id.clone(), self.publish(id, event.clone()));
202        }
203
204        results
205    }
206
207    // -------------------------------------------------------------------------
208    // Subscribing
209    // -------------------------------------------------------------------------
210
211    /// Subscribe to a channel.
212    pub fn subscribe(
213        &self,
214        channel_id: &ChannelId,
215        subscriber_id: impl Into<SubscriberId>,
216    ) -> Result<ChannelReceiver, EngineError> {
217        let subscriber_id = subscriber_id.into();
218        let channels = self
219            .channels
220            .read()
221            .expect("channels RwLock poisoned in subscribe");
222        let channel = channels
223            .get(channel_id)
224            .ok_or_else(|| EngineError::ChannelNotFound(channel_id.clone()))?;
225
226        let receiver = channel
227            .subscribe(subscriber_id.clone())
228            .map_err(EngineError::Channel)?;
229
230        drop(channels);
231
232        self.ensure_subscriber(&subscriber_id, channel_id);
233
234        {
235            let mut stats = self
236                .stats
237                .write()
238                .expect("stats RwLock poisoned in subscribe");
239            stats.active_subscriptions += 1;
240        }
241
242        Ok(receiver)
243    }
244
245    /// Subscribe with a filter.
246    pub fn subscribe_with_filter(
247        &self,
248        channel_id: &ChannelId,
249        subscriber_id: impl Into<SubscriberId>,
250        filter: EventFilter,
251    ) -> Result<ChannelReceiver, EngineError> {
252        let subscriber_id = subscriber_id.into();
253        let channels = self
254            .channels
255            .read()
256            .expect("channels RwLock poisoned in subscribe_with_filter");
257        let channel = channels
258            .get(channel_id)
259            .ok_or_else(|| EngineError::ChannelNotFound(channel_id.clone()))?;
260
261        let receiver = channel
262            .subscribe_with_filter(subscriber_id.clone(), filter)
263            .map_err(EngineError::Channel)?;
264
265        drop(channels);
266
267        self.ensure_subscriber(&subscriber_id, channel_id);
268
269        {
270            let mut stats = self
271                .stats
272                .write()
273                .expect("stats RwLock poisoned in subscribe_with_filter");
274            stats.active_subscriptions += 1;
275        }
276
277        Ok(receiver)
278    }
279
280    /// Unsubscribe from a channel.
281    pub fn unsubscribe(&self, channel_id: &ChannelId, subscriber_id: &SubscriberId) {
282        let channels = self
283            .channels
284            .read()
285            .expect("channels RwLock poisoned in unsubscribe");
286        if let Some(channel) = channels.get(channel_id) {
287            channel.unsubscribe(subscriber_id);
288        }
289
290        let mut stats = self
291            .stats
292            .write()
293            .expect("stats RwLock poisoned in unsubscribe");
294        stats.active_subscriptions = stats.active_subscriptions.saturating_sub(1);
295    }
296
297    fn ensure_subscriber(&self, subscriber_id: &SubscriberId, channel_id: &ChannelId) {
298        let mut subscribers = self
299            .subscribers
300            .write()
301            .expect("subscribers RwLock poisoned in ensure_subscriber");
302
303        let subscriber = subscribers
304            .entry(subscriber_id.clone())
305            .or_insert_with(|| Subscriber::new(subscriber_id.clone()));
306
307        let mut subscription = Subscription::new(subscriber_id.clone());
308        subscription.add_channel(channel_id.clone());
309        subscriber.add_subscription(subscription);
310    }
311
312    // -------------------------------------------------------------------------
313    // Subscriber Management
314    // -------------------------------------------------------------------------
315
316    /// Get a subscriber.
317    pub fn get_subscriber(&self, id: &SubscriberId) -> Option<Subscriber> {
318        let subscribers = self
319            .subscribers
320            .read()
321            .expect("subscribers RwLock poisoned in get_subscriber");
322        subscribers.get(id).cloned()
323    }
324
325    /// List all subscribers.
326    pub fn list_subscribers(&self) -> Vec<SubscriberId> {
327        let subscribers = self
328            .subscribers
329            .read()
330            .expect("subscribers RwLock poisoned in list_subscribers");
331        subscribers.keys().cloned().collect()
332    }
333
334    /// Remove a subscriber.
335    pub fn remove_subscriber(&self, id: &SubscriberId) {
336        let mut subscribers = self
337            .subscribers
338            .write()
339            .expect("subscribers RwLock poisoned in remove_subscriber");
340        subscribers.remove(id);
341    }
342
343    // -------------------------------------------------------------------------
344    // Consumer Group Management
345    // -------------------------------------------------------------------------
346
347    /// Create a new consumer group with the given ID.
348    pub fn create_consumer_group(&self, group_id: impl Into<String>) -> Result<(), EngineError> {
349        let group_id = group_id.into();
350        let mut groups = self
351            .consumer_groups
352            .write()
353            .expect("consumer_groups RwLock poisoned in create_consumer_group");
354
355        if groups.contains_key(&group_id) {
356            return Err(EngineError::ConsumerGroupExists(group_id));
357        }
358
359        groups.insert(group_id.clone(), ConsumerGroup::new(group_id));
360        Ok(())
361    }
362
363    /// Delete a consumer group.
364    pub fn delete_consumer_group(&self, group_id: &str) -> Result<(), EngineError> {
365        let mut groups = self
366            .consumer_groups
367            .write()
368            .expect("consumer_groups RwLock poisoned in delete_consumer_group");
369
370        if groups.remove(group_id).is_none() {
371            return Err(EngineError::ConsumerGroupNotFound(group_id.to_string()));
372        }
373
374        Ok(())
375    }
376
377    /// Join a consumer group. The subscriber is added as a member with the given
378    /// set of assigned channels.
379    pub fn join_consumer_group(
380        &self,
381        group_id: &str,
382        subscriber_id: SubscriberId,
383        channels: HashSet<String>,
384    ) -> Result<(), EngineError> {
385        let mut groups = self
386            .consumer_groups
387            .write()
388            .expect("consumer_groups RwLock poisoned in join_consumer_group");
389
390        let group = groups
391            .get_mut(group_id)
392            .ok_or_else(|| EngineError::ConsumerGroupNotFound(group_id.to_string()))?;
393
394        group.add_member(subscriber_id, channels);
395        Ok(())
396    }
397
398    /// Leave a consumer group. Returns the channels that were assigned to the
399    /// subscriber, or an error if the group or member was not found.
400    pub fn leave_consumer_group(
401        &self,
402        group_id: &str,
403        subscriber_id: &SubscriberId,
404    ) -> Result<HashSet<String>, EngineError> {
405        let mut groups = self
406            .consumer_groups
407            .write()
408            .expect("consumer_groups RwLock poisoned in leave_consumer_group");
409
410        let group = groups
411            .get_mut(group_id)
412            .ok_or_else(|| EngineError::ConsumerGroupNotFound(group_id.to_string()))?;
413
414        group.remove_member(subscriber_id).ok_or_else(|| {
415            EngineError::SubscriberNotInGroup(subscriber_id.clone(), group_id.to_string())
416        })
417    }
418
419    /// Commit an offset for a channel within a consumer group.
420    pub fn commit_offset(
421        &self,
422        group_id: &str,
423        channel_name: impl Into<String>,
424        offset: u64,
425    ) -> Result<(), EngineError> {
426        let mut groups = self
427            .consumer_groups
428            .write()
429            .expect("consumer_groups RwLock poisoned in commit_offset");
430
431        let group = groups
432            .get_mut(group_id)
433            .ok_or_else(|| EngineError::ConsumerGroupNotFound(group_id.to_string()))?;
434
435        group.commit_offset(channel_name, offset);
436        Ok(())
437    }
438
439    /// Get the committed offset for a channel within a consumer group.
440    pub fn get_committed_offset(
441        &self,
442        group_id: &str,
443        channel_name: &str,
444    ) -> Result<Option<u64>, EngineError> {
445        let groups = self
446            .consumer_groups
447            .read()
448            .expect("consumer_groups RwLock poisoned in get_committed_offset");
449
450        let group = groups
451            .get(group_id)
452            .ok_or_else(|| EngineError::ConsumerGroupNotFound(group_id.to_string()))?;
453
454        Ok(group.get_offset(channel_name))
455    }
456
457    /// List all consumer groups.
458    pub fn list_consumer_groups(&self) -> Vec<String> {
459        let groups = self
460            .consumer_groups
461            .read()
462            .expect("consumer_groups RwLock poisoned in list_consumer_groups");
463        groups.keys().cloned().collect()
464    }
465
466    /// Get a snapshot of a consumer group.
467    pub fn get_consumer_group(&self, group_id: &str) -> Option<ConsumerGroup> {
468        let groups = self
469            .consumer_groups
470            .read()
471            .expect("consumer_groups RwLock poisoned in get_consumer_group");
472        groups.get(group_id).cloned()
473    }
474
475    // -------------------------------------------------------------------------
476    // History and Replay
477    // -------------------------------------------------------------------------
478
479    /// Get recent events from a channel.
480    pub fn get_history(
481        &self,
482        channel_id: &ChannelId,
483        count: usize,
484    ) -> Result<Vec<Event>, EngineError> {
485        let channels = self
486            .channels
487            .read()
488            .expect("channels RwLock poisoned in get_history");
489        let channel = channels
490            .get(channel_id)
491            .ok_or_else(|| EngineError::ChannelNotFound(channel_id.clone()))?;
492
493        Ok(channel.get_history(count))
494    }
495
496    /// Get events after a timestamp.
497    pub fn get_history_after(
498        &self,
499        channel_id: &ChannelId,
500        timestamp: u64,
501    ) -> Result<Vec<Event>, EngineError> {
502        let channels = self
503            .channels
504            .read()
505            .expect("channels RwLock poisoned in get_history_after");
506        let channel = channels
507            .get(channel_id)
508            .ok_or_else(|| EngineError::ChannelNotFound(channel_id.clone()))?;
509
510        Ok(channel.get_history_after(timestamp))
511    }
512
513    // -------------------------------------------------------------------------
514    // Statistics
515    // -------------------------------------------------------------------------
516
517    /// Get engine statistics.
518    pub fn stats(&self) -> EngineStats {
519        let stats = self.stats.read().expect("stats RwLock poisoned in stats");
520        stats.clone()
521    }
522
523    /// Reset statistics.
524    pub fn reset_stats(&self) {
525        let mut stats = self
526            .stats
527            .write()
528            .expect("stats RwLock poisoned in reset_stats");
529        *stats = EngineStats::default();
530    }
531
532    /// Get channel statistics.
533    pub fn channel_stats(&self, id: &ChannelId) -> Option<crate::channel::ChannelStats> {
534        let channels = self
535            .channels
536            .read()
537            .expect("channels RwLock poisoned in channel_stats");
538        channels.get(id).map(|c| c.stats())
539    }
540}
541
542impl Default for StreamingEngine {
543    fn default() -> Self {
544        Self::new()
545    }
546}
547
548// =============================================================================
549// Engine Statistics
550// =============================================================================
551
552/// Statistics for the streaming engine.
553#[derive(Debug, Clone, Default)]
554pub struct EngineStats {
555    pub events_published: u64,
556    pub active_subscriptions: usize,
557    pub channels_created: usize,
558}
559
560// =============================================================================
561// Engine Error
562// =============================================================================
563
564/// Errors that can occur in the streaming engine.
565#[derive(Debug, Clone)]
566pub enum EngineError {
567    ChannelExists(ChannelId),
568    ChannelNotFound(ChannelId),
569    TooManyChannels,
570    TooManySubscribers,
571    Channel(ChannelError),
572    ConsumerGroupExists(String),
573    ConsumerGroupNotFound(String),
574    SubscriberNotInGroup(SubscriberId, String),
575}
576
577impl std::fmt::Display for EngineError {
578    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
579        match self {
580            Self::ChannelExists(id) => write!(f, "Channel already exists: {}", id),
581            Self::ChannelNotFound(id) => write!(f, "Channel not found: {}", id),
582            Self::TooManyChannels => write!(f, "Maximum channels reached"),
583            Self::TooManySubscribers => write!(f, "Maximum subscribers reached"),
584            Self::Channel(err) => write!(f, "Channel error: {}", err),
585            Self::ConsumerGroupExists(id) => write!(f, "Consumer group already exists: {}", id),
586            Self::ConsumerGroupNotFound(id) => write!(f, "Consumer group not found: {}", id),
587            Self::SubscriberNotInGroup(sub_id, group_id) => {
588                write!(
589                    f,
590                    "Subscriber {} is not in consumer group {}",
591                    sub_id, group_id
592                )
593            }
594        }
595    }
596}
597
598impl std::error::Error for EngineError {}
599
600// =============================================================================
601// Tests
602// =============================================================================
603
604#[cfg(test)]
605mod tests {
606    use super::*;
607    use crate::event::{EventData, EventType};
608
609    #[test]
610    fn test_engine_creation() {
611        let engine = StreamingEngine::new();
612        assert!(engine.list_channels().is_empty());
613    }
614
615    #[test]
616    fn test_channel_management() {
617        let engine = StreamingEngine::new();
618
619        engine.create_channel("events").unwrap();
620        assert!(engine.channel_exists(&ChannelId::new("events")));
621
622        let channels = engine.list_channels();
623        assert_eq!(channels.len(), 1);
624
625        engine.delete_channel(&ChannelId::new("events")).unwrap();
626        assert!(!engine.channel_exists(&ChannelId::new("events")));
627    }
628
629    #[tokio::test]
630    async fn test_publish_subscribe() {
631        let engine = StreamingEngine::new();
632        engine.create_channel("test").unwrap();
633
634        let channel_id = ChannelId::new("test");
635        let mut receiver = engine.subscribe(&channel_id, "sub1").unwrap();
636
637        let event = Event::new(
638            EventType::Created,
639            "source",
640            EventData::String("hello".to_string()),
641        );
642        engine.publish(&channel_id, event).unwrap();
643
644        let received = receiver.recv().await.unwrap();
645        assert_eq!(received.source, "source");
646    }
647
648    #[test]
649    fn test_duplicate_channel() {
650        let engine = StreamingEngine::new();
651
652        engine.create_channel("test").unwrap();
653        let result = engine.create_channel("test");
654
655        assert!(matches!(result, Err(EngineError::ChannelExists(_))));
656    }
657
658    #[test]
659    fn test_stats() {
660        let engine = StreamingEngine::new();
661        engine.create_channel("test").unwrap();
662
663        let channel_id = ChannelId::new("test");
664        engine.subscribe(&channel_id, "sub1").unwrap();
665
666        let event = Event::new(EventType::Created, "source", EventData::Null);
667        engine.publish(&channel_id, event).unwrap();
668
669        let stats = engine.stats();
670        assert_eq!(stats.events_published, 1);
671        assert_eq!(stats.active_subscriptions, 1);
672    }
673
674    #[test]
675    fn test_history() {
676        let config = EngineConfig {
677            default_channel_config: ChannelConfig {
678                persistent: true,
679                retention_count: 100,
680                ..Default::default()
681            },
682            ..Default::default()
683        };
684
685        let engine = StreamingEngine::with_config(config);
686        engine.create_channel("history").unwrap();
687
688        let channel_id = ChannelId::new("history");
689
690        for i in 0..5 {
691            let event = Event::new(EventType::Created, "test", EventData::Int(i));
692            engine.publish(&channel_id, event).unwrap();
693        }
694
695        let history = engine.get_history(&channel_id, 10).unwrap();
696        assert_eq!(history.len(), 5);
697    }
698
699    #[test]
700    fn test_consumer_group_create_delete() {
701        let engine = StreamingEngine::new();
702
703        engine.create_consumer_group("group1").unwrap();
704        assert_eq!(engine.list_consumer_groups().len(), 1);
705
706        // Duplicate creation should fail
707        let result = engine.create_consumer_group("group1");
708        assert!(matches!(result, Err(EngineError::ConsumerGroupExists(_))));
709
710        engine.delete_consumer_group("group1").unwrap();
711        assert!(engine.list_consumer_groups().is_empty());
712
713        // Deleting non-existent group should fail
714        let result = engine.delete_consumer_group("group1");
715        assert!(matches!(result, Err(EngineError::ConsumerGroupNotFound(_))));
716    }
717
718    #[test]
719    fn test_consumer_group_join_leave() {
720        let engine = StreamingEngine::new();
721        engine.create_consumer_group("group1").unwrap();
722
723        let sub1 = SubscriberId::new("sub1");
724        let sub2 = SubscriberId::new("sub2");
725
726        let mut channels1 = std::collections::HashSet::new();
727        channels1.insert("events".to_string());
728
729        let mut channels2 = std::collections::HashSet::new();
730        channels2.insert("logs".to_string());
731
732        engine
733            .join_consumer_group("group1", sub1.clone(), channels1)
734            .unwrap();
735        engine
736            .join_consumer_group("group1", sub2.clone(), channels2)
737            .unwrap();
738
739        let group = engine.get_consumer_group("group1").unwrap();
740        assert_eq!(group.member_count(), 2);
741        assert!(group.is_member(&sub1));
742
743        // Leave
744        let removed_channels = engine.leave_consumer_group("group1", &sub1).unwrap();
745        assert!(removed_channels.contains("events"));
746
747        let group = engine.get_consumer_group("group1").unwrap();
748        assert_eq!(group.member_count(), 1);
749        assert!(!group.is_member(&sub1));
750
751        // Leaving again should fail
752        let result = engine.leave_consumer_group("group1", &sub1);
753        assert!(matches!(
754            result,
755            Err(EngineError::SubscriberNotInGroup(_, _))
756        ));
757    }
758
759    #[test]
760    fn test_consumer_group_join_nonexistent() {
761        let engine = StreamingEngine::new();
762
763        let result = engine.join_consumer_group(
764            "nonexistent",
765            SubscriberId::new("sub1"),
766            std::collections::HashSet::new(),
767        );
768        assert!(matches!(result, Err(EngineError::ConsumerGroupNotFound(_))));
769    }
770
771    #[test]
772    fn test_consumer_group_offset_tracking() {
773        let engine = StreamingEngine::new();
774        engine.create_consumer_group("group1").unwrap();
775
776        // No offset committed yet
777        let offset = engine.get_committed_offset("group1", "events").unwrap();
778        assert_eq!(offset, None);
779
780        // Commit offset
781        engine.commit_offset("group1", "events", 42).unwrap();
782        let offset = engine.get_committed_offset("group1", "events").unwrap();
783        assert_eq!(offset, Some(42));
784
785        // Update offset
786        engine.commit_offset("group1", "events", 100).unwrap();
787        let offset = engine.get_committed_offset("group1", "events").unwrap();
788        assert_eq!(offset, Some(100));
789
790        // Different channel
791        engine.commit_offset("group1", "logs", 5).unwrap();
792        let offset = engine.get_committed_offset("group1", "logs").unwrap();
793        assert_eq!(offset, Some(5));
794        // Original channel offset unchanged
795        let offset = engine.get_committed_offset("group1", "events").unwrap();
796        assert_eq!(offset, Some(100));
797    }
798
799    #[test]
800    fn test_consumer_group_offset_nonexistent_group() {
801        let engine = StreamingEngine::new();
802
803        let result = engine.commit_offset("nonexistent", "events", 10);
804        assert!(matches!(result, Err(EngineError::ConsumerGroupNotFound(_))));
805
806        let result = engine.get_committed_offset("nonexistent", "events");
807        assert!(matches!(result, Err(EngineError::ConsumerGroupNotFound(_))));
808    }
809
810    #[test]
811    fn test_get_consumer_group() {
812        let engine = StreamingEngine::new();
813
814        assert!(engine.get_consumer_group("nonexistent").is_none());
815
816        engine.create_consumer_group("group1").unwrap();
817        let group = engine.get_consumer_group("group1");
818        assert!(group.is_some());
819        assert_eq!(group.unwrap().group_id, "group1");
820    }
821}