use std::{
collections::HashMap,
hint,
marker::PhantomData,
sync::{
Arc, Condvar, Mutex, MutexGuard,
atomic::{AtomicBool, AtomicU8, AtomicU64, AtomicUsize, Ordering},
mpsc,
},
thread,
time::Duration,
};
use arc_swap::ArcSwap;
use ractor::{Actor, ActorProcessingErr, ActorRef};
use tokio::sync::Notify;
use crate::{
StreamError, StreamResult,
actor::block_on_ractor_runtime,
stream::{BoxStream, NotUsed, Source, current_stream_cancelled},
};
const SLOT_WAIT_BACKSTOP: Duration = Duration::from_millis(10);
const ASYNC_SLOT_WAIT_BACKSTOP: Duration = Duration::from_micros(50);
const STATE_OPEN: u8 = 0;
const STATE_CLOSING: u8 = 1;
const STATE_CLOSED: u8 = 2;
const NO_SEQUENCE: u64 = u64::MAX;
const SLOT_OPEN: u8 = 0;
const SLOT_COMPLETE: u8 = 1;
const SLOT_ERROR: u8 = 2;
type Ack = mpsc::Sender<StreamResult<()>>;
pub struct Signal<T: Send + Sync + 'static> {
inner: Arc<SignalInner<T>>,
}
struct SignalInner<T: Send + Sync + 'static> {
actor: ActorRef<SignalMessage<T>>,
shared: Arc<SignalShared<T>>,
next_subscriber_id: Arc<AtomicU64>,
}
struct SignalShared<T: Send + Sync + 'static> {
mirror: Arc<ArcSwap<T>>,
subscribers: Arc<ArcSwap<SignalSlotTable<T>>>,
parked_slots: Arc<AtomicUsize>,
lifecycle: AtomicU8,
active_writers: AtomicUsize,
next_sequence: AtomicU64,
published_sequence: Arc<AtomicU64>,
delivered_sequence: AtomicU64,
}
struct SignalSlotTable<T: Send + Sync + 'static> {
slots: Vec<Arc<SignalSlot<T>>>,
}
impl<T: Send + Sync + 'static> Clone for Signal<T> {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
}
}
}
impl<T: Send + Sync + 'static> Signal<T> {
pub fn new(initial: T) -> StreamResult<Self> {
let value = Arc::new(initial);
let shared = Arc::new(SignalShared {
mirror: Arc::new(ArcSwap::from(Arc::clone(&value))),
subscribers: Arc::new(ArcSwap::from_pointee(SignalSlotTable { slots: Vec::new() })),
parked_slots: Arc::new(AtomicUsize::new(0)),
lifecycle: AtomicU8::new(STATE_OPEN),
active_writers: AtomicUsize::new(0),
next_sequence: AtomicU64::new(0),
published_sequence: Arc::new(AtomicU64::new(0)),
delivered_sequence: AtomicU64::new(0),
});
let state = SignalActorState {
shared: Arc::clone(&shared),
subscribers: HashMap::new(),
closed: false,
};
let (actor, _handle) =
block_on_ractor_runtime(Actor::spawn(None, SignalActor::<T>::default(), state))?
.map_err(|error| {
StreamError::Failed(format!("signal actor failed to spawn: {error}"))
})?;
Ok(Self {
inner: Arc::new(SignalInner {
actor,
shared,
next_subscriber_id: Arc::new(AtomicU64::new(1)),
}),
})
}
#[must_use]
pub fn get(&self) -> Arc<T> {
self.inner.shared.mirror.load_full()
}
#[must_use]
pub fn get_cloned(&self) -> T
where
T: Clone,
{
self.inner.shared.mirror.load().as_ref().clone()
}
pub fn set(&self, value: T) -> StreamResult<()> {
self.publish_set(Arc::new(value))
}
pub fn set_eventually(&self, value: T) -> StreamResult<()> {
self.publish_set(Arc::new(value))
}
pub fn update<F>(&self, update: F) -> StreamResult<()>
where
F: FnMut(&T) -> T + Send + 'static,
{
self.publish_update(update)
}
pub fn update_eventually<F>(&self, update: F) -> StreamResult<()>
where
F: FnMut(&T) -> T + Send + 'static,
{
self.publish_update(update)
}
pub fn close(&self) -> StreamResult<()> {
self.send_close(None)
}
pub fn close_with(&self, final_value: T) -> StreamResult<()> {
self.send_close(Some(final_value))
}
fn publish_set(&self, value: Arc<T>) -> StreamResult<()> {
let _permit = self.inner.shared.begin_write("signal")?;
let sequence = self.inner.shared.claim_sequence();
self.inner.shared.wait_publish_turn(sequence);
self.inner.shared.mirror.store(Arc::clone(&value));
self.inner.shared.finish_publish(sequence);
Ok(())
}
fn publish_update<F>(&self, mut update: F) -> StreamResult<()>
where
F: FnMut(&T) -> T + Send + 'static,
{
let _permit = self.inner.shared.begin_write("signal")?;
let sequence = self.inner.shared.claim_sequence();
self.inner.shared.wait_publish_turn(sequence);
loop {
let current = self.inner.shared.mirror.load();
let next = Arc::new(update(current.as_ref()));
let previous = self
.inner
.shared
.mirror
.compare_and_swap(&*current, Arc::clone(&next));
if std::ptr::eq(current.as_ref(), previous.as_ref()) {
break;
}
}
self.inner.shared.finish_publish(sequence);
Ok(())
}
fn send_close(&self, final_value: Option<T>) -> StreamResult<()> {
let (reply, receiver) = mpsc::channel();
self.inner
.actor
.send_message(SignalMessage::Close { final_value, reply })
.map_err(|error| StreamError::ActorAskSendFailed {
reason: error.to_string(),
})?;
receiver.recv().unwrap_or(Err(StreamError::ActorTerminated))
}
fn register_slot(&self, slot: Arc<SignalSlot<T>>, id: u64) -> StreamResult<()> {
let (reply, receiver) = mpsc::channel();
self.inner
.actor
.send_message(SignalMessage::Subscribe { id, slot, reply })
.map_err(|error| StreamError::ActorAskSendFailed {
reason: error.to_string(),
})?;
receiver.recv().unwrap_or(Err(StreamError::ActorTerminated))
}
}
impl<T: Send + Sync + 'static> SignalShared<T> {
fn begin_write(&self, kind: &'static str) -> StreamResult<WritePermit<'_>> {
if self.lifecycle.load(Ordering::Acquire) != STATE_OPEN {
return Err(closed_error(kind));
}
self.active_writers.fetch_add(1, Ordering::AcqRel);
if self.lifecycle.load(Ordering::Acquire) == STATE_OPEN {
Ok(WritePermit {
active_writers: &self.active_writers,
})
} else {
self.active_writers.fetch_sub(1, Ordering::AcqRel);
Err(closed_error(kind))
}
}
fn claim_sequence(&self) -> u64 {
self.next_sequence.fetch_add(1, Ordering::AcqRel) + 1
}
fn wait_publish_turn(&self, sequence: u64) {
let mut spins = 0_u32;
while self.delivered_sequence.load(Ordering::Acquire) + 1 != sequence {
spins = spins.wrapping_add(1);
if spins < 64 {
hint::spin_loop();
} else {
thread::yield_now();
}
}
}
fn finish_publish(&self, sequence: u64) {
self.published_sequence.store(sequence, Ordering::Release);
if self.parked_slots.load(Ordering::Acquire) != 0 {
let table = self.subscribers.load();
for slot in &table.slots {
slot.publish(sequence);
}
}
self.delivered_sequence.store(sequence, Ordering::Release);
}
fn wait_for_writers_to_drain(&self) {
while self.active_writers.load(Ordering::Acquire) != 0 {
thread::yield_now();
}
}
}
struct WritePermit<'a> {
active_writers: &'a AtomicUsize,
}
impl Drop for WritePermit<'_> {
fn drop(&mut self) {
self.active_writers.fetch_sub(1, Ordering::AcqRel);
}
}
impl<T: Clone + Send + Sync + 'static> Signal<T> {
#[must_use]
pub fn changes(&self) -> Source<T> {
let actor = self.inner.actor.clone();
let signal = self.clone();
let next_subscriber_id = Arc::clone(&self.inner.next_subscriber_id);
Source::from_materialized_factory(move |_materializer| {
let id = next_subscriber_id.fetch_add(1, Ordering::Relaxed);
let slot = SignalSlot::new(
id,
actor.clone(),
Arc::clone(&signal.inner.shared.mirror),
Arc::clone(&signal.inner.shared.published_sequence),
Arc::clone(&signal.inner.shared.parked_slots),
);
signal.register_slot(Arc::clone(&slot), id)?;
let stream: BoxStream<T> = Box::new(SignalChangesStream {
slot,
terminated: false,
});
Ok((stream, NotUsed))
})
}
#[doc(hidden)]
pub fn __benchmark_changes(&self) -> StreamResult<SignalBenchmarkStream<T>> {
let id = self
.inner
.next_subscriber_id
.fetch_add(1, Ordering::Relaxed);
let slot = SignalSlot::new(
id,
self.inner.actor.clone(),
Arc::clone(&self.inner.shared.mirror),
Arc::clone(&self.inner.shared.published_sequence),
Arc::clone(&self.inner.shared.parked_slots),
);
Ok(SignalBenchmarkStream {
slot,
terminated: false,
})
}
}
impl<T: Send + Sync + 'static> Drop for SignalInner<T> {
fn drop(&mut self) {
self.actor.stop(None);
}
}
enum SignalMessage<T: Send + Sync + 'static> {
Close {
final_value: Option<T>,
reply: Ack,
},
Subscribe {
id: u64,
slot: Arc<SignalSlot<T>>,
reply: Ack,
},
Unsubscribe {
id: u64,
},
}
#[cfg(feature = "cluster")]
impl<T: Send + Sync + 'static> ractor::Message for SignalMessage<T> {}
struct SignalActor<T> {
_marker: PhantomData<fn() -> T>,
}
impl<T> Default for SignalActor<T> {
fn default() -> Self {
Self {
_marker: PhantomData,
}
}
}
struct SignalActorState<T: Send + Sync + 'static> {
shared: Arc<SignalShared<T>>,
subscribers: HashMap<u64, Arc<SignalSlot<T>>>,
closed: bool,
}
impl<T: Send + Sync + 'static> Actor for SignalActor<T> {
type Msg = SignalMessage<T>;
type State = SignalActorState<T>;
type Arguments = SignalActorState<T>;
async fn pre_start(
&self,
_myself: ActorRef<Self::Msg>,
args: Self::Arguments,
) -> Result<Self::State, ActorProcessingErr> {
Ok(args)
}
async fn handle(
&self,
_myself: ActorRef<Self::Msg>,
message: Self::Msg,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match message {
SignalMessage::Close { final_value, reply } => {
close_signal(state, final_value);
let _ = reply.send(Ok(()));
}
SignalMessage::Subscribe { id, slot, reply } => {
if state.closed || state.shared.lifecycle.load(Ordering::Acquire) == STATE_CLOSED {
slot.complete_with_final();
} else {
state.subscribers.insert(id, Arc::clone(&slot));
publish_signal_slot_table(state);
}
let _ = reply.send(Ok(()));
}
SignalMessage::Unsubscribe { id } => {
state.subscribers.remove(&id);
publish_signal_slot_table(state);
}
}
Ok(())
}
async fn post_stop(
&self,
_myself: ActorRef<Self::Msg>,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
if !state.closed {
for slot in state.subscribers.values() {
slot.fail(StreamError::ActorTerminated);
}
state.subscribers.clear();
publish_signal_slot_table(state);
}
Ok(())
}
}
fn close_signal<T: Send + Sync + 'static>(state: &mut SignalActorState<T>, final_value: Option<T>) {
if state.closed {
return;
}
match state.shared.lifecycle.compare_exchange(
STATE_OPEN,
STATE_CLOSING,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {}
Err(STATE_CLOSED) => {
state.closed = true;
return;
}
Err(_) => {}
}
state.shared.wait_for_writers_to_drain();
if let Some(final_value) = final_value {
let value = Arc::new(final_value);
let sequence = state.shared.claim_sequence();
state.shared.wait_publish_turn(sequence);
state.shared.mirror.store(Arc::clone(&value));
state
.shared
.published_sequence
.store(sequence, Ordering::Release);
state
.shared
.delivered_sequence
.store(sequence, Ordering::Release);
}
state
.shared
.lifecycle
.store(STATE_CLOSED, Ordering::Release);
for slot in state.subscribers.values() {
slot.complete_with_final();
}
state.subscribers.clear();
publish_signal_slot_table(state);
state.closed = true;
}
fn publish_signal_slot_table<T: Send + Sync + 'static>(state: &SignalActorState<T>) {
let slots = state.subscribers.values().cloned().collect::<Vec<_>>();
state
.shared
.subscribers
.store(Arc::new(SignalSlotTable { slots }));
}
fn closed_error(kind: &str) -> StreamError {
StreamError::Failed(format!("{kind} is closed"))
}
struct SignalSlot<T: Send + Sync + 'static> {
id: u64,
actor: ActorRef<SignalMessage<T>>,
mirror: Arc<ArcSwap<T>>,
published_sequence: Arc<AtomicU64>,
parked_slots: Arc<AtomicUsize>,
parked: AtomicBool,
consumed_sequence: AtomicU64,
terminal_state: AtomicU8,
terminal: Mutex<Option<SignalSlotTerminal>>,
available: Condvar,
async_available: Notify,
}
#[derive(Clone)]
enum SignalSlotTerminal {
Complete,
Error(StreamError),
}
impl<T: Send + Sync + 'static> SignalSlot<T> {
fn new(
id: u64,
actor: ActorRef<SignalMessage<T>>,
mirror: Arc<ArcSwap<T>>,
published_sequence: Arc<AtomicU64>,
parked_slots: Arc<AtomicUsize>,
) -> Arc<Self> {
Arc::new(Self {
id,
actor,
mirror,
published_sequence,
parked_slots,
parked: AtomicBool::new(false),
consumed_sequence: AtomicU64::new(NO_SEQUENCE),
terminal_state: AtomicU8::new(SLOT_OPEN),
terminal: Mutex::new(None),
available: Condvar::new(),
async_available: Notify::new(),
})
}
fn terminal_lock(&self) -> MutexGuard<'_, Option<SignalSlotTerminal>> {
self.terminal
.lock()
.unwrap_or_else(|poison| poison.into_inner())
}
fn publish(&self, sequence: u64) {
if self.terminal_state.load(Ordering::Acquire) != SLOT_OPEN {
return;
}
let was_consumed = !has_unconsumed(
sequence.saturating_sub(1),
self.consumed_sequence.load(Ordering::Acquire),
);
if was_consumed {
self.wake();
}
}
fn complete_with_final(&self) {
if self
.terminal_state
.compare_exchange(
SLOT_OPEN,
SLOT_COMPLETE,
Ordering::AcqRel,
Ordering::Acquire,
)
.is_err()
{
return;
}
*self.terminal_lock() = Some(SignalSlotTerminal::Complete);
self.consumed_sequence.store(NO_SEQUENCE, Ordering::Release);
self.wake();
}
fn fail(&self, error: StreamError) {
if self
.terminal_state
.compare_exchange(SLOT_OPEN, SLOT_ERROR, Ordering::AcqRel, Ordering::Acquire)
.is_err()
{
return;
}
*self.terminal_lock() = Some(SignalSlotTerminal::Error(error));
self.wake();
}
fn take_value(&self) -> Option<Arc<T>> {
loop {
let available = self.published_sequence.load(Ordering::Acquire);
let consumed = self.consumed_sequence.load(Ordering::Acquire);
if !has_unconsumed(available, consumed) {
return None;
}
if self
.consumed_sequence
.compare_exchange(consumed, available, Ordering::AcqRel, Ordering::Acquire)
.is_err()
{
continue;
}
return Some(self.mirror.load_full());
}
}
fn terminal(&self) -> Option<SignalSlotTerminal> {
if self.terminal_state.load(Ordering::Acquire) == SLOT_OPEN {
return None;
}
self.terminal_lock().clone()
}
fn park(&self) {
if !self.parked.swap(true, Ordering::AcqRel) {
self.parked_slots.fetch_add(1, Ordering::AcqRel);
}
}
fn unpark(&self) {
if self.parked.swap(false, Ordering::AcqRel) {
self.parked_slots.fetch_sub(1, Ordering::AcqRel);
}
}
fn wake(&self) {
self.unpark();
self.available.notify_all();
self.async_available.notify_waiters();
}
fn unsubscribe(&self) {
let _ = self
.actor
.send_message(SignalMessage::Unsubscribe { id: self.id });
}
}
fn has_unconsumed(available: u64, consumed: u64) -> bool {
available != NO_SEQUENCE && (consumed == NO_SEQUENCE || available > consumed)
}
struct SignalChangesStream<T: Clone + Send + Sync + 'static> {
slot: Arc<SignalSlot<T>>,
terminated: bool,
}
#[doc(hidden)]
pub struct SignalBenchmarkStream<T: Clone + Send + Sync + 'static> {
slot: Arc<SignalSlot<T>>,
terminated: bool,
}
impl<T: Clone + Send + Sync + 'static> Iterator for SignalChangesStream<T> {
type Item = StreamResult<T>;
fn next(&mut self) -> Option<Self::Item> {
if self.terminated {
return None;
}
loop {
if let Some(value) = self.slot.take_value() {
return Some(Ok(value.as_ref().clone()));
}
if let Some(terminal) = self.slot.terminal() {
self.terminated = true;
return match terminal {
SignalSlotTerminal::Complete => None,
SignalSlotTerminal::Error(error) => Some(Err(error)),
};
}
if current_stream_cancelled()
.as_ref()
.is_some_and(|cancelled| cancelled.load(Ordering::SeqCst))
{
self.terminated = true;
return Some(Err(StreamError::Cancelled));
}
self.slot.park();
if let Some(value) = self.slot.take_value() {
self.slot.unpark();
return Some(Ok(value.as_ref().clone()));
}
if let Some(terminal) = self.slot.terminal() {
self.slot.unpark();
self.terminated = true;
return match terminal {
SignalSlotTerminal::Complete => None,
SignalSlotTerminal::Error(error) => Some(Err(error)),
};
}
let guard = self.slot.terminal_lock();
let _guard = self
.slot
.available
.wait_timeout(guard, SLOT_WAIT_BACKSTOP)
.unwrap_or_else(|poison| poison.into_inner())
.0;
self.slot.unpark();
}
}
}
impl<T: Clone + Send + Sync + 'static> Drop for SignalChangesStream<T> {
fn drop(&mut self) {
self.slot.unsubscribe();
}
}
impl<T: Clone + Send + Sync + 'static> SignalBenchmarkStream<T> {
#[doc(hidden)]
pub async fn next(&mut self) -> Option<StreamResult<T>> {
if self.terminated {
return None;
}
loop {
let notified = self.slot.async_available.notified();
tokio::pin!(notified);
notified.as_mut().enable();
if let Some(value) = self.slot.take_value() {
self.slot.unpark();
return Some(Ok(value.as_ref().clone()));
}
if let Some(terminal) = self.slot.terminal() {
self.slot.unpark();
self.terminated = true;
return match terminal {
SignalSlotTerminal::Complete => None,
SignalSlotTerminal::Error(error) => Some(Err(error)),
};
}
{
self.slot.park();
if let Some(value) = self.slot.take_value() {
self.slot.unpark();
return Some(Ok(value.as_ref().clone()));
}
if let Some(terminal) = self.slot.terminal() {
self.slot.unpark();
self.terminated = true;
return match terminal {
SignalSlotTerminal::Complete => None,
SignalSlotTerminal::Error(error) => Some(Err(error)),
};
}
}
let _ = tokio::time::timeout(ASYNC_SLOT_WAIT_BACKSTOP, notified.as_mut()).await;
self.slot.unpark();
}
}
}
impl<T: Clone + Send + Sync + 'static> Drop for SignalBenchmarkStream<T> {
fn drop(&mut self) {
self.slot.unpark();
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{Sink, stream::Materializer};
use std::{
sync::{
Barrier,
atomic::{AtomicBool, AtomicUsize},
},
thread,
time::{Duration, Instant},
};
fn wait<T>(completion: crate::StreamCompletion<T>) -> T {
completion.wait().unwrap()
}
fn wait_until<F>(timeout: Duration, mut condition: F) -> bool
where
F: FnMut() -> bool,
{
let deadline = Instant::now() + timeout;
while Instant::now() < deadline {
if condition() {
return true;
}
thread::yield_now();
}
condition()
}
#[test]
fn get_snapshot_and_acked_set_read_your_writes() {
let signal = Signal::new(1_u64).unwrap();
assert_eq!(*signal.get(), 1);
assert_eq!(signal.get_cloned(), 1);
signal.set(2).unwrap();
assert_eq!(*signal.get(), 2);
assert_eq!(signal.get_cloned(), 2);
signal.update(|value| *value + 1).unwrap();
assert_eq!(*signal.get(), 3);
assert_eq!(signal.get_cloned(), 3);
}
#[test]
fn subscribe_sees_current_then_changes() {
let signal = Signal::new(10_u64).unwrap();
let queue = signal.changes().run_with(Sink::queue()).unwrap();
assert_eq!(queue.pull().unwrap(), Some(10));
signal.set(11).unwrap();
assert_eq!(queue.pull().unwrap(), Some(11));
signal.set(12).unwrap();
assert_eq!(queue.pull().unwrap(), Some(12));
}
#[test]
fn subscribe_has_no_get_then_subscribe_gap_under_concurrent_sets() {
const RUNS: usize = 128;
for run in 0..RUNS {
let signal = Signal::new(0_u64).unwrap();
let barrier = Arc::new(Barrier::new(2));
let writer_signal = signal.clone();
let writer_barrier = Arc::clone(&barrier);
let writer = thread::spawn(move || {
writer_barrier.wait();
writer_signal.set((run + 1) as u64).unwrap();
});
barrier.wait();
let observed = signal.changes().take(2).run_with(Sink::collect()).unwrap();
writer.join().unwrap();
signal.close().unwrap();
let values = wait(observed);
assert!(!values.is_empty());
let expected = (run + 1) as u64;
assert_eq!(*signal.get(), expected);
assert!(
values.contains(&expected),
"subscription missed concurrent set: {values:?}"
);
}
}
#[test]
fn signal_coalesces_slow_subscriber_but_observes_final() {
const WRITES: u64 = 512;
let signal = Signal::new(0_u64).unwrap();
let seen = Arc::new(Mutex::new(Vec::new()));
let sink_seen = Arc::clone(&seen);
let gate = Arc::new(AtomicBool::new(false));
let sink_gate = Arc::clone(&gate);
let completion = signal
.changes()
.run_with(Sink::foreach(move |item| {
sink_seen.lock().unwrap().push(item);
while !sink_gate.load(Ordering::SeqCst) {
thread::yield_now();
}
}))
.unwrap();
assert!(wait_until(Duration::from_secs(1), || {
!seen.lock().unwrap().is_empty()
}));
for value in 1..=WRITES {
signal.set(value).unwrap();
}
signal.close().unwrap();
gate.store(true, Ordering::SeqCst);
wait(completion);
let values = seen.lock().unwrap().clone();
assert!(values.len() < WRITES as usize);
assert_eq!(values.last().copied(), Some(WRITES));
}
#[test]
fn post_close_subscribe_yields_final_then_complete() {
let signal = Signal::new(1_u64).unwrap();
signal.close_with(9).unwrap();
let values = signal.changes().run_collect().unwrap();
assert_eq!(values, vec![9]);
}
#[test]
fn benchmark_stream_sees_seed() {
let signal = Signal::new(7_u64).unwrap();
let runtime = tokio::runtime::Runtime::new().unwrap();
let mut stream = signal.__benchmark_changes().unwrap();
let seed = runtime
.block_on(stream.next())
.expect("benchmark stream ended before seed")
.expect("benchmark stream failed before seed");
assert_eq!(seed, 7);
}
#[test]
fn benchmark_stream_sees_final_after_writes() {
let signal = Signal::new(0_u64).unwrap();
let runtime = tokio::runtime::Runtime::new().unwrap();
let mut stream = signal.__benchmark_changes().unwrap();
let seed = runtime
.block_on(stream.next())
.expect("benchmark stream ended before seed")
.expect("benchmark stream failed before seed");
assert_eq!(seed, 0);
for value in 1..=16 {
signal.set_eventually(value).unwrap();
}
let final_value = runtime.block_on(async {
loop {
let value = stream
.next()
.await
.expect("benchmark stream ended before final")
.expect("benchmark stream failed before final");
if value >= 16 {
break value;
}
}
});
assert_eq!(final_value, 16);
}
#[test]
fn benchmark_spawned_stream_sees_final_after_ready() {
let signal = Signal::new(0_u64).unwrap();
let runtime = tokio::runtime::Runtime::new().unwrap();
let mut stream = signal.__benchmark_changes().unwrap();
let ready = Arc::new(AtomicBool::new(false));
let task_ready = Arc::clone(&ready);
let handle = runtime.spawn(async move {
let seed = stream
.next()
.await
.expect("benchmark stream ended before seed")
.expect("benchmark stream failed before seed");
assert_eq!(seed, 0);
task_ready.store(true, Ordering::Release);
loop {
let value = stream
.next()
.await
.expect("benchmark stream ended before final")
.expect("benchmark stream failed before final");
if value >= 1024 {
return value;
}
}
});
runtime.block_on(async {
while !ready.load(Ordering::Acquire) {
tokio::task::yield_now().await;
}
});
for value in 1..=1024 {
signal.set_eventually(value).unwrap();
}
let final_value = runtime
.block_on(async { tokio::time::timeout(Duration::from_secs(1), handle).await })
.expect("spawned signal subscriber timed out")
.expect("spawned signal subscriber panicked");
assert_eq!(final_value, 1024);
}
#[test]
fn dropping_feed_source_cancels_cleanly() {
let signal = Signal::new(0_u64).unwrap();
let pulled = Arc::new(AtomicUsize::new(0));
let sink_pulled = Arc::clone(&pulled);
let completion = signal
.changes()
.run_with(Sink::foreach(move |_| {
sink_pulled.fetch_add(1, Ordering::SeqCst);
}))
.unwrap();
assert!(wait_until(Duration::from_secs(1), || {
pulled.load(Ordering::SeqCst) == 1
}));
drop(completion);
assert!(wait_until(Duration::from_secs(1), || signal.set(1).is_ok()));
}
#[test]
fn actor_death_fails_feed() {
let signal = Signal::new(0_u64).unwrap();
let materializer = Materializer::new();
let completion = signal
.changes()
.drop(1)
.run_with_materializer(Sink::head(), &materializer)
.unwrap();
drop(signal);
match completion.wait() {
Err(StreamError::ActorTerminated) => {}
other => panic!("expected actor termination, got {other:?}"),
}
}
}