use std::sync::{
Arc,
atomic::{AtomicUsize, Ordering},
};
use super::{CellValue, Watchable};
use crate::{
cell::{Cell, CellImmutable, CellMutable},
signal::Signal,
};
pub trait TakeExt<T>: Watchable<T> {
#[track_caller]
fn take(&self, count: usize) -> Cell<T, CellImmutable>
where
T: CellValue,
Self: Clone + Send + Sync + 'static,
{
let cell = Cell::<T, CellMutable>::new(self.get());
let cell = if let Some(name) = self.name() {
cell.with_name(format!("{}::take", name))
} else {
cell
};
let remaining = Arc::new(AtomicUsize::new(count));
let weak = cell.downgrade();
let guard = self.subscribe(move |signal| {
if let Some(c) = weak.upgrade() {
match signal {
Signal::Value(_) => {
let prev =
remaining.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |n| {
if n > 0 { Some(n - 1) } else { None }
});
match prev {
Ok(1) => {
c.notify(signal.clone());
c.notify(Signal::Complete);
}
Ok(_) => {
c.notify(signal.clone());
}
Err(_) => {
}
}
}
Signal::Complete => c.notify(Signal::Complete),
Signal::Error(e) => c.notify(Signal::Error(e.clone())),
}
}
});
cell.own(guard);
cell.lock()
}
}
impl<T, W: Watchable<T>> TakeExt<T> for W {}
#[cfg(test)]
mod tests {
use std::sync::{
Arc,
atomic::{AtomicBool, AtomicU64, Ordering as AtomicOrdering},
};
use super::*;
use crate::Mutable;
#[test]
fn test_take_limits_emissions() {
let source = Cell::new(0u64);
let taken = source.take(3);
let count = Arc::new(AtomicU64::new(0));
let c = count.clone();
let _guard = taken.subscribe(move |signal| {
if let Signal::Value(_) = signal {
c.fetch_add(1, AtomicOrdering::SeqCst);
}
});
assert_eq!(count.load(AtomicOrdering::SeqCst), 1);
source.set(1); source.set(2); source.set(3); source.set(4);
assert_eq!(count.load(AtomicOrdering::SeqCst), 3);
}
#[test]
fn test_take_completes() {
let source = Cell::new(0u64);
let taken = source.take(2);
let completed = Arc::new(AtomicBool::new(false));
let c = completed.clone();
let _guard = taken.subscribe(move |signal| {
if let Signal::Complete = signal {
c.store(true, AtomicOrdering::SeqCst);
}
});
assert!(!taken.is_complete());
assert!(!completed.load(AtomicOrdering::SeqCst));
source.set(1);
assert!(taken.is_complete());
assert!(completed.load(AtomicOrdering::SeqCst));
}
}