Skip to main content

aegis_streaming/
channel.rs

1//! Aegis Streaming Channels
2//!
3//! Pub/sub channels for event distribution.
4//!
5//! @version 0.1.0
6//! @author AutomataNexus Development Team
7
8use crate::event::{Event, EventFilter};
9use crate::subscriber::{AckMode, SubscriberId};
10use serde::{Deserialize, Serialize};
11use std::collections::{HashMap, HashSet, VecDeque};
12use std::sync::{Arc, RwLock};
13use tokio::sync::broadcast;
14
15// =============================================================================
16// Channel ID
17// =============================================================================
18
19/// Unique identifier for a channel.
20#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
21pub struct ChannelId(pub String);
22
23impl ChannelId {
24    pub fn new(id: impl Into<String>) -> Self {
25        Self(id.into())
26    }
27
28    pub fn as_str(&self) -> &str {
29        &self.0
30    }
31}
32
33impl std::fmt::Display for ChannelId {
34    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35        write!(f, "{}", self.0)
36    }
37}
38
39impl From<String> for ChannelId {
40    fn from(s: String) -> Self {
41        Self(s)
42    }
43}
44
45impl From<&str> for ChannelId {
46    fn from(s: &str) -> Self {
47        Self(s.to_string())
48    }
49}
50
51// =============================================================================
52// Channel Configuration
53// =============================================================================
54
55/// Configuration for a channel.
56#[derive(Debug, Clone)]
57pub struct ChannelConfig {
58    pub buffer_size: usize,
59    pub max_subscribers: usize,
60    pub persistent: bool,
61    pub retention_count: usize,
62}
63
64impl Default for ChannelConfig {
65    fn default() -> Self {
66        Self {
67            buffer_size: 1024,
68            max_subscribers: 1000,
69            persistent: false,
70            retention_count: 1000,
71        }
72    }
73}
74
75// =============================================================================
76// Channel
77// =============================================================================
78
79/// A pub/sub channel for event distribution.
80pub struct Channel {
81    id: ChannelId,
82    config: ChannelConfig,
83    sender: broadcast::Sender<Event>,
84    subscribers: RwLock<HashMap<SubscriberId, SubscriberInfo>>,
85    history: RwLock<VecDeque<Event>>,
86    stats: RwLock<ChannelStats>,
87}
88
89impl Channel {
90    /// Create a new channel.
91    pub fn new(id: impl Into<ChannelId>) -> Self {
92        Self::with_config(id, ChannelConfig::default())
93    }
94
95    /// Create a channel with custom configuration.
96    pub fn with_config(id: impl Into<ChannelId>, config: ChannelConfig) -> Self {
97        let (sender, _) = broadcast::channel(config.buffer_size);
98
99        Self {
100            id: id.into(),
101            config,
102            sender,
103            subscribers: RwLock::new(HashMap::new()),
104            history: RwLock::new(VecDeque::new()),
105            stats: RwLock::new(ChannelStats::default()),
106        }
107    }
108
109    /// Get the channel ID.
110    pub fn id(&self) -> &ChannelId {
111        &self.id
112    }
113
114    /// Publish an event to the channel.
115    pub fn publish(&self, event: Event) -> Result<usize, ChannelError> {
116        if self.config.persistent {
117            let mut history = self
118                .history
119                .write()
120                .expect("history RwLock poisoned in publish");
121            history.push_back(event.clone());
122
123            while history.len() > self.config.retention_count {
124                history.pop_front();
125            }
126        }
127
128        let receivers = self.sender.send(event).unwrap_or(0);
129
130        {
131            let mut stats = self
132                .stats
133                .write()
134                .expect("stats RwLock poisoned in publish");
135            stats.events_published += 1;
136            stats.last_event_time = Some(
137                std::time::SystemTime::now()
138                    .duration_since(std::time::UNIX_EPOCH)
139                    .map(|d| d.as_millis() as u64)
140                    .unwrap_or(0),
141            );
142        }
143
144        Ok(receivers)
145    }
146
147    /// Subscribe to the channel.
148    pub fn subscribe(&self, subscriber_id: SubscriberId) -> Result<ChannelReceiver, ChannelError> {
149        self.subscribe_with_ack_mode(subscriber_id, None, AckMode::Auto)
150    }
151
152    /// Subscribe with a filter.
153    pub fn subscribe_with_filter(
154        &self,
155        subscriber_id: SubscriberId,
156        filter: EventFilter,
157    ) -> Result<ChannelReceiver, ChannelError> {
158        self.subscribe_with_ack_mode(subscriber_id, Some(filter), AckMode::Auto)
159    }
160
161    /// Subscribe with a specific ack mode.
162    pub fn subscribe_with_ack_mode(
163        &self,
164        subscriber_id: SubscriberId,
165        filter: Option<EventFilter>,
166        ack_mode: AckMode,
167    ) -> Result<ChannelReceiver, ChannelError> {
168        let subscribers = self
169            .subscribers
170            .read()
171            .expect("subscribers RwLock poisoned in subscribe_with_ack_mode (read)");
172        if subscribers.len() >= self.config.max_subscribers {
173            return Err(ChannelError::TooManySubscribers);
174        }
175        drop(subscribers);
176
177        let receiver = self.sender.subscribe();
178
179        {
180            let mut subscribers = self
181                .subscribers
182                .write()
183                .expect("subscribers RwLock poisoned in subscribe_with_ack_mode (write)");
184            subscribers.insert(
185                subscriber_id.clone(),
186                SubscriberInfo {
187                    filter: filter.clone(),
188                    subscribed_at: current_timestamp(),
189                },
190            );
191        }
192
193        {
194            let mut stats = self
195                .stats
196                .write()
197                .expect("stats RwLock poisoned in subscribe_with_ack_mode");
198            stats.subscriber_count += 1;
199        }
200
201        Ok(ChannelReceiver {
202            receiver,
203            filter,
204            ack_mode,
205            next_offset: 0,
206            ack_state: Arc::new(RwLock::new(AckState {
207                unacked: HashMap::new(),
208                acked: HashSet::new(),
209            })),
210        })
211    }
212
213    /// Unsubscribe from the channel.
214    pub fn unsubscribe(&self, subscriber_id: &SubscriberId) {
215        let mut subscribers = self
216            .subscribers
217            .write()
218            .expect("subscribers RwLock poisoned in unsubscribe");
219        if subscribers.remove(subscriber_id).is_some() {
220            let mut stats = self
221                .stats
222                .write()
223                .expect("stats RwLock poisoned in unsubscribe");
224            stats.subscriber_count = stats.subscriber_count.saturating_sub(1);
225        }
226    }
227
228    /// Get the number of subscribers.
229    pub fn subscriber_count(&self) -> usize {
230        let subscribers = self
231            .subscribers
232            .read()
233            .expect("subscribers RwLock poisoned in subscriber_count");
234        subscribers.len()
235    }
236
237    /// Get recent events from history.
238    pub fn get_history(&self, count: usize) -> Vec<Event> {
239        let history = self
240            .history
241            .read()
242            .expect("history RwLock poisoned in get_history");
243        history.iter().rev().take(count).cloned().collect()
244    }
245
246    /// Get events from history after a timestamp.
247    pub fn get_history_after(&self, timestamp: u64) -> Vec<Event> {
248        let history = self
249            .history
250            .read()
251            .expect("history RwLock poisoned in get_history_after");
252        history
253            .iter()
254            .filter(|e| e.timestamp > timestamp)
255            .cloned()
256            .collect()
257    }
258
259    /// Get channel statistics.
260    pub fn stats(&self) -> ChannelStats {
261        let stats = self.stats.read().expect("stats RwLock poisoned in stats");
262        stats.clone()
263    }
264
265    /// Clear history.
266    pub fn clear_history(&self) {
267        let mut history = self
268            .history
269            .write()
270            .expect("history RwLock poisoned in clear_history");
271        history.clear();
272    }
273}
274
275// =============================================================================
276// Channel Receiver
277// =============================================================================
278
279/// Shared state for tracking unacknowledged messages across clones.
280#[derive(Debug)]
281struct AckState {
282    /// Messages that have been delivered but not yet acknowledged, keyed by offset.
283    unacked: HashMap<u64, Event>,
284    /// The set of offsets that have been acknowledged.
285    acked: HashSet<u64>,
286}
287
288/// Receiver for channel events.
289pub struct ChannelReceiver {
290    receiver: broadcast::Receiver<Event>,
291    filter: Option<EventFilter>,
292    ack_mode: AckMode,
293    /// Monotonically increasing offset assigned to each received message.
294    next_offset: u64,
295    /// Shared ack tracking state (used when ack_mode is Manual).
296    ack_state: Arc<RwLock<AckState>>,
297}
298
299impl ChannelReceiver {
300    /// Get the current ack mode for this receiver.
301    pub fn ack_mode(&self) -> AckMode {
302        self.ack_mode
303    }
304
305    /// Get the next offset that will be assigned (i.e., total messages received so far).
306    pub fn current_offset(&self) -> u64 {
307        self.next_offset
308    }
309
310    /// Receive the next event.
311    ///
312    /// When `AckMode::Auto`, messages are automatically acknowledged on receive.
313    /// When `AckMode::Manual`, messages are tracked as unacknowledged and must be
314    /// explicitly acknowledged via [`ack`]. When `AckMode::None`, no tracking is performed.
315    pub async fn recv(&mut self) -> Result<Event, ChannelError> {
316        loop {
317            match self.receiver.recv().await {
318                Ok(event) => {
319                    if let Some(ref filter) = self.filter {
320                        if !event.matches(filter) {
321                            continue;
322                        }
323                    }
324                    let offset = self.next_offset;
325                    self.next_offset += 1;
326
327                    match self.ack_mode {
328                        AckMode::Auto => {
329                            // Auto-ack: record as already acknowledged
330                            let mut state = self
331                                .ack_state
332                                .write()
333                                .expect("ack_state RwLock poisoned in recv");
334                            state.acked.insert(offset);
335                        }
336                        AckMode::Manual => {
337                            // Track as unacked for manual acknowledgment
338                            let mut state = self
339                                .ack_state
340                                .write()
341                                .expect("ack_state RwLock poisoned in recv");
342                            state.unacked.insert(offset, event.clone());
343                        }
344                        AckMode::None => {
345                            // No tracking
346                        }
347                    }
348
349                    return Ok(event);
350                }
351                Err(broadcast::error::RecvError::Closed) => {
352                    return Err(ChannelError::Closed);
353                }
354                Err(broadcast::error::RecvError::Lagged(n)) => {
355                    return Err(ChannelError::Lagged(n));
356                }
357            }
358        }
359    }
360
361    /// Try to receive an event without blocking.
362    pub fn try_recv(&mut self) -> Result<Option<Event>, ChannelError> {
363        loop {
364            match self.receiver.try_recv() {
365                Ok(event) => {
366                    if let Some(ref filter) = self.filter {
367                        if !event.matches(filter) {
368                            continue;
369                        }
370                    }
371                    let offset = self.next_offset;
372                    self.next_offset += 1;
373
374                    match self.ack_mode {
375                        AckMode::Auto => {
376                            let mut state = self
377                                .ack_state
378                                .write()
379                                .expect("ack_state RwLock poisoned in try_recv");
380                            state.acked.insert(offset);
381                        }
382                        AckMode::Manual => {
383                            let mut state = self
384                                .ack_state
385                                .write()
386                                .expect("ack_state RwLock poisoned in try_recv");
387                            state.unacked.insert(offset, event.clone());
388                        }
389                        AckMode::None => {}
390                    }
391
392                    return Ok(Some(event));
393                }
394                Err(broadcast::error::TryRecvError::Empty) => {
395                    return Ok(None);
396                }
397                Err(broadcast::error::TryRecvError::Closed) => {
398                    return Err(ChannelError::Closed);
399                }
400                Err(broadcast::error::TryRecvError::Lagged(n)) => {
401                    return Err(ChannelError::Lagged(n));
402                }
403            }
404        }
405    }
406
407    /// Acknowledge a message by its offset.
408    ///
409    /// Only meaningful when `AckMode::Manual`. Removes the message from the
410    /// unacknowledged set. Returns an error if the offset is not found.
411    pub fn ack(&self, offset: u64) -> Result<(), ChannelError> {
412        let mut state = self
413            .ack_state
414            .write()
415            .expect("ack_state RwLock poisoned in ack");
416        if state.unacked.remove(&offset).is_some() {
417            state.acked.insert(offset);
418            Ok(())
419        } else if state.acked.contains(&offset) {
420            // Already acked, idempotent
421            Ok(())
422        } else {
423            Err(ChannelError::NotFound(offset))
424        }
425    }
426
427    /// Get the number of unacknowledged messages.
428    pub fn unacked_count(&self) -> usize {
429        let state = self
430            .ack_state
431            .read()
432            .expect("ack_state RwLock poisoned in unacked_count");
433        state.unacked.len()
434    }
435
436    /// Get the number of acknowledged messages.
437    pub fn acked_count(&self) -> usize {
438        let state = self
439            .ack_state
440            .read()
441            .expect("ack_state RwLock poisoned in acked_count");
442        state.acked.len()
443    }
444
445    /// Get all unacknowledged messages for re-delivery.
446    ///
447    /// Returns a vector of (offset, event) pairs for messages that have been
448    /// received but not yet acknowledged.
449    pub fn get_unacked_messages(&self) -> Vec<(u64, Event)> {
450        let state = self
451            .ack_state
452            .read()
453            .expect("ack_state RwLock poisoned in get_unacked_messages");
454        let mut messages: Vec<(u64, Event)> = state
455            .unacked
456            .iter()
457            .map(|(offset, event)| (*offset, event.clone()))
458            .collect();
459        messages.sort_by_key(|(offset, _)| *offset);
460        messages
461    }
462}
463
464// =============================================================================
465// Subscriber Info
466// =============================================================================
467
468#[derive(Debug, Clone)]
469#[allow(dead_code)]
470struct SubscriberInfo {
471    filter: Option<EventFilter>,
472    subscribed_at: u64,
473}
474
475// =============================================================================
476// Channel Statistics
477// =============================================================================
478
479/// Statistics for a channel.
480#[derive(Debug, Clone, Default)]
481pub struct ChannelStats {
482    pub events_published: u64,
483    pub subscriber_count: usize,
484    pub last_event_time: Option<u64>,
485}
486
487// =============================================================================
488// Channel Error
489// =============================================================================
490
491/// Errors that can occur with channels.
492#[derive(Debug, Clone)]
493pub enum ChannelError {
494    TooManySubscribers,
495    Closed,
496    Lagged(u64),
497    SendFailed,
498    /// The message at the given offset was not found in unacked messages.
499    NotFound(u64),
500}
501
502impl std::fmt::Display for ChannelError {
503    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
504        match self {
505            Self::TooManySubscribers => write!(f, "Maximum subscribers reached"),
506            Self::Closed => write!(f, "Channel is closed"),
507            Self::Lagged(n) => write!(f, "Receiver lagged by {} messages", n),
508            Self::SendFailed => write!(f, "Failed to send event"),
509            Self::NotFound(offset) => {
510                write!(f, "Message at offset {} not found in unacked set", offset)
511            }
512        }
513    }
514}
515
516impl std::error::Error for ChannelError {}
517
518fn current_timestamp() -> u64 {
519    std::time::SystemTime::now()
520        .duration_since(std::time::UNIX_EPOCH)
521        .map(|d| d.as_millis() as u64)
522        .unwrap_or(0)
523}
524
525// =============================================================================
526// Tests
527// =============================================================================
528
529#[cfg(test)]
530mod tests {
531    use super::*;
532    use crate::event::EventData;
533
534    #[test]
535    fn test_channel_creation() {
536        let channel = Channel::new("test");
537        assert_eq!(channel.id().as_str(), "test");
538        assert_eq!(channel.subscriber_count(), 0);
539    }
540
541    #[tokio::test]
542    async fn test_publish_subscribe() {
543        let channel = Channel::new("events");
544        let sub_id = SubscriberId::new("sub1");
545
546        let mut receiver = channel.subscribe(sub_id).unwrap();
547
548        let event = Event::new(
549            crate::event::EventType::Created,
550            "test",
551            EventData::String("hello".to_string()),
552        );
553
554        channel.publish(event.clone()).unwrap();
555
556        let received = receiver.recv().await.unwrap();
557        assert_eq!(received.source, "test");
558    }
559
560    #[test]
561    fn test_channel_history() {
562        let config = ChannelConfig {
563            persistent: true,
564            retention_count: 10,
565            ..Default::default()
566        };
567        let channel = Channel::with_config("history_test", config);
568
569        for i in 0..5 {
570            let event = Event::new(crate::event::EventType::Created, "test", EventData::Int(i));
571            channel.publish(event).unwrap();
572        }
573
574        let history = channel.get_history(10);
575        assert_eq!(history.len(), 5);
576    }
577
578    #[test]
579    fn test_subscriber_limit() {
580        let config = ChannelConfig {
581            max_subscribers: 2,
582            ..Default::default()
583        };
584        let channel = Channel::with_config("limited", config);
585
586        channel.subscribe(SubscriberId::new("sub1")).unwrap();
587        channel.subscribe(SubscriberId::new("sub2")).unwrap();
588
589        let result = channel.subscribe(SubscriberId::new("sub3"));
590        assert!(matches!(result, Err(ChannelError::TooManySubscribers)));
591    }
592
593    #[tokio::test]
594    async fn test_auto_ack_mode() {
595        use crate::subscriber::AckMode;
596
597        let channel = Channel::new("auto_ack_test");
598        let sub_id = SubscriberId::new("sub1");
599
600        let mut receiver = channel
601            .subscribe_with_ack_mode(sub_id, None, AckMode::Auto)
602            .unwrap();
603
604        assert_eq!(receiver.ack_mode(), AckMode::Auto);
605
606        let event = Event::new(
607            crate::event::EventType::Created,
608            "test",
609            EventData::String("auto".to_string()),
610        );
611        channel.publish(event).unwrap();
612
613        let _received = receiver.recv().await.unwrap();
614
615        // With Auto ack, the message should be immediately acked
616        assert_eq!(receiver.unacked_count(), 0);
617        assert_eq!(receiver.acked_count(), 1);
618    }
619
620    #[tokio::test]
621    async fn test_manual_ack_mode() {
622        use crate::subscriber::AckMode;
623
624        let channel = Channel::new("manual_ack_test");
625        let sub_id = SubscriberId::new("sub1");
626
627        let mut receiver = channel
628            .subscribe_with_ack_mode(sub_id, None, AckMode::Manual)
629            .unwrap();
630
631        assert_eq!(receiver.ack_mode(), AckMode::Manual);
632
633        // Publish two events
634        for i in 0..2 {
635            let event = Event::new(crate::event::EventType::Created, "test", EventData::Int(i));
636            channel.publish(event).unwrap();
637        }
638
639        // Receive both events
640        let _ev0 = receiver.recv().await.unwrap();
641        let _ev1 = receiver.recv().await.unwrap();
642
643        // Both should be unacked
644        assert_eq!(receiver.unacked_count(), 2);
645        assert_eq!(receiver.acked_count(), 0);
646
647        // Ack the first message (offset 0)
648        receiver.ack(0).unwrap();
649        assert_eq!(receiver.unacked_count(), 1);
650        assert_eq!(receiver.acked_count(), 1);
651
652        // Ack the second message (offset 1)
653        receiver.ack(1).unwrap();
654        assert_eq!(receiver.unacked_count(), 0);
655        assert_eq!(receiver.acked_count(), 2);
656
657        // Acking again is idempotent
658        receiver.ack(0).unwrap();
659        assert_eq!(receiver.acked_count(), 2);
660    }
661
662    #[tokio::test]
663    async fn test_manual_ack_redelivery() {
664        use crate::subscriber::AckMode;
665
666        let channel = Channel::new("redeliver_test");
667        let sub_id = SubscriberId::new("sub1");
668
669        let mut receiver = channel
670            .subscribe_with_ack_mode(sub_id, None, AckMode::Manual)
671            .unwrap();
672
673        // Publish events
674        for i in 0..3 {
675            let event = Event::new(crate::event::EventType::Created, "test", EventData::Int(i));
676            channel.publish(event).unwrap();
677        }
678
679        // Receive all three
680        let _ev0 = receiver.recv().await.unwrap();
681        let _ev1 = receiver.recv().await.unwrap();
682        let _ev2 = receiver.recv().await.unwrap();
683
684        // Ack only offset 1
685        receiver.ack(1).unwrap();
686
687        // Get unacked messages for re-delivery
688        let unacked = receiver.get_unacked_messages();
689        assert_eq!(unacked.len(), 2);
690        // Should be sorted by offset
691        assert_eq!(unacked[0].0, 0);
692        assert_eq!(unacked[1].0, 2);
693    }
694
695    #[tokio::test]
696    async fn test_ack_not_found() {
697        use crate::subscriber::AckMode;
698
699        let channel = Channel::new("ack_notfound_test");
700        let sub_id = SubscriberId::new("sub1");
701
702        let receiver = channel
703            .subscribe_with_ack_mode(sub_id, None, AckMode::Manual)
704            .unwrap();
705
706        // Acking an offset that was never received
707        let result = receiver.ack(999);
708        assert!(matches!(result, Err(ChannelError::NotFound(999))));
709    }
710
711    #[tokio::test]
712    async fn test_none_ack_mode() {
713        use crate::subscriber::AckMode;
714
715        let channel = Channel::new("none_ack_test");
716        let sub_id = SubscriberId::new("sub1");
717
718        let mut receiver = channel
719            .subscribe_with_ack_mode(sub_id, None, AckMode::None)
720            .unwrap();
721
722        let event = Event::new(
723            crate::event::EventType::Created,
724            "test",
725            EventData::String("none".to_string()),
726        );
727        channel.publish(event).unwrap();
728
729        let _received = receiver.recv().await.unwrap();
730
731        // With None ack mode, no tracking at all
732        assert_eq!(receiver.unacked_count(), 0);
733        assert_eq!(receiver.acked_count(), 0);
734    }
735}