1use std::{
2 collections::{HashMap, VecDeque},
3 hint,
4 marker::PhantomData,
5 sync::{
6 Arc, Condvar, Mutex, MutexGuard,
7 atomic::{AtomicBool, AtomicU8, AtomicU64, AtomicUsize, Ordering, fence},
8 mpsc,
9 },
10 thread,
11 time::Duration,
12};
13
14use arc_swap::{ArcSwap, ArcSwapOption};
15use ractor::{Actor, ActorProcessingErr, ActorRef};
16use tokio::sync::Notify;
17
18use crate::{
19 StreamError, StreamResult,
20 actor::block_on_ractor_runtime,
21 stream::{BoxStream, NotUsed, Source, current_stream_cancelled},
22};
23
24const SLOT_WAIT_BACKSTOP: Duration = Duration::from_millis(10);
25const SUBSCRIPTION_DRAIN_BATCH: usize = 256;
26const STATE_OPEN: u8 = 0;
27const STATE_CLOSING: u8 = 1;
28const STATE_CLOSED: u8 = 2;
29const UNSEEDED_CURSOR: u64 = u64::MAX;
30const NO_DROP_FROM: u64 = u64::MAX;
31const NO_TERMINAL_FROM: u64 = u64::MAX;
32
33type Ack = mpsc::Sender<StreamResult<()>>;
34
35#[derive(Debug, Clone, Copy, PartialEq, Eq)]
37pub enum SubscriptionOverflow {
38 Backpressure,
41 DropNew,
45 Fail,
48}
49
50pub struct Subscription<T: Send + Sync + 'static> {
72 inner: Arc<SubscriptionInner<T>>,
73}
74
75struct SubscriptionInner<T: Send + Sync + 'static> {
76 actor: ActorRef<SubscriptionMessage<T>>,
77 shared: Arc<SubscriptionShared<T>>,
78 next_subscriber_id: Arc<AtomicU64>,
79}
80
81struct SubscriptionShared<T: Send + Sync + 'static> {
82 mirror: Arc<ArcSwap<T>>,
83 published: Arc<ArcSwap<PublishedValue<T>>>,
84 subscribers: Arc<ArcSwap<SubscriptionSlotTable<T>>>,
85 ring: SubscriptionRing<T>,
86 overflow: SubscriptionOverflow,
87 lifecycle: AtomicU8,
88 active_writers: AtomicUsize,
89 next_sequence: AtomicU64,
90 published_sequence: AtomicU64,
91 parked_slots: Arc<AtomicUsize>,
92}
93
94struct PublishedValue<T: Send + Sync + 'static> {
95 sequence: u64,
96 value: Arc<T>,
97}
98
99struct SubscriptionSlotTable<T: Send + Sync + 'static> {
100 slots: Vec<Arc<SubscriptionSlot<T>>>,
101}
102
103struct SubscriptionRing<T: Send + Sync + 'static> {
104 logical_capacity: u64,
105 physical_capacity: usize,
106 slots: Vec<SubscriptionRingSlot<T>>,
107 space_lock: Mutex<()>,
108 space_available: Condvar,
109 space_waiters: AtomicUsize,
110}
111
112struct SubscriptionRingSlot<T: Send + Sync + 'static> {
113 sequence: AtomicU64,
114 value: ArcSwapOption<T>,
115}
116
117impl<T: Send + Sync + 'static> Clone for Subscription<T> {
118 fn clone(&self) -> Self {
119 Self {
120 inner: Arc::clone(&self.inner),
121 }
122 }
123}
124
125impl<T: Send + Sync + 'static> Subscription<T> {
126 pub fn new(initial: T, capacity: usize, overflow: SubscriptionOverflow) -> StreamResult<Self> {
130 assert!(
131 capacity > 0,
132 "subscription capacity must be greater than zero"
133 );
134 let value = Arc::new(initial);
135 let shared = Arc::new(SubscriptionShared {
136 mirror: Arc::new(ArcSwap::from(Arc::clone(&value))),
137 published: Arc::new(ArcSwap::from_pointee(PublishedValue {
138 sequence: 0,
139 value: Arc::clone(&value),
140 })),
141 subscribers: Arc::new(ArcSwap::from_pointee(SubscriptionSlotTable {
142 slots: Vec::new(),
143 })),
144 ring: SubscriptionRing::new(capacity),
145 overflow,
146 lifecycle: AtomicU8::new(STATE_OPEN),
147 active_writers: AtomicUsize::new(0),
148 next_sequence: AtomicU64::new(0),
149 published_sequence: AtomicU64::new(0),
150 parked_slots: Arc::new(AtomicUsize::new(0)),
151 });
152 let state = SubscriptionActorState {
153 shared: Arc::clone(&shared),
154 subscribers: HashMap::new(),
155 closed: false,
156 };
157 let (actor, _handle) =
158 block_on_ractor_runtime(Actor::spawn(None, SubscriptionActor::<T>::default(), state))?
159 .map_err(|error| {
160 StreamError::Failed(format!("subscription actor failed to spawn: {error}"))
161 })?;
162 Ok(Self {
163 inner: Arc::new(SubscriptionInner {
164 actor,
165 shared,
166 next_subscriber_id: Arc::new(AtomicU64::new(1)),
167 }),
168 })
169 }
170
171 #[must_use]
173 pub fn get(&self) -> Arc<T> {
174 self.inner.shared.mirror.load_full()
175 }
176
177 #[must_use]
183 pub fn get_cloned(&self) -> T
184 where
185 T: Clone,
186 {
187 self.inner.shared.mirror.load().as_ref().clone()
188 }
189
190 pub fn set(&self, value: T) -> StreamResult<()> {
192 self.publish_set(Arc::new(value))
193 }
194
195 pub fn set_eventually(&self, value: T) -> StreamResult<()> {
200 self.publish_set(Arc::new(value))
201 }
202
203 pub fn update<F>(&self, update: F) -> StreamResult<()>
207 where
208 F: FnMut(&T) -> T + Send + 'static,
209 {
210 self.publish_update(update)
211 }
212
213 pub fn update_eventually<F>(&self, update: F) -> StreamResult<()>
217 where
218 F: FnMut(&T) -> T + Send + 'static,
219 {
220 self.publish_update(update)
221 }
222
223 pub fn close(&self) -> StreamResult<()> {
225 self.send_close(None)
226 }
227
228 pub fn close_with(&self, final_value: T) -> StreamResult<()> {
230 self.send_close(Some(final_value))
231 }
232
233 fn publish_set(&self, value: Arc<T>) -> StreamResult<()> {
234 let _permit = self.inner.shared.begin_write()?;
235 let sequence = self.inner.shared.claim_sequence();
236 self.inner.shared.wait_publish_turn(sequence);
237 self.inner.shared.wait_for_ring_capacity(sequence);
238 let overflow = self.inner.shared.apply_overflow_policy(sequence);
239 self.inner.shared.finish_publish(sequence, value);
240 overflow
241 }
242
243 fn publish_update<F>(&self, mut update: F) -> StreamResult<()>
244 where
245 F: FnMut(&T) -> T + Send + 'static,
246 {
247 let _permit = self.inner.shared.begin_write()?;
248 let sequence = self.inner.shared.claim_sequence();
249 self.inner.shared.wait_publish_turn(sequence);
250 self.inner.shared.wait_for_ring_capacity(sequence);
251 let value = loop {
252 let current = self.inner.shared.mirror.load();
253 let next = Arc::new(update(current.as_ref()));
254 let previous = self
255 .inner
256 .shared
257 .mirror
258 .compare_and_swap(&*current, Arc::clone(&next));
259 if std::ptr::eq(current.as_ref(), previous.as_ref()) {
260 break next;
261 }
262 };
263 let overflow = self.inner.shared.apply_overflow_policy(sequence);
264 self.inner.shared.ring.store(sequence, Arc::clone(&value));
265 self.inner
266 .shared
267 .finish_publish_after_mirror(sequence, value);
268 overflow
269 }
270
271 fn send_close(&self, final_value: Option<T>) -> StreamResult<()> {
272 let (reply, receiver) = mpsc::channel();
273 self.inner
274 .actor
275 .send_message(SubscriptionMessage::Close { final_value, reply })
276 .map_err(|error| StreamError::ActorAskSendFailed {
277 reason: error.to_string(),
278 })?;
279 receiver.recv().unwrap_or(Err(StreamError::ActorTerminated))
280 }
281
282 fn register_slot(&self, slot: Arc<SubscriptionSlot<T>>, id: u64) -> StreamResult<()> {
283 let (reply, receiver) = mpsc::channel();
284 self.inner
285 .actor
286 .send_message(SubscriptionMessage::Subscribe { id, slot, reply })
287 .map_err(|error| StreamError::ActorAskSendFailed {
288 reason: error.to_string(),
289 })?;
290 receiver.recv().unwrap_or(Err(StreamError::ActorTerminated))
291 }
292}
293
294impl<T: Clone + Send + Sync + 'static> Subscription<T> {
295 #[must_use]
301 pub fn changes(&self) -> Source<T> {
302 let actor = self.inner.actor.clone();
303 let subscription = self.clone();
304 let shared = Arc::clone(&self.inner.shared);
305 let next_subscriber_id = Arc::clone(&self.inner.next_subscriber_id);
306 Source::from_materialized_factory(move |_materializer| {
307 let id = next_subscriber_id.fetch_add(1, Ordering::Relaxed);
308 let slot = SubscriptionSlot::new(id, actor.clone(), Arc::clone(&shared.parked_slots));
309 subscription.register_slot(Arc::clone(&slot), id)?;
310 let stream: BoxStream<T> = Box::new(SubscriptionChangesStream {
311 shared: Arc::clone(&shared),
312 slot,
313 pending: VecDeque::new(),
314 terminated: false,
315 });
316 Ok((stream, NotUsed))
317 })
318 }
319
320 #[doc(hidden)]
321 pub fn __benchmark_changes(&self) -> StreamResult<SubscriptionBenchmarkStream<T>> {
322 let id = self
323 .inner
324 .next_subscriber_id
325 .fetch_add(1, Ordering::Relaxed);
326 let slot = SubscriptionSlot::new(
327 id,
328 self.inner.actor.clone(),
329 Arc::clone(&self.inner.shared.parked_slots),
330 );
331 self.register_slot(Arc::clone(&slot), id)?;
332 Ok(SubscriptionBenchmarkStream {
333 shared: Arc::clone(&self.inner.shared),
334 slot,
335 pending: VecDeque::new(),
336 terminated: false,
337 })
338 }
339}
340
341impl<T: Send + Sync + 'static> SubscriptionShared<T> {
342 fn begin_write(&self) -> StreamResult<WritePermit<'_>> {
343 if self.lifecycle.load(Ordering::Acquire) != STATE_OPEN {
344 return Err(closed_error());
345 }
346 self.active_writers.fetch_add(1, Ordering::AcqRel);
347 if self.lifecycle.load(Ordering::Acquire) == STATE_OPEN {
348 Ok(WritePermit {
349 active_writers: &self.active_writers,
350 })
351 } else {
352 self.active_writers.fetch_sub(1, Ordering::AcqRel);
353 Err(closed_error())
354 }
355 }
356
357 fn claim_sequence(&self) -> u64 {
358 self.next_sequence.fetch_add(1, Ordering::AcqRel) + 1
359 }
360
361 fn wait_publish_turn(&self, sequence: u64) {
362 let mut spins = 0_u32;
363 while self.published_sequence.load(Ordering::Acquire) + 1 != sequence {
364 spins = spins.wrapping_add(1);
365 if spins < 64 {
366 hint::spin_loop();
367 } else {
368 thread::yield_now();
369 }
370 }
371 }
372
373 fn wait_for_ring_capacity(&self, sequence: u64) {
374 if self.overflow != SubscriptionOverflow::Backpressure {
375 return;
376 }
377 let mut guard = self
378 .ring
379 .space_lock
380 .lock()
381 .unwrap_or_else(|poison| poison.into_inner());
382 while self.sequence_would_overflow(sequence) {
383 self.ring.space_waiters.fetch_add(1, Ordering::AcqRel);
384 if !self.sequence_would_overflow(sequence) {
385 self.ring.space_waiters.fetch_sub(1, Ordering::AcqRel);
386 break;
387 }
388 guard = self
389 .ring
390 .space_available
391 .wait_timeout(guard, SLOT_WAIT_BACKSTOP)
392 .unwrap_or_else(|poison| poison.into_inner())
393 .0;
394 self.ring.space_waiters.fetch_sub(1, Ordering::AcqRel);
395 }
396 }
397
398 fn sequence_would_overflow(&self, sequence: u64) -> bool {
399 let Some(cursor) = self.min_active_cursor() else {
400 return false;
401 };
402 sequence >= cursor.saturating_add(self.ring.logical_capacity)
403 }
404
405 fn min_active_cursor(&self) -> Option<u64> {
406 let table = self.subscribers.load();
407 table
408 .slots
409 .iter()
410 .filter_map(|slot| slot.backpressure_cursor())
411 .min()
412 }
413
414 fn apply_overflow_policy(&self, sequence: u64) -> StreamResult<()> {
415 match self.overflow {
416 SubscriptionOverflow::Backpressure => Ok(()),
417 SubscriptionOverflow::DropNew => {
418 let table = self.subscribers.load();
419 for slot in &table.slots {
420 if slot.is_full_for(sequence, self.ring.logical_capacity) {
421 slot.drop_new(sequence, self.ring.logical_capacity);
422 }
423 }
424 Ok(())
425 }
426 SubscriptionOverflow::Fail => {
427 let table = self.subscribers.load();
428 let mut overflowed = false;
429 let error = overflow_error(self.ring.logical_capacity);
430 for slot in &table.slots {
431 if slot.is_full_for(sequence, self.ring.logical_capacity) {
432 overflowed = true;
433 slot.fail_after(sequence, error.clone());
434 }
435 }
436 if overflowed { Err(error) } else { Ok(()) }
437 }
438 }
439 }
440
441 fn finish_publish(&self, sequence: u64, value: Arc<T>) {
442 self.ring.store(sequence, Arc::clone(&value));
443 self.mirror.store(Arc::clone(&value));
444 self.finish_publish_after_mirror(sequence, value);
445 }
446
447 fn finish_publish_after_mirror(&self, sequence: u64, value: Arc<T>) {
448 self.published.store(Arc::new(PublishedValue {
449 sequence,
450 value: Arc::clone(&value),
451 }));
452 self.published_sequence.store(sequence, Ordering::Release);
453 if self.parked_slots.load(Ordering::Acquire) != 0 {
454 let table = self.subscribers.load();
455 for slot in &table.slots {
456 slot.wake_for_sequence(sequence);
457 }
458 }
459 }
460
461 fn wait_for_writers_to_drain(&self) {
462 while self.active_writers.load(Ordering::Acquire) != 0 {
463 thread::yield_now();
464 }
465 }
466}
467
468impl<T: Send + Sync + 'static> SubscriptionRing<T> {
469 fn new(logical_capacity: usize) -> Self {
470 let physical_capacity = logical_capacity.max(1_024).next_power_of_two();
471 let mut slots = Vec::with_capacity(physical_capacity);
472 for _ in 0..physical_capacity {
473 slots.push(SubscriptionRingSlot {
474 sequence: AtomicU64::new(0),
475 value: ArcSwapOption::empty(),
476 });
477 }
478 Self {
479 logical_capacity: logical_capacity as u64,
480 physical_capacity,
481 slots,
482 space_lock: Mutex::new(()),
483 space_available: Condvar::new(),
484 space_waiters: AtomicUsize::new(0),
485 }
486 }
487
488 fn store(&self, sequence: u64, value: Arc<T>) {
489 let slot = &self.slots[self.index(sequence)];
490 slot.value.store(Some(value));
491 slot.sequence.store(sequence, Ordering::Release);
492 }
493
494 fn load(&self, sequence: u64) -> Option<Arc<T>> {
495 let slot = &self.slots[self.index(sequence)];
496 if slot.sequence.load(Ordering::Acquire) == sequence {
497 slot.value.load_full()
498 } else {
499 None
500 }
501 }
502
503 fn has(&self, sequence: u64) -> bool {
504 let slot = &self.slots[self.index(sequence)];
505 slot.sequence.load(Ordering::Acquire) == sequence
506 }
507
508 fn oldest_available(&self, published_sequence: u64) -> u64 {
509 published_sequence
510 .saturating_sub(self.physical_capacity as u64)
511 .saturating_add(1)
512 .max(1)
513 }
514
515 fn notify_space(&self) {
516 if self.space_waiters.load(Ordering::Acquire) == 0 {
517 return;
518 }
519 let _guard = self
520 .space_lock
521 .lock()
522 .unwrap_or_else(|poison| poison.into_inner());
523 self.space_available.notify_all();
524 }
525
526 fn index(&self, sequence: u64) -> usize {
527 sequence as usize & (self.physical_capacity - 1)
528 }
529}
530
531struct WritePermit<'a> {
532 active_writers: &'a AtomicUsize,
533}
534
535impl Drop for WritePermit<'_> {
536 fn drop(&mut self) {
537 self.active_writers.fetch_sub(1, Ordering::AcqRel);
538 }
539}
540
541impl<T: Send + Sync + 'static> Drop for SubscriptionInner<T> {
542 fn drop(&mut self) {
543 self.actor.stop(None);
544 }
545}
546
547enum SubscriptionMessage<T: Send + Sync + 'static> {
548 Close {
549 final_value: Option<T>,
550 reply: Ack,
551 },
552 Subscribe {
553 id: u64,
554 slot: Arc<SubscriptionSlot<T>>,
555 reply: Ack,
556 },
557 Unsubscribe {
558 id: u64,
559 },
560}
561
562#[cfg(feature = "cluster")]
563impl<T: Send + Sync + 'static> ractor::Message for SubscriptionMessage<T> {}
564
565struct SubscriptionActor<T> {
566 _marker: PhantomData<fn() -> T>,
567}
568
569impl<T> Default for SubscriptionActor<T> {
570 fn default() -> Self {
571 Self {
572 _marker: PhantomData,
573 }
574 }
575}
576
577struct SubscriptionActorState<T: Send + Sync + 'static> {
578 shared: Arc<SubscriptionShared<T>>,
579 subscribers: HashMap<u64, Arc<SubscriptionSlot<T>>>,
580 closed: bool,
581}
582
583impl<T: Send + Sync + 'static> Actor for SubscriptionActor<T> {
584 type Msg = SubscriptionMessage<T>;
585 type State = SubscriptionActorState<T>;
586 type Arguments = SubscriptionActorState<T>;
587
588 async fn pre_start(
589 &self,
590 _myself: ActorRef<Self::Msg>,
591 args: Self::Arguments,
592 ) -> Result<Self::State, ActorProcessingErr> {
593 Ok(args)
594 }
595
596 async fn handle(
597 &self,
598 _myself: ActorRef<Self::Msg>,
599 message: Self::Msg,
600 state: &mut Self::State,
601 ) -> Result<(), ActorProcessingErr> {
602 match message {
603 SubscriptionMessage::Close { final_value, reply } => {
604 close_subscription(state, final_value);
605 let _ = reply.send(Ok(()));
606 }
607 SubscriptionMessage::Subscribe { id, slot, reply } => {
608 if state.closed || state.shared.lifecycle.load(Ordering::Acquire) == STATE_CLOSED {
609 let published = state.shared.published.load_full();
610 slot.complete_post_close(Arc::clone(&published.value));
611 } else {
612 state.subscribers.insert(id, Arc::clone(&slot));
613 publish_subscription_slot_table(state);
614 let published = state.shared.published.load_full();
615 slot.seed(
616 published.sequence.saturating_add(1),
617 Arc::clone(&published.value),
618 );
619 }
620 let _ = reply.send(Ok(()));
621 }
622 SubscriptionMessage::Unsubscribe { id } => {
623 state.subscribers.remove(&id);
624 publish_subscription_slot_table(state);
625 state.shared.ring.notify_space();
626 }
627 }
628 Ok(())
629 }
630
631 async fn post_stop(
632 &self,
633 _myself: ActorRef<Self::Msg>,
634 state: &mut Self::State,
635 ) -> Result<(), ActorProcessingErr> {
636 if !state.closed {
637 for slot in state.subscribers.values() {
638 slot.fail_now(StreamError::ActorTerminated);
639 }
640 state.subscribers.clear();
641 publish_subscription_slot_table(state);
642 state.shared.ring.notify_space();
643 }
644 Ok(())
645 }
646}
647
648fn close_subscription<T: Send + Sync + 'static>(
649 state: &mut SubscriptionActorState<T>,
650 final_value: Option<T>,
651) {
652 if state.closed {
653 return;
654 }
655 match state.shared.lifecycle.compare_exchange(
656 STATE_OPEN,
657 STATE_CLOSING,
658 Ordering::AcqRel,
659 Ordering::Acquire,
660 ) {
661 Ok(_) => {}
662 Err(STATE_CLOSED) => {
663 state.closed = true;
664 return;
665 }
666 Err(_) => {}
667 }
668 state.shared.wait_for_writers_to_drain();
669
670 let sequence = state.shared.claim_sequence();
671 state.shared.wait_publish_turn(sequence);
672 let value = final_value
673 .map(Arc::new)
674 .unwrap_or_else(|| state.shared.mirror.load_full());
675 state.shared.mirror.store(Arc::clone(&value));
676 state.shared.published.store(Arc::new(PublishedValue {
677 sequence,
678 value: Arc::clone(&value),
679 }));
680 state
681 .shared
682 .published_sequence
683 .store(sequence, Ordering::Release);
684 state
685 .shared
686 .lifecycle
687 .store(STATE_CLOSED, Ordering::Release);
688
689 for slot in state.subscribers.values() {
690 slot.complete_with_final(sequence, Arc::clone(&value));
691 }
692 state.subscribers.clear();
693 publish_subscription_slot_table(state);
694 state.shared.ring.notify_space();
695 state.closed = true;
696}
697
698fn publish_subscription_slot_table<T: Send + Sync + 'static>(state: &SubscriptionActorState<T>) {
699 let slots = state.subscribers.values().cloned().collect::<Vec<_>>();
700 state
701 .shared
702 .subscribers
703 .store(Arc::new(SubscriptionSlotTable { slots }));
704}
705
706fn closed_error() -> StreamError {
707 StreamError::Failed("subscription is closed".into())
708}
709
710fn overflow_error(capacity: u64) -> StreamError {
711 StreamError::Failed(format!(
712 "subscription buffer overflow (max capacity was: {capacity})"
713 ))
714}
715
716fn atomic_fetch_min(target: &AtomicU64, value: u64) {
717 let mut current = target.load(Ordering::Acquire);
718 while value < current {
719 match target.compare_exchange(current, value, Ordering::AcqRel, Ordering::Acquire) {
720 Ok(_) => return,
721 Err(observed) => current = observed,
722 }
723 }
724}
725
726fn atomic_fetch_max(target: &AtomicU64, value: u64) {
727 let mut current = target.load(Ordering::Acquire);
728 while value > current {
729 match target.compare_exchange(current, value, Ordering::AcqRel, Ordering::Acquire) {
730 Ok(_) => return,
731 Err(observed) => current = observed,
732 }
733 }
734}
735
736struct SubscriptionSlot<T: Send + Sync + 'static> {
737 id: u64,
738 actor: ActorRef<SubscriptionMessage<T>>,
739 parked_count: Arc<AtomicUsize>,
740 cursor: AtomicU64,
741 active: AtomicBool,
742 parked: AtomicBool,
743 drop_from: AtomicU64,
744 drop_through: AtomicU64,
745 terminal_from: AtomicU64,
746 state: Mutex<SubscriptionSlotState<T>>,
747 available: Condvar,
748 async_available: Notify,
749}
750
751struct SubscriptionSlotState<T: Send + Sync + 'static> {
752 seed: Option<Arc<T>>,
753 terminal: Option<SubscriptionSlotTerminal<T>>,
754}
755
756#[derive(Clone)]
757enum SubscriptionSlotTerminal<T: Send + Sync + 'static> {
758 Complete {
759 final_sequence: u64,
760 final_value: Option<Arc<T>>,
761 },
762 Error {
763 after_sequence: u64,
764 error: StreamError,
765 },
766}
767
768impl<T: Send + Sync + 'static> SubscriptionSlot<T> {
769 fn new(
770 id: u64,
771 actor: ActorRef<SubscriptionMessage<T>>,
772 parked_count: Arc<AtomicUsize>,
773 ) -> Arc<Self> {
774 Arc::new(Self {
775 id,
776 actor,
777 parked_count,
778 cursor: AtomicU64::new(UNSEEDED_CURSOR),
779 active: AtomicBool::new(true),
780 parked: AtomicBool::new(false),
781 drop_from: AtomicU64::new(NO_DROP_FROM),
782 drop_through: AtomicU64::new(0),
783 terminal_from: AtomicU64::new(NO_TERMINAL_FROM),
784 state: Mutex::new(SubscriptionSlotState {
785 seed: None,
786 terminal: None,
787 }),
788 available: Condvar::new(),
789 async_available: Notify::new(),
790 })
791 }
792
793 fn lock(&self) -> MutexGuard<'_, SubscriptionSlotState<T>> {
794 self.state
795 .lock()
796 .unwrap_or_else(|poison| poison.into_inner())
797 }
798
799 fn seed(&self, next_sequence: u64, value: Arc<T>) {
800 self.cursor.store(next_sequence, Ordering::Release);
801 let mut state = self.lock();
802 state.seed = Some(value);
803 drop(state);
804 self.wake();
805 }
806
807 fn complete_post_close(&self, value: Arc<T>) {
808 self.cursor.store(0, Ordering::Release);
809 let mut state = self.lock();
810 state.seed = Some(value);
811 state.terminal = Some(SubscriptionSlotTerminal::Complete {
812 final_sequence: 0,
813 final_value: None,
814 });
815 drop(state);
816 self.terminal_from.store(0, Ordering::Release);
817 self.active.store(false, Ordering::Release);
818 self.wake();
819 }
820
821 fn complete_with_final(&self, final_sequence: u64, value: Arc<T>) {
822 let mut state = self.lock();
823 if state.terminal.is_none() {
824 state.terminal = Some(SubscriptionSlotTerminal::Complete {
825 final_sequence,
826 final_value: Some(value),
827 });
828 }
829 drop(state);
830 self.terminal_from
831 .fetch_min(final_sequence, Ordering::AcqRel);
832 self.wake();
833 }
834
835 fn fail_after(&self, after_sequence: u64, error: StreamError) {
836 let mut state = self.lock();
837 if state.terminal.is_none() {
838 state.terminal = Some(SubscriptionSlotTerminal::Error {
839 after_sequence,
840 error,
841 });
842 }
843 drop(state);
844 self.terminal_from
845 .fetch_min(after_sequence, Ordering::AcqRel);
846 self.active.store(false, Ordering::Release);
847 self.wake();
848 }
849
850 fn fail_now(&self, error: StreamError) {
851 let cursor = self.cursor.load(Ordering::Acquire);
852 let after_sequence = if cursor == UNSEEDED_CURSOR { 0 } else { cursor };
853 self.fail_after(after_sequence, error);
854 }
855
856 fn is_full_for(&self, sequence: u64, capacity: u64) -> bool {
857 if !self.active.load(Ordering::Acquire) {
858 return false;
859 }
860 let cursor = self.cursor.load(Ordering::Acquire);
861 cursor != UNSEEDED_CURSOR && sequence >= cursor.saturating_add(capacity)
862 }
863
864 fn backpressure_cursor(&self) -> Option<u64> {
865 if !self.active.load(Ordering::Acquire) {
866 return None;
867 }
868 let cursor = self.cursor.load(Ordering::Acquire);
869 (cursor != UNSEEDED_CURSOR).then_some(cursor)
870 }
871
872 fn drop_new(&self, sequence: u64, capacity: u64) {
873 let cursor = self.cursor.load(Ordering::Acquire);
874 if cursor == UNSEEDED_CURSOR {
875 return;
876 }
877 let from = cursor.saturating_add(capacity);
878 if sequence >= from {
879 atomic_fetch_min(&self.drop_from, from);
880 atomic_fetch_max(&self.drop_through, sequence);
881 self.wake();
882 }
883 }
884
885 fn skip_dropped(&self, cursor: u64) -> Option<u64> {
886 let from = self.drop_from.load(Ordering::Acquire);
887 let through = self.drop_through.load(Ordering::Acquire);
888 if from != NO_DROP_FROM && cursor >= from && cursor <= through {
889 self.drop_from.store(NO_DROP_FROM, Ordering::Release);
890 self.drop_through.store(0, Ordering::Release);
891 Some(through.saturating_add(1))
892 } else {
893 None
894 }
895 }
896
897 fn has_dropped(&self, cursor: u64) -> bool {
898 let from = self.drop_from.load(Ordering::Acquire);
899 let through = self.drop_through.load(Ordering::Acquire);
900 from != NO_DROP_FROM && cursor >= from && cursor <= through
901 }
902
903 fn terminal_blocks(&self, cursor: u64) -> bool {
904 cursor >= self.terminal_from.load(Ordering::Acquire)
905 }
906
907 fn wake(&self) {
908 if self.parked.swap(false, Ordering::AcqRel) {
909 self.parked_count.fetch_sub(1, Ordering::AcqRel);
910 self.available.notify_all();
911 self.async_available.notify_waiters();
912 }
913 }
914
915 fn park(&self) {
916 if !self.parked.swap(true, Ordering::AcqRel) {
917 self.parked_count.fetch_add(1, Ordering::AcqRel);
918 }
919 }
920
921 fn unpark(&self) {
922 if self.parked.swap(false, Ordering::AcqRel) {
923 self.parked_count.fetch_sub(1, Ordering::AcqRel);
924 }
925 }
926
927 fn wake_for_sequence(&self, sequence: u64) {
928 if self.cursor.load(Ordering::Acquire) == sequence {
929 self.wake();
930 }
931 }
932
933 fn unsubscribe(&self) {
934 self.active.store(false, Ordering::Release);
935 let _ = self
936 .actor
937 .send_message(SubscriptionMessage::Unsubscribe { id: self.id });
938 }
939}
940
941struct SubscriptionChangesStream<T: Clone + Send + Sync + 'static> {
942 shared: Arc<SubscriptionShared<T>>,
943 slot: Arc<SubscriptionSlot<T>>,
944 pending: VecDeque<Arc<T>>,
945 terminated: bool,
946}
947
948#[doc(hidden)]
949pub struct SubscriptionBenchmarkStream<T: Clone + Send + Sync + 'static> {
950 shared: Arc<SubscriptionShared<T>>,
951 slot: Arc<SubscriptionSlot<T>>,
952 pending: VecDeque<Arc<T>>,
953 terminated: bool,
954}
955
956impl<T: Clone + Send + Sync + 'static> Iterator for SubscriptionChangesStream<T> {
957 type Item = StreamResult<T>;
958
959 fn next(&mut self) -> Option<Self::Item> {
960 if self.terminated {
961 return None;
962 }
963
964 loop {
965 if let Some(value) = self.pending.pop_front() {
966 return Some(Ok(value.as_ref().clone()));
967 }
968
969 if let Some(item) = self.poll_seed_or_terminal() {
970 return item;
971 }
972
973 let cursor = self.slot.cursor.load(Ordering::Acquire);
974 if cursor == UNSEEDED_CURSOR {
975 self.wait_for_wake();
976 continue;
977 }
978
979 if let Some(next_cursor) = self.slot.skip_dropped(cursor) {
980 self.slot.cursor.store(next_cursor, Ordering::Release);
981 self.shared.ring.notify_space();
982 continue;
983 }
984
985 if let Some(value) = self.drain_available(cursor) {
986 return Some(Ok(value.as_ref().clone()));
987 }
988
989 let published = self.shared.published_sequence.load(Ordering::Acquire);
990 if cursor <= published {
991 let oldest = self.shared.ring.oldest_available(published);
992 if cursor < oldest {
993 match self.shared.overflow {
994 SubscriptionOverflow::DropNew => {
995 self.slot.cursor.store(oldest, Ordering::Release);
996 self.shared.ring.notify_space();
997 continue;
998 }
999 SubscriptionOverflow::Backpressure | SubscriptionOverflow::Fail => {
1000 self.terminated = true;
1001 return Some(Err(overflow_error(self.shared.ring.logical_capacity)));
1002 }
1003 }
1004 }
1005 }
1006
1007 if current_stream_cancelled()
1008 .as_ref()
1009 .is_some_and(|cancelled| cancelled.load(Ordering::SeqCst))
1010 {
1011 self.terminated = true;
1012 return Some(Err(StreamError::Cancelled));
1013 }
1014
1015 self.wait_for_wake();
1016 }
1017 }
1018}
1019
1020impl<T: Clone + Send + Sync + 'static> SubscriptionChangesStream<T> {
1021 fn drain_available(&mut self, start_cursor: u64) -> Option<Arc<T>> {
1022 let mut cursor = start_cursor;
1023 let first = self.shared.ring.load(cursor)?;
1024 cursor = cursor.saturating_add(1);
1025 let mut drained = 1_usize;
1026
1027 while drained < SUBSCRIPTION_DRAIN_BATCH {
1028 if self.slot.has_dropped(cursor) || self.slot.terminal_blocks(cursor) {
1029 break;
1030 }
1031 let Some(value) = self.shared.ring.load(cursor) else {
1032 break;
1033 };
1034 self.pending.push_back(value);
1035 cursor = cursor.saturating_add(1);
1036 drained += 1;
1037 }
1038
1039 self.slot.cursor.store(cursor, Ordering::Release);
1040 self.shared.ring.notify_space();
1041 Some(first)
1042 }
1043
1044 fn poll_seed_or_terminal(&mut self) -> Option<Option<StreamResult<T>>> {
1045 let mut state = self.slot.lock();
1046 if let Some(seed) = state.seed.take() {
1047 return Some(Some(Ok(seed.as_ref().clone())));
1048 }
1049
1050 let cursor = self.slot.cursor.load(Ordering::Acquire);
1051 if let Some(terminal) = &mut state.terminal {
1052 match terminal {
1053 SubscriptionSlotTerminal::Complete {
1054 final_sequence,
1055 final_value,
1056 } => {
1057 if cursor >= *final_sequence {
1058 if let Some(value) = final_value.take() {
1059 return Some(Some(Ok(value.as_ref().clone())));
1060 }
1061 self.terminated = true;
1062 return Some(None);
1063 }
1064 }
1065 SubscriptionSlotTerminal::Error {
1066 after_sequence,
1067 error,
1068 } => {
1069 if cursor >= *after_sequence {
1070 self.terminated = true;
1071 return Some(Some(Err(error.clone())));
1072 }
1073 }
1074 }
1075 }
1076 None
1077 }
1078
1079 fn wait_for_wake(&self) {
1080 let state = self.slot.lock();
1081 self.slot.park();
1082 fence(Ordering::SeqCst);
1083 let cursor = self.slot.cursor.load(Ordering::Acquire);
1084 if state.seed.is_some()
1085 || state.terminal.is_some()
1086 || self.slot.has_dropped(cursor)
1087 || (cursor != UNSEEDED_CURSOR && self.shared.ring.has(cursor))
1088 {
1089 self.slot.unpark();
1090 return;
1091 }
1092 let _guard = self
1093 .slot
1094 .available
1095 .wait_timeout(state, SLOT_WAIT_BACKSTOP)
1096 .unwrap_or_else(|poison| poison.into_inner())
1097 .0;
1098 self.slot.unpark();
1099 }
1100}
1101
1102impl<T: Clone + Send + Sync + 'static> Drop for SubscriptionChangesStream<T> {
1103 fn drop(&mut self) {
1104 self.slot.unsubscribe();
1105 self.shared.ring.notify_space();
1106 }
1107}
1108
1109impl<T: Clone + Send + Sync + 'static> SubscriptionBenchmarkStream<T> {
1110 #[doc(hidden)]
1111 pub async fn next(&mut self) -> Option<StreamResult<T>> {
1112 if self.terminated {
1113 return None;
1114 }
1115
1116 loop {
1117 if let Some(value) = self.pending.pop_front() {
1118 return Some(Ok(value.as_ref().clone()));
1119 }
1120
1121 if let Some(item) = self.poll_seed_or_terminal() {
1122 return item;
1123 }
1124
1125 let cursor = self.slot.cursor.load(Ordering::Acquire);
1126 if cursor == UNSEEDED_CURSOR {
1127 self.wait_for_wake().await;
1128 continue;
1129 }
1130
1131 if let Some(next_cursor) = self.slot.skip_dropped(cursor) {
1132 self.slot.cursor.store(next_cursor, Ordering::Release);
1133 self.shared.ring.notify_space();
1134 continue;
1135 }
1136
1137 if let Some(value) = self.drain_available(cursor) {
1138 return Some(Ok(value.as_ref().clone()));
1139 }
1140
1141 let published = self.shared.published_sequence.load(Ordering::Acquire);
1142 if cursor <= published {
1143 let oldest = self.shared.ring.oldest_available(published);
1144 if cursor < oldest {
1145 match self.shared.overflow {
1146 SubscriptionOverflow::DropNew => {
1147 self.slot.cursor.store(oldest, Ordering::Release);
1148 self.shared.ring.notify_space();
1149 continue;
1150 }
1151 SubscriptionOverflow::Backpressure | SubscriptionOverflow::Fail => {
1152 self.terminated = true;
1153 return Some(Err(overflow_error(self.shared.ring.logical_capacity)));
1154 }
1155 }
1156 }
1157 }
1158
1159 self.wait_for_wake().await;
1160 }
1161 }
1162
1163 #[doc(hidden)]
1164 pub async fn count_changes(&mut self, target: u64) -> StreamResult<u64> {
1165 let mut count = 0_u64;
1166 while count < target {
1167 if self.terminated {
1168 return Err(StreamError::Failed(
1169 "subscription stream ended before requested count".into(),
1170 ));
1171 }
1172
1173 if !self.pending.is_empty() {
1174 let drained = self.pending.len().min((target - count) as usize);
1175 self.pending.drain(..drained);
1176 count += drained as u64;
1177 continue;
1178 }
1179
1180 if let Some(item) = self.poll_seed_or_terminal() {
1181 match item {
1182 Some(Ok(_)) => {
1183 count += 1;
1184 continue;
1185 }
1186 Some(Err(error)) => return Err(error),
1187 None => {
1188 return Err(StreamError::Failed(
1189 "subscription stream completed before requested count".into(),
1190 ));
1191 }
1192 }
1193 }
1194
1195 let cursor = self.slot.cursor.load(Ordering::Acquire);
1196 if cursor == UNSEEDED_CURSOR {
1197 self.wait_for_wake().await;
1198 continue;
1199 }
1200
1201 if let Some(next_cursor) = self.slot.skip_dropped(cursor) {
1202 self.slot.cursor.store(next_cursor, Ordering::Release);
1203 self.shared.ring.notify_space();
1204 continue;
1205 }
1206
1207 if let Some(drained) = self.drain_available_count(cursor, (target - count) as usize) {
1208 count += drained as u64;
1209 continue;
1210 }
1211
1212 let published = self.shared.published_sequence.load(Ordering::Acquire);
1213 if cursor <= published {
1214 let oldest = self.shared.ring.oldest_available(published);
1215 if cursor < oldest {
1216 return Err(overflow_error(self.shared.ring.logical_capacity));
1217 }
1218 }
1219
1220 self.wait_for_wake().await;
1221 }
1222 Ok(count)
1223 }
1224
1225 fn drain_available(&mut self, start_cursor: u64) -> Option<Arc<T>> {
1226 let mut cursor = start_cursor;
1227 let first = self.shared.ring.load(cursor)?;
1228 cursor = cursor.saturating_add(1);
1229 let mut drained = 1_usize;
1230
1231 while drained < SUBSCRIPTION_DRAIN_BATCH {
1232 if self.slot.has_dropped(cursor) || self.slot.terminal_blocks(cursor) {
1233 break;
1234 }
1235 let Some(value) = self.shared.ring.load(cursor) else {
1236 break;
1237 };
1238 self.pending.push_back(value);
1239 cursor = cursor.saturating_add(1);
1240 drained += 1;
1241 }
1242
1243 self.slot.cursor.store(cursor, Ordering::Release);
1244 self.shared.ring.notify_space();
1245 Some(first)
1246 }
1247
1248 fn drain_available_count(&mut self, start_cursor: u64, limit: usize) -> Option<usize> {
1249 if self.slot.has_dropped(start_cursor) || self.slot.terminal_blocks(start_cursor) {
1250 return None;
1251 }
1252 let published = self.shared.published_sequence.load(Ordering::Acquire);
1253 if start_cursor > published {
1254 return None;
1255 }
1256 let oldest = self.shared.ring.oldest_available(published);
1257 if start_cursor < oldest {
1258 return None;
1259 }
1260 let available = published.saturating_sub(start_cursor).saturating_add(1) as usize;
1261 let limit = limit.min(SUBSCRIPTION_DRAIN_BATCH);
1262 let drained = available.min(limit);
1263 if drained == 0 {
1264 return None;
1265 }
1266
1267 self.slot.cursor.store(
1268 start_cursor.saturating_add(drained as u64),
1269 Ordering::Release,
1270 );
1271 self.shared.ring.notify_space();
1272 Some(drained)
1273 }
1274
1275 fn poll_seed_or_terminal(&mut self) -> Option<Option<StreamResult<T>>> {
1276 let mut state = self.slot.lock();
1277 if let Some(seed) = state.seed.take() {
1278 return Some(Some(Ok(seed.as_ref().clone())));
1279 }
1280
1281 let cursor = self.slot.cursor.load(Ordering::Acquire);
1282 if let Some(terminal) = &mut state.terminal {
1283 match terminal {
1284 SubscriptionSlotTerminal::Complete {
1285 final_sequence,
1286 final_value,
1287 } => {
1288 if cursor >= *final_sequence {
1289 if let Some(value) = final_value.take() {
1290 return Some(Some(Ok(value.as_ref().clone())));
1291 }
1292 self.terminated = true;
1293 return Some(None);
1294 }
1295 }
1296 SubscriptionSlotTerminal::Error {
1297 after_sequence,
1298 error,
1299 } => {
1300 if cursor >= *after_sequence {
1301 self.terminated = true;
1302 return Some(Some(Err(error.clone())));
1303 }
1304 }
1305 }
1306 }
1307 None
1308 }
1309
1310 async fn wait_for_wake(&self) {
1311 let notified = self.slot.async_available.notified();
1312 tokio::pin!(notified);
1313 notified.as_mut().enable();
1314
1315 {
1316 let state = self.slot.lock();
1317 self.slot.park();
1318 fence(Ordering::SeqCst);
1319 let cursor = self.slot.cursor.load(Ordering::Acquire);
1320 if state.seed.is_some()
1321 || state.terminal.is_some()
1322 || self.slot.has_dropped(cursor)
1323 || (cursor != UNSEEDED_CURSOR && self.shared.ring.has(cursor))
1324 {
1325 self.slot.unpark();
1326 return;
1327 }
1328 }
1329
1330 notified.await;
1331 self.slot.unpark();
1332 }
1333}
1334
1335impl<T: Clone + Send + Sync + 'static> Drop for SubscriptionBenchmarkStream<T> {
1336 fn drop(&mut self) {}
1337}
1338
1339#[cfg(test)]
1340mod tests {
1341 use super::*;
1342 use crate::{Sink, stream::Materializer};
1343 use std::{
1344 sync::{
1345 Arc,
1346 atomic::{AtomicBool, AtomicUsize},
1347 },
1348 thread,
1349 time::{Duration, Instant},
1350 };
1351
1352 fn wait<T>(completion: crate::StreamCompletion<T>) -> T {
1353 completion.wait().unwrap()
1354 }
1355
1356 fn wait_until<F>(timeout: Duration, mut condition: F) -> bool
1357 where
1358 F: FnMut() -> bool,
1359 {
1360 let deadline = Instant::now() + timeout;
1361 while Instant::now() < deadline {
1362 if condition() {
1363 return true;
1364 }
1365 thread::yield_now();
1366 }
1367 condition()
1368 }
1369
1370 #[test]
1371 fn get_snapshot_and_acked_set_read_your_writes() {
1372 let subscription = Subscription::new(1_u64, 8, SubscriptionOverflow::Backpressure).unwrap();
1373 assert_eq!(*subscription.get(), 1);
1374 assert_eq!(subscription.get_cloned(), 1);
1375 subscription.set(2).unwrap();
1376 assert_eq!(*subscription.get(), 2);
1377 assert_eq!(subscription.get_cloned(), 2);
1378 subscription.update(|value| *value + 1).unwrap();
1379 assert_eq!(*subscription.get(), 3);
1380 assert_eq!(subscription.get_cloned(), 3);
1381 }
1382
1383 #[test]
1384 fn lossless_backpressure_subscribers_see_all_changes() {
1385 const SUBSCRIBERS: usize = 4;
1386 const WRITES: u64 = 128;
1387 let subscription =
1388 Subscription::new(0_u64, 256, SubscriptionOverflow::Backpressure).unwrap();
1389 let completions = (0..SUBSCRIBERS)
1390 .map(|_| subscription.changes().run_with(Sink::collect()).unwrap())
1391 .collect::<Vec<_>>();
1392
1393 for value in 1..=WRITES {
1394 subscription.set(value).unwrap();
1395 }
1396 subscription.close_with(WRITES + 1).unwrap();
1397
1398 for completion in completions {
1399 let values = wait(completion);
1400 let expected = (0..=WRITES + 1).collect::<Vec<_>>();
1401 assert_eq!(values, expected);
1402 }
1403 }
1404
1405 #[test]
1406 fn backpressure_parks_producer_ack_until_capacity_returns() {
1407 let subscription = Subscription::new(0_u64, 1, SubscriptionOverflow::Backpressure).unwrap();
1408 let seen = Arc::new(Mutex::new(Vec::new()));
1409 let gate = Arc::new(AtomicBool::new(false));
1410 let sink_seen = Arc::clone(&seen);
1411 let sink_gate = Arc::clone(&gate);
1412 let completion = subscription
1413 .changes()
1414 .run_with(Sink::foreach(move |item| {
1415 sink_seen.lock().unwrap().push(item);
1416 while !sink_gate.load(Ordering::SeqCst) {
1417 thread::yield_now();
1418 }
1419 }))
1420 .unwrap();
1421
1422 assert!(wait_until(Duration::from_secs(1), || {
1423 seen.lock().unwrap().as_slice() == [0]
1424 }));
1425 subscription.set(1).unwrap();
1426
1427 let producer_subscription = subscription.clone();
1428 let completed = Arc::new(AtomicBool::new(false));
1429 let producer_completed = Arc::clone(&completed);
1430 let producer = thread::spawn(move || {
1431 producer_subscription.set(2).unwrap();
1432 producer_completed.store(true, Ordering::SeqCst);
1433 });
1434
1435 assert!(!wait_until(Duration::from_millis(25), || completed
1436 .load(Ordering::SeqCst)));
1437 assert_eq!(*subscription.get(), 1);
1438 gate.store(true, Ordering::SeqCst);
1439 assert!(wait_until(Duration::from_secs(1), || completed.load(Ordering::SeqCst)));
1440 producer.join().unwrap();
1441 assert_eq!(*subscription.get(), 2);
1442 subscription.close_with(3).unwrap();
1443 wait(completion);
1444 assert_eq!(seen.lock().unwrap().as_slice(), [0, 1, 2, 3]);
1445 }
1446
1447 #[test]
1448 fn drop_new_policy_drops_only_full_subscribers() {
1449 let subscription = Subscription::new(0_u64, 1, SubscriptionOverflow::DropNew).unwrap();
1450 let seen = Arc::new(Mutex::new(Vec::new()));
1451 let gate = Arc::new(AtomicBool::new(false));
1452 let sink_seen = Arc::clone(&seen);
1453 let sink_gate = Arc::clone(&gate);
1454 let completion = subscription
1455 .changes()
1456 .run_with(Sink::foreach(move |item| {
1457 sink_seen.lock().unwrap().push(item);
1458 while !sink_gate.load(Ordering::SeqCst) {
1459 thread::yield_now();
1460 }
1461 }))
1462 .unwrap();
1463 assert!(wait_until(Duration::from_secs(1), || {
1464 seen.lock().unwrap().as_slice() == [0]
1465 }));
1466 subscription.set(1).unwrap();
1467 subscription.set(2).unwrap();
1468 subscription.close_with(3).unwrap();
1469 gate.store(true, Ordering::SeqCst);
1470 wait(completion);
1471 assert_eq!(seen.lock().unwrap().as_slice(), [0, 1, 3]);
1472 assert_eq!(*subscription.get(), 3);
1473 }
1474
1475 #[test]
1476 fn fail_policy_fails_full_subscriber_and_reports_overflow() {
1477 let subscription = Subscription::new(0_u64, 1, SubscriptionOverflow::Fail).unwrap();
1478 let seen = Arc::new(Mutex::new(Vec::new()));
1479 let gate = Arc::new(AtomicBool::new(false));
1480 let sink_seen = Arc::clone(&seen);
1481 let sink_gate = Arc::clone(&gate);
1482 let completion = subscription
1483 .changes()
1484 .run_with(Sink::foreach(move |item| {
1485 sink_seen.lock().unwrap().push(item);
1486 while !sink_gate.load(Ordering::SeqCst) {
1487 thread::yield_now();
1488 }
1489 }))
1490 .unwrap();
1491 assert!(wait_until(Duration::from_secs(1), || {
1492 seen.lock().unwrap().as_slice() == [0]
1493 }));
1494 subscription.set(1).unwrap();
1495 assert!(matches!(
1496 subscription.set(2),
1497 Err(StreamError::Failed(message)) if message.contains("subscription buffer overflow")
1498 ));
1499 gate.store(true, Ordering::SeqCst);
1500 assert!(matches!(
1501 completion.wait(),
1502 Err(StreamError::Failed(message)) if message.contains("subscription buffer overflow")
1503 ));
1504 assert_eq!(seen.lock().unwrap().as_slice(), [0, 1]);
1505 assert_eq!(*subscription.get(), 2);
1506 }
1507
1508 #[test]
1509 fn terminal_ordering_and_post_close_subscribe() {
1510 let subscription = Subscription::new(0_u64, 8, SubscriptionOverflow::Backpressure).unwrap();
1511 let completion = subscription.changes().run_with(Sink::collect()).unwrap();
1512 subscription.set(1).unwrap();
1513 subscription.close_with(9).unwrap();
1514 assert_eq!(wait(completion), vec![0, 1, 9]);
1515
1516 let post_close = subscription.changes().run_collect().unwrap();
1517 assert_eq!(post_close, vec![9]);
1518 }
1519
1520 #[test]
1521 fn dropping_feed_source_cancels_and_unsubscribes() {
1522 let subscription = Subscription::new(0_u64, 1, SubscriptionOverflow::Backpressure).unwrap();
1523 let pulled = Arc::new(AtomicUsize::new(0));
1524 let sink_pulled = Arc::clone(&pulled);
1525 let completion = subscription
1526 .changes()
1527 .run_with(Sink::foreach(move |_| {
1528 sink_pulled.fetch_add(1, Ordering::SeqCst);
1529 }))
1530 .unwrap();
1531 assert!(wait_until(Duration::from_secs(1), || {
1532 pulled.load(Ordering::SeqCst) == 1
1533 }));
1534 drop(completion);
1535 assert!(wait_until(Duration::from_secs(1), || subscription
1536 .set(1)
1537 .is_ok()));
1538 }
1539
1540 #[test]
1541 fn actor_death_fails_feed() {
1542 let subscription = Subscription::new(0_u64, 8, SubscriptionOverflow::Backpressure).unwrap();
1543 let materializer = Materializer::new();
1544 let completion = subscription
1545 .changes()
1546 .drop(1)
1547 .run_with_materializer(Sink::head(), &materializer)
1548 .unwrap();
1549 drop(subscription);
1550 match completion.wait() {
1551 Err(StreamError::ActorTerminated) => {}
1552 other => panic!("expected actor termination, got {other:?}"),
1553 }
1554 }
1555}