1use std::{
20 collections::{HashMap, VecDeque},
21 fmt, hint,
22 marker::PhantomData,
23 sync::{
24 Arc, Condvar, Mutex, MutexGuard,
25 atomic::{AtomicBool, AtomicU8, AtomicU64, AtomicUsize, Ordering, fence},
26 mpsc,
27 },
28 thread,
29 time::Duration,
30};
31
32use arc_swap::ArcSwap;
33use crossbeam_queue::ArrayQueue;
34use ractor::{Actor, ActorProcessingErr, ActorRef};
35use tokio::sync::Notify;
36
37use crate::{
38 StreamError, StreamResult,
39 actor::block_on_ractor_runtime,
40 stream::{BoxStream, NotUsed, Source, current_stream_cancelled},
41};
42
43const TOPIC_OPEN: u8 = 0;
44const TOPIC_CLOSING: u8 = 1;
45const TOPIC_CLOSED: u8 = 2;
46const SLOT_OPEN: u8 = 0;
47const SLOT_COMPLETE: u8 = 1;
48const SLOT_ERROR: u8 = 2;
49const SLOT_WAIT_BACKSTOP: Duration = Duration::from_millis(10);
50const TOPIC_DRAIN_BATCH: usize = 256;
51
52type Ack = mpsc::Sender<StreamResult<()>>;
53
54#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56pub enum TopicOverflow {
57 Backpressure,
65 Sliding,
72 Dropping,
78}
79
80#[derive(Debug, Clone, PartialEq, Eq)]
83pub enum TopicPublishError<T> {
84 Closed(T),
86}
87
88#[derive(Debug, Clone, PartialEq, Eq)]
90pub enum TopicTryPublishError<T> {
91 Closed(T),
93 Full(T),
96 Busy(T),
99}
100
101pub struct Topic<T: Send + Sync + 'static> {
111 inner: Arc<TopicInner<T>>,
112}
113
114struct TopicInner<T: Send + Sync + 'static> {
115 actor: ActorRef<TopicMessage<T>>,
116 shared: Arc<TopicShared<T>>,
117 next_subscriber_id: Arc<AtomicU64>,
118}
119
120struct TopicShared<T: Send + Sync + 'static> {
121 subscribers: Arc<ArcSwap<TopicSlotTable<T>>>,
122 capacity: usize,
123 overflow: TopicOverflow,
124 lifecycle: AtomicU8,
125 active_publishers: AtomicUsize,
126 next_sequence: AtomicU64,
127 delivered_sequence: AtomicU64,
128 space_waiters: AtomicUsize,
129 space_available: Notify,
130 closed_notified: Notify,
131}
132
133struct TopicSlotTable<T: Send + Sync + 'static> {
134 slots: Vec<Arc<TopicSlot<T>>>,
135}
136
137struct TopicSlot<T: Send + Sync + 'static> {
138 id: u64,
139 actor: ActorRef<TopicMessage<T>>,
140 buffer: ArrayQueue<Arc<T>>,
141 active: AtomicBool,
142 parked: AtomicBool,
143 terminal_state: AtomicU8,
144 terminal: Mutex<Option<TopicSlotTerminal>>,
145 available_lock: Mutex<()>,
146 available: Condvar,
147 async_available: Notify,
148}
149
150#[derive(Clone)]
151enum TopicSlotTerminal {
152 Complete,
153 Error(StreamError),
154}
155
156impl<T: Send + Sync + 'static> Clone for Topic<T> {
157 fn clone(&self) -> Self {
158 Self {
159 inner: Arc::clone(&self.inner),
160 }
161 }
162}
163
164impl<T: Send + Sync + 'static> Topic<T> {
165 pub fn new(capacity: usize, overflow: TopicOverflow) -> StreamResult<Self> {
169 assert!(capacity > 0, "topic capacity must be greater than zero");
170 let shared = Arc::new(TopicShared {
171 subscribers: Arc::new(ArcSwap::from_pointee(TopicSlotTable { slots: Vec::new() })),
172 capacity,
173 overflow,
174 lifecycle: AtomicU8::new(TOPIC_OPEN),
175 active_publishers: AtomicUsize::new(0),
176 next_sequence: AtomicU64::new(0),
177 delivered_sequence: AtomicU64::new(0),
178 space_waiters: AtomicUsize::new(0),
179 space_available: Notify::new(),
180 closed_notified: Notify::new(),
181 });
182 let state = TopicActorState {
183 shared: Arc::clone(&shared),
184 subscribers: HashMap::new(),
185 closed: false,
186 };
187 let (actor, _handle) =
188 block_on_ractor_runtime(Actor::spawn(None, TopicActor::<T>::default(), state))?
189 .map_err(|error| {
190 StreamError::Failed(format!("topic actor failed to spawn: {error}"))
191 })?;
192 Ok(Self {
193 inner: Arc::new(TopicInner {
194 actor,
195 shared,
196 next_subscriber_id: Arc::new(AtomicU64::new(1)),
197 }),
198 })
199 }
200
201 pub async fn publish(&self, value: T) -> Result<(), TopicPublishError<T>> {
209 let Ok(_permit) = self.inner.shared.begin_publish() else {
210 return Err(TopicPublishError::Closed(value));
211 };
212 let sequence = self.inner.shared.claim_sequence();
213 self.inner.shared.wait_publish_turn(sequence);
214
215 let table = self.inner.shared.subscribers.load_full();
216 if self.inner.shared.overflow == TopicOverflow::Backpressure {
217 self.inner.shared.wait_for_capacity(&table).await;
218 }
219
220 self.inner
221 .shared
222 .publish_to_snapshot(&table, Arc::new(value));
223 self.inner.shared.finish_publish(sequence);
224 Ok(())
225 }
226
227 pub fn try_publish(&self, value: T) -> Result<(), TopicTryPublishError<T>> {
236 let Ok(_permit) = self.inner.shared.begin_publish() else {
237 return Err(TopicTryPublishError::Closed(value));
238 };
239 let Some(sequence) = self.inner.shared.try_claim_sequence() else {
240 return Err(TopicTryPublishError::Busy(value));
241 };
242
243 let table = self.inner.shared.subscribers.load_full();
244 if self.inner.shared.overflow == TopicOverflow::Backpressure
245 && !self.inner.shared.snapshot_has_capacity(&table)
246 {
247 self.inner.shared.finish_publish(sequence);
248 return Err(TopicTryPublishError::Full(value));
249 }
250
251 self.inner
252 .shared
253 .publish_to_snapshot(&table, Arc::new(value));
254 self.inner.shared.finish_publish(sequence);
255 Ok(())
256 }
257
258 #[must_use]
264 pub fn subscribe(&self) -> Source<T>
265 where
266 T: Clone,
267 {
268 let topic = self.clone();
269 let actor = self.inner.actor.clone();
270 let next_subscriber_id = Arc::clone(&self.inner.next_subscriber_id);
271 Source::from_materialized_factory(move |_materializer| {
272 let id = next_subscriber_id.fetch_add(1, Ordering::Relaxed);
273 let capacity = topic.registered_capacity();
274 let slot = TopicSlot::new(id, actor.clone(), capacity);
275 topic.register_slot(Arc::clone(&slot), id)?;
276 let stream: BoxStream<T> = Box::new(TopicStream {
277 shared: Arc::clone(&topic.inner.shared),
278 slot,
279 pending: VecDeque::new(),
280 terminated: false,
281 });
282 Ok((stream, NotUsed))
283 })
284 }
285
286 #[must_use]
288 pub fn subscriber_count(&self) -> usize {
289 self.inner.shared.subscriber_count()
290 }
291
292 pub fn close(&self) -> StreamResult<()> {
298 let (reply, receiver) = mpsc::channel();
299 self.inner
300 .actor
301 .send_message(TopicMessage::Close { reply })
302 .map_err(|error| StreamError::ActorAskSendFailed {
303 reason: error.to_string(),
304 })?;
305 receiver.recv().unwrap_or(Err(StreamError::ActorTerminated))
306 }
307
308 #[must_use]
310 pub fn is_closed(&self) -> bool {
311 self.inner.shared.is_closed()
312 }
313
314 pub async fn closed(&self) {
316 loop {
317 if self.is_closed() {
318 return;
319 }
320 let notified = self.inner.shared.closed_notified.notified();
321 let mut notified = std::pin::pin!(notified);
322 notified.as_mut().enable();
323 if self.is_closed() {
324 return;
325 }
326 notified.as_mut().await;
327 }
328 }
329
330 fn registered_capacity(&self) -> usize {
331 self.inner.shared.capacity
332 }
333
334 fn register_slot(&self, slot: Arc<TopicSlot<T>>, id: u64) -> StreamResult<()> {
335 let (reply, receiver) = mpsc::channel();
336 self.inner
337 .actor
338 .send_message(TopicMessage::Subscribe { id, slot, reply })
339 .map_err(|error| StreamError::ActorAskSendFailed {
340 reason: error.to_string(),
341 })?;
342 receiver.recv().unwrap_or(Err(StreamError::ActorTerminated))
343 }
344}
345
346impl<T: Clone + Send + Sync + 'static> Topic<T> {
347 #[doc(hidden)]
348 pub fn __benchmark_subscribe(&self) -> StreamResult<TopicBenchmarkStream<T>> {
349 let id = self
350 .inner
351 .next_subscriber_id
352 .fetch_add(1, Ordering::Relaxed);
353 let capacity = self.registered_capacity();
354 let slot = TopicSlot::new(id, self.inner.actor.clone(), capacity);
355 self.register_slot(Arc::clone(&slot), id)?;
356 Ok(TopicBenchmarkStream {
357 shared: Arc::clone(&self.inner.shared),
358 slot,
359 pending: VecDeque::new(),
360 terminated: false,
361 })
362 }
363}
364
365impl<T: Send + Sync + 'static> TopicShared<T> {
366 fn begin_publish(&self) -> StreamResult<PublishPermit<'_>> {
367 if self.lifecycle.load(Ordering::Acquire) != TOPIC_OPEN {
368 return Err(closed_error());
369 }
370 self.active_publishers.fetch_add(1, Ordering::AcqRel);
371 if self.lifecycle.load(Ordering::Acquire) == TOPIC_OPEN {
372 Ok(PublishPermit {
373 active_publishers: &self.active_publishers,
374 })
375 } else {
376 self.active_publishers.fetch_sub(1, Ordering::AcqRel);
377 Err(closed_error())
378 }
379 }
380
381 fn claim_sequence(&self) -> u64 {
382 self.next_sequence.fetch_add(1, Ordering::AcqRel) + 1
383 }
384
385 fn try_claim_sequence(&self) -> Option<u64> {
386 let delivered = self.delivered_sequence.load(Ordering::Acquire);
387 self.next_sequence
388 .compare_exchange(
389 delivered,
390 delivered + 1,
391 Ordering::AcqRel,
392 Ordering::Acquire,
393 )
394 .ok()
395 .map(|_| delivered + 1)
396 }
397
398 fn wait_publish_turn(&self, sequence: u64) {
399 let mut spins = 0_u32;
400 while self.delivered_sequence.load(Ordering::Acquire) + 1 != sequence {
401 spins = spins.wrapping_add(1);
402 if spins < 64 {
403 hint::spin_loop();
404 } else {
405 thread::yield_now();
406 }
407 }
408 }
409
410 fn finish_publish(&self, sequence: u64) {
411 self.delivered_sequence.store(sequence, Ordering::Release);
412 }
413
414 fn publish_to_snapshot(&self, table: &TopicSlotTable<T>, value: Arc<T>) {
415 match self.overflow {
416 TopicOverflow::Backpressure => {
417 for slot in &table.slots {
418 slot.enqueue_backpressured(Arc::clone(&value));
419 }
420 }
421 TopicOverflow::Sliding => {
422 for slot in &table.slots {
423 slot.enqueue_sliding(Arc::clone(&value));
424 }
425 }
426 TopicOverflow::Dropping => {
427 for slot in &table.slots {
428 slot.enqueue_dropping(Arc::clone(&value));
429 }
430 }
431 }
432 }
433
434 async fn wait_for_capacity(&self, table: &TopicSlotTable<T>) {
435 loop {
436 if self.snapshot_has_capacity(table) {
437 return;
438 }
439
440 let notified = self.space_available.notified();
441 let mut notified = std::pin::pin!(notified);
442 notified.as_mut().enable();
443 self.space_waiters.fetch_add(1, Ordering::AcqRel);
444 if self.snapshot_has_capacity(table) {
445 self.space_waiters.fetch_sub(1, Ordering::AcqRel);
446 return;
447 }
448 notified.as_mut().await;
449 self.space_waiters.fetch_sub(1, Ordering::AcqRel);
450 }
451 }
452
453 fn snapshot_has_capacity(&self, table: &TopicSlotTable<T>) -> bool {
454 table.slots.iter().all(|slot| slot.has_capacity())
455 }
456
457 fn notify_space(&self) {
458 if self.space_waiters.load(Ordering::Acquire) != 0 {
459 self.space_available.notify_waiters();
460 }
461 }
462
463 fn subscriber_count(&self) -> usize {
464 let table = self.subscribers.load();
465 table.slots.iter().filter(|slot| slot.is_active()).count()
466 }
467
468 fn is_closed(&self) -> bool {
469 self.lifecycle.load(Ordering::Acquire) == TOPIC_CLOSED
470 }
471
472 fn wait_for_publishers_to_drain(&self) {
473 while self.active_publishers.load(Ordering::Acquire) != 0 {
474 thread::yield_now();
475 }
476 }
477
478 fn mark_actor_terminated(&self) {
479 self.lifecycle.store(TOPIC_CLOSED, Ordering::Release);
480 self.closed_notified.notify_waiters();
481 self.notify_space();
482 }
483}
484
485struct PublishPermit<'a> {
486 active_publishers: &'a AtomicUsize,
487}
488
489impl Drop for PublishPermit<'_> {
490 fn drop(&mut self) {
491 self.active_publishers.fetch_sub(1, Ordering::AcqRel);
492 }
493}
494
495impl<T: Send + Sync + 'static> TopicSlot<T> {
496 fn new(id: u64, actor: ActorRef<TopicMessage<T>>, capacity: usize) -> Arc<Self> {
497 Arc::new(Self {
498 id,
499 actor,
500 buffer: ArrayQueue::new(capacity),
501 active: AtomicBool::new(true),
502 parked: AtomicBool::new(false),
503 terminal_state: AtomicU8::new(SLOT_OPEN),
504 terminal: Mutex::new(None),
505 available_lock: Mutex::new(()),
506 available: Condvar::new(),
507 async_available: Notify::new(),
508 })
509 }
510
511 fn terminal_lock(&self) -> MutexGuard<'_, Option<TopicSlotTerminal>> {
512 self.terminal
513 .lock()
514 .unwrap_or_else(|poison| poison.into_inner())
515 }
516
517 fn is_active(&self) -> bool {
518 self.active.load(Ordering::Acquire)
519 }
520
521 fn has_capacity(&self) -> bool {
522 !self.is_active() || !self.buffer.is_full()
523 }
524
525 fn enqueue_backpressured(&self, value: Arc<T>) {
526 if !self.is_active() {
527 return;
528 }
529 let was_empty = self.buffer.is_empty();
530 if self.buffer.push(value).is_ok() && was_empty {
531 self.wake();
532 }
533 }
534
535 fn enqueue_sliding(&self, value: Arc<T>) {
536 if !self.is_active() {
537 return;
538 }
539 while self.buffer.is_full() {
540 if self.buffer.pop().is_none() {
541 break;
542 }
543 }
544 let was_empty = self.buffer.is_empty();
545 if self.buffer.push(value).is_ok() && was_empty {
546 self.wake();
547 }
548 }
549
550 fn enqueue_dropping(&self, value: Arc<T>) {
551 if !self.is_active() || self.buffer.is_full() {
552 return;
553 }
554 let was_empty = self.buffer.is_empty();
555 if self.buffer.push(value).is_ok() && was_empty {
556 self.wake();
557 }
558 }
559
560 fn pop(&self) -> Option<Arc<T>> {
561 self.buffer.pop()
562 }
563
564 fn park(&self) {
565 self.parked.store(true, Ordering::Release);
566 }
567
568 fn unpark(&self) {
569 self.parked.store(false, Ordering::Release);
570 }
571
572 fn wake(&self) {
573 if self.parked.swap(false, Ordering::AcqRel) {
574 let _guard = self
575 .available_lock
576 .lock()
577 .unwrap_or_else(|poison| poison.into_inner());
578 self.available.notify_one();
579 self.async_available.notify_waiters();
580 }
581 }
582
583 fn complete(&self) {
584 if self
585 .terminal_state
586 .compare_exchange(
587 SLOT_OPEN,
588 SLOT_COMPLETE,
589 Ordering::AcqRel,
590 Ordering::Acquire,
591 )
592 .is_err()
593 {
594 return;
595 }
596 self.active.store(false, Ordering::Release);
597 *self.terminal_lock() = Some(TopicSlotTerminal::Complete);
598 self.wake();
599 }
600
601 fn fail(&self, error: StreamError) {
602 if self
603 .terminal_state
604 .compare_exchange(SLOT_OPEN, SLOT_ERROR, Ordering::AcqRel, Ordering::Acquire)
605 .is_err()
606 {
607 return;
608 }
609 self.active.store(false, Ordering::Release);
610 *self.terminal_lock() = Some(TopicSlotTerminal::Error(error));
611 self.wake();
612 }
613
614 fn terminal(&self) -> Option<TopicSlotTerminal> {
615 if self.terminal_state.load(Ordering::Acquire) == SLOT_OPEN {
616 return None;
617 }
618 self.terminal_lock().clone()
619 }
620
621 fn deactivate(&self) {
622 self.active.store(false, Ordering::Release);
623 while self.buffer.pop().is_some() {}
624 self.wake();
625 }
626
627 fn unsubscribe(&self) {
628 self.deactivate();
629 let _ = self
630 .actor
631 .send_message(TopicMessage::Unsubscribe { id: self.id });
632 }
633}
634
635impl<T: Send + Sync + 'static> Drop for TopicInner<T> {
636 fn drop(&mut self) {
637 self.actor.stop(None);
638 }
639}
640
641enum TopicMessage<T: Send + Sync + 'static> {
642 Close {
643 reply: Ack,
644 },
645 Subscribe {
646 id: u64,
647 slot: Arc<TopicSlot<T>>,
648 reply: Ack,
649 },
650 Unsubscribe {
651 id: u64,
652 },
653}
654
655#[cfg(feature = "cluster")]
656impl<T: Send + Sync + 'static> ractor::Message for TopicMessage<T> {}
657
658struct TopicActor<T> {
659 _marker: PhantomData<fn() -> T>,
660}
661
662impl<T> Default for TopicActor<T> {
663 fn default() -> Self {
664 Self {
665 _marker: PhantomData,
666 }
667 }
668}
669
670struct TopicActorState<T: Send + Sync + 'static> {
671 shared: Arc<TopicShared<T>>,
672 subscribers: HashMap<u64, Arc<TopicSlot<T>>>,
673 closed: bool,
674}
675
676impl<T: Send + Sync + 'static> Actor for TopicActor<T> {
677 type Msg = TopicMessage<T>;
678 type State = TopicActorState<T>;
679 type Arguments = TopicActorState<T>;
680
681 async fn pre_start(
682 &self,
683 _myself: ActorRef<Self::Msg>,
684 args: Self::Arguments,
685 ) -> Result<Self::State, ActorProcessingErr> {
686 Ok(args)
687 }
688
689 async fn handle(
690 &self,
691 _myself: ActorRef<Self::Msg>,
692 message: Self::Msg,
693 state: &mut Self::State,
694 ) -> Result<(), ActorProcessingErr> {
695 match message {
696 TopicMessage::Close { reply } => {
697 close_topic(state);
698 let _ = reply.send(Ok(()));
699 }
700 TopicMessage::Subscribe { id, slot, reply } => {
701 if state.closed || state.shared.lifecycle.load(Ordering::Acquire) == TOPIC_CLOSED {
702 slot.complete();
703 } else {
704 state.subscribers.insert(id, Arc::clone(&slot));
705 publish_topic_slot_table(state);
706 }
707 let _ = reply.send(Ok(()));
708 }
709 TopicMessage::Unsubscribe { id } => {
710 state.subscribers.remove(&id);
711 publish_topic_slot_table(state);
712 state.shared.notify_space();
713 }
714 }
715 Ok(())
716 }
717
718 async fn post_stop(
719 &self,
720 _myself: ActorRef<Self::Msg>,
721 state: &mut Self::State,
722 ) -> Result<(), ActorProcessingErr> {
723 if !state.closed {
724 for slot in state.subscribers.values() {
725 slot.fail(StreamError::ActorTerminated);
726 }
727 state.subscribers.clear();
728 publish_topic_slot_table(state);
729 state.shared.mark_actor_terminated();
730 }
731 Ok(())
732 }
733}
734
735fn close_topic<T: Send + Sync + 'static>(state: &mut TopicActorState<T>) {
736 if state.closed {
737 return;
738 }
739 match state.shared.lifecycle.compare_exchange(
740 TOPIC_OPEN,
741 TOPIC_CLOSING,
742 Ordering::AcqRel,
743 Ordering::Acquire,
744 ) {
745 Ok(_) => {}
746 Err(TOPIC_CLOSED) => {
747 state.closed = true;
748 return;
749 }
750 Err(_) => {}
751 }
752 state.shared.wait_for_publishers_to_drain();
753 state
754 .shared
755 .lifecycle
756 .store(TOPIC_CLOSED, Ordering::Release);
757
758 for slot in state.subscribers.values() {
759 slot.complete();
760 }
761 state.subscribers.clear();
762 publish_topic_slot_table(state);
763 state.shared.notify_space();
764 state.shared.closed_notified.notify_waiters();
765 state.closed = true;
766}
767
768fn publish_topic_slot_table<T: Send + Sync + 'static>(state: &TopicActorState<T>) {
769 let slots = state.subscribers.values().cloned().collect::<Vec<_>>();
770 state
771 .shared
772 .subscribers
773 .store(Arc::new(TopicSlotTable { slots }));
774}
775
776fn closed_error() -> StreamError {
777 StreamError::Failed("topic is closed".into())
778}
779
780struct TopicStream<T: Clone + Send + Sync + 'static> {
781 shared: Arc<TopicShared<T>>,
782 slot: Arc<TopicSlot<T>>,
783 pending: VecDeque<Arc<T>>,
784 terminated: bool,
785}
786
787#[doc(hidden)]
788pub struct TopicBenchmarkStream<T: Clone + Send + Sync + 'static> {
789 shared: Arc<TopicShared<T>>,
790 slot: Arc<TopicSlot<T>>,
791 pending: VecDeque<Arc<T>>,
792 terminated: bool,
793}
794
795impl<T: Clone + Send + Sync + 'static> Iterator for TopicStream<T> {
796 type Item = StreamResult<T>;
797
798 fn next(&mut self) -> Option<Self::Item> {
799 if self.terminated {
800 return None;
801 }
802
803 loop {
804 if let Some(value) = self.pending.pop_front() {
805 return Some(Ok(value.as_ref().clone()));
806 }
807
808 if let Some(value) = self.drain_batch() {
809 return Some(Ok(value.as_ref().clone()));
810 }
811
812 if let Some(terminal) = self.slot.terminal() {
813 self.terminated = true;
814 return match terminal {
815 TopicSlotTerminal::Complete => None,
816 TopicSlotTerminal::Error(error) => Some(Err(error)),
817 };
818 }
819
820 if current_stream_cancelled()
821 .as_ref()
822 .is_some_and(|cancelled| cancelled.load(Ordering::SeqCst))
823 {
824 self.terminated = true;
825 return Some(Err(StreamError::Cancelled));
826 }
827
828 self.wait_for_wake();
829 }
830 }
831}
832
833impl<T: Clone + Send + Sync + 'static> TopicStream<T> {
834 fn drain_batch(&mut self) -> Option<Arc<T>> {
835 let first = self.slot.pop()?;
836 let mut drained = 1_usize;
837 while drained < TOPIC_DRAIN_BATCH {
838 let Some(value) = self.slot.pop() else {
839 break;
840 };
841 self.pending.push_back(value);
842 drained += 1;
843 }
844 self.shared.notify_space();
845 Some(first)
846 }
847
848 fn wait_for_wake(&self) {
849 let guard = self
850 .slot
851 .available_lock
852 .lock()
853 .unwrap_or_else(|poison| poison.into_inner());
854 self.slot.park();
855 fence(Ordering::SeqCst);
856 if !self.slot.buffer.is_empty() || self.slot.terminal().is_some() {
857 self.slot.unpark();
858 return;
859 }
860 let _guard = self
861 .slot
862 .available
863 .wait_timeout(guard, SLOT_WAIT_BACKSTOP)
864 .unwrap_or_else(|poison| poison.into_inner())
865 .0;
866 self.slot.unpark();
867 }
868}
869
870impl<T: Clone + Send + Sync + 'static> Drop for TopicStream<T> {
871 fn drop(&mut self) {
872 self.slot.unsubscribe();
873 self.shared.notify_space();
874 }
875}
876
877impl<T: Clone + Send + Sync + 'static> TopicBenchmarkStream<T> {
878 #[doc(hidden)]
879 pub async fn next(&mut self) -> Option<StreamResult<T>> {
880 if self.terminated {
881 return None;
882 }
883
884 loop {
885 if let Some(value) = self.pending.pop_front() {
886 return Some(Ok(value.as_ref().clone()));
887 }
888
889 if let Some(value) = self.drain_batch() {
890 return Some(Ok(value.as_ref().clone()));
891 }
892
893 if let Some(terminal) = self.slot.terminal() {
894 self.terminated = true;
895 return match terminal {
896 TopicSlotTerminal::Complete => None,
897 TopicSlotTerminal::Error(error) => Some(Err(error)),
898 };
899 }
900
901 self.wait_for_wake().await;
902 }
903 }
904
905 #[doc(hidden)]
906 pub async fn count_items(&mut self, target: u64) -> StreamResult<u64> {
907 let mut count = 0_u64;
908 while count < target {
909 if self.terminated {
910 return Err(StreamError::Failed(
911 "topic stream ended before requested count".into(),
912 ));
913 }
914
915 if !self.pending.is_empty() {
916 let drained = self.pending.len().min((target - count) as usize);
917 self.pending.drain(..drained);
918 count += drained as u64;
919 continue;
920 }
921
922 if let Some(drained) = self.drain_available_count((target - count) as usize) {
923 count += drained as u64;
924 continue;
925 }
926
927 if let Some(terminal) = self.slot.terminal() {
928 self.terminated = true;
929 return match terminal {
930 TopicSlotTerminal::Complete => Err(StreamError::Failed(
931 "topic stream completed before requested count".into(),
932 )),
933 TopicSlotTerminal::Error(error) => Err(error),
934 };
935 }
936
937 self.wait_for_wake().await;
938 }
939 Ok(count)
940 }
941
942 fn drain_batch(&mut self) -> Option<Arc<T>> {
943 let first = self.slot.pop()?;
944 let mut drained = 1_usize;
945 while drained < TOPIC_DRAIN_BATCH {
946 let Some(value) = self.slot.pop() else {
947 break;
948 };
949 self.pending.push_back(value);
950 drained += 1;
951 }
952 self.shared.notify_space();
953 Some(first)
954 }
955
956 fn drain_available_count(&mut self, limit: usize) -> Option<usize> {
957 let mut drained = 0_usize;
958 let limit = limit.min(TOPIC_DRAIN_BATCH);
959 while drained < limit {
960 let Some(_value) = self.slot.pop() else {
961 break;
962 };
963 drained += 1;
964 }
965 if drained == 0 {
966 return None;
967 }
968 self.shared.notify_space();
969 Some(drained)
970 }
971
972 async fn wait_for_wake(&self) {
973 let notified = self.slot.async_available.notified();
974 tokio::pin!(notified);
975 notified.as_mut().enable();
976
977 self.slot.park();
978 fence(Ordering::SeqCst);
979 if !self.slot.buffer.is_empty() || self.slot.terminal().is_some() {
980 self.slot.unpark();
981 return;
982 }
983
984 notified.as_mut().await;
985 self.slot.unpark();
986 }
987}
988
989impl<T: Clone + Send + Sync + 'static> Drop for TopicBenchmarkStream<T> {
990 fn drop(&mut self) {
991 self.slot.unsubscribe();
992 self.shared.notify_space();
993 }
994}
995
996impl<T: Send + Sync + 'static> fmt::Debug for Topic<T> {
997 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
998 f.debug_struct("Topic")
999 .field("closed", &self.is_closed())
1000 .field("subscribers", &self.subscriber_count())
1001 .finish_non_exhaustive()
1002 }
1003}
1004
1005impl<T> fmt::Display for TopicPublishError<T> {
1006 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1007 match self {
1008 TopicPublishError::Closed(_) => f.write_str("topic is closed"),
1009 }
1010 }
1011}
1012
1013impl<T> fmt::Display for TopicTryPublishError<T> {
1014 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1015 match self {
1016 TopicTryPublishError::Closed(_) => f.write_str("topic is closed"),
1017 TopicTryPublishError::Full(_) => f.write_str("topic subscriber buffer is full"),
1018 TopicTryPublishError::Busy(_) => f.write_str("topic publish turn is busy"),
1019 }
1020 }
1021}
1022
1023#[cfg(test)]
1024mod tests {
1025 use super::*;
1026 use crate::{Sink, stream::Materializer};
1027 use futures::executor::block_on;
1028 use std::{
1029 collections::HashSet,
1030 sync::{
1031 Arc,
1032 atomic::{AtomicBool, AtomicUsize},
1033 },
1034 thread,
1035 time::{Duration, Instant},
1036 };
1037
1038 fn wait<T>(completion: crate::StreamCompletion<T>) -> T {
1039 completion.wait().unwrap()
1040 }
1041
1042 fn wait_until<F>(timeout: Duration, mut condition: F) -> bool
1043 where
1044 F: FnMut() -> bool,
1045 {
1046 let deadline = Instant::now() + timeout;
1047 while Instant::now() < deadline {
1048 if condition() {
1049 return true;
1050 }
1051 thread::yield_now();
1052 thread::sleep(Duration::from_millis(1));
1053 }
1054 condition()
1055 }
1056
1057 fn materialize_topic<T: Clone + Send + Sync + 'static>(topic: &Topic<T>) -> BoxStream<T> {
1058 let materializer = Materializer::new();
1059 let (stream, _) = topic.subscribe().factory.create(&materializer).unwrap();
1060 stream
1061 }
1062
1063 #[test]
1064 fn every_subscriber_sees_every_post_subscription_element() {
1065 const SUBSCRIBERS: usize = 4;
1066 const PUBLISHERS: usize = 4;
1067 const PER_PUBLISHER: usize = 128;
1068 let topic = Topic::new(1_024, TopicOverflow::Backpressure).unwrap();
1069 let completions = (0..SUBSCRIBERS)
1070 .map(|_| topic.subscribe().run_with(Sink::collect()).unwrap())
1071 .collect::<Vec<_>>();
1072 assert!(wait_until(Duration::from_secs(1), || topic
1073 .subscriber_count()
1074 == SUBSCRIBERS));
1075
1076 let mut handles = Vec::new();
1077 for publisher in 0..PUBLISHERS {
1078 let topic = topic.clone();
1079 handles.push(thread::spawn(move || {
1080 for seq in 0..PER_PUBLISHER {
1081 let value = ((publisher as u64) << 32) | seq as u64;
1082 block_on(topic.publish(value)).unwrap();
1083 }
1084 }));
1085 }
1086 for handle in handles {
1087 handle.join().unwrap();
1088 }
1089 topic.close().unwrap();
1090
1091 let observed = completions.into_iter().map(wait).collect::<Vec<_>>();
1092 let first = observed.first().unwrap();
1093 assert_eq!(first.len(), PUBLISHERS * PER_PUBLISHER);
1094 let unique = first.iter().copied().collect::<HashSet<_>>();
1095 assert_eq!(unique.len(), PUBLISHERS * PER_PUBLISHER);
1096 for values in &observed[1..] {
1097 assert_eq!(values, first, "subscribers disagreed on global order");
1098 }
1099 }
1100
1101 #[test]
1102 fn late_subscriber_sees_nothing_prior_and_zero_subscriber_publish_drops() {
1103 let topic = Topic::new(8, TopicOverflow::Backpressure).unwrap();
1104 assert_eq!(topic.subscriber_count(), 0);
1105 block_on(topic.publish(1_u64)).unwrap();
1106 topic.try_publish(2).unwrap();
1107
1108 let completion = topic.subscribe().run_with(Sink::collect()).unwrap();
1109 assert!(wait_until(Duration::from_secs(1), || topic
1110 .subscriber_count()
1111 == 1));
1112 block_on(topic.publish(3)).unwrap();
1113 topic.close().unwrap();
1114 assert_eq!(wait(completion), vec![3]);
1115 }
1116
1117 #[test]
1118 fn sliding_overflow_drops_oldest_for_slow_subscriber() {
1119 let topic = Topic::new(2, TopicOverflow::Sliding).unwrap();
1120 let mut stream = materialize_topic(&topic);
1121 topic.try_publish(1_u64).unwrap();
1122 topic.try_publish(2).unwrap();
1123 topic.try_publish(3).unwrap();
1124 topic.try_publish(4).unwrap();
1125 topic.close().unwrap();
1126
1127 assert_eq!(stream.next(), Some(Ok(3)));
1128 assert_eq!(stream.next(), Some(Ok(4)));
1129 assert_eq!(stream.next(), None);
1130 }
1131
1132 #[test]
1133 fn dropping_overflow_drops_newest_for_slow_subscriber() {
1134 let topic = Topic::new(2, TopicOverflow::Dropping).unwrap();
1135 let mut stream = materialize_topic(&topic);
1136 topic.try_publish(1_u64).unwrap();
1137 topic.try_publish(2).unwrap();
1138 topic.try_publish(3).unwrap();
1139 topic.try_publish(4).unwrap();
1140 topic.close().unwrap();
1141
1142 assert_eq!(stream.next(), Some(Ok(1)));
1143 assert_eq!(stream.next(), Some(Ok(2)));
1144 assert_eq!(stream.next(), None);
1145 }
1146
1147 #[test]
1148 fn backpressure_stalls_publisher_until_slow_subscriber_drains() {
1149 let topic = Topic::new(1, TopicOverflow::Backpressure).unwrap();
1150 let mut stream = materialize_topic(&topic);
1151 block_on(topic.publish(1_u64)).unwrap();
1152
1153 let completed = Arc::new(AtomicBool::new(false));
1154 let publisher_completed = Arc::clone(&completed);
1155 let publisher = {
1156 let topic = topic.clone();
1157 thread::spawn(move || {
1158 block_on(topic.publish(2)).unwrap();
1159 publisher_completed.store(true, Ordering::SeqCst);
1160 })
1161 };
1162
1163 assert!(!wait_until(Duration::from_millis(25), || completed
1164 .load(Ordering::SeqCst)));
1165 assert_eq!(stream.next(), Some(Ok(1)));
1166 assert!(wait_until(Duration::from_secs(1), || completed.load(Ordering::SeqCst)));
1167 publisher.join().unwrap();
1168 topic.close().unwrap();
1169 assert_eq!(stream.next(), Some(Ok(2)));
1170 assert_eq!(stream.next(), None);
1171 }
1172
1173 #[test]
1174 fn dropping_source_unsubscribes_and_frees_backpressured_slot() {
1175 let topic = Topic::new(1, TopicOverflow::Backpressure).unwrap();
1176 let stream = materialize_topic(&topic);
1177 block_on(topic.publish(1_u64)).unwrap();
1178 assert_eq!(topic.subscriber_count(), 1);
1179 drop(stream);
1180 assert!(wait_until(Duration::from_secs(1), || topic
1181 .subscriber_count()
1182 == 0));
1183 block_on(topic.publish(2)).unwrap();
1184 topic.close().unwrap();
1185 }
1186
1187 #[test]
1188 fn close_drains_then_completes_and_rejects_late_publishes() {
1189 let topic = Topic::new(4, TopicOverflow::Backpressure).unwrap();
1190 let mut stream = materialize_topic(&topic);
1191 block_on(topic.publish(1_u64)).unwrap();
1192 block_on(topic.publish(2)).unwrap();
1193 topic.close().unwrap();
1194
1195 assert_eq!(stream.next(), Some(Ok(1)));
1196 assert_eq!(stream.next(), Some(Ok(2)));
1197 assert_eq!(stream.next(), None);
1198 assert_eq!(topic.try_publish(3), Err(TopicTryPublishError::Closed(3)));
1199 assert_eq!(
1200 block_on(topic.publish(4)),
1201 Err(TopicPublishError::Closed(4))
1202 );
1203 }
1204
1205 #[test]
1206 fn closed_future_wakes_on_close() {
1207 let topic = Topic::<u64>::new(1, TopicOverflow::Backpressure).unwrap();
1208 let waiting = Arc::new(AtomicBool::new(false));
1209 let waiter_started = Arc::clone(&waiting);
1210 let waiter = {
1211 let topic = topic.clone();
1212 thread::spawn(move || {
1213 waiter_started.store(true, Ordering::SeqCst);
1214 block_on(topic.closed());
1215 })
1216 };
1217
1218 assert!(wait_until(Duration::from_secs(1), || waiting.load(Ordering::SeqCst)));
1219 topic.close().unwrap();
1220 waiter.join().unwrap();
1221 assert!(topic.is_closed());
1222 }
1223
1224 #[test]
1225 fn publisher_subscriber_churn_hammer() {
1226 const ROUNDS: usize = 200;
1227 const PUBLISHERS: usize = 4;
1228 let topic = Topic::new(8, TopicOverflow::Sliding).unwrap();
1229 let published = Arc::new(AtomicUsize::new(0));
1230
1231 let mut publisher_handles = Vec::new();
1232 for publisher in 0..PUBLISHERS {
1233 let topic = topic.clone();
1234 let published = Arc::clone(&published);
1235 publisher_handles.push(thread::spawn(move || {
1236 for seq in 0..ROUNDS {
1237 let value = ((publisher as u64) << 32) | seq as u64;
1238 block_on(topic.publish(value)).unwrap();
1239 published.fetch_add(1, Ordering::Relaxed);
1240 }
1241 }));
1242 }
1243
1244 let churn_topic = topic.clone();
1245 let churn = thread::spawn(move || {
1246 for _ in 0..ROUNDS {
1247 let stream = materialize_topic(&churn_topic);
1248 drop(stream);
1249 }
1250 });
1251
1252 for handle in publisher_handles {
1253 handle.join().unwrap();
1254 }
1255 churn.join().unwrap();
1256 topic.close().unwrap();
1257 assert_eq!(published.load(Ordering::Relaxed), PUBLISHERS * ROUNDS);
1258 }
1259
1260 #[test]
1261 fn post_close_subscribe_completes_empty() {
1262 let topic = Topic::<u64>::new(8, TopicOverflow::Backpressure).unwrap();
1263 topic.close().unwrap();
1264 assert_eq!(topic.subscribe().run_collect().unwrap(), Vec::<u64>::new());
1265 }
1266}