1use crate::event::{Event, EventFilter};
9use crate::subscriber::SubscriberId;
10use serde::{Deserialize, Serialize};
11use std::collections::{HashMap, VecDeque};
12use std::sync::RwLock;
13use tokio::sync::broadcast;
14
15#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
21pub struct ChannelId(pub String);
22
23impl ChannelId {
24 pub fn new(id: impl Into<String>) -> Self {
25 Self(id.into())
26 }
27
28 pub fn as_str(&self) -> &str {
29 &self.0
30 }
31}
32
33impl std::fmt::Display for ChannelId {
34 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35 write!(f, "{}", self.0)
36 }
37}
38
39impl From<String> for ChannelId {
40 fn from(s: String) -> Self {
41 Self(s)
42 }
43}
44
45impl From<&str> for ChannelId {
46 fn from(s: &str) -> Self {
47 Self(s.to_string())
48 }
49}
50
51#[derive(Debug, Clone)]
57pub struct ChannelConfig {
58 pub buffer_size: usize,
59 pub max_subscribers: usize,
60 pub persistent: bool,
61 pub retention_count: usize,
62}
63
64impl Default for ChannelConfig {
65 fn default() -> Self {
66 Self {
67 buffer_size: 1024,
68 max_subscribers: 1000,
69 persistent: false,
70 retention_count: 1000,
71 }
72 }
73}
74
75pub struct Channel {
81 id: ChannelId,
82 config: ChannelConfig,
83 sender: broadcast::Sender<Event>,
84 subscribers: RwLock<HashMap<SubscriberId, SubscriberInfo>>,
85 history: RwLock<VecDeque<Event>>,
86 stats: RwLock<ChannelStats>,
87}
88
89impl Channel {
90 pub fn new(id: impl Into<ChannelId>) -> Self {
92 Self::with_config(id, ChannelConfig::default())
93 }
94
95 pub fn with_config(id: impl Into<ChannelId>, config: ChannelConfig) -> Self {
97 let (sender, _) = broadcast::channel(config.buffer_size);
98
99 Self {
100 id: id.into(),
101 config,
102 sender,
103 subscribers: RwLock::new(HashMap::new()),
104 history: RwLock::new(VecDeque::new()),
105 stats: RwLock::new(ChannelStats::default()),
106 }
107 }
108
109 pub fn id(&self) -> &ChannelId {
111 &self.id
112 }
113
114 pub fn publish(&self, event: Event) -> Result<usize, ChannelError> {
116 if self.config.persistent {
117 let mut history = self
118 .history
119 .write()
120 .expect("history RwLock poisoned in publish");
121 history.push_back(event.clone());
122
123 while history.len() > self.config.retention_count {
124 history.pop_front();
125 }
126 }
127
128 let receivers = self.sender.send(event).unwrap_or(0);
129
130 {
131 let mut stats = self
132 .stats
133 .write()
134 .expect("stats RwLock poisoned in publish");
135 stats.events_published += 1;
136 stats.last_event_time = Some(
137 std::time::SystemTime::now()
138 .duration_since(std::time::UNIX_EPOCH)
139 .map(|d| d.as_millis() as u64)
140 .unwrap_or(0),
141 );
142 }
143
144 Ok(receivers)
145 }
146
147 pub fn subscribe(&self, subscriber_id: SubscriberId) -> Result<ChannelReceiver, ChannelError> {
149 let subscribers = self
150 .subscribers
151 .read()
152 .expect("subscribers RwLock poisoned in subscribe (read)");
153 if subscribers.len() >= self.config.max_subscribers {
154 return Err(ChannelError::TooManySubscribers);
155 }
156 drop(subscribers);
157
158 let receiver = self.sender.subscribe();
159
160 {
161 let mut subscribers = self
162 .subscribers
163 .write()
164 .expect("subscribers RwLock poisoned in subscribe (write)");
165 subscribers.insert(
166 subscriber_id.clone(),
167 SubscriberInfo {
168 filter: None,
169 subscribed_at: current_timestamp(),
170 },
171 );
172 }
173
174 {
175 let mut stats = self
176 .stats
177 .write()
178 .expect("stats RwLock poisoned in subscribe");
179 stats.subscriber_count += 1;
180 }
181
182 Ok(ChannelReceiver {
183 receiver,
184 filter: None,
185 })
186 }
187
188 pub fn subscribe_with_filter(
190 &self,
191 subscriber_id: SubscriberId,
192 filter: EventFilter,
193 ) -> Result<ChannelReceiver, ChannelError> {
194 let subscribers = self
195 .subscribers
196 .read()
197 .expect("subscribers RwLock poisoned in subscribe_with_filter (read)");
198 if subscribers.len() >= self.config.max_subscribers {
199 return Err(ChannelError::TooManySubscribers);
200 }
201 drop(subscribers);
202
203 let receiver = self.sender.subscribe();
204
205 {
206 let mut subscribers = self
207 .subscribers
208 .write()
209 .expect("subscribers RwLock poisoned in subscribe_with_filter (write)");
210 subscribers.insert(
211 subscriber_id.clone(),
212 SubscriberInfo {
213 filter: Some(filter.clone()),
214 subscribed_at: current_timestamp(),
215 },
216 );
217 }
218
219 {
220 let mut stats = self
221 .stats
222 .write()
223 .expect("stats RwLock poisoned in subscribe_with_filter");
224 stats.subscriber_count += 1;
225 }
226
227 Ok(ChannelReceiver {
228 receiver,
229 filter: Some(filter),
230 })
231 }
232
233 pub fn unsubscribe(&self, subscriber_id: &SubscriberId) {
235 let mut subscribers = self
236 .subscribers
237 .write()
238 .expect("subscribers RwLock poisoned in unsubscribe");
239 if subscribers.remove(subscriber_id).is_some() {
240 let mut stats = self
241 .stats
242 .write()
243 .expect("stats RwLock poisoned in unsubscribe");
244 stats.subscriber_count = stats.subscriber_count.saturating_sub(1);
245 }
246 }
247
248 pub fn subscriber_count(&self) -> usize {
250 let subscribers = self
251 .subscribers
252 .read()
253 .expect("subscribers RwLock poisoned in subscriber_count");
254 subscribers.len()
255 }
256
257 pub fn get_history(&self, count: usize) -> Vec<Event> {
259 let history = self
260 .history
261 .read()
262 .expect("history RwLock poisoned in get_history");
263 history.iter().rev().take(count).cloned().collect()
264 }
265
266 pub fn get_history_after(&self, timestamp: u64) -> Vec<Event> {
268 let history = self
269 .history
270 .read()
271 .expect("history RwLock poisoned in get_history_after");
272 history
273 .iter()
274 .filter(|e| e.timestamp > timestamp)
275 .cloned()
276 .collect()
277 }
278
279 pub fn stats(&self) -> ChannelStats {
281 let stats = self
282 .stats
283 .read()
284 .expect("stats RwLock poisoned in stats");
285 stats.clone()
286 }
287
288 pub fn clear_history(&self) {
290 let mut history = self
291 .history
292 .write()
293 .expect("history RwLock poisoned in clear_history");
294 history.clear();
295 }
296}
297
298pub struct ChannelReceiver {
304 receiver: broadcast::Receiver<Event>,
305 filter: Option<EventFilter>,
306}
307
308impl ChannelReceiver {
309 pub async fn recv(&mut self) -> Result<Event, ChannelError> {
311 loop {
312 match self.receiver.recv().await {
313 Ok(event) => {
314 if let Some(ref filter) = self.filter {
315 if !event.matches(filter) {
316 continue;
317 }
318 }
319 return Ok(event);
320 }
321 Err(broadcast::error::RecvError::Closed) => {
322 return Err(ChannelError::Closed);
323 }
324 Err(broadcast::error::RecvError::Lagged(n)) => {
325 return Err(ChannelError::Lagged(n));
326 }
327 }
328 }
329 }
330
331 pub fn try_recv(&mut self) -> Result<Option<Event>, ChannelError> {
333 loop {
334 match self.receiver.try_recv() {
335 Ok(event) => {
336 if let Some(ref filter) = self.filter {
337 if !event.matches(filter) {
338 continue;
339 }
340 }
341 return Ok(Some(event));
342 }
343 Err(broadcast::error::TryRecvError::Empty) => {
344 return Ok(None);
345 }
346 Err(broadcast::error::TryRecvError::Closed) => {
347 return Err(ChannelError::Closed);
348 }
349 Err(broadcast::error::TryRecvError::Lagged(n)) => {
350 return Err(ChannelError::Lagged(n));
351 }
352 }
353 }
354 }
355}
356
357#[derive(Debug, Clone)]
362#[allow(dead_code)]
363struct SubscriberInfo {
364 filter: Option<EventFilter>,
365 subscribed_at: u64,
366}
367
368#[derive(Debug, Clone, Default)]
374pub struct ChannelStats {
375 pub events_published: u64,
376 pub subscriber_count: usize,
377 pub last_event_time: Option<u64>,
378}
379
380#[derive(Debug, Clone)]
386pub enum ChannelError {
387 TooManySubscribers,
388 Closed,
389 Lagged(u64),
390 SendFailed,
391}
392
393impl std::fmt::Display for ChannelError {
394 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
395 match self {
396 Self::TooManySubscribers => write!(f, "Maximum subscribers reached"),
397 Self::Closed => write!(f, "Channel is closed"),
398 Self::Lagged(n) => write!(f, "Receiver lagged by {} messages", n),
399 Self::SendFailed => write!(f, "Failed to send event"),
400 }
401 }
402}
403
404impl std::error::Error for ChannelError {}
405
406fn current_timestamp() -> u64 {
407 std::time::SystemTime::now()
408 .duration_since(std::time::UNIX_EPOCH)
409 .map(|d| d.as_millis() as u64)
410 .unwrap_or(0)
411}
412
413#[cfg(test)]
418mod tests {
419 use super::*;
420 use crate::event::EventData;
421
422 #[test]
423 fn test_channel_creation() {
424 let channel = Channel::new("test");
425 assert_eq!(channel.id().as_str(), "test");
426 assert_eq!(channel.subscriber_count(), 0);
427 }
428
429 #[tokio::test]
430 async fn test_publish_subscribe() {
431 let channel = Channel::new("events");
432 let sub_id = SubscriberId::new("sub1");
433
434 let mut receiver = channel.subscribe(sub_id).unwrap();
435
436 let event = Event::new(
437 crate::event::EventType::Created,
438 "test",
439 EventData::String("hello".to_string()),
440 );
441
442 channel.publish(event.clone()).unwrap();
443
444 let received = receiver.recv().await.unwrap();
445 assert_eq!(received.source, "test");
446 }
447
448 #[test]
449 fn test_channel_history() {
450 let config = ChannelConfig {
451 persistent: true,
452 retention_count: 10,
453 ..Default::default()
454 };
455 let channel = Channel::with_config("history_test", config);
456
457 for i in 0..5 {
458 let event = Event::new(
459 crate::event::EventType::Created,
460 "test",
461 EventData::Int(i),
462 );
463 channel.publish(event).unwrap();
464 }
465
466 let history = channel.get_history(10);
467 assert_eq!(history.len(), 5);
468 }
469
470 #[test]
471 fn test_subscriber_limit() {
472 let config = ChannelConfig {
473 max_subscribers: 2,
474 ..Default::default()
475 };
476 let channel = Channel::with_config("limited", config);
477
478 channel.subscribe(SubscriberId::new("sub1")).unwrap();
479 channel.subscribe(SubscriberId::new("sub2")).unwrap();
480
481 let result = channel.subscribe(SubscriberId::new("sub3"));
482 assert!(matches!(result, Err(ChannelError::TooManySubscribers)));
483 }
484}