Skip to main content

aegis_streaming/
channel.rs

1//! Aegis Streaming Channels
2//!
3//! Pub/sub channels for event distribution.
4//!
5//! @version 0.1.0
6//! @author AutomataNexus Development Team
7
8use crate::event::{Event, EventFilter};
9use crate::subscriber::SubscriberId;
10use serde::{Deserialize, Serialize};
11use std::collections::{HashMap, VecDeque};
12use std::sync::RwLock;
13use tokio::sync::broadcast;
14
15// =============================================================================
16// Channel ID
17// =============================================================================
18
19/// Unique identifier for a channel.
20#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
21pub struct ChannelId(pub String);
22
23impl ChannelId {
24    pub fn new(id: impl Into<String>) -> Self {
25        Self(id.into())
26    }
27
28    pub fn as_str(&self) -> &str {
29        &self.0
30    }
31}
32
33impl std::fmt::Display for ChannelId {
34    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35        write!(f, "{}", self.0)
36    }
37}
38
39impl From<String> for ChannelId {
40    fn from(s: String) -> Self {
41        Self(s)
42    }
43}
44
45impl From<&str> for ChannelId {
46    fn from(s: &str) -> Self {
47        Self(s.to_string())
48    }
49}
50
51// =============================================================================
52// Channel Configuration
53// =============================================================================
54
55/// Configuration for a channel.
56#[derive(Debug, Clone)]
57pub struct ChannelConfig {
58    pub buffer_size: usize,
59    pub max_subscribers: usize,
60    pub persistent: bool,
61    pub retention_count: usize,
62}
63
64impl Default for ChannelConfig {
65    fn default() -> Self {
66        Self {
67            buffer_size: 1024,
68            max_subscribers: 1000,
69            persistent: false,
70            retention_count: 1000,
71        }
72    }
73}
74
75// =============================================================================
76// Channel
77// =============================================================================
78
79/// A pub/sub channel for event distribution.
80pub struct Channel {
81    id: ChannelId,
82    config: ChannelConfig,
83    sender: broadcast::Sender<Event>,
84    subscribers: RwLock<HashMap<SubscriberId, SubscriberInfo>>,
85    history: RwLock<VecDeque<Event>>,
86    stats: RwLock<ChannelStats>,
87}
88
89impl Channel {
90    /// Create a new channel.
91    pub fn new(id: impl Into<ChannelId>) -> Self {
92        Self::with_config(id, ChannelConfig::default())
93    }
94
95    /// Create a channel with custom configuration.
96    pub fn with_config(id: impl Into<ChannelId>, config: ChannelConfig) -> Self {
97        let (sender, _) = broadcast::channel(config.buffer_size);
98
99        Self {
100            id: id.into(),
101            config,
102            sender,
103            subscribers: RwLock::new(HashMap::new()),
104            history: RwLock::new(VecDeque::new()),
105            stats: RwLock::new(ChannelStats::default()),
106        }
107    }
108
109    /// Get the channel ID.
110    pub fn id(&self) -> &ChannelId {
111        &self.id
112    }
113
114    /// Publish an event to the channel.
115    pub fn publish(&self, event: Event) -> Result<usize, ChannelError> {
116        if self.config.persistent {
117            let mut history = self
118                .history
119                .write()
120                .expect("history RwLock poisoned in publish");
121            history.push_back(event.clone());
122
123            while history.len() > self.config.retention_count {
124                history.pop_front();
125            }
126        }
127
128        let receivers = self.sender.send(event).unwrap_or(0);
129
130        {
131            let mut stats = self
132                .stats
133                .write()
134                .expect("stats RwLock poisoned in publish");
135            stats.events_published += 1;
136            stats.last_event_time = Some(
137                std::time::SystemTime::now()
138                    .duration_since(std::time::UNIX_EPOCH)
139                    .map(|d| d.as_millis() as u64)
140                    .unwrap_or(0),
141            );
142        }
143
144        Ok(receivers)
145    }
146
147    /// Subscribe to the channel.
148    pub fn subscribe(&self, subscriber_id: SubscriberId) -> Result<ChannelReceiver, ChannelError> {
149        let subscribers = self
150            .subscribers
151            .read()
152            .expect("subscribers RwLock poisoned in subscribe (read)");
153        if subscribers.len() >= self.config.max_subscribers {
154            return Err(ChannelError::TooManySubscribers);
155        }
156        drop(subscribers);
157
158        let receiver = self.sender.subscribe();
159
160        {
161            let mut subscribers = self
162                .subscribers
163                .write()
164                .expect("subscribers RwLock poisoned in subscribe (write)");
165            subscribers.insert(
166                subscriber_id.clone(),
167                SubscriberInfo {
168                    filter: None,
169                    subscribed_at: current_timestamp(),
170                },
171            );
172        }
173
174        {
175            let mut stats = self
176                .stats
177                .write()
178                .expect("stats RwLock poisoned in subscribe");
179            stats.subscriber_count += 1;
180        }
181
182        Ok(ChannelReceiver {
183            receiver,
184            filter: None,
185        })
186    }
187
188    /// Subscribe with a filter.
189    pub fn subscribe_with_filter(
190        &self,
191        subscriber_id: SubscriberId,
192        filter: EventFilter,
193    ) -> Result<ChannelReceiver, ChannelError> {
194        let subscribers = self
195            .subscribers
196            .read()
197            .expect("subscribers RwLock poisoned in subscribe_with_filter (read)");
198        if subscribers.len() >= self.config.max_subscribers {
199            return Err(ChannelError::TooManySubscribers);
200        }
201        drop(subscribers);
202
203        let receiver = self.sender.subscribe();
204
205        {
206            let mut subscribers = self
207                .subscribers
208                .write()
209                .expect("subscribers RwLock poisoned in subscribe_with_filter (write)");
210            subscribers.insert(
211                subscriber_id.clone(),
212                SubscriberInfo {
213                    filter: Some(filter.clone()),
214                    subscribed_at: current_timestamp(),
215                },
216            );
217        }
218
219        {
220            let mut stats = self
221                .stats
222                .write()
223                .expect("stats RwLock poisoned in subscribe_with_filter");
224            stats.subscriber_count += 1;
225        }
226
227        Ok(ChannelReceiver {
228            receiver,
229            filter: Some(filter),
230        })
231    }
232
233    /// Unsubscribe from the channel.
234    pub fn unsubscribe(&self, subscriber_id: &SubscriberId) {
235        let mut subscribers = self
236            .subscribers
237            .write()
238            .expect("subscribers RwLock poisoned in unsubscribe");
239        if subscribers.remove(subscriber_id).is_some() {
240            let mut stats = self
241                .stats
242                .write()
243                .expect("stats RwLock poisoned in unsubscribe");
244            stats.subscriber_count = stats.subscriber_count.saturating_sub(1);
245        }
246    }
247
248    /// Get the number of subscribers.
249    pub fn subscriber_count(&self) -> usize {
250        let subscribers = self
251            .subscribers
252            .read()
253            .expect("subscribers RwLock poisoned in subscriber_count");
254        subscribers.len()
255    }
256
257    /// Get recent events from history.
258    pub fn get_history(&self, count: usize) -> Vec<Event> {
259        let history = self
260            .history
261            .read()
262            .expect("history RwLock poisoned in get_history");
263        history.iter().rev().take(count).cloned().collect()
264    }
265
266    /// Get events from history after a timestamp.
267    pub fn get_history_after(&self, timestamp: u64) -> Vec<Event> {
268        let history = self
269            .history
270            .read()
271            .expect("history RwLock poisoned in get_history_after");
272        history
273            .iter()
274            .filter(|e| e.timestamp > timestamp)
275            .cloned()
276            .collect()
277    }
278
279    /// Get channel statistics.
280    pub fn stats(&self) -> ChannelStats {
281        let stats = self
282            .stats
283            .read()
284            .expect("stats RwLock poisoned in stats");
285        stats.clone()
286    }
287
288    /// Clear history.
289    pub fn clear_history(&self) {
290        let mut history = self
291            .history
292            .write()
293            .expect("history RwLock poisoned in clear_history");
294        history.clear();
295    }
296}
297
298// =============================================================================
299// Channel Receiver
300// =============================================================================
301
302/// Receiver for channel events.
303pub struct ChannelReceiver {
304    receiver: broadcast::Receiver<Event>,
305    filter: Option<EventFilter>,
306}
307
308impl ChannelReceiver {
309    /// Receive the next event.
310    pub async fn recv(&mut self) -> Result<Event, ChannelError> {
311        loop {
312            match self.receiver.recv().await {
313                Ok(event) => {
314                    if let Some(ref filter) = self.filter {
315                        if !event.matches(filter) {
316                            continue;
317                        }
318                    }
319                    return Ok(event);
320                }
321                Err(broadcast::error::RecvError::Closed) => {
322                    return Err(ChannelError::Closed);
323                }
324                Err(broadcast::error::RecvError::Lagged(n)) => {
325                    return Err(ChannelError::Lagged(n));
326                }
327            }
328        }
329    }
330
331    /// Try to receive an event without blocking.
332    pub fn try_recv(&mut self) -> Result<Option<Event>, ChannelError> {
333        loop {
334            match self.receiver.try_recv() {
335                Ok(event) => {
336                    if let Some(ref filter) = self.filter {
337                        if !event.matches(filter) {
338                            continue;
339                        }
340                    }
341                    return Ok(Some(event));
342                }
343                Err(broadcast::error::TryRecvError::Empty) => {
344                    return Ok(None);
345                }
346                Err(broadcast::error::TryRecvError::Closed) => {
347                    return Err(ChannelError::Closed);
348                }
349                Err(broadcast::error::TryRecvError::Lagged(n)) => {
350                    return Err(ChannelError::Lagged(n));
351                }
352            }
353        }
354    }
355}
356
357// =============================================================================
358// Subscriber Info
359// =============================================================================
360
361#[derive(Debug, Clone)]
362#[allow(dead_code)]
363struct SubscriberInfo {
364    filter: Option<EventFilter>,
365    subscribed_at: u64,
366}
367
368// =============================================================================
369// Channel Statistics
370// =============================================================================
371
372/// Statistics for a channel.
373#[derive(Debug, Clone, Default)]
374pub struct ChannelStats {
375    pub events_published: u64,
376    pub subscriber_count: usize,
377    pub last_event_time: Option<u64>,
378}
379
380// =============================================================================
381// Channel Error
382// =============================================================================
383
384/// Errors that can occur with channels.
385#[derive(Debug, Clone)]
386pub enum ChannelError {
387    TooManySubscribers,
388    Closed,
389    Lagged(u64),
390    SendFailed,
391}
392
393impl std::fmt::Display for ChannelError {
394    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
395        match self {
396            Self::TooManySubscribers => write!(f, "Maximum subscribers reached"),
397            Self::Closed => write!(f, "Channel is closed"),
398            Self::Lagged(n) => write!(f, "Receiver lagged by {} messages", n),
399            Self::SendFailed => write!(f, "Failed to send event"),
400        }
401    }
402}
403
404impl std::error::Error for ChannelError {}
405
406fn current_timestamp() -> u64 {
407    std::time::SystemTime::now()
408        .duration_since(std::time::UNIX_EPOCH)
409        .map(|d| d.as_millis() as u64)
410        .unwrap_or(0)
411}
412
413// =============================================================================
414// Tests
415// =============================================================================
416
417#[cfg(test)]
418mod tests {
419    use super::*;
420    use crate::event::EventData;
421
422    #[test]
423    fn test_channel_creation() {
424        let channel = Channel::new("test");
425        assert_eq!(channel.id().as_str(), "test");
426        assert_eq!(channel.subscriber_count(), 0);
427    }
428
429    #[tokio::test]
430    async fn test_publish_subscribe() {
431        let channel = Channel::new("events");
432        let sub_id = SubscriberId::new("sub1");
433
434        let mut receiver = channel.subscribe(sub_id).unwrap();
435
436        let event = Event::new(
437            crate::event::EventType::Created,
438            "test",
439            EventData::String("hello".to_string()),
440        );
441
442        channel.publish(event.clone()).unwrap();
443
444        let received = receiver.recv().await.unwrap();
445        assert_eq!(received.source, "test");
446    }
447
448    #[test]
449    fn test_channel_history() {
450        let config = ChannelConfig {
451            persistent: true,
452            retention_count: 10,
453            ..Default::default()
454        };
455        let channel = Channel::with_config("history_test", config);
456
457        for i in 0..5 {
458            let event = Event::new(
459                crate::event::EventType::Created,
460                "test",
461                EventData::Int(i),
462            );
463            channel.publish(event).unwrap();
464        }
465
466        let history = channel.get_history(10);
467        assert_eq!(history.len(), 5);
468    }
469
470    #[test]
471    fn test_subscriber_limit() {
472        let config = ChannelConfig {
473            max_subscribers: 2,
474            ..Default::default()
475        };
476        let channel = Channel::with_config("limited", config);
477
478        channel.subscribe(SubscriberId::new("sub1")).unwrap();
479        channel.subscribe(SubscriberId::new("sub2")).unwrap();
480
481        let result = channel.subscribe(SubscriberId::new("sub3"));
482        assert!(matches!(result, Err(ChannelError::TooManySubscribers)));
483    }
484}