1use crate::event::{Event, EventFilter};
9use crate::subscriber::{AckMode, SubscriberId};
10use serde::{Deserialize, Serialize};
11use std::collections::{HashMap, HashSet, VecDeque};
12use std::sync::{Arc, RwLock};
13use tokio::sync::broadcast;
14
15#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
21pub struct ChannelId(pub String);
22
23impl ChannelId {
24 pub fn new(id: impl Into<String>) -> Self {
25 Self(id.into())
26 }
27
28 pub fn as_str(&self) -> &str {
29 &self.0
30 }
31}
32
33impl std::fmt::Display for ChannelId {
34 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35 write!(f, "{}", self.0)
36 }
37}
38
39impl From<String> for ChannelId {
40 fn from(s: String) -> Self {
41 Self(s)
42 }
43}
44
45impl From<&str> for ChannelId {
46 fn from(s: &str) -> Self {
47 Self(s.to_string())
48 }
49}
50
51#[derive(Debug, Clone)]
57pub struct ChannelConfig {
58 pub buffer_size: usize,
59 pub max_subscribers: usize,
60 pub persistent: bool,
61 pub retention_count: usize,
62}
63
64impl Default for ChannelConfig {
65 fn default() -> Self {
66 Self {
67 buffer_size: 1024,
68 max_subscribers: 1000,
69 persistent: false,
70 retention_count: 1000,
71 }
72 }
73}
74
75pub struct Channel {
81 id: ChannelId,
82 config: ChannelConfig,
83 sender: broadcast::Sender<Event>,
84 subscribers: RwLock<HashMap<SubscriberId, SubscriberInfo>>,
85 history: RwLock<VecDeque<Event>>,
86 stats: RwLock<ChannelStats>,
87}
88
89impl Channel {
90 pub fn new(id: impl Into<ChannelId>) -> Self {
92 Self::with_config(id, ChannelConfig::default())
93 }
94
95 pub fn with_config(id: impl Into<ChannelId>, config: ChannelConfig) -> Self {
97 let (sender, _) = broadcast::channel(config.buffer_size);
98
99 Self {
100 id: id.into(),
101 config,
102 sender,
103 subscribers: RwLock::new(HashMap::new()),
104 history: RwLock::new(VecDeque::new()),
105 stats: RwLock::new(ChannelStats::default()),
106 }
107 }
108
109 pub fn id(&self) -> &ChannelId {
111 &self.id
112 }
113
114 pub fn publish(&self, event: Event) -> Result<usize, ChannelError> {
116 if self.config.persistent {
117 let mut history = self
118 .history
119 .write()
120 .expect("history RwLock poisoned in publish");
121 history.push_back(event.clone());
122
123 while history.len() > self.config.retention_count {
124 history.pop_front();
125 }
126 }
127
128 let receivers = self.sender.send(event).unwrap_or(0);
129
130 {
131 let mut stats = self
132 .stats
133 .write()
134 .expect("stats RwLock poisoned in publish");
135 stats.events_published += 1;
136 stats.last_event_time = Some(
137 std::time::SystemTime::now()
138 .duration_since(std::time::UNIX_EPOCH)
139 .map(|d| d.as_millis() as u64)
140 .unwrap_or(0),
141 );
142 }
143
144 Ok(receivers)
145 }
146
147 pub fn subscribe(&self, subscriber_id: SubscriberId) -> Result<ChannelReceiver, ChannelError> {
149 self.subscribe_with_ack_mode(subscriber_id, None, AckMode::Auto)
150 }
151
152 pub fn subscribe_with_filter(
154 &self,
155 subscriber_id: SubscriberId,
156 filter: EventFilter,
157 ) -> Result<ChannelReceiver, ChannelError> {
158 self.subscribe_with_ack_mode(subscriber_id, Some(filter), AckMode::Auto)
159 }
160
161 pub fn subscribe_with_ack_mode(
163 &self,
164 subscriber_id: SubscriberId,
165 filter: Option<EventFilter>,
166 ack_mode: AckMode,
167 ) -> Result<ChannelReceiver, ChannelError> {
168 let subscribers = self
169 .subscribers
170 .read()
171 .expect("subscribers RwLock poisoned in subscribe_with_ack_mode (read)");
172 if subscribers.len() >= self.config.max_subscribers {
173 return Err(ChannelError::TooManySubscribers);
174 }
175 drop(subscribers);
176
177 let receiver = self.sender.subscribe();
178
179 {
180 let mut subscribers = self
181 .subscribers
182 .write()
183 .expect("subscribers RwLock poisoned in subscribe_with_ack_mode (write)");
184 subscribers.insert(
185 subscriber_id.clone(),
186 SubscriberInfo {
187 filter: filter.clone(),
188 subscribed_at: current_timestamp(),
189 },
190 );
191 }
192
193 {
194 let mut stats = self
195 .stats
196 .write()
197 .expect("stats RwLock poisoned in subscribe_with_ack_mode");
198 stats.subscriber_count += 1;
199 }
200
201 Ok(ChannelReceiver {
202 receiver,
203 filter,
204 ack_mode,
205 next_offset: 0,
206 ack_state: Arc::new(RwLock::new(AckState {
207 unacked: HashMap::new(),
208 acked: HashSet::new(),
209 })),
210 })
211 }
212
213 pub fn unsubscribe(&self, subscriber_id: &SubscriberId) {
215 let mut subscribers = self
216 .subscribers
217 .write()
218 .expect("subscribers RwLock poisoned in unsubscribe");
219 if subscribers.remove(subscriber_id).is_some() {
220 let mut stats = self
221 .stats
222 .write()
223 .expect("stats RwLock poisoned in unsubscribe");
224 stats.subscriber_count = stats.subscriber_count.saturating_sub(1);
225 }
226 }
227
228 pub fn subscriber_count(&self) -> usize {
230 let subscribers = self
231 .subscribers
232 .read()
233 .expect("subscribers RwLock poisoned in subscriber_count");
234 subscribers.len()
235 }
236
237 pub fn get_history(&self, count: usize) -> Vec<Event> {
239 let history = self
240 .history
241 .read()
242 .expect("history RwLock poisoned in get_history");
243 history.iter().rev().take(count).cloned().collect()
244 }
245
246 pub fn get_history_after(&self, timestamp: u64) -> Vec<Event> {
248 let history = self
249 .history
250 .read()
251 .expect("history RwLock poisoned in get_history_after");
252 history
253 .iter()
254 .filter(|e| e.timestamp > timestamp)
255 .cloned()
256 .collect()
257 }
258
259 pub fn stats(&self) -> ChannelStats {
261 let stats = self.stats.read().expect("stats RwLock poisoned in stats");
262 stats.clone()
263 }
264
265 pub fn clear_history(&self) {
267 let mut history = self
268 .history
269 .write()
270 .expect("history RwLock poisoned in clear_history");
271 history.clear();
272 }
273}
274
275#[derive(Debug)]
281struct AckState {
282 unacked: HashMap<u64, Event>,
284 acked: HashSet<u64>,
286}
287
288pub struct ChannelReceiver {
290 receiver: broadcast::Receiver<Event>,
291 filter: Option<EventFilter>,
292 ack_mode: AckMode,
293 next_offset: u64,
295 ack_state: Arc<RwLock<AckState>>,
297}
298
299impl ChannelReceiver {
300 pub fn ack_mode(&self) -> AckMode {
302 self.ack_mode
303 }
304
305 pub fn current_offset(&self) -> u64 {
307 self.next_offset
308 }
309
310 pub async fn recv(&mut self) -> Result<Event, ChannelError> {
316 loop {
317 match self.receiver.recv().await {
318 Ok(event) => {
319 if let Some(ref filter) = self.filter {
320 if !event.matches(filter) {
321 continue;
322 }
323 }
324 let offset = self.next_offset;
325 self.next_offset += 1;
326
327 match self.ack_mode {
328 AckMode::Auto => {
329 let mut state = self
331 .ack_state
332 .write()
333 .expect("ack_state RwLock poisoned in recv");
334 state.acked.insert(offset);
335 }
336 AckMode::Manual => {
337 let mut state = self
339 .ack_state
340 .write()
341 .expect("ack_state RwLock poisoned in recv");
342 state.unacked.insert(offset, event.clone());
343 }
344 AckMode::None => {
345 }
347 }
348
349 return Ok(event);
350 }
351 Err(broadcast::error::RecvError::Closed) => {
352 return Err(ChannelError::Closed);
353 }
354 Err(broadcast::error::RecvError::Lagged(n)) => {
355 return Err(ChannelError::Lagged(n));
356 }
357 }
358 }
359 }
360
361 pub fn try_recv(&mut self) -> Result<Option<Event>, ChannelError> {
363 loop {
364 match self.receiver.try_recv() {
365 Ok(event) => {
366 if let Some(ref filter) = self.filter {
367 if !event.matches(filter) {
368 continue;
369 }
370 }
371 let offset = self.next_offset;
372 self.next_offset += 1;
373
374 match self.ack_mode {
375 AckMode::Auto => {
376 let mut state = self
377 .ack_state
378 .write()
379 .expect("ack_state RwLock poisoned in try_recv");
380 state.acked.insert(offset);
381 }
382 AckMode::Manual => {
383 let mut state = self
384 .ack_state
385 .write()
386 .expect("ack_state RwLock poisoned in try_recv");
387 state.unacked.insert(offset, event.clone());
388 }
389 AckMode::None => {}
390 }
391
392 return Ok(Some(event));
393 }
394 Err(broadcast::error::TryRecvError::Empty) => {
395 return Ok(None);
396 }
397 Err(broadcast::error::TryRecvError::Closed) => {
398 return Err(ChannelError::Closed);
399 }
400 Err(broadcast::error::TryRecvError::Lagged(n)) => {
401 return Err(ChannelError::Lagged(n));
402 }
403 }
404 }
405 }
406
407 pub fn ack(&self, offset: u64) -> Result<(), ChannelError> {
412 let mut state = self
413 .ack_state
414 .write()
415 .expect("ack_state RwLock poisoned in ack");
416 if state.unacked.remove(&offset).is_some() {
417 state.acked.insert(offset);
418 Ok(())
419 } else if state.acked.contains(&offset) {
420 Ok(())
422 } else {
423 Err(ChannelError::NotFound(offset))
424 }
425 }
426
427 pub fn unacked_count(&self) -> usize {
429 let state = self
430 .ack_state
431 .read()
432 .expect("ack_state RwLock poisoned in unacked_count");
433 state.unacked.len()
434 }
435
436 pub fn acked_count(&self) -> usize {
438 let state = self
439 .ack_state
440 .read()
441 .expect("ack_state RwLock poisoned in acked_count");
442 state.acked.len()
443 }
444
445 pub fn get_unacked_messages(&self) -> Vec<(u64, Event)> {
450 let state = self
451 .ack_state
452 .read()
453 .expect("ack_state RwLock poisoned in get_unacked_messages");
454 let mut messages: Vec<(u64, Event)> = state
455 .unacked
456 .iter()
457 .map(|(offset, event)| (*offset, event.clone()))
458 .collect();
459 messages.sort_by_key(|(offset, _)| *offset);
460 messages
461 }
462}
463
464#[derive(Debug, Clone)]
469#[allow(dead_code)]
470struct SubscriberInfo {
471 filter: Option<EventFilter>,
472 subscribed_at: u64,
473}
474
475#[derive(Debug, Clone, Default)]
481pub struct ChannelStats {
482 pub events_published: u64,
483 pub subscriber_count: usize,
484 pub last_event_time: Option<u64>,
485}
486
487#[derive(Debug, Clone)]
493pub enum ChannelError {
494 TooManySubscribers,
495 Closed,
496 Lagged(u64),
497 SendFailed,
498 NotFound(u64),
500}
501
502impl std::fmt::Display for ChannelError {
503 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
504 match self {
505 Self::TooManySubscribers => write!(f, "Maximum subscribers reached"),
506 Self::Closed => write!(f, "Channel is closed"),
507 Self::Lagged(n) => write!(f, "Receiver lagged by {} messages", n),
508 Self::SendFailed => write!(f, "Failed to send event"),
509 Self::NotFound(offset) => {
510 write!(f, "Message at offset {} not found in unacked set", offset)
511 }
512 }
513 }
514}
515
516impl std::error::Error for ChannelError {}
517
518fn current_timestamp() -> u64 {
519 std::time::SystemTime::now()
520 .duration_since(std::time::UNIX_EPOCH)
521 .map(|d| d.as_millis() as u64)
522 .unwrap_or(0)
523}
524
525#[cfg(test)]
530mod tests {
531 use super::*;
532 use crate::event::EventData;
533
534 #[test]
535 fn test_channel_creation() {
536 let channel = Channel::new("test");
537 assert_eq!(channel.id().as_str(), "test");
538 assert_eq!(channel.subscriber_count(), 0);
539 }
540
541 #[tokio::test]
542 async fn test_publish_subscribe() {
543 let channel = Channel::new("events");
544 let sub_id = SubscriberId::new("sub1");
545
546 let mut receiver = channel.subscribe(sub_id).unwrap();
547
548 let event = Event::new(
549 crate::event::EventType::Created,
550 "test",
551 EventData::String("hello".to_string()),
552 );
553
554 channel.publish(event.clone()).unwrap();
555
556 let received = receiver.recv().await.unwrap();
557 assert_eq!(received.source, "test");
558 }
559
560 #[test]
561 fn test_channel_history() {
562 let config = ChannelConfig {
563 persistent: true,
564 retention_count: 10,
565 ..Default::default()
566 };
567 let channel = Channel::with_config("history_test", config);
568
569 for i in 0..5 {
570 let event = Event::new(crate::event::EventType::Created, "test", EventData::Int(i));
571 channel.publish(event).unwrap();
572 }
573
574 let history = channel.get_history(10);
575 assert_eq!(history.len(), 5);
576 }
577
578 #[test]
579 fn test_subscriber_limit() {
580 let config = ChannelConfig {
581 max_subscribers: 2,
582 ..Default::default()
583 };
584 let channel = Channel::with_config("limited", config);
585
586 channel.subscribe(SubscriberId::new("sub1")).unwrap();
587 channel.subscribe(SubscriberId::new("sub2")).unwrap();
588
589 let result = channel.subscribe(SubscriberId::new("sub3"));
590 assert!(matches!(result, Err(ChannelError::TooManySubscribers)));
591 }
592
593 #[tokio::test]
594 async fn test_auto_ack_mode() {
595 use crate::subscriber::AckMode;
596
597 let channel = Channel::new("auto_ack_test");
598 let sub_id = SubscriberId::new("sub1");
599
600 let mut receiver = channel
601 .subscribe_with_ack_mode(sub_id, None, AckMode::Auto)
602 .unwrap();
603
604 assert_eq!(receiver.ack_mode(), AckMode::Auto);
605
606 let event = Event::new(
607 crate::event::EventType::Created,
608 "test",
609 EventData::String("auto".to_string()),
610 );
611 channel.publish(event).unwrap();
612
613 let _received = receiver.recv().await.unwrap();
614
615 assert_eq!(receiver.unacked_count(), 0);
617 assert_eq!(receiver.acked_count(), 1);
618 }
619
620 #[tokio::test]
621 async fn test_manual_ack_mode() {
622 use crate::subscriber::AckMode;
623
624 let channel = Channel::new("manual_ack_test");
625 let sub_id = SubscriberId::new("sub1");
626
627 let mut receiver = channel
628 .subscribe_with_ack_mode(sub_id, None, AckMode::Manual)
629 .unwrap();
630
631 assert_eq!(receiver.ack_mode(), AckMode::Manual);
632
633 for i in 0..2 {
635 let event = Event::new(crate::event::EventType::Created, "test", EventData::Int(i));
636 channel.publish(event).unwrap();
637 }
638
639 let _ev0 = receiver.recv().await.unwrap();
641 let _ev1 = receiver.recv().await.unwrap();
642
643 assert_eq!(receiver.unacked_count(), 2);
645 assert_eq!(receiver.acked_count(), 0);
646
647 receiver.ack(0).unwrap();
649 assert_eq!(receiver.unacked_count(), 1);
650 assert_eq!(receiver.acked_count(), 1);
651
652 receiver.ack(1).unwrap();
654 assert_eq!(receiver.unacked_count(), 0);
655 assert_eq!(receiver.acked_count(), 2);
656
657 receiver.ack(0).unwrap();
659 assert_eq!(receiver.acked_count(), 2);
660 }
661
662 #[tokio::test]
663 async fn test_manual_ack_redelivery() {
664 use crate::subscriber::AckMode;
665
666 let channel = Channel::new("redeliver_test");
667 let sub_id = SubscriberId::new("sub1");
668
669 let mut receiver = channel
670 .subscribe_with_ack_mode(sub_id, None, AckMode::Manual)
671 .unwrap();
672
673 for i in 0..3 {
675 let event = Event::new(crate::event::EventType::Created, "test", EventData::Int(i));
676 channel.publish(event).unwrap();
677 }
678
679 let _ev0 = receiver.recv().await.unwrap();
681 let _ev1 = receiver.recv().await.unwrap();
682 let _ev2 = receiver.recv().await.unwrap();
683
684 receiver.ack(1).unwrap();
686
687 let unacked = receiver.get_unacked_messages();
689 assert_eq!(unacked.len(), 2);
690 assert_eq!(unacked[0].0, 0);
692 assert_eq!(unacked[1].0, 2);
693 }
694
695 #[tokio::test]
696 async fn test_ack_not_found() {
697 use crate::subscriber::AckMode;
698
699 let channel = Channel::new("ack_notfound_test");
700 let sub_id = SubscriberId::new("sub1");
701
702 let receiver = channel
703 .subscribe_with_ack_mode(sub_id, None, AckMode::Manual)
704 .unwrap();
705
706 let result = receiver.ack(999);
708 assert!(matches!(result, Err(ChannelError::NotFound(999))));
709 }
710
711 #[tokio::test]
712 async fn test_none_ack_mode() {
713 use crate::subscriber::AckMode;
714
715 let channel = Channel::new("none_ack_test");
716 let sub_id = SubscriberId::new("sub1");
717
718 let mut receiver = channel
719 .subscribe_with_ack_mode(sub_id, None, AckMode::None)
720 .unwrap();
721
722 let event = Event::new(
723 crate::event::EventType::Created,
724 "test",
725 EventData::String("none".to_string()),
726 );
727 channel.publish(event).unwrap();
728
729 let _received = receiver.recv().await.unwrap();
730
731 assert_eq!(receiver.unacked_count(), 0);
733 assert_eq!(receiver.acked_count(), 0);
734 }
735}