aegis_streaming/
channel.rs

1//! Aegis Streaming Channels
2//!
3//! Pub/sub channels for event distribution.
4//!
5//! @version 0.1.0
6//! @author AutomataNexus Development Team
7
8use crate::event::{Event, EventFilter};
9use crate::subscriber::SubscriberId;
10use serde::{Deserialize, Serialize};
11use std::collections::{HashMap, VecDeque};
12use std::sync::RwLock;
13use tokio::sync::broadcast;
14
15// =============================================================================
16// Channel ID
17// =============================================================================
18
19/// Unique identifier for a channel.
20#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
21pub struct ChannelId(pub String);
22
23impl ChannelId {
24    pub fn new(id: impl Into<String>) -> Self {
25        Self(id.into())
26    }
27
28    pub fn as_str(&self) -> &str {
29        &self.0
30    }
31}
32
33impl std::fmt::Display for ChannelId {
34    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35        write!(f, "{}", self.0)
36    }
37}
38
39impl From<String> for ChannelId {
40    fn from(s: String) -> Self {
41        Self(s)
42    }
43}
44
45impl From<&str> for ChannelId {
46    fn from(s: &str) -> Self {
47        Self(s.to_string())
48    }
49}
50
51// =============================================================================
52// Channel Configuration
53// =============================================================================
54
55/// Configuration for a channel.
56#[derive(Debug, Clone)]
57pub struct ChannelConfig {
58    pub buffer_size: usize,
59    pub max_subscribers: usize,
60    pub persistent: bool,
61    pub retention_count: usize,
62}
63
64impl Default for ChannelConfig {
65    fn default() -> Self {
66        Self {
67            buffer_size: 1024,
68            max_subscribers: 1000,
69            persistent: false,
70            retention_count: 1000,
71        }
72    }
73}
74
75// =============================================================================
76// Channel
77// =============================================================================
78
79/// A pub/sub channel for event distribution.
80pub struct Channel {
81    id: ChannelId,
82    config: ChannelConfig,
83    sender: broadcast::Sender<Event>,
84    subscribers: RwLock<HashMap<SubscriberId, SubscriberInfo>>,
85    history: RwLock<VecDeque<Event>>,
86    stats: RwLock<ChannelStats>,
87}
88
89impl Channel {
90    /// Create a new channel.
91    pub fn new(id: impl Into<ChannelId>) -> Self {
92        Self::with_config(id, ChannelConfig::default())
93    }
94
95    /// Create a channel with custom configuration.
96    pub fn with_config(id: impl Into<ChannelId>, config: ChannelConfig) -> Self {
97        let (sender, _) = broadcast::channel(config.buffer_size);
98
99        Self {
100            id: id.into(),
101            config,
102            sender,
103            subscribers: RwLock::new(HashMap::new()),
104            history: RwLock::new(VecDeque::new()),
105            stats: RwLock::new(ChannelStats::default()),
106        }
107    }
108
109    /// Get the channel ID.
110    pub fn id(&self) -> &ChannelId {
111        &self.id
112    }
113
114    /// Publish an event to the channel.
115    pub fn publish(&self, event: Event) -> Result<usize, ChannelError> {
116        if self.config.persistent {
117            let mut history = self.history.write().unwrap();
118            history.push_back(event.clone());
119
120            while history.len() > self.config.retention_count {
121                history.pop_front();
122            }
123        }
124
125        let receivers = self.sender.send(event).unwrap_or(0);
126
127        {
128            let mut stats = self.stats.write().unwrap();
129            stats.events_published += 1;
130            stats.last_event_time = Some(
131                std::time::SystemTime::now()
132                    .duration_since(std::time::UNIX_EPOCH)
133                    .map(|d| d.as_millis() as u64)
134                    .unwrap_or(0),
135            );
136        }
137
138        Ok(receivers)
139    }
140
141    /// Subscribe to the channel.
142    pub fn subscribe(&self, subscriber_id: SubscriberId) -> Result<ChannelReceiver, ChannelError> {
143        let subscribers = self.subscribers.read().unwrap();
144        if subscribers.len() >= self.config.max_subscribers {
145            return Err(ChannelError::TooManySubscribers);
146        }
147        drop(subscribers);
148
149        let receiver = self.sender.subscribe();
150
151        {
152            let mut subscribers = self.subscribers.write().unwrap();
153            subscribers.insert(
154                subscriber_id.clone(),
155                SubscriberInfo {
156                    filter: None,
157                    subscribed_at: current_timestamp(),
158                },
159            );
160        }
161
162        {
163            let mut stats = self.stats.write().unwrap();
164            stats.subscriber_count += 1;
165        }
166
167        Ok(ChannelReceiver {
168            receiver,
169            filter: None,
170        })
171    }
172
173    /// Subscribe with a filter.
174    pub fn subscribe_with_filter(
175        &self,
176        subscriber_id: SubscriberId,
177        filter: EventFilter,
178    ) -> Result<ChannelReceiver, ChannelError> {
179        let subscribers = self.subscribers.read().unwrap();
180        if subscribers.len() >= self.config.max_subscribers {
181            return Err(ChannelError::TooManySubscribers);
182        }
183        drop(subscribers);
184
185        let receiver = self.sender.subscribe();
186
187        {
188            let mut subscribers = self.subscribers.write().unwrap();
189            subscribers.insert(
190                subscriber_id.clone(),
191                SubscriberInfo {
192                    filter: Some(filter.clone()),
193                    subscribed_at: current_timestamp(),
194                },
195            );
196        }
197
198        {
199            let mut stats = self.stats.write().unwrap();
200            stats.subscriber_count += 1;
201        }
202
203        Ok(ChannelReceiver {
204            receiver,
205            filter: Some(filter),
206        })
207    }
208
209    /// Unsubscribe from the channel.
210    pub fn unsubscribe(&self, subscriber_id: &SubscriberId) {
211        let mut subscribers = self.subscribers.write().unwrap();
212        if subscribers.remove(subscriber_id).is_some() {
213            let mut stats = self.stats.write().unwrap();
214            stats.subscriber_count = stats.subscriber_count.saturating_sub(1);
215        }
216    }
217
218    /// Get the number of subscribers.
219    pub fn subscriber_count(&self) -> usize {
220        let subscribers = self.subscribers.read().unwrap();
221        subscribers.len()
222    }
223
224    /// Get recent events from history.
225    pub fn get_history(&self, count: usize) -> Vec<Event> {
226        let history = self.history.read().unwrap();
227        history.iter().rev().take(count).cloned().collect()
228    }
229
230    /// Get events from history after a timestamp.
231    pub fn get_history_after(&self, timestamp: u64) -> Vec<Event> {
232        let history = self.history.read().unwrap();
233        history
234            .iter()
235            .filter(|e| e.timestamp > timestamp)
236            .cloned()
237            .collect()
238    }
239
240    /// Get channel statistics.
241    pub fn stats(&self) -> ChannelStats {
242        let stats = self.stats.read().unwrap();
243        stats.clone()
244    }
245
246    /// Clear history.
247    pub fn clear_history(&self) {
248        let mut history = self.history.write().unwrap();
249        history.clear();
250    }
251}
252
253// =============================================================================
254// Channel Receiver
255// =============================================================================
256
257/// Receiver for channel events.
258pub struct ChannelReceiver {
259    receiver: broadcast::Receiver<Event>,
260    filter: Option<EventFilter>,
261}
262
263impl ChannelReceiver {
264    /// Receive the next event.
265    pub async fn recv(&mut self) -> Result<Event, ChannelError> {
266        loop {
267            match self.receiver.recv().await {
268                Ok(event) => {
269                    if let Some(ref filter) = self.filter {
270                        if !event.matches(filter) {
271                            continue;
272                        }
273                    }
274                    return Ok(event);
275                }
276                Err(broadcast::error::RecvError::Closed) => {
277                    return Err(ChannelError::Closed);
278                }
279                Err(broadcast::error::RecvError::Lagged(n)) => {
280                    return Err(ChannelError::Lagged(n));
281                }
282            }
283        }
284    }
285
286    /// Try to receive an event without blocking.
287    pub fn try_recv(&mut self) -> Result<Option<Event>, ChannelError> {
288        loop {
289            match self.receiver.try_recv() {
290                Ok(event) => {
291                    if let Some(ref filter) = self.filter {
292                        if !event.matches(filter) {
293                            continue;
294                        }
295                    }
296                    return Ok(Some(event));
297                }
298                Err(broadcast::error::TryRecvError::Empty) => {
299                    return Ok(None);
300                }
301                Err(broadcast::error::TryRecvError::Closed) => {
302                    return Err(ChannelError::Closed);
303                }
304                Err(broadcast::error::TryRecvError::Lagged(n)) => {
305                    return Err(ChannelError::Lagged(n));
306                }
307            }
308        }
309    }
310}
311
312// =============================================================================
313// Subscriber Info
314// =============================================================================
315
316#[derive(Debug, Clone)]
317#[allow(dead_code)]
318struct SubscriberInfo {
319    filter: Option<EventFilter>,
320    subscribed_at: u64,
321}
322
323// =============================================================================
324// Channel Statistics
325// =============================================================================
326
327/// Statistics for a channel.
328#[derive(Debug, Clone, Default)]
329pub struct ChannelStats {
330    pub events_published: u64,
331    pub subscriber_count: usize,
332    pub last_event_time: Option<u64>,
333}
334
335// =============================================================================
336// Channel Error
337// =============================================================================
338
339/// Errors that can occur with channels.
340#[derive(Debug, Clone)]
341pub enum ChannelError {
342    TooManySubscribers,
343    Closed,
344    Lagged(u64),
345    SendFailed,
346}
347
348impl std::fmt::Display for ChannelError {
349    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
350        match self {
351            Self::TooManySubscribers => write!(f, "Maximum subscribers reached"),
352            Self::Closed => write!(f, "Channel is closed"),
353            Self::Lagged(n) => write!(f, "Receiver lagged by {} messages", n),
354            Self::SendFailed => write!(f, "Failed to send event"),
355        }
356    }
357}
358
359impl std::error::Error for ChannelError {}
360
361fn current_timestamp() -> u64 {
362    std::time::SystemTime::now()
363        .duration_since(std::time::UNIX_EPOCH)
364        .map(|d| d.as_millis() as u64)
365        .unwrap_or(0)
366}
367
368// =============================================================================
369// Tests
370// =============================================================================
371
372#[cfg(test)]
373mod tests {
374    use super::*;
375    use crate::event::EventData;
376
377    #[test]
378    fn test_channel_creation() {
379        let channel = Channel::new("test");
380        assert_eq!(channel.id().as_str(), "test");
381        assert_eq!(channel.subscriber_count(), 0);
382    }
383
384    #[tokio::test]
385    async fn test_publish_subscribe() {
386        let channel = Channel::new("events");
387        let sub_id = SubscriberId::new("sub1");
388
389        let mut receiver = channel.subscribe(sub_id).unwrap();
390
391        let event = Event::new(
392            crate::event::EventType::Created,
393            "test",
394            EventData::String("hello".to_string()),
395        );
396
397        channel.publish(event.clone()).unwrap();
398
399        let received = receiver.recv().await.unwrap();
400        assert_eq!(received.source, "test");
401    }
402
403    #[test]
404    fn test_channel_history() {
405        let config = ChannelConfig {
406            persistent: true,
407            retention_count: 10,
408            ..Default::default()
409        };
410        let channel = Channel::with_config("history_test", config);
411
412        for i in 0..5 {
413            let event = Event::new(
414                crate::event::EventType::Created,
415                "test",
416                EventData::Int(i),
417            );
418            channel.publish(event).unwrap();
419        }
420
421        let history = channel.get_history(10);
422        assert_eq!(history.len(), 5);
423    }
424
425    #[test]
426    fn test_subscriber_limit() {
427        let config = ChannelConfig {
428            max_subscribers: 2,
429            ..Default::default()
430        };
431        let channel = Channel::with_config("limited", config);
432
433        channel.subscribe(SubscriberId::new("sub1")).unwrap();
434        channel.subscribe(SubscriberId::new("sub2")).unwrap();
435
436        let result = channel.subscribe(SubscriberId::new("sub3"));
437        assert!(matches!(result, Err(ChannelError::TooManySubscribers)));
438    }
439}