use std::{thread, time::Duration};
use super::{CellValue, Watchable};
use crate::cell::{Cell, CellImmutable, CellMutable};
pub trait DelayExt<T>: Watchable<T> {
#[track_caller]
fn delay(&self, duration: Duration) -> Cell<T, CellImmutable>
where
T: CellValue,
Self: Clone + Send + Sync + 'static,
{
let cell = Cell::<T, CellMutable>::new(self.get());
let cell = if let Some(name) = self.name() {
cell.with_name(format!("{}::delay", name))
} else {
cell
};
let weak = cell.downgrade();
let guard = self.subscribe(move |signal| {
let signal = signal.clone();
let weak = weak.clone();
thread::spawn(move || {
thread::sleep(duration);
if let Some(c) = weak.upgrade() {
c.notify(signal);
}
});
});
cell.own(guard);
cell.lock()
}
}
impl<T, W: Watchable<T>> DelayExt<T> for W {}
#[cfg(test)]
mod tests {
use std::sync::{
Arc,
atomic::{AtomicU64, Ordering},
};
use super::*;
use crate::{Mutable, Signal};
#[test]
fn test_delay_delays_emission() {
let source = Cell::new(0u64);
let delayed = source.delay(Duration::from_millis(50));
let received = Arc::new(AtomicU64::new(0));
let r = received.clone();
let _guard = delayed.subscribe(move |signal| {
if let Signal::Value(v) = signal {
r.store(**v, Ordering::SeqCst);
}
});
thread::sleep(Duration::from_millis(100));
assert_eq!(received.load(Ordering::SeqCst), 0);
source.set(42);
thread::sleep(Duration::from_millis(20));
assert_eq!(received.load(Ordering::SeqCst), 0);
thread::sleep(Duration::from_millis(100));
assert_eq!(received.load(Ordering::SeqCst), 42);
}
}