Skip to main content

datum/concurrent/
topic.rs

1//! Stream-native many-publisher, many-subscriber broadcast topic.
2//!
3//! [`Topic`] mirrors FS2 `Topic` / ZIO `Hub` broadcast semantics for Datum:
4//! publishers broadcast to the subscriber set that is current at the publish
5//! linearization point, and each subscriber sees only elements published after
6//! its subscription is registered. Publishing with no subscribers succeeds and
7//! drops the element, matching FS2 rather than ZIO's buffering-for-future-
8//! subscribers behavior.
9//!
10//! The implementation follows M9's two-plane rule. A Ractor actor owns only the
11//! control plane: subscribe, unsubscribe, close, terminal state, and publishing
12//! `ArcSwap` subscriber-table snapshots. The element path is direct: publishers
13//! claim one global sequence turn, load the current table snapshot, enqueue an
14//! `Arc<T>` into each subscriber's lock-free slot queue, and wake a subscriber
15//! only on an empty-to-non-empty transition or if it is parked at the queue
16//! head. There is no actor message and no mutex on the accepted publish path;
17//! backpressured publishers park outside the actor.
18
19use std::{
20    collections::{HashMap, VecDeque},
21    fmt, hint,
22    marker::PhantomData,
23    sync::{
24        Arc, Condvar, Mutex, MutexGuard,
25        atomic::{AtomicBool, AtomicU8, AtomicU64, AtomicUsize, Ordering, fence},
26        mpsc,
27    },
28    thread,
29    time::Duration,
30};
31
32use arc_swap::ArcSwap;
33use crossbeam_queue::ArrayQueue;
34use ractor::{Actor, ActorProcessingErr, ActorRef};
35use tokio::sync::Notify;
36
37use crate::{
38    StreamError, StreamResult,
39    actor::block_on_ractor_runtime,
40    stream::{BoxStream, NotUsed, Source, current_stream_cancelled},
41};
42
43const TOPIC_OPEN: u8 = 0;
44const TOPIC_CLOSING: u8 = 1;
45const TOPIC_CLOSED: u8 = 2;
46const SLOT_OPEN: u8 = 0;
47const SLOT_COMPLETE: u8 = 1;
48const SLOT_ERROR: u8 = 2;
49const SLOT_WAIT_BACKSTOP: Duration = Duration::from_millis(10);
50const TOPIC_DRAIN_BATCH: usize = 256;
51
52type Ack = mpsc::Sender<StreamResult<()>>;
53
54/// Overflow policy for each [`Topic`] subscriber buffer.
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56pub enum TopicOverflow {
57    /// Preserve every element for every active subscriber.
58    ///
59    /// `publish(value).await` waits until every subscriber in the publish
60    /// snapshot has capacity, then enqueues the element to all of them in the
61    /// topic's global publish order. `try_publish(value)` returns
62    /// [`TopicTryPublishError::Full`] instead of waiting if any subscriber in
63    /// the snapshot is full.
64    Backpressure,
65    /// Keep the newest bounded window per subscriber.
66    ///
67    /// When a subscriber buffer is full, publishing drops that subscriber's
68    /// oldest queued element and enqueues the new element. Other subscribers are
69    /// unaffected. Publishers never wait for subscriber capacity under this
70    /// policy.
71    Sliding,
72    /// Drop new elements for slow subscribers.
73    ///
74    /// When a subscriber buffer is full, the new element is skipped for that
75    /// subscriber only. Other subscribers still receive it. Publishers never
76    /// wait for subscriber capacity under this policy.
77    Dropping,
78}
79
80/// Error returned by [`Topic::publish`] when the topic closes before the value
81/// can be accepted.
82#[derive(Debug, Clone, PartialEq, Eq)]
83pub enum TopicPublishError<T> {
84    /// The topic is closed; the unpublished value is returned.
85    Closed(T),
86}
87
88/// Error returned by [`Topic::try_publish`].
89#[derive(Debug, Clone, PartialEq, Eq)]
90pub enum TopicTryPublishError<T> {
91    /// The topic is closed; the unpublished value is returned.
92    Closed(T),
93    /// The call would need to wait for subscriber capacity under
94    /// [`TopicOverflow::Backpressure`]; the unpublished value is returned.
95    Full(T),
96    /// Another publisher currently owns an earlier global publish turn; the
97    /// unpublished value is returned rather than blocking behind it.
98    Busy(T),
99}
100
101/// A cloneable stream-native broadcast topic.
102///
103/// Clone the handle for publishers. Each call to [`Topic::subscribe`] returns a
104/// source blueprint; each materialization registers a fresh subscriber slot.
105/// Subscribers observe every element published after their registration, in the
106/// same global order as every other subscriber, subject to this topic's
107/// overflow policy.
108///
109/// Capacity is per subscriber and must be greater than zero.
110pub struct Topic<T: Send + Sync + 'static> {
111    inner: Arc<TopicInner<T>>,
112}
113
114struct TopicInner<T: Send + Sync + 'static> {
115    actor: ActorRef<TopicMessage<T>>,
116    shared: Arc<TopicShared<T>>,
117    next_subscriber_id: Arc<AtomicU64>,
118}
119
120struct TopicShared<T: Send + Sync + 'static> {
121    subscribers: Arc<ArcSwap<TopicSlotTable<T>>>,
122    capacity: usize,
123    overflow: TopicOverflow,
124    lifecycle: AtomicU8,
125    active_publishers: AtomicUsize,
126    next_sequence: AtomicU64,
127    delivered_sequence: AtomicU64,
128    space_waiters: AtomicUsize,
129    space_available: Notify,
130    closed_notified: Notify,
131}
132
133struct TopicSlotTable<T: Send + Sync + 'static> {
134    slots: Vec<Arc<TopicSlot<T>>>,
135}
136
137struct TopicSlot<T: Send + Sync + 'static> {
138    id: u64,
139    actor: ActorRef<TopicMessage<T>>,
140    buffer: ArrayQueue<Arc<T>>,
141    active: AtomicBool,
142    parked: AtomicBool,
143    terminal_state: AtomicU8,
144    terminal: Mutex<Option<TopicSlotTerminal>>,
145    available_lock: Mutex<()>,
146    available: Condvar,
147    async_available: Notify,
148}
149
150#[derive(Clone)]
151enum TopicSlotTerminal {
152    Complete,
153    Error(StreamError),
154}
155
156impl<T: Send + Sync + 'static> Clone for Topic<T> {
157    fn clone(&self) -> Self {
158        Self {
159            inner: Arc::clone(&self.inner),
160        }
161    }
162}
163
164impl<T: Send + Sync + 'static> Topic<T> {
165    /// Create a new topic with per-subscriber `capacity` and `overflow` policy.
166    ///
167    /// Panics if `capacity == 0`.
168    pub fn new(capacity: usize, overflow: TopicOverflow) -> StreamResult<Self> {
169        assert!(capacity > 0, "topic capacity must be greater than zero");
170        let shared = Arc::new(TopicShared {
171            subscribers: Arc::new(ArcSwap::from_pointee(TopicSlotTable { slots: Vec::new() })),
172            capacity,
173            overflow,
174            lifecycle: AtomicU8::new(TOPIC_OPEN),
175            active_publishers: AtomicUsize::new(0),
176            next_sequence: AtomicU64::new(0),
177            delivered_sequence: AtomicU64::new(0),
178            space_waiters: AtomicUsize::new(0),
179            space_available: Notify::new(),
180            closed_notified: Notify::new(),
181        });
182        let state = TopicActorState {
183            shared: Arc::clone(&shared),
184            subscribers: HashMap::new(),
185            closed: false,
186        };
187        let (actor, _handle) =
188            block_on_ractor_runtime(Actor::spawn(None, TopicActor::<T>::default(), state))?
189                .map_err(|error| {
190                    StreamError::Failed(format!("topic actor failed to spawn: {error}"))
191                })?;
192        Ok(Self {
193            inner: Arc::new(TopicInner {
194                actor,
195                shared,
196                next_subscriber_id: Arc::new(AtomicU64::new(1)),
197            }),
198        })
199    }
200
201    /// Publish one value, waiting only under [`TopicOverflow::Backpressure`].
202    ///
203    /// The publish linearizes when this call owns its global sequence turn and
204    /// loads the subscriber-table snapshot. Subscribers in that snapshot receive
205    /// the value according to the configured overflow policy; subscribers
206    /// registered later do not. If the topic has no subscribers, the value is
207    /// accepted and dropped.
208    pub async fn publish(&self, value: T) -> Result<(), TopicPublishError<T>> {
209        let Ok(_permit) = self.inner.shared.begin_publish() else {
210            return Err(TopicPublishError::Closed(value));
211        };
212        let sequence = self.inner.shared.claim_sequence();
213        self.inner.shared.wait_publish_turn(sequence);
214
215        let table = self.inner.shared.subscribers.load_full();
216        if self.inner.shared.overflow == TopicOverflow::Backpressure {
217            self.inner.shared.wait_for_capacity(&table).await;
218        }
219
220        self.inner
221            .shared
222            .publish_to_snapshot(&table, Arc::new(value));
223        self.inner.shared.finish_publish(sequence);
224        Ok(())
225    }
226
227    /// Try to publish one value without awaiting subscriber capacity or an
228    /// earlier in-flight publisher.
229    ///
230    /// Under [`TopicOverflow::Backpressure`], this returns
231    /// [`TopicTryPublishError::Full`] if any active subscriber in the publish
232    /// snapshot is full. Under all policies, it returns
233    /// [`TopicTryPublishError::Busy`] if another publisher currently owns an
234    /// earlier global publish turn.
235    pub fn try_publish(&self, value: T) -> Result<(), TopicTryPublishError<T>> {
236        let Ok(_permit) = self.inner.shared.begin_publish() else {
237            return Err(TopicTryPublishError::Closed(value));
238        };
239        let Some(sequence) = self.inner.shared.try_claim_sequence() else {
240            return Err(TopicTryPublishError::Busy(value));
241        };
242
243        let table = self.inner.shared.subscribers.load_full();
244        if self.inner.shared.overflow == TopicOverflow::Backpressure
245            && !self.inner.shared.snapshot_has_capacity(&table)
246        {
247            self.inner.shared.finish_publish(sequence);
248            return Err(TopicTryPublishError::Full(value));
249        }
250
251        self.inner
252            .shared
253            .publish_to_snapshot(&table, Arc::new(value));
254        self.inner.shared.finish_publish(sequence);
255        Ok(())
256    }
257
258    /// Return a source blueprint that registers a fresh subscriber when
259    /// materialized.
260    ///
261    /// The source emits only elements published after its registration. If the
262    /// topic is already closed, the materialized source completes immediately.
263    #[must_use]
264    pub fn subscribe(&self) -> Source<T>
265    where
266        T: Clone,
267    {
268        let topic = self.clone();
269        let actor = self.inner.actor.clone();
270        let next_subscriber_id = Arc::clone(&self.inner.next_subscriber_id);
271        Source::from_materialized_factory(move |_materializer| {
272            let id = next_subscriber_id.fetch_add(1, Ordering::Relaxed);
273            let capacity = topic.registered_capacity();
274            let slot = TopicSlot::new(id, actor.clone(), capacity);
275            topic.register_slot(Arc::clone(&slot), id)?;
276            let stream: BoxStream<T> = Box::new(TopicStream {
277                shared: Arc::clone(&topic.inner.shared),
278                slot,
279                pending: VecDeque::new(),
280                terminated: false,
281            });
282            Ok((stream, NotUsed))
283        })
284    }
285
286    /// Return the number of currently active subscribers.
287    #[must_use]
288    pub fn subscriber_count(&self) -> usize {
289        self.inner.shared.subscriber_count()
290    }
291
292    /// Gracefully close the topic.
293    ///
294    /// Current subscribers drain their queued elements before completing.
295    /// Publishers that begin after close fail with [`TopicPublishError::Closed`]
296    /// or [`TopicTryPublishError::Closed`].
297    pub fn close(&self) -> StreamResult<()> {
298        let (reply, receiver) = mpsc::channel();
299        self.inner
300            .actor
301            .send_message(TopicMessage::Close { reply })
302            .map_err(|error| StreamError::ActorAskSendFailed {
303                reason: error.to_string(),
304            })?;
305        receiver.recv().unwrap_or(Err(StreamError::ActorTerminated))
306    }
307
308    /// Return true after the topic has closed.
309    #[must_use]
310    pub fn is_closed(&self) -> bool {
311        self.inner.shared.is_closed()
312    }
313
314    /// Wait until the topic is closed.
315    pub async fn closed(&self) {
316        loop {
317            if self.is_closed() {
318                return;
319            }
320            let notified = self.inner.shared.closed_notified.notified();
321            let mut notified = std::pin::pin!(notified);
322            notified.as_mut().enable();
323            if self.is_closed() {
324                return;
325            }
326            notified.as_mut().await;
327        }
328    }
329
330    fn registered_capacity(&self) -> usize {
331        self.inner.shared.capacity
332    }
333
334    fn register_slot(&self, slot: Arc<TopicSlot<T>>, id: u64) -> StreamResult<()> {
335        let (reply, receiver) = mpsc::channel();
336        self.inner
337            .actor
338            .send_message(TopicMessage::Subscribe { id, slot, reply })
339            .map_err(|error| StreamError::ActorAskSendFailed {
340                reason: error.to_string(),
341            })?;
342        receiver.recv().unwrap_or(Err(StreamError::ActorTerminated))
343    }
344}
345
346impl<T: Clone + Send + Sync + 'static> Topic<T> {
347    #[doc(hidden)]
348    pub fn __benchmark_subscribe(&self) -> StreamResult<TopicBenchmarkStream<T>> {
349        let id = self
350            .inner
351            .next_subscriber_id
352            .fetch_add(1, Ordering::Relaxed);
353        let capacity = self.registered_capacity();
354        let slot = TopicSlot::new(id, self.inner.actor.clone(), capacity);
355        self.register_slot(Arc::clone(&slot), id)?;
356        Ok(TopicBenchmarkStream {
357            shared: Arc::clone(&self.inner.shared),
358            slot,
359            pending: VecDeque::new(),
360            terminated: false,
361        })
362    }
363}
364
365impl<T: Send + Sync + 'static> TopicShared<T> {
366    fn begin_publish(&self) -> StreamResult<PublishPermit<'_>> {
367        if self.lifecycle.load(Ordering::Acquire) != TOPIC_OPEN {
368            return Err(closed_error());
369        }
370        self.active_publishers.fetch_add(1, Ordering::AcqRel);
371        if self.lifecycle.load(Ordering::Acquire) == TOPIC_OPEN {
372            Ok(PublishPermit {
373                active_publishers: &self.active_publishers,
374            })
375        } else {
376            self.active_publishers.fetch_sub(1, Ordering::AcqRel);
377            Err(closed_error())
378        }
379    }
380
381    fn claim_sequence(&self) -> u64 {
382        self.next_sequence.fetch_add(1, Ordering::AcqRel) + 1
383    }
384
385    fn try_claim_sequence(&self) -> Option<u64> {
386        let delivered = self.delivered_sequence.load(Ordering::Acquire);
387        self.next_sequence
388            .compare_exchange(
389                delivered,
390                delivered + 1,
391                Ordering::AcqRel,
392                Ordering::Acquire,
393            )
394            .ok()
395            .map(|_| delivered + 1)
396    }
397
398    fn wait_publish_turn(&self, sequence: u64) {
399        let mut spins = 0_u32;
400        while self.delivered_sequence.load(Ordering::Acquire) + 1 != sequence {
401            spins = spins.wrapping_add(1);
402            if spins < 64 {
403                hint::spin_loop();
404            } else {
405                thread::yield_now();
406            }
407        }
408    }
409
410    fn finish_publish(&self, sequence: u64) {
411        self.delivered_sequence.store(sequence, Ordering::Release);
412    }
413
414    fn publish_to_snapshot(&self, table: &TopicSlotTable<T>, value: Arc<T>) {
415        match self.overflow {
416            TopicOverflow::Backpressure => {
417                for slot in &table.slots {
418                    slot.enqueue_backpressured(Arc::clone(&value));
419                }
420            }
421            TopicOverflow::Sliding => {
422                for slot in &table.slots {
423                    slot.enqueue_sliding(Arc::clone(&value));
424                }
425            }
426            TopicOverflow::Dropping => {
427                for slot in &table.slots {
428                    slot.enqueue_dropping(Arc::clone(&value));
429                }
430            }
431        }
432    }
433
434    async fn wait_for_capacity(&self, table: &TopicSlotTable<T>) {
435        loop {
436            if self.snapshot_has_capacity(table) {
437                return;
438            }
439
440            let notified = self.space_available.notified();
441            let mut notified = std::pin::pin!(notified);
442            notified.as_mut().enable();
443            self.space_waiters.fetch_add(1, Ordering::AcqRel);
444            if self.snapshot_has_capacity(table) {
445                self.space_waiters.fetch_sub(1, Ordering::AcqRel);
446                return;
447            }
448            notified.as_mut().await;
449            self.space_waiters.fetch_sub(1, Ordering::AcqRel);
450        }
451    }
452
453    fn snapshot_has_capacity(&self, table: &TopicSlotTable<T>) -> bool {
454        table.slots.iter().all(|slot| slot.has_capacity())
455    }
456
457    fn notify_space(&self) {
458        if self.space_waiters.load(Ordering::Acquire) != 0 {
459            self.space_available.notify_waiters();
460        }
461    }
462
463    fn subscriber_count(&self) -> usize {
464        let table = self.subscribers.load();
465        table.slots.iter().filter(|slot| slot.is_active()).count()
466    }
467
468    fn is_closed(&self) -> bool {
469        self.lifecycle.load(Ordering::Acquire) == TOPIC_CLOSED
470    }
471
472    fn wait_for_publishers_to_drain(&self) {
473        while self.active_publishers.load(Ordering::Acquire) != 0 {
474            thread::yield_now();
475        }
476    }
477
478    fn mark_actor_terminated(&self) {
479        self.lifecycle.store(TOPIC_CLOSED, Ordering::Release);
480        self.closed_notified.notify_waiters();
481        self.notify_space();
482    }
483}
484
485struct PublishPermit<'a> {
486    active_publishers: &'a AtomicUsize,
487}
488
489impl Drop for PublishPermit<'_> {
490    fn drop(&mut self) {
491        self.active_publishers.fetch_sub(1, Ordering::AcqRel);
492    }
493}
494
495impl<T: Send + Sync + 'static> TopicSlot<T> {
496    fn new(id: u64, actor: ActorRef<TopicMessage<T>>, capacity: usize) -> Arc<Self> {
497        Arc::new(Self {
498            id,
499            actor,
500            buffer: ArrayQueue::new(capacity),
501            active: AtomicBool::new(true),
502            parked: AtomicBool::new(false),
503            terminal_state: AtomicU8::new(SLOT_OPEN),
504            terminal: Mutex::new(None),
505            available_lock: Mutex::new(()),
506            available: Condvar::new(),
507            async_available: Notify::new(),
508        })
509    }
510
511    fn terminal_lock(&self) -> MutexGuard<'_, Option<TopicSlotTerminal>> {
512        self.terminal
513            .lock()
514            .unwrap_or_else(|poison| poison.into_inner())
515    }
516
517    fn is_active(&self) -> bool {
518        self.active.load(Ordering::Acquire)
519    }
520
521    fn has_capacity(&self) -> bool {
522        !self.is_active() || !self.buffer.is_full()
523    }
524
525    fn enqueue_backpressured(&self, value: Arc<T>) {
526        if !self.is_active() {
527            return;
528        }
529        let was_empty = self.buffer.is_empty();
530        if self.buffer.push(value).is_ok() && was_empty {
531            self.wake();
532        }
533    }
534
535    fn enqueue_sliding(&self, value: Arc<T>) {
536        if !self.is_active() {
537            return;
538        }
539        while self.buffer.is_full() {
540            if self.buffer.pop().is_none() {
541                break;
542            }
543        }
544        let was_empty = self.buffer.is_empty();
545        if self.buffer.push(value).is_ok() && was_empty {
546            self.wake();
547        }
548    }
549
550    fn enqueue_dropping(&self, value: Arc<T>) {
551        if !self.is_active() || self.buffer.is_full() {
552            return;
553        }
554        let was_empty = self.buffer.is_empty();
555        if self.buffer.push(value).is_ok() && was_empty {
556            self.wake();
557        }
558    }
559
560    fn pop(&self) -> Option<Arc<T>> {
561        self.buffer.pop()
562    }
563
564    fn park(&self) {
565        self.parked.store(true, Ordering::Release);
566    }
567
568    fn unpark(&self) {
569        self.parked.store(false, Ordering::Release);
570    }
571
572    fn wake(&self) {
573        if self.parked.swap(false, Ordering::AcqRel) {
574            let _guard = self
575                .available_lock
576                .lock()
577                .unwrap_or_else(|poison| poison.into_inner());
578            self.available.notify_one();
579            self.async_available.notify_waiters();
580        }
581    }
582
583    fn complete(&self) {
584        if self
585            .terminal_state
586            .compare_exchange(
587                SLOT_OPEN,
588                SLOT_COMPLETE,
589                Ordering::AcqRel,
590                Ordering::Acquire,
591            )
592            .is_err()
593        {
594            return;
595        }
596        self.active.store(false, Ordering::Release);
597        *self.terminal_lock() = Some(TopicSlotTerminal::Complete);
598        self.wake();
599    }
600
601    fn fail(&self, error: StreamError) {
602        if self
603            .terminal_state
604            .compare_exchange(SLOT_OPEN, SLOT_ERROR, Ordering::AcqRel, Ordering::Acquire)
605            .is_err()
606        {
607            return;
608        }
609        self.active.store(false, Ordering::Release);
610        *self.terminal_lock() = Some(TopicSlotTerminal::Error(error));
611        self.wake();
612    }
613
614    fn terminal(&self) -> Option<TopicSlotTerminal> {
615        if self.terminal_state.load(Ordering::Acquire) == SLOT_OPEN {
616            return None;
617        }
618        self.terminal_lock().clone()
619    }
620
621    fn deactivate(&self) {
622        self.active.store(false, Ordering::Release);
623        while self.buffer.pop().is_some() {}
624        self.wake();
625    }
626
627    fn unsubscribe(&self) {
628        self.deactivate();
629        let _ = self
630            .actor
631            .send_message(TopicMessage::Unsubscribe { id: self.id });
632    }
633}
634
635impl<T: Send + Sync + 'static> Drop for TopicInner<T> {
636    fn drop(&mut self) {
637        self.actor.stop(None);
638    }
639}
640
641enum TopicMessage<T: Send + Sync + 'static> {
642    Close {
643        reply: Ack,
644    },
645    Subscribe {
646        id: u64,
647        slot: Arc<TopicSlot<T>>,
648        reply: Ack,
649    },
650    Unsubscribe {
651        id: u64,
652    },
653}
654
655#[cfg(feature = "cluster")]
656impl<T: Send + Sync + 'static> ractor::Message for TopicMessage<T> {}
657
658struct TopicActor<T> {
659    _marker: PhantomData<fn() -> T>,
660}
661
662impl<T> Default for TopicActor<T> {
663    fn default() -> Self {
664        Self {
665            _marker: PhantomData,
666        }
667    }
668}
669
670struct TopicActorState<T: Send + Sync + 'static> {
671    shared: Arc<TopicShared<T>>,
672    subscribers: HashMap<u64, Arc<TopicSlot<T>>>,
673    closed: bool,
674}
675
676impl<T: Send + Sync + 'static> Actor for TopicActor<T> {
677    type Msg = TopicMessage<T>;
678    type State = TopicActorState<T>;
679    type Arguments = TopicActorState<T>;
680
681    async fn pre_start(
682        &self,
683        _myself: ActorRef<Self::Msg>,
684        args: Self::Arguments,
685    ) -> Result<Self::State, ActorProcessingErr> {
686        Ok(args)
687    }
688
689    async fn handle(
690        &self,
691        _myself: ActorRef<Self::Msg>,
692        message: Self::Msg,
693        state: &mut Self::State,
694    ) -> Result<(), ActorProcessingErr> {
695        match message {
696            TopicMessage::Close { reply } => {
697                close_topic(state);
698                let _ = reply.send(Ok(()));
699            }
700            TopicMessage::Subscribe { id, slot, reply } => {
701                if state.closed || state.shared.lifecycle.load(Ordering::Acquire) == TOPIC_CLOSED {
702                    slot.complete();
703                } else {
704                    state.subscribers.insert(id, Arc::clone(&slot));
705                    publish_topic_slot_table(state);
706                }
707                let _ = reply.send(Ok(()));
708            }
709            TopicMessage::Unsubscribe { id } => {
710                state.subscribers.remove(&id);
711                publish_topic_slot_table(state);
712                state.shared.notify_space();
713            }
714        }
715        Ok(())
716    }
717
718    async fn post_stop(
719        &self,
720        _myself: ActorRef<Self::Msg>,
721        state: &mut Self::State,
722    ) -> Result<(), ActorProcessingErr> {
723        if !state.closed {
724            for slot in state.subscribers.values() {
725                slot.fail(StreamError::ActorTerminated);
726            }
727            state.subscribers.clear();
728            publish_topic_slot_table(state);
729            state.shared.mark_actor_terminated();
730        }
731        Ok(())
732    }
733}
734
735fn close_topic<T: Send + Sync + 'static>(state: &mut TopicActorState<T>) {
736    if state.closed {
737        return;
738    }
739    match state.shared.lifecycle.compare_exchange(
740        TOPIC_OPEN,
741        TOPIC_CLOSING,
742        Ordering::AcqRel,
743        Ordering::Acquire,
744    ) {
745        Ok(_) => {}
746        Err(TOPIC_CLOSED) => {
747            state.closed = true;
748            return;
749        }
750        Err(_) => {}
751    }
752    state.shared.wait_for_publishers_to_drain();
753    state
754        .shared
755        .lifecycle
756        .store(TOPIC_CLOSED, Ordering::Release);
757
758    for slot in state.subscribers.values() {
759        slot.complete();
760    }
761    state.subscribers.clear();
762    publish_topic_slot_table(state);
763    state.shared.notify_space();
764    state.shared.closed_notified.notify_waiters();
765    state.closed = true;
766}
767
768fn publish_topic_slot_table<T: Send + Sync + 'static>(state: &TopicActorState<T>) {
769    let slots = state.subscribers.values().cloned().collect::<Vec<_>>();
770    state
771        .shared
772        .subscribers
773        .store(Arc::new(TopicSlotTable { slots }));
774}
775
776fn closed_error() -> StreamError {
777    StreamError::Failed("topic is closed".into())
778}
779
780struct TopicStream<T: Clone + Send + Sync + 'static> {
781    shared: Arc<TopicShared<T>>,
782    slot: Arc<TopicSlot<T>>,
783    pending: VecDeque<Arc<T>>,
784    terminated: bool,
785}
786
787#[doc(hidden)]
788pub struct TopicBenchmarkStream<T: Clone + Send + Sync + 'static> {
789    shared: Arc<TopicShared<T>>,
790    slot: Arc<TopicSlot<T>>,
791    pending: VecDeque<Arc<T>>,
792    terminated: bool,
793}
794
795impl<T: Clone + Send + Sync + 'static> Iterator for TopicStream<T> {
796    type Item = StreamResult<T>;
797
798    fn next(&mut self) -> Option<Self::Item> {
799        if self.terminated {
800            return None;
801        }
802
803        loop {
804            if let Some(value) = self.pending.pop_front() {
805                return Some(Ok(value.as_ref().clone()));
806            }
807
808            if let Some(value) = self.drain_batch() {
809                return Some(Ok(value.as_ref().clone()));
810            }
811
812            if let Some(terminal) = self.slot.terminal() {
813                self.terminated = true;
814                return match terminal {
815                    TopicSlotTerminal::Complete => None,
816                    TopicSlotTerminal::Error(error) => Some(Err(error)),
817                };
818            }
819
820            if current_stream_cancelled()
821                .as_ref()
822                .is_some_and(|cancelled| cancelled.load(Ordering::SeqCst))
823            {
824                self.terminated = true;
825                return Some(Err(StreamError::Cancelled));
826            }
827
828            self.wait_for_wake();
829        }
830    }
831}
832
833impl<T: Clone + Send + Sync + 'static> TopicStream<T> {
834    fn drain_batch(&mut self) -> Option<Arc<T>> {
835        let first = self.slot.pop()?;
836        let mut drained = 1_usize;
837        while drained < TOPIC_DRAIN_BATCH {
838            let Some(value) = self.slot.pop() else {
839                break;
840            };
841            self.pending.push_back(value);
842            drained += 1;
843        }
844        self.shared.notify_space();
845        Some(first)
846    }
847
848    fn wait_for_wake(&self) {
849        let guard = self
850            .slot
851            .available_lock
852            .lock()
853            .unwrap_or_else(|poison| poison.into_inner());
854        self.slot.park();
855        fence(Ordering::SeqCst);
856        if !self.slot.buffer.is_empty() || self.slot.terminal().is_some() {
857            self.slot.unpark();
858            return;
859        }
860        let _guard = self
861            .slot
862            .available
863            .wait_timeout(guard, SLOT_WAIT_BACKSTOP)
864            .unwrap_or_else(|poison| poison.into_inner())
865            .0;
866        self.slot.unpark();
867    }
868}
869
870impl<T: Clone + Send + Sync + 'static> Drop for TopicStream<T> {
871    fn drop(&mut self) {
872        self.slot.unsubscribe();
873        self.shared.notify_space();
874    }
875}
876
877impl<T: Clone + Send + Sync + 'static> TopicBenchmarkStream<T> {
878    #[doc(hidden)]
879    pub async fn next(&mut self) -> Option<StreamResult<T>> {
880        if self.terminated {
881            return None;
882        }
883
884        loop {
885            if let Some(value) = self.pending.pop_front() {
886                return Some(Ok(value.as_ref().clone()));
887            }
888
889            if let Some(value) = self.drain_batch() {
890                return Some(Ok(value.as_ref().clone()));
891            }
892
893            if let Some(terminal) = self.slot.terminal() {
894                self.terminated = true;
895                return match terminal {
896                    TopicSlotTerminal::Complete => None,
897                    TopicSlotTerminal::Error(error) => Some(Err(error)),
898                };
899            }
900
901            self.wait_for_wake().await;
902        }
903    }
904
905    #[doc(hidden)]
906    pub async fn count_items(&mut self, target: u64) -> StreamResult<u64> {
907        let mut count = 0_u64;
908        while count < target {
909            if self.terminated {
910                return Err(StreamError::Failed(
911                    "topic stream ended before requested count".into(),
912                ));
913            }
914
915            if !self.pending.is_empty() {
916                let drained = self.pending.len().min((target - count) as usize);
917                self.pending.drain(..drained);
918                count += drained as u64;
919                continue;
920            }
921
922            if let Some(drained) = self.drain_available_count((target - count) as usize) {
923                count += drained as u64;
924                continue;
925            }
926
927            if let Some(terminal) = self.slot.terminal() {
928                self.terminated = true;
929                return match terminal {
930                    TopicSlotTerminal::Complete => Err(StreamError::Failed(
931                        "topic stream completed before requested count".into(),
932                    )),
933                    TopicSlotTerminal::Error(error) => Err(error),
934                };
935            }
936
937            self.wait_for_wake().await;
938        }
939        Ok(count)
940    }
941
942    fn drain_batch(&mut self) -> Option<Arc<T>> {
943        let first = self.slot.pop()?;
944        let mut drained = 1_usize;
945        while drained < TOPIC_DRAIN_BATCH {
946            let Some(value) = self.slot.pop() else {
947                break;
948            };
949            self.pending.push_back(value);
950            drained += 1;
951        }
952        self.shared.notify_space();
953        Some(first)
954    }
955
956    fn drain_available_count(&mut self, limit: usize) -> Option<usize> {
957        let mut drained = 0_usize;
958        let limit = limit.min(TOPIC_DRAIN_BATCH);
959        while drained < limit {
960            let Some(_value) = self.slot.pop() else {
961                break;
962            };
963            drained += 1;
964        }
965        if drained == 0 {
966            return None;
967        }
968        self.shared.notify_space();
969        Some(drained)
970    }
971
972    async fn wait_for_wake(&self) {
973        let notified = self.slot.async_available.notified();
974        tokio::pin!(notified);
975        notified.as_mut().enable();
976
977        self.slot.park();
978        fence(Ordering::SeqCst);
979        if !self.slot.buffer.is_empty() || self.slot.terminal().is_some() {
980            self.slot.unpark();
981            return;
982        }
983
984        notified.as_mut().await;
985        self.slot.unpark();
986    }
987}
988
989impl<T: Clone + Send + Sync + 'static> Drop for TopicBenchmarkStream<T> {
990    fn drop(&mut self) {
991        self.slot.unsubscribe();
992        self.shared.notify_space();
993    }
994}
995
996impl<T: Send + Sync + 'static> fmt::Debug for Topic<T> {
997    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
998        f.debug_struct("Topic")
999            .field("closed", &self.is_closed())
1000            .field("subscribers", &self.subscriber_count())
1001            .finish_non_exhaustive()
1002    }
1003}
1004
1005impl<T> fmt::Display for TopicPublishError<T> {
1006    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1007        match self {
1008            TopicPublishError::Closed(_) => f.write_str("topic is closed"),
1009        }
1010    }
1011}
1012
1013impl<T> fmt::Display for TopicTryPublishError<T> {
1014    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1015        match self {
1016            TopicTryPublishError::Closed(_) => f.write_str("topic is closed"),
1017            TopicTryPublishError::Full(_) => f.write_str("topic subscriber buffer is full"),
1018            TopicTryPublishError::Busy(_) => f.write_str("topic publish turn is busy"),
1019        }
1020    }
1021}
1022
1023#[cfg(test)]
1024mod tests {
1025    use super::*;
1026    use crate::{Sink, stream::Materializer};
1027    use futures::executor::block_on;
1028    use std::{
1029        collections::HashSet,
1030        sync::{
1031            Arc,
1032            atomic::{AtomicBool, AtomicUsize},
1033        },
1034        thread,
1035        time::{Duration, Instant},
1036    };
1037
1038    fn wait<T>(completion: crate::StreamCompletion<T>) -> T {
1039        completion.wait().unwrap()
1040    }
1041
1042    fn wait_until<F>(timeout: Duration, mut condition: F) -> bool
1043    where
1044        F: FnMut() -> bool,
1045    {
1046        let deadline = Instant::now() + timeout;
1047        while Instant::now() < deadline {
1048            if condition() {
1049                return true;
1050            }
1051            thread::yield_now();
1052            thread::sleep(Duration::from_millis(1));
1053        }
1054        condition()
1055    }
1056
1057    fn materialize_topic<T: Clone + Send + Sync + 'static>(topic: &Topic<T>) -> BoxStream<T> {
1058        let materializer = Materializer::new();
1059        let (stream, _) = topic.subscribe().factory.create(&materializer).unwrap();
1060        stream
1061    }
1062
1063    #[test]
1064    fn every_subscriber_sees_every_post_subscription_element() {
1065        const SUBSCRIBERS: usize = 4;
1066        const PUBLISHERS: usize = 4;
1067        const PER_PUBLISHER: usize = 128;
1068        let topic = Topic::new(1_024, TopicOverflow::Backpressure).unwrap();
1069        let completions = (0..SUBSCRIBERS)
1070            .map(|_| topic.subscribe().run_with(Sink::collect()).unwrap())
1071            .collect::<Vec<_>>();
1072        assert!(wait_until(Duration::from_secs(1), || topic
1073            .subscriber_count()
1074            == SUBSCRIBERS));
1075
1076        let mut handles = Vec::new();
1077        for publisher in 0..PUBLISHERS {
1078            let topic = topic.clone();
1079            handles.push(thread::spawn(move || {
1080                for seq in 0..PER_PUBLISHER {
1081                    let value = ((publisher as u64) << 32) | seq as u64;
1082                    block_on(topic.publish(value)).unwrap();
1083                }
1084            }));
1085        }
1086        for handle in handles {
1087            handle.join().unwrap();
1088        }
1089        topic.close().unwrap();
1090
1091        let observed = completions.into_iter().map(wait).collect::<Vec<_>>();
1092        let first = observed.first().unwrap();
1093        assert_eq!(first.len(), PUBLISHERS * PER_PUBLISHER);
1094        let unique = first.iter().copied().collect::<HashSet<_>>();
1095        assert_eq!(unique.len(), PUBLISHERS * PER_PUBLISHER);
1096        for values in &observed[1..] {
1097            assert_eq!(values, first, "subscribers disagreed on global order");
1098        }
1099    }
1100
1101    #[test]
1102    fn late_subscriber_sees_nothing_prior_and_zero_subscriber_publish_drops() {
1103        let topic = Topic::new(8, TopicOverflow::Backpressure).unwrap();
1104        assert_eq!(topic.subscriber_count(), 0);
1105        block_on(topic.publish(1_u64)).unwrap();
1106        topic.try_publish(2).unwrap();
1107
1108        let completion = topic.subscribe().run_with(Sink::collect()).unwrap();
1109        assert!(wait_until(Duration::from_secs(1), || topic
1110            .subscriber_count()
1111            == 1));
1112        block_on(topic.publish(3)).unwrap();
1113        topic.close().unwrap();
1114        assert_eq!(wait(completion), vec![3]);
1115    }
1116
1117    #[test]
1118    fn sliding_overflow_drops_oldest_for_slow_subscriber() {
1119        let topic = Topic::new(2, TopicOverflow::Sliding).unwrap();
1120        let mut stream = materialize_topic(&topic);
1121        topic.try_publish(1_u64).unwrap();
1122        topic.try_publish(2).unwrap();
1123        topic.try_publish(3).unwrap();
1124        topic.try_publish(4).unwrap();
1125        topic.close().unwrap();
1126
1127        assert_eq!(stream.next(), Some(Ok(3)));
1128        assert_eq!(stream.next(), Some(Ok(4)));
1129        assert_eq!(stream.next(), None);
1130    }
1131
1132    #[test]
1133    fn dropping_overflow_drops_newest_for_slow_subscriber() {
1134        let topic = Topic::new(2, TopicOverflow::Dropping).unwrap();
1135        let mut stream = materialize_topic(&topic);
1136        topic.try_publish(1_u64).unwrap();
1137        topic.try_publish(2).unwrap();
1138        topic.try_publish(3).unwrap();
1139        topic.try_publish(4).unwrap();
1140        topic.close().unwrap();
1141
1142        assert_eq!(stream.next(), Some(Ok(1)));
1143        assert_eq!(stream.next(), Some(Ok(2)));
1144        assert_eq!(stream.next(), None);
1145    }
1146
1147    #[test]
1148    fn backpressure_stalls_publisher_until_slow_subscriber_drains() {
1149        let topic = Topic::new(1, TopicOverflow::Backpressure).unwrap();
1150        let mut stream = materialize_topic(&topic);
1151        block_on(topic.publish(1_u64)).unwrap();
1152
1153        let completed = Arc::new(AtomicBool::new(false));
1154        let publisher_completed = Arc::clone(&completed);
1155        let publisher = {
1156            let topic = topic.clone();
1157            thread::spawn(move || {
1158                block_on(topic.publish(2)).unwrap();
1159                publisher_completed.store(true, Ordering::SeqCst);
1160            })
1161        };
1162
1163        assert!(!wait_until(Duration::from_millis(25), || completed
1164            .load(Ordering::SeqCst)));
1165        assert_eq!(stream.next(), Some(Ok(1)));
1166        assert!(wait_until(Duration::from_secs(1), || completed.load(Ordering::SeqCst)));
1167        publisher.join().unwrap();
1168        topic.close().unwrap();
1169        assert_eq!(stream.next(), Some(Ok(2)));
1170        assert_eq!(stream.next(), None);
1171    }
1172
1173    #[test]
1174    fn dropping_source_unsubscribes_and_frees_backpressured_slot() {
1175        let topic = Topic::new(1, TopicOverflow::Backpressure).unwrap();
1176        let stream = materialize_topic(&topic);
1177        block_on(topic.publish(1_u64)).unwrap();
1178        assert_eq!(topic.subscriber_count(), 1);
1179        drop(stream);
1180        assert!(wait_until(Duration::from_secs(1), || topic
1181            .subscriber_count()
1182            == 0));
1183        block_on(topic.publish(2)).unwrap();
1184        topic.close().unwrap();
1185    }
1186
1187    #[test]
1188    fn close_drains_then_completes_and_rejects_late_publishes() {
1189        let topic = Topic::new(4, TopicOverflow::Backpressure).unwrap();
1190        let mut stream = materialize_topic(&topic);
1191        block_on(topic.publish(1_u64)).unwrap();
1192        block_on(topic.publish(2)).unwrap();
1193        topic.close().unwrap();
1194
1195        assert_eq!(stream.next(), Some(Ok(1)));
1196        assert_eq!(stream.next(), Some(Ok(2)));
1197        assert_eq!(stream.next(), None);
1198        assert_eq!(topic.try_publish(3), Err(TopicTryPublishError::Closed(3)));
1199        assert_eq!(
1200            block_on(topic.publish(4)),
1201            Err(TopicPublishError::Closed(4))
1202        );
1203    }
1204
1205    #[test]
1206    fn closed_future_wakes_on_close() {
1207        let topic = Topic::<u64>::new(1, TopicOverflow::Backpressure).unwrap();
1208        let waiting = Arc::new(AtomicBool::new(false));
1209        let waiter_started = Arc::clone(&waiting);
1210        let waiter = {
1211            let topic = topic.clone();
1212            thread::spawn(move || {
1213                waiter_started.store(true, Ordering::SeqCst);
1214                block_on(topic.closed());
1215            })
1216        };
1217
1218        assert!(wait_until(Duration::from_secs(1), || waiting.load(Ordering::SeqCst)));
1219        topic.close().unwrap();
1220        waiter.join().unwrap();
1221        assert!(topic.is_closed());
1222    }
1223
1224    #[test]
1225    fn publisher_subscriber_churn_hammer() {
1226        const ROUNDS: usize = 200;
1227        const PUBLISHERS: usize = 4;
1228        let topic = Topic::new(8, TopicOverflow::Sliding).unwrap();
1229        let published = Arc::new(AtomicUsize::new(0));
1230
1231        let mut publisher_handles = Vec::new();
1232        for publisher in 0..PUBLISHERS {
1233            let topic = topic.clone();
1234            let published = Arc::clone(&published);
1235            publisher_handles.push(thread::spawn(move || {
1236                for seq in 0..ROUNDS {
1237                    let value = ((publisher as u64) << 32) | seq as u64;
1238                    block_on(topic.publish(value)).unwrap();
1239                    published.fetch_add(1, Ordering::Relaxed);
1240                }
1241            }));
1242        }
1243
1244        let churn_topic = topic.clone();
1245        let churn = thread::spawn(move || {
1246            for _ in 0..ROUNDS {
1247                let stream = materialize_topic(&churn_topic);
1248                drop(stream);
1249            }
1250        });
1251
1252        for handle in publisher_handles {
1253            handle.join().unwrap();
1254        }
1255        churn.join().unwrap();
1256        topic.close().unwrap();
1257        assert_eq!(published.load(Ordering::Relaxed), PUBLISHERS * ROUNDS);
1258    }
1259
1260    #[test]
1261    fn post_close_subscribe_completes_empty() {
1262        let topic = Topic::<u64>::new(8, TopicOverflow::Backpressure).unwrap();
1263        topic.close().unwrap();
1264        assert_eq!(topic.subscribe().run_collect().unwrap(), Vec::<u64>::new());
1265    }
1266}