Skip to main content

datum/concurrent/
subscription.rs

1use std::{
2    collections::{HashMap, VecDeque},
3    hint,
4    marker::PhantomData,
5    sync::{
6        Arc, Condvar, Mutex, MutexGuard,
7        atomic::{AtomicBool, AtomicU8, AtomicU64, AtomicUsize, Ordering, fence},
8        mpsc,
9    },
10    thread,
11    time::Duration,
12};
13
14use arc_swap::{ArcSwap, ArcSwapOption};
15use ractor::{Actor, ActorProcessingErr, ActorRef};
16use tokio::sync::Notify;
17
18use crate::{
19    StreamError, StreamResult,
20    actor::block_on_ractor_runtime,
21    stream::{BoxStream, NotUsed, Source, current_stream_cancelled},
22};
23
24const SLOT_WAIT_BACKSTOP: Duration = Duration::from_millis(10);
25const SUBSCRIPTION_DRAIN_BATCH: usize = 256;
26const STATE_OPEN: u8 = 0;
27const STATE_CLOSING: u8 = 1;
28const STATE_CLOSED: u8 = 2;
29const UNSEEDED_CURSOR: u64 = u64::MAX;
30const NO_DROP_FROM: u64 = u64::MAX;
31const NO_TERMINAL_FROM: u64 = u64::MAX;
32
33type Ack = mpsc::Sender<StreamResult<()>>;
34
35/// Overflow policy for a [`Subscription`] subscriber buffer.
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
37pub enum SubscriptionOverflow {
38    /// Preserve every change by parking the producer on the caller thread until every active
39    /// subscriber cursor has ring capacity. No actor handler waits for data-plane backpressure.
40    Backpressure,
41    /// Apply the state transition, but mark the new feed item as dropped for subscribers whose
42    /// logical cursor is full. This is not lossless for slow subscribers and is intended only when
43    /// dropping is an explicit part of the chosen contract.
44    DropNew,
45    /// Apply the state transition, fail subscribers whose logical cursor is full after they drain
46    /// already-accepted items, and return an error to the acknowledged producer.
47    Fail,
48}
49
50/// Latest-value state cell with a bounded every-change feed.
51///
52/// `Subscription` uses a two-plane implementation. The Ractor actor owns only subscribe,
53/// unsubscribe, close, terminal delivery, and the registry that is periodically published as an
54/// `ArcSwap` slot-table snapshot. State transitions run on the caller thread.
55///
56/// The lossless data plane is a sequence-claimed ring: each writer claims a global sequence,
57/// waits for its publish turn, writes the ring slot, stores the `ArcSwap` mirror, publishes an
58/// internal `(sequence, value)` snapshot, wakes current subscribers, and then returns. Under
59/// [`SubscriptionOverflow::Backpressure`], producers wait outside the actor while any active
60/// subscriber cursor would lag past the logical capacity. Subscribers consume by cursor in total
61/// sequence order, so they see no gaps or duplicates.
62///
63/// Subscribe has no actor-serialized-set gap. The control actor first publishes the new
64/// slot-table snapshot, then seeds the slot from the current published `(sequence, value)` and sets
65/// the cursor to `sequence + 1`; any writer that races after registration is either represented by
66/// the seed or consumed from the ring.
67///
68/// `update` uses `ArcSwap::compare_and_swap` under the write publication turn. The update function
69/// may be re-invoked if a concurrent writer wins the CAS race, matching the usual Ref/atomic
70/// update contract.
71pub struct Subscription<T: Send + Sync + 'static> {
72    inner: Arc<SubscriptionInner<T>>,
73}
74
75struct SubscriptionInner<T: Send + Sync + 'static> {
76    actor: ActorRef<SubscriptionMessage<T>>,
77    shared: Arc<SubscriptionShared<T>>,
78    next_subscriber_id: Arc<AtomicU64>,
79}
80
81struct SubscriptionShared<T: Send + Sync + 'static> {
82    mirror: Arc<ArcSwap<T>>,
83    published: Arc<ArcSwap<PublishedValue<T>>>,
84    subscribers: Arc<ArcSwap<SubscriptionSlotTable<T>>>,
85    ring: SubscriptionRing<T>,
86    overflow: SubscriptionOverflow,
87    lifecycle: AtomicU8,
88    active_writers: AtomicUsize,
89    next_sequence: AtomicU64,
90    published_sequence: AtomicU64,
91    parked_slots: Arc<AtomicUsize>,
92}
93
94struct PublishedValue<T: Send + Sync + 'static> {
95    sequence: u64,
96    value: Arc<T>,
97}
98
99struct SubscriptionSlotTable<T: Send + Sync + 'static> {
100    slots: Vec<Arc<SubscriptionSlot<T>>>,
101}
102
103struct SubscriptionRing<T: Send + Sync + 'static> {
104    logical_capacity: u64,
105    physical_capacity: usize,
106    slots: Vec<SubscriptionRingSlot<T>>,
107    space_lock: Mutex<()>,
108    space_available: Condvar,
109    space_waiters: AtomicUsize,
110}
111
112struct SubscriptionRingSlot<T: Send + Sync + 'static> {
113    sequence: AtomicU64,
114    value: ArcSwapOption<T>,
115}
116
117impl<T: Send + Sync + 'static> Clone for Subscription<T> {
118    fn clone(&self) -> Self {
119        Self {
120            inner: Arc::clone(&self.inner),
121        }
122    }
123}
124
125impl<T: Send + Sync + 'static> Subscription<T> {
126    /// Create a subscription initialized to `initial`.
127    ///
128    /// Panics if `capacity == 0`.
129    pub fn new(initial: T, capacity: usize, overflow: SubscriptionOverflow) -> StreamResult<Self> {
130        assert!(
131            capacity > 0,
132            "subscription capacity must be greater than zero"
133        );
134        let value = Arc::new(initial);
135        let shared = Arc::new(SubscriptionShared {
136            mirror: Arc::new(ArcSwap::from(Arc::clone(&value))),
137            published: Arc::new(ArcSwap::from_pointee(PublishedValue {
138                sequence: 0,
139                value: Arc::clone(&value),
140            })),
141            subscribers: Arc::new(ArcSwap::from_pointee(SubscriptionSlotTable {
142                slots: Vec::new(),
143            })),
144            ring: SubscriptionRing::new(capacity),
145            overflow,
146            lifecycle: AtomicU8::new(STATE_OPEN),
147            active_writers: AtomicUsize::new(0),
148            next_sequence: AtomicU64::new(0),
149            published_sequence: AtomicU64::new(0),
150            parked_slots: Arc::new(AtomicUsize::new(0)),
151        });
152        let state = SubscriptionActorState {
153            shared: Arc::clone(&shared),
154            subscribers: HashMap::new(),
155            closed: false,
156        };
157        let (actor, _handle) =
158            block_on_ractor_runtime(Actor::spawn(None, SubscriptionActor::<T>::default(), state))?
159                .map_err(|error| {
160                    StreamError::Failed(format!("subscription actor failed to spawn: {error}"))
161                })?;
162        Ok(Self {
163            inner: Arc::new(SubscriptionInner {
164                actor,
165                shared,
166                next_subscriber_id: Arc::new(AtomicU64::new(1)),
167            }),
168        })
169    }
170
171    /// Return the current immutable snapshot without sending an actor message.
172    #[must_use]
173    pub fn get(&self) -> Arc<T> {
174        self.inner.shared.mirror.load_full()
175    }
176
177    /// Return a cloned value using `ArcSwap::load()`'s guarded read path.
178    ///
179    /// This avoids cloning the `Arc` itself on the hot read path. For scalar `Copy`/cheap-`Clone`
180    /// values, this is the fair equivalent of JVM refs returning the value directly; use
181    /// [`Subscription::get`] when the caller wants an owned snapshot shared by `Arc`.
182    #[must_use]
183    pub fn get_cloned(&self) -> T
184    where
185        T: Clone,
186    {
187        self.inner.shared.mirror.load().as_ref().clone()
188    }
189
190    /// Set the state and wait for the transition to be accepted according to the overflow policy.
191    pub fn set(&self, value: T) -> StreamResult<()> {
192        self.publish_set(Arc::new(value))
193    }
194
195    /// Set the state on the caller thread.
196    ///
197    /// With [`SubscriptionOverflow::Backpressure`], this may still park the caller until subscriber
198    /// cursors make ring capacity available. It does not send an actor message.
199    pub fn set_eventually(&self, value: T) -> StreamResult<()> {
200        self.publish_set(Arc::new(value))
201    }
202
203    /// Update the state atomically and wait for the transition to be accepted.
204    ///
205    /// The update function may be called more than once if a concurrent writer wins the CAS race.
206    pub fn update<F>(&self, update: F) -> StreamResult<()>
207    where
208        F: FnMut(&T) -> T + Send + 'static,
209    {
210        self.publish_update(update)
211    }
212
213    /// Update the state atomically on the caller thread.
214    ///
215    /// The update function may be called more than once if a concurrent writer wins the CAS race.
216    pub fn update_eventually<F>(&self, update: F) -> StreamResult<()>
217    where
218        F: FnMut(&T) -> T + Send + 'static,
219    {
220        self.publish_update(update)
221    }
222
223    /// Close the subscription, re-emitting the current final snapshot to current subscribers.
224    pub fn close(&self) -> StreamResult<()> {
225        self.send_close(None)
226    }
227
228    /// Set a final value, then close the subscription in one control-plane turn.
229    pub fn close_with(&self, final_value: T) -> StreamResult<()> {
230        self.send_close(Some(final_value))
231    }
232
233    fn publish_set(&self, value: Arc<T>) -> StreamResult<()> {
234        let _permit = self.inner.shared.begin_write()?;
235        let sequence = self.inner.shared.claim_sequence();
236        self.inner.shared.wait_publish_turn(sequence);
237        self.inner.shared.wait_for_ring_capacity(sequence);
238        let overflow = self.inner.shared.apply_overflow_policy(sequence);
239        self.inner.shared.finish_publish(sequence, value);
240        overflow
241    }
242
243    fn publish_update<F>(&self, mut update: F) -> StreamResult<()>
244    where
245        F: FnMut(&T) -> T + Send + 'static,
246    {
247        let _permit = self.inner.shared.begin_write()?;
248        let sequence = self.inner.shared.claim_sequence();
249        self.inner.shared.wait_publish_turn(sequence);
250        self.inner.shared.wait_for_ring_capacity(sequence);
251        let value = loop {
252            let current = self.inner.shared.mirror.load();
253            let next = Arc::new(update(current.as_ref()));
254            let previous = self
255                .inner
256                .shared
257                .mirror
258                .compare_and_swap(&*current, Arc::clone(&next));
259            if std::ptr::eq(current.as_ref(), previous.as_ref()) {
260                break next;
261            }
262        };
263        let overflow = self.inner.shared.apply_overflow_policy(sequence);
264        self.inner.shared.ring.store(sequence, Arc::clone(&value));
265        self.inner
266            .shared
267            .finish_publish_after_mirror(sequence, value);
268        overflow
269    }
270
271    fn send_close(&self, final_value: Option<T>) -> StreamResult<()> {
272        let (reply, receiver) = mpsc::channel();
273        self.inner
274            .actor
275            .send_message(SubscriptionMessage::Close { final_value, reply })
276            .map_err(|error| StreamError::ActorAskSendFailed {
277                reason: error.to_string(),
278            })?;
279        receiver.recv().unwrap_or(Err(StreamError::ActorTerminated))
280    }
281
282    fn register_slot(&self, slot: Arc<SubscriptionSlot<T>>, id: u64) -> StreamResult<()> {
283        let (reply, receiver) = mpsc::channel();
284        self.inner
285            .actor
286            .send_message(SubscriptionMessage::Subscribe { id, slot, reply })
287            .map_err(|error| StreamError::ActorAskSendFailed {
288                reason: error.to_string(),
289            })?;
290        receiver.recv().unwrap_or(Err(StreamError::ActorTerminated))
291    }
292}
293
294impl<T: Clone + Send + Sync + 'static> Subscription<T> {
295    /// A bounded source of the current value followed by every accepted change.
296    ///
297    /// Under [`SubscriptionOverflow::Backpressure`], every subscriber observes every change. Under
298    /// `DropNew`, slow subscribers may miss changes. Under `Fail`, a full subscriber fails with
299    /// `StreamError::Failed`.
300    #[must_use]
301    pub fn changes(&self) -> Source<T> {
302        let actor = self.inner.actor.clone();
303        let subscription = self.clone();
304        let shared = Arc::clone(&self.inner.shared);
305        let next_subscriber_id = Arc::clone(&self.inner.next_subscriber_id);
306        Source::from_materialized_factory(move |_materializer| {
307            let id = next_subscriber_id.fetch_add(1, Ordering::Relaxed);
308            let slot = SubscriptionSlot::new(id, actor.clone(), Arc::clone(&shared.parked_slots));
309            subscription.register_slot(Arc::clone(&slot), id)?;
310            let stream: BoxStream<T> = Box::new(SubscriptionChangesStream {
311                shared: Arc::clone(&shared),
312                slot,
313                pending: VecDeque::new(),
314                terminated: false,
315            });
316            Ok((stream, NotUsed))
317        })
318    }
319
320    #[doc(hidden)]
321    pub fn __benchmark_changes(&self) -> StreamResult<SubscriptionBenchmarkStream<T>> {
322        let id = self
323            .inner
324            .next_subscriber_id
325            .fetch_add(1, Ordering::Relaxed);
326        let slot = SubscriptionSlot::new(
327            id,
328            self.inner.actor.clone(),
329            Arc::clone(&self.inner.shared.parked_slots),
330        );
331        self.register_slot(Arc::clone(&slot), id)?;
332        Ok(SubscriptionBenchmarkStream {
333            shared: Arc::clone(&self.inner.shared),
334            slot,
335            pending: VecDeque::new(),
336            terminated: false,
337        })
338    }
339}
340
341impl<T: Send + Sync + 'static> SubscriptionShared<T> {
342    fn begin_write(&self) -> StreamResult<WritePermit<'_>> {
343        if self.lifecycle.load(Ordering::Acquire) != STATE_OPEN {
344            return Err(closed_error());
345        }
346        self.active_writers.fetch_add(1, Ordering::AcqRel);
347        if self.lifecycle.load(Ordering::Acquire) == STATE_OPEN {
348            Ok(WritePermit {
349                active_writers: &self.active_writers,
350            })
351        } else {
352            self.active_writers.fetch_sub(1, Ordering::AcqRel);
353            Err(closed_error())
354        }
355    }
356
357    fn claim_sequence(&self) -> u64 {
358        self.next_sequence.fetch_add(1, Ordering::AcqRel) + 1
359    }
360
361    fn wait_publish_turn(&self, sequence: u64) {
362        let mut spins = 0_u32;
363        while self.published_sequence.load(Ordering::Acquire) + 1 != sequence {
364            spins = spins.wrapping_add(1);
365            if spins < 64 {
366                hint::spin_loop();
367            } else {
368                thread::yield_now();
369            }
370        }
371    }
372
373    fn wait_for_ring_capacity(&self, sequence: u64) {
374        if self.overflow != SubscriptionOverflow::Backpressure {
375            return;
376        }
377        let mut guard = self
378            .ring
379            .space_lock
380            .lock()
381            .unwrap_or_else(|poison| poison.into_inner());
382        while self.sequence_would_overflow(sequence) {
383            self.ring.space_waiters.fetch_add(1, Ordering::AcqRel);
384            if !self.sequence_would_overflow(sequence) {
385                self.ring.space_waiters.fetch_sub(1, Ordering::AcqRel);
386                break;
387            }
388            guard = self
389                .ring
390                .space_available
391                .wait_timeout(guard, SLOT_WAIT_BACKSTOP)
392                .unwrap_or_else(|poison| poison.into_inner())
393                .0;
394            self.ring.space_waiters.fetch_sub(1, Ordering::AcqRel);
395        }
396    }
397
398    fn sequence_would_overflow(&self, sequence: u64) -> bool {
399        let Some(cursor) = self.min_active_cursor() else {
400            return false;
401        };
402        sequence >= cursor.saturating_add(self.ring.logical_capacity)
403    }
404
405    fn min_active_cursor(&self) -> Option<u64> {
406        let table = self.subscribers.load();
407        table
408            .slots
409            .iter()
410            .filter_map(|slot| slot.backpressure_cursor())
411            .min()
412    }
413
414    fn apply_overflow_policy(&self, sequence: u64) -> StreamResult<()> {
415        match self.overflow {
416            SubscriptionOverflow::Backpressure => Ok(()),
417            SubscriptionOverflow::DropNew => {
418                let table = self.subscribers.load();
419                for slot in &table.slots {
420                    if slot.is_full_for(sequence, self.ring.logical_capacity) {
421                        slot.drop_new(sequence, self.ring.logical_capacity);
422                    }
423                }
424                Ok(())
425            }
426            SubscriptionOverflow::Fail => {
427                let table = self.subscribers.load();
428                let mut overflowed = false;
429                let error = overflow_error(self.ring.logical_capacity);
430                for slot in &table.slots {
431                    if slot.is_full_for(sequence, self.ring.logical_capacity) {
432                        overflowed = true;
433                        slot.fail_after(sequence, error.clone());
434                    }
435                }
436                if overflowed { Err(error) } else { Ok(()) }
437            }
438        }
439    }
440
441    fn finish_publish(&self, sequence: u64, value: Arc<T>) {
442        self.ring.store(sequence, Arc::clone(&value));
443        self.mirror.store(Arc::clone(&value));
444        self.finish_publish_after_mirror(sequence, value);
445    }
446
447    fn finish_publish_after_mirror(&self, sequence: u64, value: Arc<T>) {
448        self.published.store(Arc::new(PublishedValue {
449            sequence,
450            value: Arc::clone(&value),
451        }));
452        self.published_sequence.store(sequence, Ordering::Release);
453        if self.parked_slots.load(Ordering::Acquire) != 0 {
454            let table = self.subscribers.load();
455            for slot in &table.slots {
456                slot.wake_for_sequence(sequence);
457            }
458        }
459    }
460
461    fn wait_for_writers_to_drain(&self) {
462        while self.active_writers.load(Ordering::Acquire) != 0 {
463            thread::yield_now();
464        }
465    }
466}
467
468impl<T: Send + Sync + 'static> SubscriptionRing<T> {
469    fn new(logical_capacity: usize) -> Self {
470        let physical_capacity = logical_capacity.max(1_024).next_power_of_two();
471        let mut slots = Vec::with_capacity(physical_capacity);
472        for _ in 0..physical_capacity {
473            slots.push(SubscriptionRingSlot {
474                sequence: AtomicU64::new(0),
475                value: ArcSwapOption::empty(),
476            });
477        }
478        Self {
479            logical_capacity: logical_capacity as u64,
480            physical_capacity,
481            slots,
482            space_lock: Mutex::new(()),
483            space_available: Condvar::new(),
484            space_waiters: AtomicUsize::new(0),
485        }
486    }
487
488    fn store(&self, sequence: u64, value: Arc<T>) {
489        let slot = &self.slots[self.index(sequence)];
490        slot.value.store(Some(value));
491        slot.sequence.store(sequence, Ordering::Release);
492    }
493
494    fn load(&self, sequence: u64) -> Option<Arc<T>> {
495        let slot = &self.slots[self.index(sequence)];
496        if slot.sequence.load(Ordering::Acquire) == sequence {
497            slot.value.load_full()
498        } else {
499            None
500        }
501    }
502
503    fn has(&self, sequence: u64) -> bool {
504        let slot = &self.slots[self.index(sequence)];
505        slot.sequence.load(Ordering::Acquire) == sequence
506    }
507
508    fn oldest_available(&self, published_sequence: u64) -> u64 {
509        published_sequence
510            .saturating_sub(self.physical_capacity as u64)
511            .saturating_add(1)
512            .max(1)
513    }
514
515    fn notify_space(&self) {
516        if self.space_waiters.load(Ordering::Acquire) == 0 {
517            return;
518        }
519        let _guard = self
520            .space_lock
521            .lock()
522            .unwrap_or_else(|poison| poison.into_inner());
523        self.space_available.notify_all();
524    }
525
526    fn index(&self, sequence: u64) -> usize {
527        sequence as usize & (self.physical_capacity - 1)
528    }
529}
530
531struct WritePermit<'a> {
532    active_writers: &'a AtomicUsize,
533}
534
535impl Drop for WritePermit<'_> {
536    fn drop(&mut self) {
537        self.active_writers.fetch_sub(1, Ordering::AcqRel);
538    }
539}
540
541impl<T: Send + Sync + 'static> Drop for SubscriptionInner<T> {
542    fn drop(&mut self) {
543        self.actor.stop(None);
544    }
545}
546
547enum SubscriptionMessage<T: Send + Sync + 'static> {
548    Close {
549        final_value: Option<T>,
550        reply: Ack,
551    },
552    Subscribe {
553        id: u64,
554        slot: Arc<SubscriptionSlot<T>>,
555        reply: Ack,
556    },
557    Unsubscribe {
558        id: u64,
559    },
560}
561
562#[cfg(feature = "cluster")]
563impl<T: Send + Sync + 'static> ractor::Message for SubscriptionMessage<T> {}
564
565struct SubscriptionActor<T> {
566    _marker: PhantomData<fn() -> T>,
567}
568
569impl<T> Default for SubscriptionActor<T> {
570    fn default() -> Self {
571        Self {
572            _marker: PhantomData,
573        }
574    }
575}
576
577struct SubscriptionActorState<T: Send + Sync + 'static> {
578    shared: Arc<SubscriptionShared<T>>,
579    subscribers: HashMap<u64, Arc<SubscriptionSlot<T>>>,
580    closed: bool,
581}
582
583impl<T: Send + Sync + 'static> Actor for SubscriptionActor<T> {
584    type Msg = SubscriptionMessage<T>;
585    type State = SubscriptionActorState<T>;
586    type Arguments = SubscriptionActorState<T>;
587
588    async fn pre_start(
589        &self,
590        _myself: ActorRef<Self::Msg>,
591        args: Self::Arguments,
592    ) -> Result<Self::State, ActorProcessingErr> {
593        Ok(args)
594    }
595
596    async fn handle(
597        &self,
598        _myself: ActorRef<Self::Msg>,
599        message: Self::Msg,
600        state: &mut Self::State,
601    ) -> Result<(), ActorProcessingErr> {
602        match message {
603            SubscriptionMessage::Close { final_value, reply } => {
604                close_subscription(state, final_value);
605                let _ = reply.send(Ok(()));
606            }
607            SubscriptionMessage::Subscribe { id, slot, reply } => {
608                if state.closed || state.shared.lifecycle.load(Ordering::Acquire) == STATE_CLOSED {
609                    let published = state.shared.published.load_full();
610                    slot.complete_post_close(Arc::clone(&published.value));
611                } else {
612                    state.subscribers.insert(id, Arc::clone(&slot));
613                    publish_subscription_slot_table(state);
614                    let published = state.shared.published.load_full();
615                    slot.seed(
616                        published.sequence.saturating_add(1),
617                        Arc::clone(&published.value),
618                    );
619                }
620                let _ = reply.send(Ok(()));
621            }
622            SubscriptionMessage::Unsubscribe { id } => {
623                state.subscribers.remove(&id);
624                publish_subscription_slot_table(state);
625                state.shared.ring.notify_space();
626            }
627        }
628        Ok(())
629    }
630
631    async fn post_stop(
632        &self,
633        _myself: ActorRef<Self::Msg>,
634        state: &mut Self::State,
635    ) -> Result<(), ActorProcessingErr> {
636        if !state.closed {
637            for slot in state.subscribers.values() {
638                slot.fail_now(StreamError::ActorTerminated);
639            }
640            state.subscribers.clear();
641            publish_subscription_slot_table(state);
642            state.shared.ring.notify_space();
643        }
644        Ok(())
645    }
646}
647
648fn close_subscription<T: Send + Sync + 'static>(
649    state: &mut SubscriptionActorState<T>,
650    final_value: Option<T>,
651) {
652    if state.closed {
653        return;
654    }
655    match state.shared.lifecycle.compare_exchange(
656        STATE_OPEN,
657        STATE_CLOSING,
658        Ordering::AcqRel,
659        Ordering::Acquire,
660    ) {
661        Ok(_) => {}
662        Err(STATE_CLOSED) => {
663            state.closed = true;
664            return;
665        }
666        Err(_) => {}
667    }
668    state.shared.wait_for_writers_to_drain();
669
670    let sequence = state.shared.claim_sequence();
671    state.shared.wait_publish_turn(sequence);
672    let value = final_value
673        .map(Arc::new)
674        .unwrap_or_else(|| state.shared.mirror.load_full());
675    state.shared.mirror.store(Arc::clone(&value));
676    state.shared.published.store(Arc::new(PublishedValue {
677        sequence,
678        value: Arc::clone(&value),
679    }));
680    state
681        .shared
682        .published_sequence
683        .store(sequence, Ordering::Release);
684    state
685        .shared
686        .lifecycle
687        .store(STATE_CLOSED, Ordering::Release);
688
689    for slot in state.subscribers.values() {
690        slot.complete_with_final(sequence, Arc::clone(&value));
691    }
692    state.subscribers.clear();
693    publish_subscription_slot_table(state);
694    state.shared.ring.notify_space();
695    state.closed = true;
696}
697
698fn publish_subscription_slot_table<T: Send + Sync + 'static>(state: &SubscriptionActorState<T>) {
699    let slots = state.subscribers.values().cloned().collect::<Vec<_>>();
700    state
701        .shared
702        .subscribers
703        .store(Arc::new(SubscriptionSlotTable { slots }));
704}
705
706fn closed_error() -> StreamError {
707    StreamError::Failed("subscription is closed".into())
708}
709
710fn overflow_error(capacity: u64) -> StreamError {
711    StreamError::Failed(format!(
712        "subscription buffer overflow (max capacity was: {capacity})"
713    ))
714}
715
716fn atomic_fetch_min(target: &AtomicU64, value: u64) {
717    let mut current = target.load(Ordering::Acquire);
718    while value < current {
719        match target.compare_exchange(current, value, Ordering::AcqRel, Ordering::Acquire) {
720            Ok(_) => return,
721            Err(observed) => current = observed,
722        }
723    }
724}
725
726fn atomic_fetch_max(target: &AtomicU64, value: u64) {
727    let mut current = target.load(Ordering::Acquire);
728    while value > current {
729        match target.compare_exchange(current, value, Ordering::AcqRel, Ordering::Acquire) {
730            Ok(_) => return,
731            Err(observed) => current = observed,
732        }
733    }
734}
735
736struct SubscriptionSlot<T: Send + Sync + 'static> {
737    id: u64,
738    actor: ActorRef<SubscriptionMessage<T>>,
739    parked_count: Arc<AtomicUsize>,
740    cursor: AtomicU64,
741    active: AtomicBool,
742    parked: AtomicBool,
743    drop_from: AtomicU64,
744    drop_through: AtomicU64,
745    terminal_from: AtomicU64,
746    state: Mutex<SubscriptionSlotState<T>>,
747    available: Condvar,
748    async_available: Notify,
749}
750
751struct SubscriptionSlotState<T: Send + Sync + 'static> {
752    seed: Option<Arc<T>>,
753    terminal: Option<SubscriptionSlotTerminal<T>>,
754}
755
756#[derive(Clone)]
757enum SubscriptionSlotTerminal<T: Send + Sync + 'static> {
758    Complete {
759        final_sequence: u64,
760        final_value: Option<Arc<T>>,
761    },
762    Error {
763        after_sequence: u64,
764        error: StreamError,
765    },
766}
767
768impl<T: Send + Sync + 'static> SubscriptionSlot<T> {
769    fn new(
770        id: u64,
771        actor: ActorRef<SubscriptionMessage<T>>,
772        parked_count: Arc<AtomicUsize>,
773    ) -> Arc<Self> {
774        Arc::new(Self {
775            id,
776            actor,
777            parked_count,
778            cursor: AtomicU64::new(UNSEEDED_CURSOR),
779            active: AtomicBool::new(true),
780            parked: AtomicBool::new(false),
781            drop_from: AtomicU64::new(NO_DROP_FROM),
782            drop_through: AtomicU64::new(0),
783            terminal_from: AtomicU64::new(NO_TERMINAL_FROM),
784            state: Mutex::new(SubscriptionSlotState {
785                seed: None,
786                terminal: None,
787            }),
788            available: Condvar::new(),
789            async_available: Notify::new(),
790        })
791    }
792
793    fn lock(&self) -> MutexGuard<'_, SubscriptionSlotState<T>> {
794        self.state
795            .lock()
796            .unwrap_or_else(|poison| poison.into_inner())
797    }
798
799    fn seed(&self, next_sequence: u64, value: Arc<T>) {
800        self.cursor.store(next_sequence, Ordering::Release);
801        let mut state = self.lock();
802        state.seed = Some(value);
803        drop(state);
804        self.wake();
805    }
806
807    fn complete_post_close(&self, value: Arc<T>) {
808        self.cursor.store(0, Ordering::Release);
809        let mut state = self.lock();
810        state.seed = Some(value);
811        state.terminal = Some(SubscriptionSlotTerminal::Complete {
812            final_sequence: 0,
813            final_value: None,
814        });
815        drop(state);
816        self.terminal_from.store(0, Ordering::Release);
817        self.active.store(false, Ordering::Release);
818        self.wake();
819    }
820
821    fn complete_with_final(&self, final_sequence: u64, value: Arc<T>) {
822        let mut state = self.lock();
823        if state.terminal.is_none() {
824            state.terminal = Some(SubscriptionSlotTerminal::Complete {
825                final_sequence,
826                final_value: Some(value),
827            });
828        }
829        drop(state);
830        self.terminal_from
831            .fetch_min(final_sequence, Ordering::AcqRel);
832        self.wake();
833    }
834
835    fn fail_after(&self, after_sequence: u64, error: StreamError) {
836        let mut state = self.lock();
837        if state.terminal.is_none() {
838            state.terminal = Some(SubscriptionSlotTerminal::Error {
839                after_sequence,
840                error,
841            });
842        }
843        drop(state);
844        self.terminal_from
845            .fetch_min(after_sequence, Ordering::AcqRel);
846        self.active.store(false, Ordering::Release);
847        self.wake();
848    }
849
850    fn fail_now(&self, error: StreamError) {
851        let cursor = self.cursor.load(Ordering::Acquire);
852        let after_sequence = if cursor == UNSEEDED_CURSOR { 0 } else { cursor };
853        self.fail_after(after_sequence, error);
854    }
855
856    fn is_full_for(&self, sequence: u64, capacity: u64) -> bool {
857        if !self.active.load(Ordering::Acquire) {
858            return false;
859        }
860        let cursor = self.cursor.load(Ordering::Acquire);
861        cursor != UNSEEDED_CURSOR && sequence >= cursor.saturating_add(capacity)
862    }
863
864    fn backpressure_cursor(&self) -> Option<u64> {
865        if !self.active.load(Ordering::Acquire) {
866            return None;
867        }
868        let cursor = self.cursor.load(Ordering::Acquire);
869        (cursor != UNSEEDED_CURSOR).then_some(cursor)
870    }
871
872    fn drop_new(&self, sequence: u64, capacity: u64) {
873        let cursor = self.cursor.load(Ordering::Acquire);
874        if cursor == UNSEEDED_CURSOR {
875            return;
876        }
877        let from = cursor.saturating_add(capacity);
878        if sequence >= from {
879            atomic_fetch_min(&self.drop_from, from);
880            atomic_fetch_max(&self.drop_through, sequence);
881            self.wake();
882        }
883    }
884
885    fn skip_dropped(&self, cursor: u64) -> Option<u64> {
886        let from = self.drop_from.load(Ordering::Acquire);
887        let through = self.drop_through.load(Ordering::Acquire);
888        if from != NO_DROP_FROM && cursor >= from && cursor <= through {
889            self.drop_from.store(NO_DROP_FROM, Ordering::Release);
890            self.drop_through.store(0, Ordering::Release);
891            Some(through.saturating_add(1))
892        } else {
893            None
894        }
895    }
896
897    fn has_dropped(&self, cursor: u64) -> bool {
898        let from = self.drop_from.load(Ordering::Acquire);
899        let through = self.drop_through.load(Ordering::Acquire);
900        from != NO_DROP_FROM && cursor >= from && cursor <= through
901    }
902
903    fn terminal_blocks(&self, cursor: u64) -> bool {
904        cursor >= self.terminal_from.load(Ordering::Acquire)
905    }
906
907    fn wake(&self) {
908        if self.parked.swap(false, Ordering::AcqRel) {
909            self.parked_count.fetch_sub(1, Ordering::AcqRel);
910            self.available.notify_all();
911            self.async_available.notify_waiters();
912        }
913    }
914
915    fn park(&self) {
916        if !self.parked.swap(true, Ordering::AcqRel) {
917            self.parked_count.fetch_add(1, Ordering::AcqRel);
918        }
919    }
920
921    fn unpark(&self) {
922        if self.parked.swap(false, Ordering::AcqRel) {
923            self.parked_count.fetch_sub(1, Ordering::AcqRel);
924        }
925    }
926
927    fn wake_for_sequence(&self, sequence: u64) {
928        if self.cursor.load(Ordering::Acquire) == sequence {
929            self.wake();
930        }
931    }
932
933    fn unsubscribe(&self) {
934        self.active.store(false, Ordering::Release);
935        let _ = self
936            .actor
937            .send_message(SubscriptionMessage::Unsubscribe { id: self.id });
938    }
939}
940
941struct SubscriptionChangesStream<T: Clone + Send + Sync + 'static> {
942    shared: Arc<SubscriptionShared<T>>,
943    slot: Arc<SubscriptionSlot<T>>,
944    pending: VecDeque<Arc<T>>,
945    terminated: bool,
946}
947
948#[doc(hidden)]
949pub struct SubscriptionBenchmarkStream<T: Clone + Send + Sync + 'static> {
950    shared: Arc<SubscriptionShared<T>>,
951    slot: Arc<SubscriptionSlot<T>>,
952    pending: VecDeque<Arc<T>>,
953    terminated: bool,
954}
955
956impl<T: Clone + Send + Sync + 'static> Iterator for SubscriptionChangesStream<T> {
957    type Item = StreamResult<T>;
958
959    fn next(&mut self) -> Option<Self::Item> {
960        if self.terminated {
961            return None;
962        }
963
964        loop {
965            if let Some(value) = self.pending.pop_front() {
966                return Some(Ok(value.as_ref().clone()));
967            }
968
969            if let Some(item) = self.poll_seed_or_terminal() {
970                return item;
971            }
972
973            let cursor = self.slot.cursor.load(Ordering::Acquire);
974            if cursor == UNSEEDED_CURSOR {
975                self.wait_for_wake();
976                continue;
977            }
978
979            if let Some(next_cursor) = self.slot.skip_dropped(cursor) {
980                self.slot.cursor.store(next_cursor, Ordering::Release);
981                self.shared.ring.notify_space();
982                continue;
983            }
984
985            if let Some(value) = self.drain_available(cursor) {
986                return Some(Ok(value.as_ref().clone()));
987            }
988
989            let published = self.shared.published_sequence.load(Ordering::Acquire);
990            if cursor <= published {
991                let oldest = self.shared.ring.oldest_available(published);
992                if cursor < oldest {
993                    match self.shared.overflow {
994                        SubscriptionOverflow::DropNew => {
995                            self.slot.cursor.store(oldest, Ordering::Release);
996                            self.shared.ring.notify_space();
997                            continue;
998                        }
999                        SubscriptionOverflow::Backpressure | SubscriptionOverflow::Fail => {
1000                            self.terminated = true;
1001                            return Some(Err(overflow_error(self.shared.ring.logical_capacity)));
1002                        }
1003                    }
1004                }
1005            }
1006
1007            if current_stream_cancelled()
1008                .as_ref()
1009                .is_some_and(|cancelled| cancelled.load(Ordering::SeqCst))
1010            {
1011                self.terminated = true;
1012                return Some(Err(StreamError::Cancelled));
1013            }
1014
1015            self.wait_for_wake();
1016        }
1017    }
1018}
1019
1020impl<T: Clone + Send + Sync + 'static> SubscriptionChangesStream<T> {
1021    fn drain_available(&mut self, start_cursor: u64) -> Option<Arc<T>> {
1022        let mut cursor = start_cursor;
1023        let first = self.shared.ring.load(cursor)?;
1024        cursor = cursor.saturating_add(1);
1025        let mut drained = 1_usize;
1026
1027        while drained < SUBSCRIPTION_DRAIN_BATCH {
1028            if self.slot.has_dropped(cursor) || self.slot.terminal_blocks(cursor) {
1029                break;
1030            }
1031            let Some(value) = self.shared.ring.load(cursor) else {
1032                break;
1033            };
1034            self.pending.push_back(value);
1035            cursor = cursor.saturating_add(1);
1036            drained += 1;
1037        }
1038
1039        self.slot.cursor.store(cursor, Ordering::Release);
1040        self.shared.ring.notify_space();
1041        Some(first)
1042    }
1043
1044    fn poll_seed_or_terminal(&mut self) -> Option<Option<StreamResult<T>>> {
1045        let mut state = self.slot.lock();
1046        if let Some(seed) = state.seed.take() {
1047            return Some(Some(Ok(seed.as_ref().clone())));
1048        }
1049
1050        let cursor = self.slot.cursor.load(Ordering::Acquire);
1051        if let Some(terminal) = &mut state.terminal {
1052            match terminal {
1053                SubscriptionSlotTerminal::Complete {
1054                    final_sequence,
1055                    final_value,
1056                } => {
1057                    if cursor >= *final_sequence {
1058                        if let Some(value) = final_value.take() {
1059                            return Some(Some(Ok(value.as_ref().clone())));
1060                        }
1061                        self.terminated = true;
1062                        return Some(None);
1063                    }
1064                }
1065                SubscriptionSlotTerminal::Error {
1066                    after_sequence,
1067                    error,
1068                } => {
1069                    if cursor >= *after_sequence {
1070                        self.terminated = true;
1071                        return Some(Some(Err(error.clone())));
1072                    }
1073                }
1074            }
1075        }
1076        None
1077    }
1078
1079    fn wait_for_wake(&self) {
1080        let state = self.slot.lock();
1081        self.slot.park();
1082        fence(Ordering::SeqCst);
1083        let cursor = self.slot.cursor.load(Ordering::Acquire);
1084        if state.seed.is_some()
1085            || state.terminal.is_some()
1086            || self.slot.has_dropped(cursor)
1087            || (cursor != UNSEEDED_CURSOR && self.shared.ring.has(cursor))
1088        {
1089            self.slot.unpark();
1090            return;
1091        }
1092        let _guard = self
1093            .slot
1094            .available
1095            .wait_timeout(state, SLOT_WAIT_BACKSTOP)
1096            .unwrap_or_else(|poison| poison.into_inner())
1097            .0;
1098        self.slot.unpark();
1099    }
1100}
1101
1102impl<T: Clone + Send + Sync + 'static> Drop for SubscriptionChangesStream<T> {
1103    fn drop(&mut self) {
1104        self.slot.unsubscribe();
1105        self.shared.ring.notify_space();
1106    }
1107}
1108
1109impl<T: Clone + Send + Sync + 'static> SubscriptionBenchmarkStream<T> {
1110    #[doc(hidden)]
1111    pub async fn next(&mut self) -> Option<StreamResult<T>> {
1112        if self.terminated {
1113            return None;
1114        }
1115
1116        loop {
1117            if let Some(value) = self.pending.pop_front() {
1118                return Some(Ok(value.as_ref().clone()));
1119            }
1120
1121            if let Some(item) = self.poll_seed_or_terminal() {
1122                return item;
1123            }
1124
1125            let cursor = self.slot.cursor.load(Ordering::Acquire);
1126            if cursor == UNSEEDED_CURSOR {
1127                self.wait_for_wake().await;
1128                continue;
1129            }
1130
1131            if let Some(next_cursor) = self.slot.skip_dropped(cursor) {
1132                self.slot.cursor.store(next_cursor, Ordering::Release);
1133                self.shared.ring.notify_space();
1134                continue;
1135            }
1136
1137            if let Some(value) = self.drain_available(cursor) {
1138                return Some(Ok(value.as_ref().clone()));
1139            }
1140
1141            let published = self.shared.published_sequence.load(Ordering::Acquire);
1142            if cursor <= published {
1143                let oldest = self.shared.ring.oldest_available(published);
1144                if cursor < oldest {
1145                    match self.shared.overflow {
1146                        SubscriptionOverflow::DropNew => {
1147                            self.slot.cursor.store(oldest, Ordering::Release);
1148                            self.shared.ring.notify_space();
1149                            continue;
1150                        }
1151                        SubscriptionOverflow::Backpressure | SubscriptionOverflow::Fail => {
1152                            self.terminated = true;
1153                            return Some(Err(overflow_error(self.shared.ring.logical_capacity)));
1154                        }
1155                    }
1156                }
1157            }
1158
1159            self.wait_for_wake().await;
1160        }
1161    }
1162
1163    #[doc(hidden)]
1164    pub async fn count_changes(&mut self, target: u64) -> StreamResult<u64> {
1165        let mut count = 0_u64;
1166        while count < target {
1167            if self.terminated {
1168                return Err(StreamError::Failed(
1169                    "subscription stream ended before requested count".into(),
1170                ));
1171            }
1172
1173            if !self.pending.is_empty() {
1174                let drained = self.pending.len().min((target - count) as usize);
1175                self.pending.drain(..drained);
1176                count += drained as u64;
1177                continue;
1178            }
1179
1180            if let Some(item) = self.poll_seed_or_terminal() {
1181                match item {
1182                    Some(Ok(_)) => {
1183                        count += 1;
1184                        continue;
1185                    }
1186                    Some(Err(error)) => return Err(error),
1187                    None => {
1188                        return Err(StreamError::Failed(
1189                            "subscription stream completed before requested count".into(),
1190                        ));
1191                    }
1192                }
1193            }
1194
1195            let cursor = self.slot.cursor.load(Ordering::Acquire);
1196            if cursor == UNSEEDED_CURSOR {
1197                self.wait_for_wake().await;
1198                continue;
1199            }
1200
1201            if let Some(next_cursor) = self.slot.skip_dropped(cursor) {
1202                self.slot.cursor.store(next_cursor, Ordering::Release);
1203                self.shared.ring.notify_space();
1204                continue;
1205            }
1206
1207            if let Some(drained) = self.drain_available_count(cursor, (target - count) as usize) {
1208                count += drained as u64;
1209                continue;
1210            }
1211
1212            let published = self.shared.published_sequence.load(Ordering::Acquire);
1213            if cursor <= published {
1214                let oldest = self.shared.ring.oldest_available(published);
1215                if cursor < oldest {
1216                    return Err(overflow_error(self.shared.ring.logical_capacity));
1217                }
1218            }
1219
1220            self.wait_for_wake().await;
1221        }
1222        Ok(count)
1223    }
1224
1225    fn drain_available(&mut self, start_cursor: u64) -> Option<Arc<T>> {
1226        let mut cursor = start_cursor;
1227        let first = self.shared.ring.load(cursor)?;
1228        cursor = cursor.saturating_add(1);
1229        let mut drained = 1_usize;
1230
1231        while drained < SUBSCRIPTION_DRAIN_BATCH {
1232            if self.slot.has_dropped(cursor) || self.slot.terminal_blocks(cursor) {
1233                break;
1234            }
1235            let Some(value) = self.shared.ring.load(cursor) else {
1236                break;
1237            };
1238            self.pending.push_back(value);
1239            cursor = cursor.saturating_add(1);
1240            drained += 1;
1241        }
1242
1243        self.slot.cursor.store(cursor, Ordering::Release);
1244        self.shared.ring.notify_space();
1245        Some(first)
1246    }
1247
1248    fn drain_available_count(&mut self, start_cursor: u64, limit: usize) -> Option<usize> {
1249        if self.slot.has_dropped(start_cursor) || self.slot.terminal_blocks(start_cursor) {
1250            return None;
1251        }
1252        let published = self.shared.published_sequence.load(Ordering::Acquire);
1253        if start_cursor > published {
1254            return None;
1255        }
1256        let oldest = self.shared.ring.oldest_available(published);
1257        if start_cursor < oldest {
1258            return None;
1259        }
1260        let available = published.saturating_sub(start_cursor).saturating_add(1) as usize;
1261        let limit = limit.min(SUBSCRIPTION_DRAIN_BATCH);
1262        let drained = available.min(limit);
1263        if drained == 0 {
1264            return None;
1265        }
1266
1267        self.slot.cursor.store(
1268            start_cursor.saturating_add(drained as u64),
1269            Ordering::Release,
1270        );
1271        self.shared.ring.notify_space();
1272        Some(drained)
1273    }
1274
1275    fn poll_seed_or_terminal(&mut self) -> Option<Option<StreamResult<T>>> {
1276        let mut state = self.slot.lock();
1277        if let Some(seed) = state.seed.take() {
1278            return Some(Some(Ok(seed.as_ref().clone())));
1279        }
1280
1281        let cursor = self.slot.cursor.load(Ordering::Acquire);
1282        if let Some(terminal) = &mut state.terminal {
1283            match terminal {
1284                SubscriptionSlotTerminal::Complete {
1285                    final_sequence,
1286                    final_value,
1287                } => {
1288                    if cursor >= *final_sequence {
1289                        if let Some(value) = final_value.take() {
1290                            return Some(Some(Ok(value.as_ref().clone())));
1291                        }
1292                        self.terminated = true;
1293                        return Some(None);
1294                    }
1295                }
1296                SubscriptionSlotTerminal::Error {
1297                    after_sequence,
1298                    error,
1299                } => {
1300                    if cursor >= *after_sequence {
1301                        self.terminated = true;
1302                        return Some(Some(Err(error.clone())));
1303                    }
1304                }
1305            }
1306        }
1307        None
1308    }
1309
1310    async fn wait_for_wake(&self) {
1311        let notified = self.slot.async_available.notified();
1312        tokio::pin!(notified);
1313        notified.as_mut().enable();
1314
1315        {
1316            let state = self.slot.lock();
1317            self.slot.park();
1318            fence(Ordering::SeqCst);
1319            let cursor = self.slot.cursor.load(Ordering::Acquire);
1320            if state.seed.is_some()
1321                || state.terminal.is_some()
1322                || self.slot.has_dropped(cursor)
1323                || (cursor != UNSEEDED_CURSOR && self.shared.ring.has(cursor))
1324            {
1325                self.slot.unpark();
1326                return;
1327            }
1328        }
1329
1330        notified.await;
1331        self.slot.unpark();
1332    }
1333}
1334
1335impl<T: Clone + Send + Sync + 'static> Drop for SubscriptionBenchmarkStream<T> {
1336    fn drop(&mut self) {}
1337}
1338
1339#[cfg(test)]
1340mod tests {
1341    use super::*;
1342    use crate::{Sink, stream::Materializer};
1343    use std::{
1344        sync::{
1345            Arc,
1346            atomic::{AtomicBool, AtomicUsize},
1347        },
1348        thread,
1349        time::{Duration, Instant},
1350    };
1351
1352    fn wait<T>(completion: crate::StreamCompletion<T>) -> T {
1353        completion.wait().unwrap()
1354    }
1355
1356    fn wait_until<F>(timeout: Duration, mut condition: F) -> bool
1357    where
1358        F: FnMut() -> bool,
1359    {
1360        let deadline = Instant::now() + timeout;
1361        while Instant::now() < deadline {
1362            if condition() {
1363                return true;
1364            }
1365            thread::yield_now();
1366        }
1367        condition()
1368    }
1369
1370    #[test]
1371    fn get_snapshot_and_acked_set_read_your_writes() {
1372        let subscription = Subscription::new(1_u64, 8, SubscriptionOverflow::Backpressure).unwrap();
1373        assert_eq!(*subscription.get(), 1);
1374        assert_eq!(subscription.get_cloned(), 1);
1375        subscription.set(2).unwrap();
1376        assert_eq!(*subscription.get(), 2);
1377        assert_eq!(subscription.get_cloned(), 2);
1378        subscription.update(|value| *value + 1).unwrap();
1379        assert_eq!(*subscription.get(), 3);
1380        assert_eq!(subscription.get_cloned(), 3);
1381    }
1382
1383    #[test]
1384    fn lossless_backpressure_subscribers_see_all_changes() {
1385        const SUBSCRIBERS: usize = 4;
1386        const WRITES: u64 = 128;
1387        let subscription =
1388            Subscription::new(0_u64, 256, SubscriptionOverflow::Backpressure).unwrap();
1389        let completions = (0..SUBSCRIBERS)
1390            .map(|_| subscription.changes().run_with(Sink::collect()).unwrap())
1391            .collect::<Vec<_>>();
1392
1393        for value in 1..=WRITES {
1394            subscription.set(value).unwrap();
1395        }
1396        subscription.close_with(WRITES + 1).unwrap();
1397
1398        for completion in completions {
1399            let values = wait(completion);
1400            let expected = (0..=WRITES + 1).collect::<Vec<_>>();
1401            assert_eq!(values, expected);
1402        }
1403    }
1404
1405    #[test]
1406    fn backpressure_parks_producer_ack_until_capacity_returns() {
1407        let subscription = Subscription::new(0_u64, 1, SubscriptionOverflow::Backpressure).unwrap();
1408        let seen = Arc::new(Mutex::new(Vec::new()));
1409        let gate = Arc::new(AtomicBool::new(false));
1410        let sink_seen = Arc::clone(&seen);
1411        let sink_gate = Arc::clone(&gate);
1412        let completion = subscription
1413            .changes()
1414            .run_with(Sink::foreach(move |item| {
1415                sink_seen.lock().unwrap().push(item);
1416                while !sink_gate.load(Ordering::SeqCst) {
1417                    thread::yield_now();
1418                }
1419            }))
1420            .unwrap();
1421
1422        assert!(wait_until(Duration::from_secs(1), || {
1423            seen.lock().unwrap().as_slice() == [0]
1424        }));
1425        subscription.set(1).unwrap();
1426
1427        let producer_subscription = subscription.clone();
1428        let completed = Arc::new(AtomicBool::new(false));
1429        let producer_completed = Arc::clone(&completed);
1430        let producer = thread::spawn(move || {
1431            producer_subscription.set(2).unwrap();
1432            producer_completed.store(true, Ordering::SeqCst);
1433        });
1434
1435        assert!(!wait_until(Duration::from_millis(25), || completed
1436            .load(Ordering::SeqCst)));
1437        assert_eq!(*subscription.get(), 1);
1438        gate.store(true, Ordering::SeqCst);
1439        assert!(wait_until(Duration::from_secs(1), || completed.load(Ordering::SeqCst)));
1440        producer.join().unwrap();
1441        assert_eq!(*subscription.get(), 2);
1442        subscription.close_with(3).unwrap();
1443        wait(completion);
1444        assert_eq!(seen.lock().unwrap().as_slice(), [0, 1, 2, 3]);
1445    }
1446
1447    #[test]
1448    fn drop_new_policy_drops_only_full_subscribers() {
1449        let subscription = Subscription::new(0_u64, 1, SubscriptionOverflow::DropNew).unwrap();
1450        let seen = Arc::new(Mutex::new(Vec::new()));
1451        let gate = Arc::new(AtomicBool::new(false));
1452        let sink_seen = Arc::clone(&seen);
1453        let sink_gate = Arc::clone(&gate);
1454        let completion = subscription
1455            .changes()
1456            .run_with(Sink::foreach(move |item| {
1457                sink_seen.lock().unwrap().push(item);
1458                while !sink_gate.load(Ordering::SeqCst) {
1459                    thread::yield_now();
1460                }
1461            }))
1462            .unwrap();
1463        assert!(wait_until(Duration::from_secs(1), || {
1464            seen.lock().unwrap().as_slice() == [0]
1465        }));
1466        subscription.set(1).unwrap();
1467        subscription.set(2).unwrap();
1468        subscription.close_with(3).unwrap();
1469        gate.store(true, Ordering::SeqCst);
1470        wait(completion);
1471        assert_eq!(seen.lock().unwrap().as_slice(), [0, 1, 3]);
1472        assert_eq!(*subscription.get(), 3);
1473    }
1474
1475    #[test]
1476    fn fail_policy_fails_full_subscriber_and_reports_overflow() {
1477        let subscription = Subscription::new(0_u64, 1, SubscriptionOverflow::Fail).unwrap();
1478        let seen = Arc::new(Mutex::new(Vec::new()));
1479        let gate = Arc::new(AtomicBool::new(false));
1480        let sink_seen = Arc::clone(&seen);
1481        let sink_gate = Arc::clone(&gate);
1482        let completion = subscription
1483            .changes()
1484            .run_with(Sink::foreach(move |item| {
1485                sink_seen.lock().unwrap().push(item);
1486                while !sink_gate.load(Ordering::SeqCst) {
1487                    thread::yield_now();
1488                }
1489            }))
1490            .unwrap();
1491        assert!(wait_until(Duration::from_secs(1), || {
1492            seen.lock().unwrap().as_slice() == [0]
1493        }));
1494        subscription.set(1).unwrap();
1495        assert!(matches!(
1496            subscription.set(2),
1497            Err(StreamError::Failed(message)) if message.contains("subscription buffer overflow")
1498        ));
1499        gate.store(true, Ordering::SeqCst);
1500        assert!(matches!(
1501            completion.wait(),
1502            Err(StreamError::Failed(message)) if message.contains("subscription buffer overflow")
1503        ));
1504        assert_eq!(seen.lock().unwrap().as_slice(), [0, 1]);
1505        assert_eq!(*subscription.get(), 2);
1506    }
1507
1508    #[test]
1509    fn terminal_ordering_and_post_close_subscribe() {
1510        let subscription = Subscription::new(0_u64, 8, SubscriptionOverflow::Backpressure).unwrap();
1511        let completion = subscription.changes().run_with(Sink::collect()).unwrap();
1512        subscription.set(1).unwrap();
1513        subscription.close_with(9).unwrap();
1514        assert_eq!(wait(completion), vec![0, 1, 9]);
1515
1516        let post_close = subscription.changes().run_collect().unwrap();
1517        assert_eq!(post_close, vec![9]);
1518    }
1519
1520    #[test]
1521    fn dropping_feed_source_cancels_and_unsubscribes() {
1522        let subscription = Subscription::new(0_u64, 1, SubscriptionOverflow::Backpressure).unwrap();
1523        let pulled = Arc::new(AtomicUsize::new(0));
1524        let sink_pulled = Arc::clone(&pulled);
1525        let completion = subscription
1526            .changes()
1527            .run_with(Sink::foreach(move |_| {
1528                sink_pulled.fetch_add(1, Ordering::SeqCst);
1529            }))
1530            .unwrap();
1531        assert!(wait_until(Duration::from_secs(1), || {
1532            pulled.load(Ordering::SeqCst) == 1
1533        }));
1534        drop(completion);
1535        assert!(wait_until(Duration::from_secs(1), || subscription
1536            .set(1)
1537            .is_ok()));
1538    }
1539
1540    #[test]
1541    fn actor_death_fails_feed() {
1542        let subscription = Subscription::new(0_u64, 8, SubscriptionOverflow::Backpressure).unwrap();
1543        let materializer = Materializer::new();
1544        let completion = subscription
1545            .changes()
1546            .drop(1)
1547            .run_with_materializer(Sink::head(), &materializer)
1548            .unwrap();
1549        drop(subscription);
1550        match completion.wait() {
1551            Err(StreamError::ActorTerminated) => {}
1552            other => panic!("expected actor termination, got {other:?}"),
1553        }
1554    }
1555}