1use crate::event::{Event, EventFilter};
9use crate::subscriber::SubscriberId;
10use serde::{Deserialize, Serialize};
11use std::collections::{HashMap, VecDeque};
12use std::sync::RwLock;
13use tokio::sync::broadcast;
14
15#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
21pub struct ChannelId(pub String);
22
23impl ChannelId {
24 pub fn new(id: impl Into<String>) -> Self {
25 Self(id.into())
26 }
27
28 pub fn as_str(&self) -> &str {
29 &self.0
30 }
31}
32
33impl std::fmt::Display for ChannelId {
34 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35 write!(f, "{}", self.0)
36 }
37}
38
39impl From<String> for ChannelId {
40 fn from(s: String) -> Self {
41 Self(s)
42 }
43}
44
45impl From<&str> for ChannelId {
46 fn from(s: &str) -> Self {
47 Self(s.to_string())
48 }
49}
50
51#[derive(Debug, Clone)]
57pub struct ChannelConfig {
58 pub buffer_size: usize,
59 pub max_subscribers: usize,
60 pub persistent: bool,
61 pub retention_count: usize,
62}
63
64impl Default for ChannelConfig {
65 fn default() -> Self {
66 Self {
67 buffer_size: 1024,
68 max_subscribers: 1000,
69 persistent: false,
70 retention_count: 1000,
71 }
72 }
73}
74
75pub struct Channel {
81 id: ChannelId,
82 config: ChannelConfig,
83 sender: broadcast::Sender<Event>,
84 subscribers: RwLock<HashMap<SubscriberId, SubscriberInfo>>,
85 history: RwLock<VecDeque<Event>>,
86 stats: RwLock<ChannelStats>,
87}
88
89impl Channel {
90 pub fn new(id: impl Into<ChannelId>) -> Self {
92 Self::with_config(id, ChannelConfig::default())
93 }
94
95 pub fn with_config(id: impl Into<ChannelId>, config: ChannelConfig) -> Self {
97 let (sender, _) = broadcast::channel(config.buffer_size);
98
99 Self {
100 id: id.into(),
101 config,
102 sender,
103 subscribers: RwLock::new(HashMap::new()),
104 history: RwLock::new(VecDeque::new()),
105 stats: RwLock::new(ChannelStats::default()),
106 }
107 }
108
109 pub fn id(&self) -> &ChannelId {
111 &self.id
112 }
113
114 pub fn publish(&self, event: Event) -> Result<usize, ChannelError> {
116 if self.config.persistent {
117 let mut history = self.history.write().unwrap();
118 history.push_back(event.clone());
119
120 while history.len() > self.config.retention_count {
121 history.pop_front();
122 }
123 }
124
125 let receivers = self.sender.send(event).unwrap_or(0);
126
127 {
128 let mut stats = self.stats.write().unwrap();
129 stats.events_published += 1;
130 stats.last_event_time = Some(
131 std::time::SystemTime::now()
132 .duration_since(std::time::UNIX_EPOCH)
133 .map(|d| d.as_millis() as u64)
134 .unwrap_or(0),
135 );
136 }
137
138 Ok(receivers)
139 }
140
141 pub fn subscribe(&self, subscriber_id: SubscriberId) -> Result<ChannelReceiver, ChannelError> {
143 let subscribers = self.subscribers.read().unwrap();
144 if subscribers.len() >= self.config.max_subscribers {
145 return Err(ChannelError::TooManySubscribers);
146 }
147 drop(subscribers);
148
149 let receiver = self.sender.subscribe();
150
151 {
152 let mut subscribers = self.subscribers.write().unwrap();
153 subscribers.insert(
154 subscriber_id.clone(),
155 SubscriberInfo {
156 filter: None,
157 subscribed_at: current_timestamp(),
158 },
159 );
160 }
161
162 {
163 let mut stats = self.stats.write().unwrap();
164 stats.subscriber_count += 1;
165 }
166
167 Ok(ChannelReceiver {
168 receiver,
169 filter: None,
170 })
171 }
172
173 pub fn subscribe_with_filter(
175 &self,
176 subscriber_id: SubscriberId,
177 filter: EventFilter,
178 ) -> Result<ChannelReceiver, ChannelError> {
179 let subscribers = self.subscribers.read().unwrap();
180 if subscribers.len() >= self.config.max_subscribers {
181 return Err(ChannelError::TooManySubscribers);
182 }
183 drop(subscribers);
184
185 let receiver = self.sender.subscribe();
186
187 {
188 let mut subscribers = self.subscribers.write().unwrap();
189 subscribers.insert(
190 subscriber_id.clone(),
191 SubscriberInfo {
192 filter: Some(filter.clone()),
193 subscribed_at: current_timestamp(),
194 },
195 );
196 }
197
198 {
199 let mut stats = self.stats.write().unwrap();
200 stats.subscriber_count += 1;
201 }
202
203 Ok(ChannelReceiver {
204 receiver,
205 filter: Some(filter),
206 })
207 }
208
209 pub fn unsubscribe(&self, subscriber_id: &SubscriberId) {
211 let mut subscribers = self.subscribers.write().unwrap();
212 if subscribers.remove(subscriber_id).is_some() {
213 let mut stats = self.stats.write().unwrap();
214 stats.subscriber_count = stats.subscriber_count.saturating_sub(1);
215 }
216 }
217
218 pub fn subscriber_count(&self) -> usize {
220 let subscribers = self.subscribers.read().unwrap();
221 subscribers.len()
222 }
223
224 pub fn get_history(&self, count: usize) -> Vec<Event> {
226 let history = self.history.read().unwrap();
227 history.iter().rev().take(count).cloned().collect()
228 }
229
230 pub fn get_history_after(&self, timestamp: u64) -> Vec<Event> {
232 let history = self.history.read().unwrap();
233 history
234 .iter()
235 .filter(|e| e.timestamp > timestamp)
236 .cloned()
237 .collect()
238 }
239
240 pub fn stats(&self) -> ChannelStats {
242 let stats = self.stats.read().unwrap();
243 stats.clone()
244 }
245
246 pub fn clear_history(&self) {
248 let mut history = self.history.write().unwrap();
249 history.clear();
250 }
251}
252
253pub struct ChannelReceiver {
259 receiver: broadcast::Receiver<Event>,
260 filter: Option<EventFilter>,
261}
262
263impl ChannelReceiver {
264 pub async fn recv(&mut self) -> Result<Event, ChannelError> {
266 loop {
267 match self.receiver.recv().await {
268 Ok(event) => {
269 if let Some(ref filter) = self.filter {
270 if !event.matches(filter) {
271 continue;
272 }
273 }
274 return Ok(event);
275 }
276 Err(broadcast::error::RecvError::Closed) => {
277 return Err(ChannelError::Closed);
278 }
279 Err(broadcast::error::RecvError::Lagged(n)) => {
280 return Err(ChannelError::Lagged(n));
281 }
282 }
283 }
284 }
285
286 pub fn try_recv(&mut self) -> Result<Option<Event>, ChannelError> {
288 loop {
289 match self.receiver.try_recv() {
290 Ok(event) => {
291 if let Some(ref filter) = self.filter {
292 if !event.matches(filter) {
293 continue;
294 }
295 }
296 return Ok(Some(event));
297 }
298 Err(broadcast::error::TryRecvError::Empty) => {
299 return Ok(None);
300 }
301 Err(broadcast::error::TryRecvError::Closed) => {
302 return Err(ChannelError::Closed);
303 }
304 Err(broadcast::error::TryRecvError::Lagged(n)) => {
305 return Err(ChannelError::Lagged(n));
306 }
307 }
308 }
309 }
310}
311
312#[derive(Debug, Clone)]
317#[allow(dead_code)]
318struct SubscriberInfo {
319 filter: Option<EventFilter>,
320 subscribed_at: u64,
321}
322
323#[derive(Debug, Clone, Default)]
329pub struct ChannelStats {
330 pub events_published: u64,
331 pub subscriber_count: usize,
332 pub last_event_time: Option<u64>,
333}
334
335#[derive(Debug, Clone)]
341pub enum ChannelError {
342 TooManySubscribers,
343 Closed,
344 Lagged(u64),
345 SendFailed,
346}
347
348impl std::fmt::Display for ChannelError {
349 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
350 match self {
351 Self::TooManySubscribers => write!(f, "Maximum subscribers reached"),
352 Self::Closed => write!(f, "Channel is closed"),
353 Self::Lagged(n) => write!(f, "Receiver lagged by {} messages", n),
354 Self::SendFailed => write!(f, "Failed to send event"),
355 }
356 }
357}
358
359impl std::error::Error for ChannelError {}
360
361fn current_timestamp() -> u64 {
362 std::time::SystemTime::now()
363 .duration_since(std::time::UNIX_EPOCH)
364 .map(|d| d.as_millis() as u64)
365 .unwrap_or(0)
366}
367
368#[cfg(test)]
373mod tests {
374 use super::*;
375 use crate::event::EventData;
376
377 #[test]
378 fn test_channel_creation() {
379 let channel = Channel::new("test");
380 assert_eq!(channel.id().as_str(), "test");
381 assert_eq!(channel.subscriber_count(), 0);
382 }
383
384 #[tokio::test]
385 async fn test_publish_subscribe() {
386 let channel = Channel::new("events");
387 let sub_id = SubscriberId::new("sub1");
388
389 let mut receiver = channel.subscribe(sub_id).unwrap();
390
391 let event = Event::new(
392 crate::event::EventType::Created,
393 "test",
394 EventData::String("hello".to_string()),
395 );
396
397 channel.publish(event.clone()).unwrap();
398
399 let received = receiver.recv().await.unwrap();
400 assert_eq!(received.source, "test");
401 }
402
403 #[test]
404 fn test_channel_history() {
405 let config = ChannelConfig {
406 persistent: true,
407 retention_count: 10,
408 ..Default::default()
409 };
410 let channel = Channel::with_config("history_test", config);
411
412 for i in 0..5 {
413 let event = Event::new(
414 crate::event::EventType::Created,
415 "test",
416 EventData::Int(i),
417 );
418 channel.publish(event).unwrap();
419 }
420
421 let history = channel.get_history(10);
422 assert_eq!(history.len(), 5);
423 }
424
425 #[test]
426 fn test_subscriber_limit() {
427 let config = ChannelConfig {
428 max_subscribers: 2,
429 ..Default::default()
430 };
431 let channel = Channel::with_config("limited", config);
432
433 channel.subscribe(SubscriberId::new("sub1")).unwrap();
434 channel.subscribe(SubscriberId::new("sub2")).unwrap();
435
436 let result = channel.subscribe(SubscriberId::new("sub3"));
437 assert!(matches!(result, Err(ChannelError::TooManySubscribers)));
438 }
439}