1use std::{
2 collections::HashMap,
3 hint,
4 marker::PhantomData,
5 sync::{
6 Arc, Condvar, Mutex, MutexGuard,
7 atomic::{AtomicBool, AtomicU8, AtomicU64, AtomicUsize, Ordering},
8 mpsc,
9 },
10 thread,
11 time::Duration,
12};
13
14use arc_swap::ArcSwap;
15use ractor::{Actor, ActorProcessingErr, ActorRef};
16use tokio::sync::Notify;
17
18use crate::{
19 StreamError, StreamResult,
20 actor::block_on_ractor_runtime,
21 stream::{BoxStream, NotUsed, Source, current_stream_cancelled},
22};
23
24const SLOT_WAIT_BACKSTOP: Duration = Duration::from_millis(10);
25const ASYNC_SLOT_WAIT_BACKSTOP: Duration = Duration::from_micros(50);
26const STATE_OPEN: u8 = 0;
27const STATE_CLOSING: u8 = 1;
28const STATE_CLOSED: u8 = 2;
29const NO_SEQUENCE: u64 = u64::MAX;
30const SLOT_OPEN: u8 = 0;
31const SLOT_COMPLETE: u8 = 1;
32const SLOT_ERROR: u8 = 2;
33
34type Ack = mpsc::Sender<StreamResult<()>>;
35
36pub struct Signal<T: Send + Sync + 'static> {
56 inner: Arc<SignalInner<T>>,
57}
58
59struct SignalInner<T: Send + Sync + 'static> {
60 actor: ActorRef<SignalMessage<T>>,
61 shared: Arc<SignalShared<T>>,
62 next_subscriber_id: Arc<AtomicU64>,
63}
64
65struct SignalShared<T: Send + Sync + 'static> {
66 mirror: Arc<ArcSwap<T>>,
67 subscribers: Arc<ArcSwap<SignalSlotTable<T>>>,
68 parked_slots: Arc<AtomicUsize>,
69 lifecycle: AtomicU8,
70 active_writers: AtomicUsize,
71 next_sequence: AtomicU64,
72 published_sequence: Arc<AtomicU64>,
73 delivered_sequence: AtomicU64,
74}
75
76struct SignalSlotTable<T: Send + Sync + 'static> {
77 slots: Vec<Arc<SignalSlot<T>>>,
78}
79
80impl<T: Send + Sync + 'static> Clone for Signal<T> {
81 fn clone(&self) -> Self {
82 Self {
83 inner: Arc::clone(&self.inner),
84 }
85 }
86}
87
88impl<T: Send + Sync + 'static> Signal<T> {
89 pub fn new(initial: T) -> StreamResult<Self> {
91 let value = Arc::new(initial);
92 let shared = Arc::new(SignalShared {
93 mirror: Arc::new(ArcSwap::from(Arc::clone(&value))),
94 subscribers: Arc::new(ArcSwap::from_pointee(SignalSlotTable { slots: Vec::new() })),
95 parked_slots: Arc::new(AtomicUsize::new(0)),
96 lifecycle: AtomicU8::new(STATE_OPEN),
97 active_writers: AtomicUsize::new(0),
98 next_sequence: AtomicU64::new(0),
99 published_sequence: Arc::new(AtomicU64::new(0)),
100 delivered_sequence: AtomicU64::new(0),
101 });
102 let state = SignalActorState {
103 shared: Arc::clone(&shared),
104 subscribers: HashMap::new(),
105 closed: false,
106 };
107 let (actor, _handle) =
108 block_on_ractor_runtime(Actor::spawn(None, SignalActor::<T>::default(), state))?
109 .map_err(|error| {
110 StreamError::Failed(format!("signal actor failed to spawn: {error}"))
111 })?;
112 Ok(Self {
113 inner: Arc::new(SignalInner {
114 actor,
115 shared,
116 next_subscriber_id: Arc::new(AtomicU64::new(1)),
117 }),
118 })
119 }
120
121 #[must_use]
123 pub fn get(&self) -> Arc<T> {
124 self.inner.shared.mirror.load_full()
125 }
126
127 #[must_use]
133 pub fn get_cloned(&self) -> T
134 where
135 T: Clone,
136 {
137 self.inner.shared.mirror.load().as_ref().clone()
138 }
139
140 pub fn set(&self, value: T) -> StreamResult<()> {
142 self.publish_set(Arc::new(value))
143 }
144
145 pub fn set_eventually(&self, value: T) -> StreamResult<()> {
150 self.publish_set(Arc::new(value))
151 }
152
153 pub fn update<F>(&self, update: F) -> StreamResult<()>
157 where
158 F: FnMut(&T) -> T + Send + 'static,
159 {
160 self.publish_update(update)
161 }
162
163 pub fn update_eventually<F>(&self, update: F) -> StreamResult<()>
167 where
168 F: FnMut(&T) -> T + Send + 'static,
169 {
170 self.publish_update(update)
171 }
172
173 pub fn close(&self) -> StreamResult<()> {
176 self.send_close(None)
177 }
178
179 pub fn close_with(&self, final_value: T) -> StreamResult<()> {
181 self.send_close(Some(final_value))
182 }
183
184 fn publish_set(&self, value: Arc<T>) -> StreamResult<()> {
185 let _permit = self.inner.shared.begin_write("signal")?;
186 let sequence = self.inner.shared.claim_sequence();
187 self.inner.shared.wait_publish_turn(sequence);
188 self.inner.shared.mirror.store(Arc::clone(&value));
189 self.inner.shared.finish_publish(sequence);
190 Ok(())
191 }
192
193 fn publish_update<F>(&self, mut update: F) -> StreamResult<()>
194 where
195 F: FnMut(&T) -> T + Send + 'static,
196 {
197 let _permit = self.inner.shared.begin_write("signal")?;
198 let sequence = self.inner.shared.claim_sequence();
199 self.inner.shared.wait_publish_turn(sequence);
200 loop {
201 let current = self.inner.shared.mirror.load();
202 let next = Arc::new(update(current.as_ref()));
203 let previous = self
204 .inner
205 .shared
206 .mirror
207 .compare_and_swap(&*current, Arc::clone(&next));
208 if std::ptr::eq(current.as_ref(), previous.as_ref()) {
209 break;
210 }
211 }
212 self.inner.shared.finish_publish(sequence);
213 Ok(())
214 }
215
216 fn send_close(&self, final_value: Option<T>) -> StreamResult<()> {
217 let (reply, receiver) = mpsc::channel();
218 self.inner
219 .actor
220 .send_message(SignalMessage::Close { final_value, reply })
221 .map_err(|error| StreamError::ActorAskSendFailed {
222 reason: error.to_string(),
223 })?;
224 receiver.recv().unwrap_or(Err(StreamError::ActorTerminated))
225 }
226
227 fn register_slot(&self, slot: Arc<SignalSlot<T>>, id: u64) -> StreamResult<()> {
228 let (reply, receiver) = mpsc::channel();
229 self.inner
230 .actor
231 .send_message(SignalMessage::Subscribe { id, slot, reply })
232 .map_err(|error| StreamError::ActorAskSendFailed {
233 reason: error.to_string(),
234 })?;
235 receiver.recv().unwrap_or(Err(StreamError::ActorTerminated))
236 }
237}
238
239impl<T: Send + Sync + 'static> SignalShared<T> {
240 fn begin_write(&self, kind: &'static str) -> StreamResult<WritePermit<'_>> {
241 if self.lifecycle.load(Ordering::Acquire) != STATE_OPEN {
242 return Err(closed_error(kind));
243 }
244 self.active_writers.fetch_add(1, Ordering::AcqRel);
245 if self.lifecycle.load(Ordering::Acquire) == STATE_OPEN {
246 Ok(WritePermit {
247 active_writers: &self.active_writers,
248 })
249 } else {
250 self.active_writers.fetch_sub(1, Ordering::AcqRel);
251 Err(closed_error(kind))
252 }
253 }
254
255 fn claim_sequence(&self) -> u64 {
256 self.next_sequence.fetch_add(1, Ordering::AcqRel) + 1
257 }
258
259 fn wait_publish_turn(&self, sequence: u64) {
260 let mut spins = 0_u32;
261 while self.delivered_sequence.load(Ordering::Acquire) + 1 != sequence {
262 spins = spins.wrapping_add(1);
263 if spins < 64 {
264 hint::spin_loop();
265 } else {
266 thread::yield_now();
267 }
268 }
269 }
270
271 fn finish_publish(&self, sequence: u64) {
272 self.published_sequence.store(sequence, Ordering::Release);
273 if self.parked_slots.load(Ordering::Acquire) != 0 {
274 let table = self.subscribers.load();
275 for slot in &table.slots {
276 slot.publish(sequence);
277 }
278 }
279 self.delivered_sequence.store(sequence, Ordering::Release);
280 }
281
282 fn wait_for_writers_to_drain(&self) {
283 while self.active_writers.load(Ordering::Acquire) != 0 {
284 thread::yield_now();
285 }
286 }
287}
288
289struct WritePermit<'a> {
290 active_writers: &'a AtomicUsize,
291}
292
293impl Drop for WritePermit<'_> {
294 fn drop(&mut self) {
295 self.active_writers.fetch_sub(1, Ordering::AcqRel);
296 }
297}
298
299impl<T: Clone + Send + Sync + 'static> Signal<T> {
300 #[must_use]
306 pub fn changes(&self) -> Source<T> {
307 let actor = self.inner.actor.clone();
308 let signal = self.clone();
309 let next_subscriber_id = Arc::clone(&self.inner.next_subscriber_id);
310 Source::from_materialized_factory(move |_materializer| {
311 let id = next_subscriber_id.fetch_add(1, Ordering::Relaxed);
312 let slot = SignalSlot::new(
313 id,
314 actor.clone(),
315 Arc::clone(&signal.inner.shared.mirror),
316 Arc::clone(&signal.inner.shared.published_sequence),
317 Arc::clone(&signal.inner.shared.parked_slots),
318 );
319 signal.register_slot(Arc::clone(&slot), id)?;
320 let stream: BoxStream<T> = Box::new(SignalChangesStream {
321 slot,
322 terminated: false,
323 });
324 Ok((stream, NotUsed))
325 })
326 }
327
328 #[doc(hidden)]
329 pub fn __benchmark_changes(&self) -> StreamResult<SignalBenchmarkStream<T>> {
330 let id = self
331 .inner
332 .next_subscriber_id
333 .fetch_add(1, Ordering::Relaxed);
334 let slot = SignalSlot::new(
335 id,
336 self.inner.actor.clone(),
337 Arc::clone(&self.inner.shared.mirror),
338 Arc::clone(&self.inner.shared.published_sequence),
339 Arc::clone(&self.inner.shared.parked_slots),
340 );
341 Ok(SignalBenchmarkStream {
342 slot,
343 terminated: false,
344 })
345 }
346}
347
348impl<T: Send + Sync + 'static> Drop for SignalInner<T> {
349 fn drop(&mut self) {
350 self.actor.stop(None);
351 }
352}
353
354enum SignalMessage<T: Send + Sync + 'static> {
355 Close {
356 final_value: Option<T>,
357 reply: Ack,
358 },
359 Subscribe {
360 id: u64,
361 slot: Arc<SignalSlot<T>>,
362 reply: Ack,
363 },
364 Unsubscribe {
365 id: u64,
366 },
367}
368
369#[cfg(feature = "cluster")]
370impl<T: Send + Sync + 'static> ractor::Message for SignalMessage<T> {}
371
372struct SignalActor<T> {
373 _marker: PhantomData<fn() -> T>,
374}
375
376impl<T> Default for SignalActor<T> {
377 fn default() -> Self {
378 Self {
379 _marker: PhantomData,
380 }
381 }
382}
383
384struct SignalActorState<T: Send + Sync + 'static> {
385 shared: Arc<SignalShared<T>>,
386 subscribers: HashMap<u64, Arc<SignalSlot<T>>>,
387 closed: bool,
388}
389
390impl<T: Send + Sync + 'static> Actor for SignalActor<T> {
391 type Msg = SignalMessage<T>;
392 type State = SignalActorState<T>;
393 type Arguments = SignalActorState<T>;
394
395 async fn pre_start(
396 &self,
397 _myself: ActorRef<Self::Msg>,
398 args: Self::Arguments,
399 ) -> Result<Self::State, ActorProcessingErr> {
400 Ok(args)
401 }
402
403 async fn handle(
404 &self,
405 _myself: ActorRef<Self::Msg>,
406 message: Self::Msg,
407 state: &mut Self::State,
408 ) -> Result<(), ActorProcessingErr> {
409 match message {
410 SignalMessage::Close { final_value, reply } => {
411 close_signal(state, final_value);
412 let _ = reply.send(Ok(()));
413 }
414 SignalMessage::Subscribe { id, slot, reply } => {
415 if state.closed || state.shared.lifecycle.load(Ordering::Acquire) == STATE_CLOSED {
416 slot.complete_with_final();
417 } else {
418 state.subscribers.insert(id, Arc::clone(&slot));
419 publish_signal_slot_table(state);
420 }
421 let _ = reply.send(Ok(()));
422 }
423 SignalMessage::Unsubscribe { id } => {
424 state.subscribers.remove(&id);
425 publish_signal_slot_table(state);
426 }
427 }
428 Ok(())
429 }
430
431 async fn post_stop(
432 &self,
433 _myself: ActorRef<Self::Msg>,
434 state: &mut Self::State,
435 ) -> Result<(), ActorProcessingErr> {
436 if !state.closed {
437 for slot in state.subscribers.values() {
438 slot.fail(StreamError::ActorTerminated);
439 }
440 state.subscribers.clear();
441 publish_signal_slot_table(state);
442 }
443 Ok(())
444 }
445}
446
447fn close_signal<T: Send + Sync + 'static>(state: &mut SignalActorState<T>, final_value: Option<T>) {
448 if state.closed {
449 return;
450 }
451 match state.shared.lifecycle.compare_exchange(
452 STATE_OPEN,
453 STATE_CLOSING,
454 Ordering::AcqRel,
455 Ordering::Acquire,
456 ) {
457 Ok(_) => {}
458 Err(STATE_CLOSED) => {
459 state.closed = true;
460 return;
461 }
462 Err(_) => {}
463 }
464 state.shared.wait_for_writers_to_drain();
465
466 if let Some(final_value) = final_value {
467 let value = Arc::new(final_value);
468 let sequence = state.shared.claim_sequence();
469 state.shared.wait_publish_turn(sequence);
470 state.shared.mirror.store(Arc::clone(&value));
471 state
472 .shared
473 .published_sequence
474 .store(sequence, Ordering::Release);
475 state
476 .shared
477 .delivered_sequence
478 .store(sequence, Ordering::Release);
479 }
480 state
481 .shared
482 .lifecycle
483 .store(STATE_CLOSED, Ordering::Release);
484
485 for slot in state.subscribers.values() {
486 slot.complete_with_final();
487 }
488 state.subscribers.clear();
489 publish_signal_slot_table(state);
490 state.closed = true;
491}
492
493fn publish_signal_slot_table<T: Send + Sync + 'static>(state: &SignalActorState<T>) {
494 let slots = state.subscribers.values().cloned().collect::<Vec<_>>();
495 state
496 .shared
497 .subscribers
498 .store(Arc::new(SignalSlotTable { slots }));
499}
500
501fn closed_error(kind: &str) -> StreamError {
502 StreamError::Failed(format!("{kind} is closed"))
503}
504
505struct SignalSlot<T: Send + Sync + 'static> {
506 id: u64,
507 actor: ActorRef<SignalMessage<T>>,
508 mirror: Arc<ArcSwap<T>>,
509 published_sequence: Arc<AtomicU64>,
510 parked_slots: Arc<AtomicUsize>,
511 parked: AtomicBool,
512 consumed_sequence: AtomicU64,
513 terminal_state: AtomicU8,
514 terminal: Mutex<Option<SignalSlotTerminal>>,
515 available: Condvar,
516 async_available: Notify,
517}
518
519#[derive(Clone)]
520enum SignalSlotTerminal {
521 Complete,
522 Error(StreamError),
523}
524
525impl<T: Send + Sync + 'static> SignalSlot<T> {
526 fn new(
527 id: u64,
528 actor: ActorRef<SignalMessage<T>>,
529 mirror: Arc<ArcSwap<T>>,
530 published_sequence: Arc<AtomicU64>,
531 parked_slots: Arc<AtomicUsize>,
532 ) -> Arc<Self> {
533 Arc::new(Self {
534 id,
535 actor,
536 mirror,
537 published_sequence,
538 parked_slots,
539 parked: AtomicBool::new(false),
540 consumed_sequence: AtomicU64::new(NO_SEQUENCE),
541 terminal_state: AtomicU8::new(SLOT_OPEN),
542 terminal: Mutex::new(None),
543 available: Condvar::new(),
544 async_available: Notify::new(),
545 })
546 }
547
548 fn terminal_lock(&self) -> MutexGuard<'_, Option<SignalSlotTerminal>> {
549 self.terminal
550 .lock()
551 .unwrap_or_else(|poison| poison.into_inner())
552 }
553
554 fn publish(&self, sequence: u64) {
555 if self.terminal_state.load(Ordering::Acquire) != SLOT_OPEN {
556 return;
557 }
558
559 let was_consumed = !has_unconsumed(
560 sequence.saturating_sub(1),
561 self.consumed_sequence.load(Ordering::Acquire),
562 );
563 if was_consumed {
564 self.wake();
565 }
566 }
567
568 fn complete_with_final(&self) {
569 if self
570 .terminal_state
571 .compare_exchange(
572 SLOT_OPEN,
573 SLOT_COMPLETE,
574 Ordering::AcqRel,
575 Ordering::Acquire,
576 )
577 .is_err()
578 {
579 return;
580 }
581 *self.terminal_lock() = Some(SignalSlotTerminal::Complete);
582 self.consumed_sequence.store(NO_SEQUENCE, Ordering::Release);
583 self.wake();
584 }
585
586 fn fail(&self, error: StreamError) {
587 if self
588 .terminal_state
589 .compare_exchange(SLOT_OPEN, SLOT_ERROR, Ordering::AcqRel, Ordering::Acquire)
590 .is_err()
591 {
592 return;
593 }
594 *self.terminal_lock() = Some(SignalSlotTerminal::Error(error));
595 self.wake();
596 }
597
598 fn take_value(&self) -> Option<Arc<T>> {
599 loop {
600 let available = self.published_sequence.load(Ordering::Acquire);
601 let consumed = self.consumed_sequence.load(Ordering::Acquire);
602 if !has_unconsumed(available, consumed) {
603 return None;
604 }
605 if self
606 .consumed_sequence
607 .compare_exchange(consumed, available, Ordering::AcqRel, Ordering::Acquire)
608 .is_err()
609 {
610 continue;
611 }
612
613 return Some(self.mirror.load_full());
614 }
615 }
616
617 fn terminal(&self) -> Option<SignalSlotTerminal> {
618 if self.terminal_state.load(Ordering::Acquire) == SLOT_OPEN {
619 return None;
620 }
621 self.terminal_lock().clone()
622 }
623
624 fn park(&self) {
625 if !self.parked.swap(true, Ordering::AcqRel) {
626 self.parked_slots.fetch_add(1, Ordering::AcqRel);
627 }
628 }
629
630 fn unpark(&self) {
631 if self.parked.swap(false, Ordering::AcqRel) {
632 self.parked_slots.fetch_sub(1, Ordering::AcqRel);
633 }
634 }
635
636 fn wake(&self) {
637 self.unpark();
638 self.available.notify_all();
639 self.async_available.notify_waiters();
640 }
641
642 fn unsubscribe(&self) {
643 let _ = self
644 .actor
645 .send_message(SignalMessage::Unsubscribe { id: self.id });
646 }
647}
648
649fn has_unconsumed(available: u64, consumed: u64) -> bool {
650 available != NO_SEQUENCE && (consumed == NO_SEQUENCE || available > consumed)
651}
652
653struct SignalChangesStream<T: Clone + Send + Sync + 'static> {
654 slot: Arc<SignalSlot<T>>,
655 terminated: bool,
656}
657
658#[doc(hidden)]
659pub struct SignalBenchmarkStream<T: Clone + Send + Sync + 'static> {
660 slot: Arc<SignalSlot<T>>,
661 terminated: bool,
662}
663
664impl<T: Clone + Send + Sync + 'static> Iterator for SignalChangesStream<T> {
665 type Item = StreamResult<T>;
666
667 fn next(&mut self) -> Option<Self::Item> {
668 if self.terminated {
669 return None;
670 }
671
672 loop {
673 if let Some(value) = self.slot.take_value() {
674 return Some(Ok(value.as_ref().clone()));
675 }
676 if let Some(terminal) = self.slot.terminal() {
677 self.terminated = true;
678 return match terminal {
679 SignalSlotTerminal::Complete => None,
680 SignalSlotTerminal::Error(error) => Some(Err(error)),
681 };
682 }
683
684 if current_stream_cancelled()
685 .as_ref()
686 .is_some_and(|cancelled| cancelled.load(Ordering::SeqCst))
687 {
688 self.terminated = true;
689 return Some(Err(StreamError::Cancelled));
690 }
691 self.slot.park();
692 if let Some(value) = self.slot.take_value() {
693 self.slot.unpark();
694 return Some(Ok(value.as_ref().clone()));
695 }
696 if let Some(terminal) = self.slot.terminal() {
697 self.slot.unpark();
698 self.terminated = true;
699 return match terminal {
700 SignalSlotTerminal::Complete => None,
701 SignalSlotTerminal::Error(error) => Some(Err(error)),
702 };
703 }
704 let guard = self.slot.terminal_lock();
705 let _guard = self
706 .slot
707 .available
708 .wait_timeout(guard, SLOT_WAIT_BACKSTOP)
709 .unwrap_or_else(|poison| poison.into_inner())
710 .0;
711 self.slot.unpark();
712 }
713 }
714}
715
716impl<T: Clone + Send + Sync + 'static> Drop for SignalChangesStream<T> {
717 fn drop(&mut self) {
718 self.slot.unsubscribe();
719 }
720}
721
722impl<T: Clone + Send + Sync + 'static> SignalBenchmarkStream<T> {
723 #[doc(hidden)]
724 pub async fn next(&mut self) -> Option<StreamResult<T>> {
725 if self.terminated {
726 return None;
727 }
728
729 loop {
730 let notified = self.slot.async_available.notified();
731 tokio::pin!(notified);
732 notified.as_mut().enable();
733
734 if let Some(value) = self.slot.take_value() {
735 self.slot.unpark();
736 return Some(Ok(value.as_ref().clone()));
737 }
738 if let Some(terminal) = self.slot.terminal() {
739 self.slot.unpark();
740 self.terminated = true;
741 return match terminal {
742 SignalSlotTerminal::Complete => None,
743 SignalSlotTerminal::Error(error) => Some(Err(error)),
744 };
745 }
746
747 {
748 self.slot.park();
749 if let Some(value) = self.slot.take_value() {
750 self.slot.unpark();
751 return Some(Ok(value.as_ref().clone()));
752 }
753 if let Some(terminal) = self.slot.terminal() {
754 self.slot.unpark();
755 self.terminated = true;
756 return match terminal {
757 SignalSlotTerminal::Complete => None,
758 SignalSlotTerminal::Error(error) => Some(Err(error)),
759 };
760 }
761 }
762
763 let _ = tokio::time::timeout(ASYNC_SLOT_WAIT_BACKSTOP, notified.as_mut()).await;
764 self.slot.unpark();
765 }
766 }
767}
768
769impl<T: Clone + Send + Sync + 'static> Drop for SignalBenchmarkStream<T> {
770 fn drop(&mut self) {
771 self.slot.unpark();
772 }
773}
774
775#[cfg(test)]
776mod tests {
777 use super::*;
778 use crate::{Sink, stream::Materializer};
779 use std::{
780 sync::{
781 Barrier,
782 atomic::{AtomicBool, AtomicUsize},
783 },
784 thread,
785 time::{Duration, Instant},
786 };
787
788 fn wait<T>(completion: crate::StreamCompletion<T>) -> T {
789 completion.wait().unwrap()
790 }
791
792 fn wait_until<F>(timeout: Duration, mut condition: F) -> bool
793 where
794 F: FnMut() -> bool,
795 {
796 let deadline = Instant::now() + timeout;
797 while Instant::now() < deadline {
798 if condition() {
799 return true;
800 }
801 thread::yield_now();
802 }
803 condition()
804 }
805
806 #[test]
807 fn get_snapshot_and_acked_set_read_your_writes() {
808 let signal = Signal::new(1_u64).unwrap();
809 assert_eq!(*signal.get(), 1);
810 assert_eq!(signal.get_cloned(), 1);
811 signal.set(2).unwrap();
812 assert_eq!(*signal.get(), 2);
813 assert_eq!(signal.get_cloned(), 2);
814 signal.update(|value| *value + 1).unwrap();
815 assert_eq!(*signal.get(), 3);
816 assert_eq!(signal.get_cloned(), 3);
817 }
818
819 #[test]
820 fn subscribe_sees_current_then_changes() {
821 let signal = Signal::new(10_u64).unwrap();
822 let queue = signal.changes().run_with(Sink::queue()).unwrap();
823 assert_eq!(queue.pull().unwrap(), Some(10));
824 signal.set(11).unwrap();
825 assert_eq!(queue.pull().unwrap(), Some(11));
826 signal.set(12).unwrap();
827 assert_eq!(queue.pull().unwrap(), Some(12));
828 }
829
830 #[test]
831 fn subscribe_has_no_get_then_subscribe_gap_under_concurrent_sets() {
832 const RUNS: usize = 128;
833 for run in 0..RUNS {
834 let signal = Signal::new(0_u64).unwrap();
835 let barrier = Arc::new(Barrier::new(2));
836 let writer_signal = signal.clone();
837 let writer_barrier = Arc::clone(&barrier);
838 let writer = thread::spawn(move || {
839 writer_barrier.wait();
840 writer_signal.set((run + 1) as u64).unwrap();
841 });
842
843 barrier.wait();
844 let observed = signal.changes().take(2).run_with(Sink::collect()).unwrap();
845 writer.join().unwrap();
846 signal.close().unwrap();
847 let values = wait(observed);
848 assert!(!values.is_empty());
849 let expected = (run + 1) as u64;
850 assert_eq!(*signal.get(), expected);
851 assert!(
852 values.contains(&expected),
853 "subscription missed concurrent set: {values:?}"
854 );
855 }
856 }
857
858 #[test]
859 fn signal_coalesces_slow_subscriber_but_observes_final() {
860 const WRITES: u64 = 512;
861 let signal = Signal::new(0_u64).unwrap();
862 let seen = Arc::new(Mutex::new(Vec::new()));
863 let sink_seen = Arc::clone(&seen);
864 let gate = Arc::new(AtomicBool::new(false));
865 let sink_gate = Arc::clone(&gate);
866
867 let completion = signal
868 .changes()
869 .run_with(Sink::foreach(move |item| {
870 sink_seen.lock().unwrap().push(item);
871 while !sink_gate.load(Ordering::SeqCst) {
872 thread::yield_now();
873 }
874 }))
875 .unwrap();
876
877 assert!(wait_until(Duration::from_secs(1), || {
878 !seen.lock().unwrap().is_empty()
879 }));
880
881 for value in 1..=WRITES {
882 signal.set(value).unwrap();
883 }
884 signal.close().unwrap();
885 gate.store(true, Ordering::SeqCst);
886 wait(completion);
887
888 let values = seen.lock().unwrap().clone();
889 assert!(values.len() < WRITES as usize);
890 assert_eq!(values.last().copied(), Some(WRITES));
891 }
892
893 #[test]
894 fn post_close_subscribe_yields_final_then_complete() {
895 let signal = Signal::new(1_u64).unwrap();
896 signal.close_with(9).unwrap();
897 let values = signal.changes().run_collect().unwrap();
898 assert_eq!(values, vec![9]);
899 }
900
901 #[test]
902 fn benchmark_stream_sees_seed() {
903 let signal = Signal::new(7_u64).unwrap();
904 let runtime = tokio::runtime::Runtime::new().unwrap();
905 let mut stream = signal.__benchmark_changes().unwrap();
906 let seed = runtime
907 .block_on(stream.next())
908 .expect("benchmark stream ended before seed")
909 .expect("benchmark stream failed before seed");
910 assert_eq!(seed, 7);
911 }
912
913 #[test]
914 fn benchmark_stream_sees_final_after_writes() {
915 let signal = Signal::new(0_u64).unwrap();
916 let runtime = tokio::runtime::Runtime::new().unwrap();
917 let mut stream = signal.__benchmark_changes().unwrap();
918 let seed = runtime
919 .block_on(stream.next())
920 .expect("benchmark stream ended before seed")
921 .expect("benchmark stream failed before seed");
922 assert_eq!(seed, 0);
923 for value in 1..=16 {
924 signal.set_eventually(value).unwrap();
925 }
926 let final_value = runtime.block_on(async {
927 loop {
928 let value = stream
929 .next()
930 .await
931 .expect("benchmark stream ended before final")
932 .expect("benchmark stream failed before final");
933 if value >= 16 {
934 break value;
935 }
936 }
937 });
938 assert_eq!(final_value, 16);
939 }
940
941 #[test]
942 fn benchmark_spawned_stream_sees_final_after_ready() {
943 let signal = Signal::new(0_u64).unwrap();
944 let runtime = tokio::runtime::Runtime::new().unwrap();
945 let mut stream = signal.__benchmark_changes().unwrap();
946 let ready = Arc::new(AtomicBool::new(false));
947 let task_ready = Arc::clone(&ready);
948 let handle = runtime.spawn(async move {
949 let seed = stream
950 .next()
951 .await
952 .expect("benchmark stream ended before seed")
953 .expect("benchmark stream failed before seed");
954 assert_eq!(seed, 0);
955 task_ready.store(true, Ordering::Release);
956 loop {
957 let value = stream
958 .next()
959 .await
960 .expect("benchmark stream ended before final")
961 .expect("benchmark stream failed before final");
962 if value >= 1024 {
963 return value;
964 }
965 }
966 });
967 runtime.block_on(async {
968 while !ready.load(Ordering::Acquire) {
969 tokio::task::yield_now().await;
970 }
971 });
972 for value in 1..=1024 {
973 signal.set_eventually(value).unwrap();
974 }
975 let final_value = runtime
976 .block_on(async { tokio::time::timeout(Duration::from_secs(1), handle).await })
977 .expect("spawned signal subscriber timed out")
978 .expect("spawned signal subscriber panicked");
979 assert_eq!(final_value, 1024);
980 }
981
982 #[test]
983 fn dropping_feed_source_cancels_cleanly() {
984 let signal = Signal::new(0_u64).unwrap();
985 let pulled = Arc::new(AtomicUsize::new(0));
986 let sink_pulled = Arc::clone(&pulled);
987 let completion = signal
988 .changes()
989 .run_with(Sink::foreach(move |_| {
990 sink_pulled.fetch_add(1, Ordering::SeqCst);
991 }))
992 .unwrap();
993 assert!(wait_until(Duration::from_secs(1), || {
994 pulled.load(Ordering::SeqCst) == 1
995 }));
996 drop(completion);
997 assert!(wait_until(Duration::from_secs(1), || signal.set(1).is_ok()));
998 }
999
1000 #[test]
1001 fn actor_death_fails_feed() {
1002 let signal = Signal::new(0_u64).unwrap();
1003 let materializer = Materializer::new();
1004 let completion = signal
1005 .changes()
1006 .drop(1)
1007 .run_with_materializer(Sink::head(), &materializer)
1008 .unwrap();
1009 drop(signal);
1010 match completion.wait() {
1011 Err(StreamError::ActorTerminated) => {}
1012 other => panic!("expected actor termination, got {other:?}"),
1013 }
1014 }
1015}