1use std::cell::UnsafeCell;
40use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
41use std::sync::RwLock;
42use std::time::Duration;
43
44use crate::tpc::CachePadded;
45
46pub const DEFAULT_BROADCAST_CAPACITY: usize = 1024;
48
49pub const DEFAULT_MAX_SUBSCRIBERS: usize = 64;
51
52pub const DEFAULT_SLOW_SUBSCRIBER_TIMEOUT: Duration = Duration::from_millis(100);
54
55pub const DEFAULT_LAG_WARNING_THRESHOLD: u64 = 1000;
57
58#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
63pub enum SlowSubscriberPolicy {
64 #[default]
69 Block,
70
71 DropSlow,
76
77 SkipForSlow,
82}
83
84#[derive(Debug, Clone)]
86pub struct BroadcastConfig {
87 pub capacity: usize,
89
90 pub max_subscribers: usize,
92
93 pub slow_subscriber_policy: SlowSubscriberPolicy,
95
96 pub slow_subscriber_timeout: Duration,
98
99 pub lag_warning_threshold: u64,
101}
102
103impl Default for BroadcastConfig {
104 fn default() -> Self {
105 Self {
106 capacity: DEFAULT_BROADCAST_CAPACITY,
107 max_subscribers: DEFAULT_MAX_SUBSCRIBERS,
108 slow_subscriber_policy: SlowSubscriberPolicy::Block,
109 slow_subscriber_timeout: DEFAULT_SLOW_SUBSCRIBER_TIMEOUT,
110 lag_warning_threshold: DEFAULT_LAG_WARNING_THRESHOLD,
111 }
112 }
113}
114
115impl BroadcastConfig {
116 #[must_use]
118 pub fn with_capacity(capacity: usize) -> Self {
119 Self {
120 capacity,
121 ..Default::default()
122 }
123 }
124
125 #[must_use]
127 pub fn builder() -> BroadcastConfigBuilder {
128 BroadcastConfigBuilder::default()
129 }
130
131 #[must_use]
133 pub fn effective_capacity(&self) -> usize {
134 self.capacity.max(4).next_power_of_two()
135 }
136}
137
138#[derive(Debug, Default)]
140pub struct BroadcastConfigBuilder {
141 capacity: Option<usize>,
142 max_subscribers: Option<usize>,
143 slow_subscriber_policy: Option<SlowSubscriberPolicy>,
144 slow_subscriber_timeout: Option<Duration>,
145 lag_warning_threshold: Option<u64>,
146}
147
148impl BroadcastConfigBuilder {
149 #[must_use]
151 pub fn capacity(mut self, capacity: usize) -> Self {
152 self.capacity = Some(capacity);
153 self
154 }
155
156 #[must_use]
158 pub fn max_subscribers(mut self, max: usize) -> Self {
159 self.max_subscribers = Some(max);
160 self
161 }
162
163 #[must_use]
165 pub fn slow_subscriber_policy(mut self, policy: SlowSubscriberPolicy) -> Self {
166 self.slow_subscriber_policy = Some(policy);
167 self
168 }
169
170 #[must_use]
172 pub fn slow_subscriber_timeout(mut self, timeout: Duration) -> Self {
173 self.slow_subscriber_timeout = Some(timeout);
174 self
175 }
176
177 #[must_use]
179 pub fn lag_warning_threshold(mut self, threshold: u64) -> Self {
180 self.lag_warning_threshold = Some(threshold);
181 self
182 }
183
184 #[must_use]
186 pub fn build(self) -> BroadcastConfig {
187 BroadcastConfig {
188 capacity: self.capacity.unwrap_or(DEFAULT_BROADCAST_CAPACITY),
189 max_subscribers: self.max_subscribers.unwrap_or(DEFAULT_MAX_SUBSCRIBERS),
190 slow_subscriber_policy: self.slow_subscriber_policy.unwrap_or_default(),
191 slow_subscriber_timeout: self
192 .slow_subscriber_timeout
193 .unwrap_or(DEFAULT_SLOW_SUBSCRIBER_TIMEOUT),
194 lag_warning_threshold: self
195 .lag_warning_threshold
196 .unwrap_or(DEFAULT_LAG_WARNING_THRESHOLD),
197 }
198 }
199}
200
201#[derive(Debug, thiserror::Error)]
203pub enum BroadcastError {
204 #[error("maximum subscribers ({0}) reached")]
206 MaxSubscribersReached(usize),
207
208 #[error("slow subscriber timeout after {0:?}")]
210 SlowSubscriberTimeout(Duration),
211
212 #[error("no active subscribers")]
214 NoSubscribers,
215
216 #[error("subscriber {0} not found")]
218 SubscriberNotFound(usize),
219
220 #[error("buffer full")]
222 BufferFull,
223
224 #[error("channel closed")]
226 Closed,
227}
228
229#[repr(C, align(64))]
246struct CursorSlot {
247 active: AtomicBool,
249 read_seq: AtomicU64,
251 _pad: [u8; 48],
253}
254
255impl CursorSlot {
256 const fn empty() -> Self {
258 Self {
259 active: AtomicBool::new(false),
260 read_seq: AtomicU64::new(0),
261 _pad: [0; 48],
262 }
263 }
264
265 #[inline]
269 fn try_claim(&self, start_seq: u64) -> bool {
270 if self
272 .active
273 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
274 .is_ok()
275 {
276 self.read_seq.store(start_seq, Ordering::Release);
277 true
278 } else {
279 false
280 }
281 }
282
283 #[inline]
284 fn is_active(&self) -> bool {
285 self.active.load(Ordering::Acquire)
286 }
287
288 #[inline]
289 fn deactivate(&self) {
290 self.active.store(false, Ordering::Release);
291 }
292
293 #[inline]
294 fn read_position(&self) -> u64 {
295 self.read_seq.load(Ordering::Acquire)
296 }
297}
298
299impl Default for CursorSlot {
300 fn default() -> Self {
301 Self::empty()
302 }
303}
304
305pub struct BroadcastChannel<T> {
349 buffer: Box<[UnsafeCell<Option<T>>]>,
351
352 write_seq: CachePadded<AtomicU64>,
355
356 cursor_slots: Box<[CursorSlot]>,
360
361 cursor_names: RwLock<Vec<String>>,
365
366 active_count: AtomicUsize,
369
370 next_slot_hint: AtomicUsize,
373
374 config: BroadcastConfig,
376
377 capacity: usize,
379
380 mask: usize,
382
383 closed: AtomicBool,
385}
386
387unsafe impl<T: Send> Send for BroadcastChannel<T> {}
393unsafe impl<T: Send> Sync for BroadcastChannel<T> {}
396
397impl<T> BroadcastChannel<T> {
398 #[must_use]
412 pub fn new(config: BroadcastConfig) -> Self {
413 let capacity = config.effective_capacity();
414 let mask = capacity - 1;
415 let max_subscribers = config.max_subscribers;
416
417 let buffer: Vec<UnsafeCell<Option<T>>> =
418 (0..capacity).map(|_| UnsafeCell::new(None)).collect();
419
420 let cursor_slots: Vec<CursorSlot> =
422 (0..max_subscribers).map(|_| CursorSlot::empty()).collect();
423
424 let cursor_names: Vec<String> = (0..max_subscribers).map(|_| String::new()).collect();
426
427 Self {
428 buffer: buffer.into_boxed_slice(),
429 write_seq: CachePadded::new(AtomicU64::new(0)),
430 cursor_slots: cursor_slots.into_boxed_slice(),
431 cursor_names: RwLock::new(cursor_names),
432 active_count: AtomicUsize::new(0),
433 next_slot_hint: AtomicUsize::new(0),
434 config,
435 capacity,
436 mask,
437 closed: AtomicBool::new(false),
438 }
439 }
440
441 #[must_use]
447 pub fn slowest_cursor(&self) -> u64 {
448 let mut min_pos = u64::MAX;
449 for slot in &*self.cursor_slots {
450 if slot.is_active() {
451 let pos = slot.read_position();
452 if pos < min_pos {
453 min_pos = pos;
454 }
455 }
456 }
457 min_pos
458 }
459
460 #[must_use]
466 pub fn subscriber_lag(&self, subscriber_id: usize) -> u64 {
467 if subscriber_id >= self.cursor_slots.len() {
468 return 0;
469 }
470 let slot = &self.cursor_slots[subscriber_id];
471 if !slot.is_active() {
472 return 0;
473 }
474 let write_pos = self.write_seq.load(Ordering::Acquire);
475 let read_pos = slot.read_position();
476 write_pos.saturating_sub(read_pos)
477 }
478
479 #[must_use]
483 pub fn subscriber_count(&self) -> usize {
484 self.active_count.load(Ordering::Acquire)
485 }
486
487 #[must_use]
491 pub fn is_lagging(&self, subscriber_id: usize) -> bool {
492 self.subscriber_lag(subscriber_id) >= self.config.lag_warning_threshold
493 }
494
495 #[must_use]
497 pub fn write_position(&self) -> u64 {
498 self.write_seq.load(Ordering::Relaxed)
499 }
500
501 #[must_use]
503 pub fn capacity(&self) -> usize {
504 self.capacity
505 }
506
507 #[must_use]
509 pub fn config(&self) -> &BroadcastConfig {
510 &self.config
511 }
512
513 #[must_use]
515 pub fn is_closed(&self) -> bool {
516 self.closed.load(Ordering::Acquire)
517 }
518
519 pub fn close(&self) {
524 self.closed.store(true, Ordering::Release);
525 }
526
527 #[must_use]
537 pub fn subscriber_info(&self, subscriber_id: usize) -> Option<SubscriberInfo> {
538 if subscriber_id >= self.cursor_slots.len() {
539 return None;
540 }
541 let slot = &self.cursor_slots[subscriber_id];
542 let names = self.cursor_names.read().unwrap();
543 let write_pos = self.write_seq.load(Ordering::Acquire);
544 let read_pos = slot.read_position();
545 let active = slot.is_active();
546
547 Some(SubscriberInfo {
549 id: subscriber_id,
550 name: names[subscriber_id].clone(),
551 active,
552 read_position: read_pos,
553 lag: write_pos.saturating_sub(read_pos),
554 })
555 }
556
557 #[must_use]
567 pub fn list_subscribers(&self) -> Vec<SubscriberInfo> {
568 let names = self.cursor_names.read().unwrap();
569 let write_pos = self.write_seq.load(Ordering::Acquire);
570
571 self.cursor_slots
572 .iter()
573 .enumerate()
574 .filter(|(_, slot)| slot.is_active())
575 .map(|(id, slot)| {
576 let read_pos = slot.read_position();
577 SubscriberInfo {
578 id,
579 name: names[id].clone(),
580 active: true,
581 read_position: read_pos,
582 lag: write_pos.saturating_sub(read_pos),
583 }
584 })
585 .collect()
586 }
587
588 #[inline]
590 fn slot_index(&self, seq: u64) -> usize {
591 #[allow(clippy::cast_possible_truncation)]
593 let idx = (seq as usize) & self.mask;
594 idx
595 }
596
597 pub fn unsubscribe(&self, subscriber_id: usize) {
605 if subscriber_id < self.cursor_slots.len() {
606 let slot = &self.cursor_slots[subscriber_id];
607 if slot.is_active() {
608 slot.deactivate();
609 self.active_count.fetch_sub(1, Ordering::Release);
610 }
611 }
612 }
613}
614
615impl<T: Clone> BroadcastChannel<T> {
616 pub fn broadcast(&self, value: T) -> Result<(), BroadcastError> {
634 if self.closed.load(Ordering::Acquire) {
635 return Err(BroadcastError::Closed);
636 }
637
638 let write_pos = self.write_seq.load(Ordering::Relaxed);
639 let slot_idx = self.slot_index(write_pos);
640
641 let min_read = self.slowest_cursor();
643 if min_read == u64::MAX {
644 return Err(BroadcastError::NoSubscribers);
645 }
646
647 if write_pos >= min_read + self.capacity as u64 {
649 self.handle_slow_subscriber(write_pos)?;
650 }
651
652 unsafe { *self.buffer[slot_idx].get() = Some(value) };
656
657 self.write_seq.store(write_pos + 1, Ordering::Release);
659
660 Ok(())
661 }
662
663 pub fn subscribe(&self, name: impl Into<String>) -> Result<usize, BroadcastError> {
684 let start_seq = self.write_seq.load(Ordering::Acquire);
685 let max_slots = self.cursor_slots.len();
686
687 let hint = self.next_slot_hint.load(Ordering::Relaxed) % max_slots;
689
690 for offset in 0..max_slots {
692 let slot_id = (hint + offset) % max_slots;
693 let slot = &self.cursor_slots[slot_id];
694
695 if slot.try_claim(start_seq) {
696 {
698 let mut names = self.cursor_names.write().unwrap();
699 names[slot_id] = name.into();
700 }
701
702 self.next_slot_hint
704 .store((slot_id + 1) % max_slots, Ordering::Relaxed);
705
706 self.active_count.fetch_add(1, Ordering::Release);
708
709 return Ok(slot_id);
710 }
711 }
712
713 Err(BroadcastError::MaxSubscribersReached(max_slots))
715 }
716
717 #[inline]
728 pub fn read(&self, subscriber_id: usize) -> Option<T> {
729 if subscriber_id >= self.cursor_slots.len() {
731 return None;
732 }
733
734 let slot = &self.cursor_slots[subscriber_id];
735
736 if !slot.is_active() {
737 return None;
738 }
739
740 let read_pos = slot.read_seq.load(Ordering::Relaxed);
741 let write_pos = self.write_seq.load(Ordering::Acquire);
742
743 if read_pos >= write_pos {
744 return None; }
746
747 let buffer_idx = self.slot_index(read_pos);
748
749 let value = unsafe { (*self.buffer[buffer_idx].get()).as_ref()?.clone() };
753
754 slot.read_seq.store(read_pos + 1, Ordering::Release);
756
757 Some(value)
758 }
759
760 #[inline]
772 pub fn try_read(&self, subscriber_id: usize) -> Result<Option<T>, BroadcastError> {
773 if subscriber_id >= self.cursor_slots.len() {
775 return Err(BroadcastError::SubscriberNotFound(subscriber_id));
776 }
777
778 let slot = &self.cursor_slots[subscriber_id];
779
780 if !slot.is_active() {
781 return Err(BroadcastError::SubscriberNotFound(subscriber_id));
782 }
783
784 let read_pos = slot.read_seq.load(Ordering::Relaxed);
785 let write_pos = self.write_seq.load(Ordering::Acquire);
786
787 if read_pos >= write_pos {
788 return Ok(None); }
790
791 let buffer_idx = self.slot_index(read_pos);
792
793 let value = unsafe { (*self.buffer[buffer_idx].get()).clone() };
795
796 if value.is_some() {
797 slot.read_seq.store(read_pos + 1, Ordering::Release);
799 }
800
801 Ok(value)
802 }
803
804 fn handle_slow_subscriber(&self, target_write: u64) -> Result<(), BroadcastError> {
806 match self.config.slow_subscriber_policy {
807 SlowSubscriberPolicy::Block => self.wait_for_slowest(target_write),
808 SlowSubscriberPolicy::DropSlow => {
809 self.drop_slowest_subscriber();
810 Ok(())
811 }
812 SlowSubscriberPolicy::SkipForSlow => {
813 Ok(())
815 }
816 }
817 }
818
819 fn wait_for_slowest(&self, target_write: u64) -> Result<(), BroadcastError> {
821 let start = std::time::Instant::now();
822 let timeout = self.config.slow_subscriber_timeout;
823
824 loop {
825 let min_read = self.slowest_cursor();
826 if min_read == u64::MAX {
827 return Err(BroadcastError::NoSubscribers);
828 }
829
830 if target_write < min_read + self.capacity as u64 {
832 return Ok(());
833 }
834
835 if start.elapsed() >= timeout {
837 return Err(BroadcastError::SlowSubscriberTimeout(timeout));
838 }
839
840 std::hint::spin_loop();
842 }
843 }
844
845 fn drop_slowest_subscriber(&self) {
849 let mut slowest_id: Option<usize> = None;
851 let mut slowest_pos = u64::MAX;
852
853 for (id, slot) in self.cursor_slots.iter().enumerate() {
854 if slot.is_active() {
855 let pos = slot.read_position();
856 if pos < slowest_pos {
857 slowest_pos = pos;
858 slowest_id = Some(id);
859 }
860 }
861 }
862
863 if let Some(id) = slowest_id {
864 self.cursor_slots[id].deactivate();
865 self.active_count.fetch_sub(1, Ordering::Release);
866 }
867 }
868}
869
870impl<T> std::fmt::Debug for BroadcastChannel<T> {
871 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
872 f.debug_struct("BroadcastChannel")
873 .field("capacity", &self.capacity)
874 .field("write_position", &self.write_position())
875 .field("subscriber_count", &self.subscriber_count())
876 .field("slowest_cursor", &self.slowest_cursor())
877 .field("closed", &self.is_closed())
878 .finish_non_exhaustive()
879 }
880}
881
882#[derive(Debug, Clone)]
884pub struct SubscriberInfo {
885 pub id: usize,
887 pub name: String,
889 pub active: bool,
891 pub read_position: u64,
893 pub lag: u64,
895}
896
897#[cfg(test)]
902#[allow(clippy::default_trait_access)]
903#[allow(clippy::unnecessary_map_or)]
904#[allow(clippy::cast_possible_truncation)]
905#[allow(clippy::cast_possible_wrap)]
906mod tests {
907 use super::*;
908 use std::sync::Arc;
909 use std::thread;
910
911 #[test]
914 fn test_default_config() {
915 let config = BroadcastConfig::default();
916 assert_eq!(config.capacity, DEFAULT_BROADCAST_CAPACITY);
917 assert_eq!(config.max_subscribers, DEFAULT_MAX_SUBSCRIBERS);
918 assert_eq!(config.slow_subscriber_policy, SlowSubscriberPolicy::Block);
919 assert_eq!(
920 config.slow_subscriber_timeout,
921 DEFAULT_SLOW_SUBSCRIBER_TIMEOUT
922 );
923 assert_eq!(config.lag_warning_threshold, DEFAULT_LAG_WARNING_THRESHOLD);
924 }
925
926 #[test]
927 fn test_config_with_capacity() {
928 let config = BroadcastConfig::with_capacity(256);
929 assert_eq!(config.capacity, 256);
930 assert_eq!(config.effective_capacity(), 256);
931 }
932
933 #[test]
934 fn test_config_effective_capacity_rounds_up() {
935 let config = BroadcastConfig::with_capacity(100);
936 assert_eq!(config.effective_capacity(), 128); let config = BroadcastConfig::with_capacity(1);
939 assert_eq!(config.effective_capacity(), 4); }
941
942 #[test]
943 fn test_config_builder() {
944 let config = BroadcastConfig::builder()
945 .capacity(512)
946 .max_subscribers(8)
947 .slow_subscriber_policy(SlowSubscriberPolicy::DropSlow)
948 .slow_subscriber_timeout(Duration::from_secs(1))
949 .lag_warning_threshold(500)
950 .build();
951
952 assert_eq!(config.capacity, 512);
953 assert_eq!(config.max_subscribers, 8);
954 assert_eq!(
955 config.slow_subscriber_policy,
956 SlowSubscriberPolicy::DropSlow
957 );
958 assert_eq!(config.slow_subscriber_timeout, Duration::from_secs(1));
959 assert_eq!(config.lag_warning_threshold, 500);
960 }
961
962 #[test]
963 fn test_slow_subscriber_policy_default() {
964 let policy: SlowSubscriberPolicy = Default::default();
965 assert_eq!(policy, SlowSubscriberPolicy::Block);
966 }
967
968 #[test]
969 fn test_slow_subscriber_policy_variants() {
970 assert_eq!(SlowSubscriberPolicy::Block, SlowSubscriberPolicy::Block);
971 assert_ne!(SlowSubscriberPolicy::Block, SlowSubscriberPolicy::DropSlow);
972 assert_ne!(
973 SlowSubscriberPolicy::Block,
974 SlowSubscriberPolicy::SkipForSlow
975 );
976 }
977
978 #[test]
981 fn test_channel_creation() {
982 let channel = BroadcastChannel::<i32>::new(BroadcastConfig::default());
983 assert_eq!(channel.capacity(), DEFAULT_BROADCAST_CAPACITY);
984 assert_eq!(channel.subscriber_count(), 0);
985 assert_eq!(channel.write_position(), 0);
986 assert!(!channel.is_closed());
987 }
988
989 #[test]
990 fn test_channel_custom_capacity() {
991 let config = BroadcastConfig::with_capacity(64);
992 let channel = BroadcastChannel::<i32>::new(config);
993 assert_eq!(channel.capacity(), 64);
994 }
995
996 #[test]
999 fn test_subscribe() {
1000 let channel = BroadcastChannel::<i32>::new(BroadcastConfig::default());
1001
1002 let id1 = channel.subscribe("sub1").unwrap();
1003 let id2 = channel.subscribe("sub2").unwrap();
1004
1005 assert_eq!(id1, 0);
1006 assert_eq!(id2, 1);
1007 assert_eq!(channel.subscriber_count(), 2);
1008 }
1009
1010 #[test]
1011 fn test_subscribe_max_limit() {
1012 let config = BroadcastConfig::builder().max_subscribers(2).build();
1013 let channel = BroadcastChannel::<i32>::new(config);
1014
1015 channel.subscribe("sub1").unwrap();
1016 channel.subscribe("sub2").unwrap();
1017
1018 let result = channel.subscribe("sub3");
1019 assert!(matches!(
1020 result,
1021 Err(BroadcastError::MaxSubscribersReached(2))
1022 ));
1023 }
1024
1025 #[test]
1026 fn test_unsubscribe() {
1027 let channel = BroadcastChannel::<i32>::new(BroadcastConfig::default());
1028
1029 let id = channel.subscribe("sub1").unwrap();
1030 assert_eq!(channel.subscriber_count(), 1);
1031
1032 channel.unsubscribe(id);
1033 assert_eq!(channel.subscriber_count(), 0);
1034 }
1035
1036 #[test]
1037 fn test_unsubscribe_nonexistent() {
1038 let channel = BroadcastChannel::<i32>::new(BroadcastConfig::default());
1039 channel.unsubscribe(999); }
1041
1042 #[test]
1045 fn test_broadcast_no_subscribers() {
1046 let channel = BroadcastChannel::<i32>::new(BroadcastConfig::default());
1047
1048 let result = channel.broadcast(42);
1049 assert!(matches!(result, Err(BroadcastError::NoSubscribers)));
1050 }
1051
1052 #[test]
1053 fn test_broadcast_and_read_single_subscriber() {
1054 let channel = BroadcastChannel::<i32>::new(BroadcastConfig::default());
1055
1056 let id = channel.subscribe("sub1").unwrap();
1057
1058 channel.broadcast(1).unwrap();
1059 channel.broadcast(2).unwrap();
1060 channel.broadcast(3).unwrap();
1061
1062 assert_eq!(channel.read(id), Some(1));
1063 assert_eq!(channel.read(id), Some(2));
1064 assert_eq!(channel.read(id), Some(3));
1065 assert_eq!(channel.read(id), None); }
1067
1068 #[test]
1069 fn test_broadcast_and_read_multiple_subscribers() {
1070 let channel = BroadcastChannel::<i32>::new(BroadcastConfig::default());
1071
1072 let id1 = channel.subscribe("sub1").unwrap();
1073 let id2 = channel.subscribe("sub2").unwrap();
1074
1075 channel.broadcast(42).unwrap();
1076
1077 assert_eq!(channel.read(id1), Some(42));
1079 assert_eq!(channel.read(id2), Some(42));
1080 }
1081
1082 #[test]
1083 fn test_read_unsubscribed() {
1084 let channel = BroadcastChannel::<i32>::new(BroadcastConfig::default());
1085
1086 let id = channel.subscribe("sub1").unwrap();
1087 channel.broadcast(42).unwrap();
1088
1089 channel.unsubscribe(id);
1090
1091 assert_eq!(channel.read(id), None);
1092 }
1093
1094 #[test]
1095 fn test_read_nonexistent_subscriber() {
1096 let channel = BroadcastChannel::<i32>::new(BroadcastConfig::default());
1097 assert_eq!(channel.read(999), None);
1098 }
1099
1100 #[test]
1101 fn test_try_read() {
1102 let channel = BroadcastChannel::<i32>::new(BroadcastConfig::default());
1103
1104 let id = channel.subscribe("sub1").unwrap();
1105 channel.broadcast(42).unwrap();
1106
1107 assert_eq!(channel.try_read(id).unwrap(), Some(42));
1108 assert_eq!(channel.try_read(id).unwrap(), None); }
1110
1111 #[test]
1112 fn test_try_read_nonexistent() {
1113 let channel = BroadcastChannel::<i32>::new(BroadcastConfig::default());
1114 let result = channel.try_read(999);
1115 assert!(matches!(
1116 result,
1117 Err(BroadcastError::SubscriberNotFound(999))
1118 ));
1119 }
1120
1121 #[test]
1124 fn test_subscriber_lag() {
1125 let channel = BroadcastChannel::<i32>::new(BroadcastConfig::default());
1126
1127 let id = channel.subscribe("sub1").unwrap();
1128
1129 assert_eq!(channel.subscriber_lag(id), 0);
1130
1131 channel.broadcast(1).unwrap();
1132 channel.broadcast(2).unwrap();
1133 channel.broadcast(3).unwrap();
1134
1135 assert_eq!(channel.subscriber_lag(id), 3);
1136
1137 channel.read(id);
1138 assert_eq!(channel.subscriber_lag(id), 2);
1139 }
1140
1141 #[test]
1142 fn test_slowest_cursor() {
1143 let channel = BroadcastChannel::<i32>::new(BroadcastConfig::default());
1144
1145 assert_eq!(channel.slowest_cursor(), u64::MAX);
1147
1148 let id1 = channel.subscribe("sub1").unwrap();
1149 let _id2 = channel.subscribe("sub2").unwrap();
1150
1151 channel.broadcast(1).unwrap();
1152 channel.broadcast(2).unwrap();
1153
1154 assert_eq!(channel.slowest_cursor(), 0);
1156
1157 channel.read(id1);
1159 assert_eq!(channel.slowest_cursor(), 0); }
1161
1162 #[test]
1163 fn test_is_lagging() {
1164 let config = BroadcastConfig::builder()
1165 .capacity(256)
1166 .lag_warning_threshold(3)
1167 .build();
1168 let channel = BroadcastChannel::<i32>::new(config);
1169
1170 let id = channel.subscribe("sub1").unwrap();
1171
1172 assert!(!channel.is_lagging(id));
1173
1174 channel.broadcast(1).unwrap();
1175 channel.broadcast(2).unwrap();
1176 assert!(!channel.is_lagging(id));
1177
1178 channel.broadcast(3).unwrap();
1179 assert!(channel.is_lagging(id)); }
1181
1182 #[test]
1185 fn test_skip_for_slow_policy() {
1186 let config = BroadcastConfig::builder()
1187 .capacity(4)
1188 .slow_subscriber_policy(SlowSubscriberPolicy::SkipForSlow)
1189 .build();
1190 let channel = BroadcastChannel::<i32>::new(config);
1191
1192 let _id = channel.subscribe("slow").unwrap();
1193
1194 for i in 0..10 {
1196 channel.broadcast(i).unwrap();
1197 }
1198
1199 assert_eq!(channel.write_position(), 10);
1201 }
1202
1203 #[test]
1204 fn test_drop_slow_policy() {
1205 let config = BroadcastConfig::builder()
1206 .capacity(4)
1207 .slow_subscriber_policy(SlowSubscriberPolicy::DropSlow)
1208 .build();
1209 let channel = BroadcastChannel::<i32>::new(config);
1210
1211 let id1 = channel.subscribe("slow").unwrap();
1212 let id2 = channel.subscribe("fast").unwrap();
1213
1214 for i in 0..10 {
1216 channel.read(id2);
1218 channel.broadcast(i).unwrap();
1219 }
1220
1221 assert!(channel.subscriber_info(id1).map_or(true, |i| !i.active));
1223 }
1224
1225 #[test]
1228 fn test_channel_close() {
1229 let channel = BroadcastChannel::<i32>::new(BroadcastConfig::default());
1230 let id = channel.subscribe("sub1").unwrap();
1231
1232 channel.broadcast(42).unwrap();
1233 channel.close();
1234
1235 assert!(channel.is_closed());
1236
1237 assert_eq!(channel.read(id), Some(42));
1239
1240 let result = channel.broadcast(43);
1242 assert!(matches!(result, Err(BroadcastError::Closed)));
1243 }
1244
1245 #[test]
1248 fn test_subscriber_info() {
1249 let channel = BroadcastChannel::<i32>::new(BroadcastConfig::default());
1250
1251 let id = channel.subscribe("test_sub").unwrap();
1252 channel.broadcast(1).unwrap();
1253 channel.broadcast(2).unwrap();
1254
1255 let info = channel.subscriber_info(id).unwrap();
1256 assert_eq!(info.id, id);
1257 assert_eq!(info.name, "test_sub");
1258 assert!(info.active);
1259 assert_eq!(info.read_position, 0);
1260 assert_eq!(info.lag, 2);
1261 }
1262
1263 #[test]
1264 fn test_list_subscribers() {
1265 let channel = BroadcastChannel::<i32>::new(BroadcastConfig::default());
1266
1267 channel.subscribe("sub1").unwrap();
1268 channel.subscribe("sub2").unwrap();
1269 let id3 = channel.subscribe("sub3").unwrap();
1270
1271 channel.unsubscribe(id3);
1272
1273 let subscribers = channel.list_subscribers();
1274 assert_eq!(subscribers.len(), 2);
1275 assert!(subscribers.iter().any(|s| s.name == "sub1"));
1276 assert!(subscribers.iter().any(|s| s.name == "sub2"));
1277 }
1278
1279 #[test]
1282 fn test_debug_format() {
1283 let channel = BroadcastChannel::<i32>::new(BroadcastConfig::with_capacity(16));
1284 channel.subscribe("sub1").unwrap();
1285
1286 let debug = format!("{channel:?}");
1287 assert!(debug.contains("BroadcastChannel"));
1288 assert!(debug.contains("capacity"));
1289 assert!(debug.contains("subscriber_count"));
1290 }
1291
1292 #[test]
1295 fn test_error_display() {
1296 let e1 = BroadcastError::MaxSubscribersReached(10);
1297 assert!(e1.to_string().contains("maximum subscribers (10)"));
1298
1299 let e2 = BroadcastError::SlowSubscriberTimeout(Duration::from_secs(5));
1300 assert!(e2.to_string().contains("slow subscriber timeout"));
1301
1302 let e3 = BroadcastError::NoSubscribers;
1303 assert!(e3.to_string().contains("no active subscribers"));
1304
1305 let e4 = BroadcastError::SubscriberNotFound(42);
1306 assert!(e4.to_string().contains("subscriber 42 not found"));
1307
1308 let e5 = BroadcastError::BufferFull;
1309 assert!(e5.to_string().contains("buffer full"));
1310
1311 let e6 = BroadcastError::Closed;
1312 assert!(e6.to_string().contains("channel closed"));
1313 }
1314
1315 #[test]
1318 fn test_concurrent_subscribe_read() {
1319 let channel = Arc::new(BroadcastChannel::<i32>::new(BroadcastConfig::default()));
1320 let channel_clone = Arc::clone(&channel);
1321
1322 let id = channel.subscribe("main").unwrap();
1324
1325 let producer = thread::spawn(move || {
1327 for i in 0..100 {
1328 channel_clone.broadcast(i).unwrap();
1329 }
1330 });
1331
1332 let mut received = Vec::new();
1334 loop {
1335 if let Some(val) = channel.read(id) {
1336 received.push(val);
1337 if received.len() == 100 {
1338 break;
1339 }
1340 }
1341 thread::yield_now();
1342 }
1343
1344 producer.join().unwrap();
1345
1346 assert_eq!(received.len(), 100);
1347 for (i, val) in received.iter().enumerate() {
1348 assert_eq!(*val, i as i32);
1349 }
1350 }
1351
1352 #[test]
1353 fn test_multiple_concurrent_readers() {
1354 let channel = Arc::new(BroadcastChannel::<i32>::new(BroadcastConfig::default()));
1355
1356 let id1 = channel.subscribe("reader1").unwrap();
1357 let id2 = channel.subscribe("reader2").unwrap();
1358
1359 let channel1 = Arc::clone(&channel);
1360 let channel2 = Arc::clone(&channel);
1361 let channel_prod = Arc::clone(&channel);
1362
1363 let producer = thread::spawn(move || {
1365 for i in 0..50 {
1366 channel_prod.broadcast(i).unwrap();
1367 }
1368 });
1369
1370 let reader1 = thread::spawn(move || {
1372 let mut received = Vec::new();
1373 loop {
1374 if let Some(val) = channel1.read(id1) {
1375 received.push(val);
1376 if received.len() == 50 {
1377 break;
1378 }
1379 }
1380 thread::yield_now();
1381 }
1382 received
1383 });
1384
1385 let reader2 = thread::spawn(move || {
1387 let mut received = Vec::new();
1388 loop {
1389 if let Some(val) = channel2.read(id2) {
1390 received.push(val);
1391 if received.len() == 50 {
1392 break;
1393 }
1394 }
1395 thread::yield_now();
1396 }
1397 received
1398 });
1399
1400 producer.join().unwrap();
1401 let r1 = reader1.join().unwrap();
1402 let r2 = reader2.join().unwrap();
1403
1404 assert_eq!(r1.len(), 50);
1406 assert_eq!(r2.len(), 50);
1407 assert_eq!(r1, r2);
1408 }
1409}