Skip to main content

datum/concurrent/
signal.rs

1use std::{
2    collections::HashMap,
3    hint,
4    marker::PhantomData,
5    sync::{
6        Arc, Condvar, Mutex, MutexGuard,
7        atomic::{AtomicBool, AtomicU8, AtomicU64, AtomicUsize, Ordering},
8        mpsc,
9    },
10    thread,
11    time::Duration,
12};
13
14use arc_swap::ArcSwap;
15use ractor::{Actor, ActorProcessingErr, ActorRef};
16use tokio::sync::Notify;
17
18use crate::{
19    StreamError, StreamResult,
20    actor::block_on_ractor_runtime,
21    stream::{BoxStream, NotUsed, Source, current_stream_cancelled},
22};
23
24const SLOT_WAIT_BACKSTOP: Duration = Duration::from_millis(10);
25const ASYNC_SLOT_WAIT_BACKSTOP: Duration = Duration::from_micros(50);
26const STATE_OPEN: u8 = 0;
27const STATE_CLOSING: u8 = 1;
28const STATE_CLOSED: u8 = 2;
29const NO_SEQUENCE: u64 = u64::MAX;
30const SLOT_OPEN: u8 = 0;
31const SLOT_COMPLETE: u8 = 1;
32const SLOT_ERROR: u8 = 2;
33
34type Ack = mpsc::Sender<StreamResult<()>>;
35
36/// Latest-value state cell with a lock-free synchronous snapshot and a coalesced change feed.
37///
38/// `Signal` has a two-plane implementation. The data plane (`set`, `set_eventually`, `update`,
39/// and `update_eventually`) runs on the caller thread: it publishes the `ArcSwap` mirror, stores
40/// the published sequence, wakes currently parked subscribers, and then returns. The Ractor actor
41/// owns only the control plane: subscribe, unsubscribe, close, and terminal delivery.
42///
43/// `changes()` is coalesced. Each materialized subscriber keeps only a consumed sequence cursor and
44/// reads the latest global snapshot when pulled; slow subscribers skip intermediate values and
45/// writers are never backpressured. Subscribe has no get-then-subscribe gap: the actor publishes
46/// the new slot-table snapshot before the materialized stream can pull, and the first pull reads
47/// the latest global `mirror + published_sequence` snapshot.
48///
49/// `update` uses `ArcSwap::compare_and_swap` under the write publication turn. The update function
50/// may be re-invoked if a concurrent writer wins the CAS race, matching the usual Ref/atomic
51/// update contract.
52///
53/// Values are stored as immutable `Arc<T>` snapshots. Datum cannot prevent user-chosen interior
54/// mutability inside `T`; callers should treat values placed in a `Signal` as logically immutable.
55pub struct Signal<T: Send + Sync + 'static> {
56    inner: Arc<SignalInner<T>>,
57}
58
59struct SignalInner<T: Send + Sync + 'static> {
60    actor: ActorRef<SignalMessage<T>>,
61    shared: Arc<SignalShared<T>>,
62    next_subscriber_id: Arc<AtomicU64>,
63}
64
65struct SignalShared<T: Send + Sync + 'static> {
66    mirror: Arc<ArcSwap<T>>,
67    subscribers: Arc<ArcSwap<SignalSlotTable<T>>>,
68    parked_slots: Arc<AtomicUsize>,
69    lifecycle: AtomicU8,
70    active_writers: AtomicUsize,
71    next_sequence: AtomicU64,
72    published_sequence: Arc<AtomicU64>,
73    delivered_sequence: AtomicU64,
74}
75
76struct SignalSlotTable<T: Send + Sync + 'static> {
77    slots: Vec<Arc<SignalSlot<T>>>,
78}
79
80impl<T: Send + Sync + 'static> Clone for Signal<T> {
81    fn clone(&self) -> Self {
82        Self {
83            inner: Arc::clone(&self.inner),
84        }
85    }
86}
87
88impl<T: Send + Sync + 'static> Signal<T> {
89    /// Create a new signal initialized to `initial`.
90    pub fn new(initial: T) -> StreamResult<Self> {
91        let value = Arc::new(initial);
92        let shared = Arc::new(SignalShared {
93            mirror: Arc::new(ArcSwap::from(Arc::clone(&value))),
94            subscribers: Arc::new(ArcSwap::from_pointee(SignalSlotTable { slots: Vec::new() })),
95            parked_slots: Arc::new(AtomicUsize::new(0)),
96            lifecycle: AtomicU8::new(STATE_OPEN),
97            active_writers: AtomicUsize::new(0),
98            next_sequence: AtomicU64::new(0),
99            published_sequence: Arc::new(AtomicU64::new(0)),
100            delivered_sequence: AtomicU64::new(0),
101        });
102        let state = SignalActorState {
103            shared: Arc::clone(&shared),
104            subscribers: HashMap::new(),
105            closed: false,
106        };
107        let (actor, _handle) =
108            block_on_ractor_runtime(Actor::spawn(None, SignalActor::<T>::default(), state))?
109                .map_err(|error| {
110                    StreamError::Failed(format!("signal actor failed to spawn: {error}"))
111                })?;
112        Ok(Self {
113            inner: Arc::new(SignalInner {
114                actor,
115                shared,
116                next_subscriber_id: Arc::new(AtomicU64::new(1)),
117            }),
118        })
119    }
120
121    /// Return the current immutable snapshot without sending an actor message.
122    #[must_use]
123    pub fn get(&self) -> Arc<T> {
124        self.inner.shared.mirror.load_full()
125    }
126
127    /// Return a cloned value using `ArcSwap::load()`'s guarded read path.
128    ///
129    /// This avoids cloning the `Arc` itself on the hot read path. For scalar `Copy`/cheap-`Clone`
130    /// values, this is the fair equivalent of JVM refs returning the value directly; use
131    /// [`Signal::get`] when the caller wants an owned snapshot shared by `Arc`.
132    #[must_use]
133    pub fn get_cloned(&self) -> T
134    where
135        T: Clone,
136    {
137        self.inner.shared.mirror.load().as_ref().clone()
138    }
139
140    /// Set the state and return after the value is published to current subscribers.
141    pub fn set(&self, value: T) -> StreamResult<()> {
142        self.publish_set(Arc::new(value))
143    }
144
145    /// Set the state on the caller thread.
146    ///
147    /// In the two-plane implementation this publishes before returning; it remains named
148    /// `set_eventually` for API compatibility with the actor-backed first implementation.
149    pub fn set_eventually(&self, value: T) -> StreamResult<()> {
150        self.publish_set(Arc::new(value))
151    }
152
153    /// Update the state atomically and return after publication.
154    ///
155    /// The update function may be called more than once if a concurrent writer wins the CAS race.
156    pub fn update<F>(&self, update: F) -> StreamResult<()>
157    where
158        F: FnMut(&T) -> T + Send + 'static,
159    {
160        self.publish_update(update)
161    }
162
163    /// Update the state atomically on the caller thread.
164    ///
165    /// The update function may be called more than once if a concurrent writer wins the CAS race.
166    pub fn update_eventually<F>(&self, update: F) -> StreamResult<()>
167    where
168        F: FnMut(&T) -> T + Send + 'static,
169    {
170        self.publish_update(update)
171    }
172
173    /// Close the signal, re-emitting the current final snapshot to current subscribers before
174    /// completing them. Post-close subscribers receive the final snapshot and then completion.
175    pub fn close(&self) -> StreamResult<()> {
176        self.send_close(None)
177    }
178
179    /// Set a final value, then close the signal in one control-plane turn.
180    pub fn close_with(&self, final_value: T) -> StreamResult<()> {
181        self.send_close(Some(final_value))
182    }
183
184    fn publish_set(&self, value: Arc<T>) -> StreamResult<()> {
185        let _permit = self.inner.shared.begin_write("signal")?;
186        let sequence = self.inner.shared.claim_sequence();
187        self.inner.shared.wait_publish_turn(sequence);
188        self.inner.shared.mirror.store(Arc::clone(&value));
189        self.inner.shared.finish_publish(sequence);
190        Ok(())
191    }
192
193    fn publish_update<F>(&self, mut update: F) -> StreamResult<()>
194    where
195        F: FnMut(&T) -> T + Send + 'static,
196    {
197        let _permit = self.inner.shared.begin_write("signal")?;
198        let sequence = self.inner.shared.claim_sequence();
199        self.inner.shared.wait_publish_turn(sequence);
200        loop {
201            let current = self.inner.shared.mirror.load();
202            let next = Arc::new(update(current.as_ref()));
203            let previous = self
204                .inner
205                .shared
206                .mirror
207                .compare_and_swap(&*current, Arc::clone(&next));
208            if std::ptr::eq(current.as_ref(), previous.as_ref()) {
209                break;
210            }
211        }
212        self.inner.shared.finish_publish(sequence);
213        Ok(())
214    }
215
216    fn send_close(&self, final_value: Option<T>) -> StreamResult<()> {
217        let (reply, receiver) = mpsc::channel();
218        self.inner
219            .actor
220            .send_message(SignalMessage::Close { final_value, reply })
221            .map_err(|error| StreamError::ActorAskSendFailed {
222                reason: error.to_string(),
223            })?;
224        receiver.recv().unwrap_or(Err(StreamError::ActorTerminated))
225    }
226
227    fn register_slot(&self, slot: Arc<SignalSlot<T>>, id: u64) -> StreamResult<()> {
228        let (reply, receiver) = mpsc::channel();
229        self.inner
230            .actor
231            .send_message(SignalMessage::Subscribe { id, slot, reply })
232            .map_err(|error| StreamError::ActorAskSendFailed {
233                reason: error.to_string(),
234            })?;
235        receiver.recv().unwrap_or(Err(StreamError::ActorTerminated))
236    }
237}
238
239impl<T: Send + Sync + 'static> SignalShared<T> {
240    fn begin_write(&self, kind: &'static str) -> StreamResult<WritePermit<'_>> {
241        if self.lifecycle.load(Ordering::Acquire) != STATE_OPEN {
242            return Err(closed_error(kind));
243        }
244        self.active_writers.fetch_add(1, Ordering::AcqRel);
245        if self.lifecycle.load(Ordering::Acquire) == STATE_OPEN {
246            Ok(WritePermit {
247                active_writers: &self.active_writers,
248            })
249        } else {
250            self.active_writers.fetch_sub(1, Ordering::AcqRel);
251            Err(closed_error(kind))
252        }
253    }
254
255    fn claim_sequence(&self) -> u64 {
256        self.next_sequence.fetch_add(1, Ordering::AcqRel) + 1
257    }
258
259    fn wait_publish_turn(&self, sequence: u64) {
260        let mut spins = 0_u32;
261        while self.delivered_sequence.load(Ordering::Acquire) + 1 != sequence {
262            spins = spins.wrapping_add(1);
263            if spins < 64 {
264                hint::spin_loop();
265            } else {
266                thread::yield_now();
267            }
268        }
269    }
270
271    fn finish_publish(&self, sequence: u64) {
272        self.published_sequence.store(sequence, Ordering::Release);
273        if self.parked_slots.load(Ordering::Acquire) != 0 {
274            let table = self.subscribers.load();
275            for slot in &table.slots {
276                slot.publish(sequence);
277            }
278        }
279        self.delivered_sequence.store(sequence, Ordering::Release);
280    }
281
282    fn wait_for_writers_to_drain(&self) {
283        while self.active_writers.load(Ordering::Acquire) != 0 {
284            thread::yield_now();
285        }
286    }
287}
288
289struct WritePermit<'a> {
290    active_writers: &'a AtomicUsize,
291}
292
293impl Drop for WritePermit<'_> {
294    fn drop(&mut self) {
295        self.active_writers.fetch_sub(1, Ordering::AcqRel);
296    }
297}
298
299impl<T: Clone + Send + Sync + 'static> Signal<T> {
300    /// A coalesced source of the current value followed by later snapshots.
301    ///
302    /// Each materialization registers a fresh slot with the control actor. Slow subscribers may
303    /// skip intermediate values, but always observe the newest pending snapshot and the final
304    /// snapshot before completion.
305    #[must_use]
306    pub fn changes(&self) -> Source<T> {
307        let actor = self.inner.actor.clone();
308        let signal = self.clone();
309        let next_subscriber_id = Arc::clone(&self.inner.next_subscriber_id);
310        Source::from_materialized_factory(move |_materializer| {
311            let id = next_subscriber_id.fetch_add(1, Ordering::Relaxed);
312            let slot = SignalSlot::new(
313                id,
314                actor.clone(),
315                Arc::clone(&signal.inner.shared.mirror),
316                Arc::clone(&signal.inner.shared.published_sequence),
317                Arc::clone(&signal.inner.shared.parked_slots),
318            );
319            signal.register_slot(Arc::clone(&slot), id)?;
320            let stream: BoxStream<T> = Box::new(SignalChangesStream {
321                slot,
322                terminated: false,
323            });
324            Ok((stream, NotUsed))
325        })
326    }
327
328    #[doc(hidden)]
329    pub fn __benchmark_changes(&self) -> StreamResult<SignalBenchmarkStream<T>> {
330        let id = self
331            .inner
332            .next_subscriber_id
333            .fetch_add(1, Ordering::Relaxed);
334        let slot = SignalSlot::new(
335            id,
336            self.inner.actor.clone(),
337            Arc::clone(&self.inner.shared.mirror),
338            Arc::clone(&self.inner.shared.published_sequence),
339            Arc::clone(&self.inner.shared.parked_slots),
340        );
341        Ok(SignalBenchmarkStream {
342            slot,
343            terminated: false,
344        })
345    }
346}
347
348impl<T: Send + Sync + 'static> Drop for SignalInner<T> {
349    fn drop(&mut self) {
350        self.actor.stop(None);
351    }
352}
353
354enum SignalMessage<T: Send + Sync + 'static> {
355    Close {
356        final_value: Option<T>,
357        reply: Ack,
358    },
359    Subscribe {
360        id: u64,
361        slot: Arc<SignalSlot<T>>,
362        reply: Ack,
363    },
364    Unsubscribe {
365        id: u64,
366    },
367}
368
369#[cfg(feature = "cluster")]
370impl<T: Send + Sync + 'static> ractor::Message for SignalMessage<T> {}
371
372struct SignalActor<T> {
373    _marker: PhantomData<fn() -> T>,
374}
375
376impl<T> Default for SignalActor<T> {
377    fn default() -> Self {
378        Self {
379            _marker: PhantomData,
380        }
381    }
382}
383
384struct SignalActorState<T: Send + Sync + 'static> {
385    shared: Arc<SignalShared<T>>,
386    subscribers: HashMap<u64, Arc<SignalSlot<T>>>,
387    closed: bool,
388}
389
390impl<T: Send + Sync + 'static> Actor for SignalActor<T> {
391    type Msg = SignalMessage<T>;
392    type State = SignalActorState<T>;
393    type Arguments = SignalActorState<T>;
394
395    async fn pre_start(
396        &self,
397        _myself: ActorRef<Self::Msg>,
398        args: Self::Arguments,
399    ) -> Result<Self::State, ActorProcessingErr> {
400        Ok(args)
401    }
402
403    async fn handle(
404        &self,
405        _myself: ActorRef<Self::Msg>,
406        message: Self::Msg,
407        state: &mut Self::State,
408    ) -> Result<(), ActorProcessingErr> {
409        match message {
410            SignalMessage::Close { final_value, reply } => {
411                close_signal(state, final_value);
412                let _ = reply.send(Ok(()));
413            }
414            SignalMessage::Subscribe { id, slot, reply } => {
415                if state.closed || state.shared.lifecycle.load(Ordering::Acquire) == STATE_CLOSED {
416                    slot.complete_with_final();
417                } else {
418                    state.subscribers.insert(id, Arc::clone(&slot));
419                    publish_signal_slot_table(state);
420                }
421                let _ = reply.send(Ok(()));
422            }
423            SignalMessage::Unsubscribe { id } => {
424                state.subscribers.remove(&id);
425                publish_signal_slot_table(state);
426            }
427        }
428        Ok(())
429    }
430
431    async fn post_stop(
432        &self,
433        _myself: ActorRef<Self::Msg>,
434        state: &mut Self::State,
435    ) -> Result<(), ActorProcessingErr> {
436        if !state.closed {
437            for slot in state.subscribers.values() {
438                slot.fail(StreamError::ActorTerminated);
439            }
440            state.subscribers.clear();
441            publish_signal_slot_table(state);
442        }
443        Ok(())
444    }
445}
446
447fn close_signal<T: Send + Sync + 'static>(state: &mut SignalActorState<T>, final_value: Option<T>) {
448    if state.closed {
449        return;
450    }
451    match state.shared.lifecycle.compare_exchange(
452        STATE_OPEN,
453        STATE_CLOSING,
454        Ordering::AcqRel,
455        Ordering::Acquire,
456    ) {
457        Ok(_) => {}
458        Err(STATE_CLOSED) => {
459            state.closed = true;
460            return;
461        }
462        Err(_) => {}
463    }
464    state.shared.wait_for_writers_to_drain();
465
466    if let Some(final_value) = final_value {
467        let value = Arc::new(final_value);
468        let sequence = state.shared.claim_sequence();
469        state.shared.wait_publish_turn(sequence);
470        state.shared.mirror.store(Arc::clone(&value));
471        state
472            .shared
473            .published_sequence
474            .store(sequence, Ordering::Release);
475        state
476            .shared
477            .delivered_sequence
478            .store(sequence, Ordering::Release);
479    }
480    state
481        .shared
482        .lifecycle
483        .store(STATE_CLOSED, Ordering::Release);
484
485    for slot in state.subscribers.values() {
486        slot.complete_with_final();
487    }
488    state.subscribers.clear();
489    publish_signal_slot_table(state);
490    state.closed = true;
491}
492
493fn publish_signal_slot_table<T: Send + Sync + 'static>(state: &SignalActorState<T>) {
494    let slots = state.subscribers.values().cloned().collect::<Vec<_>>();
495    state
496        .shared
497        .subscribers
498        .store(Arc::new(SignalSlotTable { slots }));
499}
500
501fn closed_error(kind: &str) -> StreamError {
502    StreamError::Failed(format!("{kind} is closed"))
503}
504
505struct SignalSlot<T: Send + Sync + 'static> {
506    id: u64,
507    actor: ActorRef<SignalMessage<T>>,
508    mirror: Arc<ArcSwap<T>>,
509    published_sequence: Arc<AtomicU64>,
510    parked_slots: Arc<AtomicUsize>,
511    parked: AtomicBool,
512    consumed_sequence: AtomicU64,
513    terminal_state: AtomicU8,
514    terminal: Mutex<Option<SignalSlotTerminal>>,
515    available: Condvar,
516    async_available: Notify,
517}
518
519#[derive(Clone)]
520enum SignalSlotTerminal {
521    Complete,
522    Error(StreamError),
523}
524
525impl<T: Send + Sync + 'static> SignalSlot<T> {
526    fn new(
527        id: u64,
528        actor: ActorRef<SignalMessage<T>>,
529        mirror: Arc<ArcSwap<T>>,
530        published_sequence: Arc<AtomicU64>,
531        parked_slots: Arc<AtomicUsize>,
532    ) -> Arc<Self> {
533        Arc::new(Self {
534            id,
535            actor,
536            mirror,
537            published_sequence,
538            parked_slots,
539            parked: AtomicBool::new(false),
540            consumed_sequence: AtomicU64::new(NO_SEQUENCE),
541            terminal_state: AtomicU8::new(SLOT_OPEN),
542            terminal: Mutex::new(None),
543            available: Condvar::new(),
544            async_available: Notify::new(),
545        })
546    }
547
548    fn terminal_lock(&self) -> MutexGuard<'_, Option<SignalSlotTerminal>> {
549        self.terminal
550            .lock()
551            .unwrap_or_else(|poison| poison.into_inner())
552    }
553
554    fn publish(&self, sequence: u64) {
555        if self.terminal_state.load(Ordering::Acquire) != SLOT_OPEN {
556            return;
557        }
558
559        let was_consumed = !has_unconsumed(
560            sequence.saturating_sub(1),
561            self.consumed_sequence.load(Ordering::Acquire),
562        );
563        if was_consumed {
564            self.wake();
565        }
566    }
567
568    fn complete_with_final(&self) {
569        if self
570            .terminal_state
571            .compare_exchange(
572                SLOT_OPEN,
573                SLOT_COMPLETE,
574                Ordering::AcqRel,
575                Ordering::Acquire,
576            )
577            .is_err()
578        {
579            return;
580        }
581        *self.terminal_lock() = Some(SignalSlotTerminal::Complete);
582        self.consumed_sequence.store(NO_SEQUENCE, Ordering::Release);
583        self.wake();
584    }
585
586    fn fail(&self, error: StreamError) {
587        if self
588            .terminal_state
589            .compare_exchange(SLOT_OPEN, SLOT_ERROR, Ordering::AcqRel, Ordering::Acquire)
590            .is_err()
591        {
592            return;
593        }
594        *self.terminal_lock() = Some(SignalSlotTerminal::Error(error));
595        self.wake();
596    }
597
598    fn take_value(&self) -> Option<Arc<T>> {
599        loop {
600            let available = self.published_sequence.load(Ordering::Acquire);
601            let consumed = self.consumed_sequence.load(Ordering::Acquire);
602            if !has_unconsumed(available, consumed) {
603                return None;
604            }
605            if self
606                .consumed_sequence
607                .compare_exchange(consumed, available, Ordering::AcqRel, Ordering::Acquire)
608                .is_err()
609            {
610                continue;
611            }
612
613            return Some(self.mirror.load_full());
614        }
615    }
616
617    fn terminal(&self) -> Option<SignalSlotTerminal> {
618        if self.terminal_state.load(Ordering::Acquire) == SLOT_OPEN {
619            return None;
620        }
621        self.terminal_lock().clone()
622    }
623
624    fn park(&self) {
625        if !self.parked.swap(true, Ordering::AcqRel) {
626            self.parked_slots.fetch_add(1, Ordering::AcqRel);
627        }
628    }
629
630    fn unpark(&self) {
631        if self.parked.swap(false, Ordering::AcqRel) {
632            self.parked_slots.fetch_sub(1, Ordering::AcqRel);
633        }
634    }
635
636    fn wake(&self) {
637        self.unpark();
638        self.available.notify_all();
639        self.async_available.notify_waiters();
640    }
641
642    fn unsubscribe(&self) {
643        let _ = self
644            .actor
645            .send_message(SignalMessage::Unsubscribe { id: self.id });
646    }
647}
648
649fn has_unconsumed(available: u64, consumed: u64) -> bool {
650    available != NO_SEQUENCE && (consumed == NO_SEQUENCE || available > consumed)
651}
652
653struct SignalChangesStream<T: Clone + Send + Sync + 'static> {
654    slot: Arc<SignalSlot<T>>,
655    terminated: bool,
656}
657
658#[doc(hidden)]
659pub struct SignalBenchmarkStream<T: Clone + Send + Sync + 'static> {
660    slot: Arc<SignalSlot<T>>,
661    terminated: bool,
662}
663
664impl<T: Clone + Send + Sync + 'static> Iterator for SignalChangesStream<T> {
665    type Item = StreamResult<T>;
666
667    fn next(&mut self) -> Option<Self::Item> {
668        if self.terminated {
669            return None;
670        }
671
672        loop {
673            if let Some(value) = self.slot.take_value() {
674                return Some(Ok(value.as_ref().clone()));
675            }
676            if let Some(terminal) = self.slot.terminal() {
677                self.terminated = true;
678                return match terminal {
679                    SignalSlotTerminal::Complete => None,
680                    SignalSlotTerminal::Error(error) => Some(Err(error)),
681                };
682            }
683
684            if current_stream_cancelled()
685                .as_ref()
686                .is_some_and(|cancelled| cancelled.load(Ordering::SeqCst))
687            {
688                self.terminated = true;
689                return Some(Err(StreamError::Cancelled));
690            }
691            self.slot.park();
692            if let Some(value) = self.slot.take_value() {
693                self.slot.unpark();
694                return Some(Ok(value.as_ref().clone()));
695            }
696            if let Some(terminal) = self.slot.terminal() {
697                self.slot.unpark();
698                self.terminated = true;
699                return match terminal {
700                    SignalSlotTerminal::Complete => None,
701                    SignalSlotTerminal::Error(error) => Some(Err(error)),
702                };
703            }
704            let guard = self.slot.terminal_lock();
705            let _guard = self
706                .slot
707                .available
708                .wait_timeout(guard, SLOT_WAIT_BACKSTOP)
709                .unwrap_or_else(|poison| poison.into_inner())
710                .0;
711            self.slot.unpark();
712        }
713    }
714}
715
716impl<T: Clone + Send + Sync + 'static> Drop for SignalChangesStream<T> {
717    fn drop(&mut self) {
718        self.slot.unsubscribe();
719    }
720}
721
722impl<T: Clone + Send + Sync + 'static> SignalBenchmarkStream<T> {
723    #[doc(hidden)]
724    pub async fn next(&mut self) -> Option<StreamResult<T>> {
725        if self.terminated {
726            return None;
727        }
728
729        loop {
730            let notified = self.slot.async_available.notified();
731            tokio::pin!(notified);
732            notified.as_mut().enable();
733
734            if let Some(value) = self.slot.take_value() {
735                self.slot.unpark();
736                return Some(Ok(value.as_ref().clone()));
737            }
738            if let Some(terminal) = self.slot.terminal() {
739                self.slot.unpark();
740                self.terminated = true;
741                return match terminal {
742                    SignalSlotTerminal::Complete => None,
743                    SignalSlotTerminal::Error(error) => Some(Err(error)),
744                };
745            }
746
747            {
748                self.slot.park();
749                if let Some(value) = self.slot.take_value() {
750                    self.slot.unpark();
751                    return Some(Ok(value.as_ref().clone()));
752                }
753                if let Some(terminal) = self.slot.terminal() {
754                    self.slot.unpark();
755                    self.terminated = true;
756                    return match terminal {
757                        SignalSlotTerminal::Complete => None,
758                        SignalSlotTerminal::Error(error) => Some(Err(error)),
759                    };
760                }
761            }
762
763            let _ = tokio::time::timeout(ASYNC_SLOT_WAIT_BACKSTOP, notified.as_mut()).await;
764            self.slot.unpark();
765        }
766    }
767}
768
769impl<T: Clone + Send + Sync + 'static> Drop for SignalBenchmarkStream<T> {
770    fn drop(&mut self) {
771        self.slot.unpark();
772    }
773}
774
775#[cfg(test)]
776mod tests {
777    use super::*;
778    use crate::{Sink, stream::Materializer};
779    use std::{
780        sync::{
781            Barrier,
782            atomic::{AtomicBool, AtomicUsize},
783        },
784        thread,
785        time::{Duration, Instant},
786    };
787
788    fn wait<T>(completion: crate::StreamCompletion<T>) -> T {
789        completion.wait().unwrap()
790    }
791
792    fn wait_until<F>(timeout: Duration, mut condition: F) -> bool
793    where
794        F: FnMut() -> bool,
795    {
796        let deadline = Instant::now() + timeout;
797        while Instant::now() < deadline {
798            if condition() {
799                return true;
800            }
801            thread::yield_now();
802        }
803        condition()
804    }
805
806    #[test]
807    fn get_snapshot_and_acked_set_read_your_writes() {
808        let signal = Signal::new(1_u64).unwrap();
809        assert_eq!(*signal.get(), 1);
810        assert_eq!(signal.get_cloned(), 1);
811        signal.set(2).unwrap();
812        assert_eq!(*signal.get(), 2);
813        assert_eq!(signal.get_cloned(), 2);
814        signal.update(|value| *value + 1).unwrap();
815        assert_eq!(*signal.get(), 3);
816        assert_eq!(signal.get_cloned(), 3);
817    }
818
819    #[test]
820    fn subscribe_sees_current_then_changes() {
821        let signal = Signal::new(10_u64).unwrap();
822        let queue = signal.changes().run_with(Sink::queue()).unwrap();
823        assert_eq!(queue.pull().unwrap(), Some(10));
824        signal.set(11).unwrap();
825        assert_eq!(queue.pull().unwrap(), Some(11));
826        signal.set(12).unwrap();
827        assert_eq!(queue.pull().unwrap(), Some(12));
828    }
829
830    #[test]
831    fn subscribe_has_no_get_then_subscribe_gap_under_concurrent_sets() {
832        const RUNS: usize = 128;
833        for run in 0..RUNS {
834            let signal = Signal::new(0_u64).unwrap();
835            let barrier = Arc::new(Barrier::new(2));
836            let writer_signal = signal.clone();
837            let writer_barrier = Arc::clone(&barrier);
838            let writer = thread::spawn(move || {
839                writer_barrier.wait();
840                writer_signal.set((run + 1) as u64).unwrap();
841            });
842
843            barrier.wait();
844            let observed = signal.changes().take(2).run_with(Sink::collect()).unwrap();
845            writer.join().unwrap();
846            signal.close().unwrap();
847            let values = wait(observed);
848            assert!(!values.is_empty());
849            let expected = (run + 1) as u64;
850            assert_eq!(*signal.get(), expected);
851            assert!(
852                values.contains(&expected),
853                "subscription missed concurrent set: {values:?}"
854            );
855        }
856    }
857
858    #[test]
859    fn signal_coalesces_slow_subscriber_but_observes_final() {
860        const WRITES: u64 = 512;
861        let signal = Signal::new(0_u64).unwrap();
862        let seen = Arc::new(Mutex::new(Vec::new()));
863        let sink_seen = Arc::clone(&seen);
864        let gate = Arc::new(AtomicBool::new(false));
865        let sink_gate = Arc::clone(&gate);
866
867        let completion = signal
868            .changes()
869            .run_with(Sink::foreach(move |item| {
870                sink_seen.lock().unwrap().push(item);
871                while !sink_gate.load(Ordering::SeqCst) {
872                    thread::yield_now();
873                }
874            }))
875            .unwrap();
876
877        assert!(wait_until(Duration::from_secs(1), || {
878            !seen.lock().unwrap().is_empty()
879        }));
880
881        for value in 1..=WRITES {
882            signal.set(value).unwrap();
883        }
884        signal.close().unwrap();
885        gate.store(true, Ordering::SeqCst);
886        wait(completion);
887
888        let values = seen.lock().unwrap().clone();
889        assert!(values.len() < WRITES as usize);
890        assert_eq!(values.last().copied(), Some(WRITES));
891    }
892
893    #[test]
894    fn post_close_subscribe_yields_final_then_complete() {
895        let signal = Signal::new(1_u64).unwrap();
896        signal.close_with(9).unwrap();
897        let values = signal.changes().run_collect().unwrap();
898        assert_eq!(values, vec![9]);
899    }
900
901    #[test]
902    fn benchmark_stream_sees_seed() {
903        let signal = Signal::new(7_u64).unwrap();
904        let runtime = tokio::runtime::Runtime::new().unwrap();
905        let mut stream = signal.__benchmark_changes().unwrap();
906        let seed = runtime
907            .block_on(stream.next())
908            .expect("benchmark stream ended before seed")
909            .expect("benchmark stream failed before seed");
910        assert_eq!(seed, 7);
911    }
912
913    #[test]
914    fn benchmark_stream_sees_final_after_writes() {
915        let signal = Signal::new(0_u64).unwrap();
916        let runtime = tokio::runtime::Runtime::new().unwrap();
917        let mut stream = signal.__benchmark_changes().unwrap();
918        let seed = runtime
919            .block_on(stream.next())
920            .expect("benchmark stream ended before seed")
921            .expect("benchmark stream failed before seed");
922        assert_eq!(seed, 0);
923        for value in 1..=16 {
924            signal.set_eventually(value).unwrap();
925        }
926        let final_value = runtime.block_on(async {
927            loop {
928                let value = stream
929                    .next()
930                    .await
931                    .expect("benchmark stream ended before final")
932                    .expect("benchmark stream failed before final");
933                if value >= 16 {
934                    break value;
935                }
936            }
937        });
938        assert_eq!(final_value, 16);
939    }
940
941    #[test]
942    fn benchmark_spawned_stream_sees_final_after_ready() {
943        let signal = Signal::new(0_u64).unwrap();
944        let runtime = tokio::runtime::Runtime::new().unwrap();
945        let mut stream = signal.__benchmark_changes().unwrap();
946        let ready = Arc::new(AtomicBool::new(false));
947        let task_ready = Arc::clone(&ready);
948        let handle = runtime.spawn(async move {
949            let seed = stream
950                .next()
951                .await
952                .expect("benchmark stream ended before seed")
953                .expect("benchmark stream failed before seed");
954            assert_eq!(seed, 0);
955            task_ready.store(true, Ordering::Release);
956            loop {
957                let value = stream
958                    .next()
959                    .await
960                    .expect("benchmark stream ended before final")
961                    .expect("benchmark stream failed before final");
962                if value >= 1024 {
963                    return value;
964                }
965            }
966        });
967        runtime.block_on(async {
968            while !ready.load(Ordering::Acquire) {
969                tokio::task::yield_now().await;
970            }
971        });
972        for value in 1..=1024 {
973            signal.set_eventually(value).unwrap();
974        }
975        let final_value = runtime
976            .block_on(async { tokio::time::timeout(Duration::from_secs(1), handle).await })
977            .expect("spawned signal subscriber timed out")
978            .expect("spawned signal subscriber panicked");
979        assert_eq!(final_value, 1024);
980    }
981
982    #[test]
983    fn dropping_feed_source_cancels_cleanly() {
984        let signal = Signal::new(0_u64).unwrap();
985        let pulled = Arc::new(AtomicUsize::new(0));
986        let sink_pulled = Arc::clone(&pulled);
987        let completion = signal
988            .changes()
989            .run_with(Sink::foreach(move |_| {
990                sink_pulled.fetch_add(1, Ordering::SeqCst);
991            }))
992            .unwrap();
993        assert!(wait_until(Duration::from_secs(1), || {
994            pulled.load(Ordering::SeqCst) == 1
995        }));
996        drop(completion);
997        assert!(wait_until(Duration::from_secs(1), || signal.set(1).is_ok()));
998    }
999
1000    #[test]
1001    fn actor_death_fails_feed() {
1002        let signal = Signal::new(0_u64).unwrap();
1003        let materializer = Materializer::new();
1004        let completion = signal
1005            .changes()
1006            .drop(1)
1007            .run_with_materializer(Sink::head(), &materializer)
1008            .unwrap();
1009        drop(signal);
1010        match completion.wait() {
1011            Err(StreamError::ActorTerminated) => {}
1012            other => panic!("expected actor termination, got {other:?}"),
1013        }
1014    }
1015}