use std::{
sync::{
Arc,
atomic::{AtomicBool, Ordering},
},
thread,
time::Duration,
};
use crossbeam::queue::SegQueue;
use super::{CellValue, Watchable};
use crate::{
cell::{Cell, CellImmutable, CellMutable},
signal::Signal,
};
pub trait BufferTimeExt<T>: Watchable<T> {
#[track_caller]
fn buffer_time(&self, duration: Duration) -> Cell<Vec<T>, CellImmutable>
where
T: CellValue,
Self: Clone + Send + Sync + 'static,
{
let derived = Cell::<Vec<T>, CellMutable>::new(Vec::new());
let derived = if let Some(name) = self.name() {
derived.with_name(format!("{}::buffer_time", name))
} else {
derived
};
let weak = derived.downgrade();
let buffer: Arc<SegQueue<T>> = Arc::new(SegQueue::new());
let first = Arc::new(AtomicBool::new(true));
let completed = Arc::new(AtomicBool::new(false));
let buffer2 = buffer.clone();
let weak2 = derived.downgrade();
let comp = completed.clone();
thread::spawn(move || {
while !comp.load(Ordering::SeqCst) {
thread::sleep(duration);
if comp.load(Ordering::SeqCst) {
break;
}
if let Some(d) = weak2.upgrade() {
let mut chunk = Vec::new();
while let Some(v) = buffer2.pop() {
chunk.push(v);
}
d.notify(Signal::value(chunk));
} else {
break;
}
}
});
let guard = self.subscribe(move |signal| {
if let Some(d) = weak.upgrade() {
match signal {
Signal::Value(value) => {
if first.swap(false, Ordering::SeqCst) {
return;
}
buffer.push((**value).clone());
}
Signal::Complete => {
completed.store(true, Ordering::SeqCst);
let mut remainder = Vec::new();
while let Some(v) = buffer.pop() {
remainder.push(v);
}
if !remainder.is_empty() {
d.notify(Signal::value(remainder));
}
d.notify(Signal::Complete);
}
Signal::Error(e) => {
completed.store(true, Ordering::SeqCst);
d.notify(Signal::Error(e.clone()));
}
}
}
});
derived.own(guard);
derived.lock()
}
}
impl<T, W: Watchable<T>> BufferTimeExt<T> for W {}
#[cfg(test)]
mod tests {
use std::time::Instant;
use super::*;
use crate::Mutable;
#[test]
fn test_buffer_time() {
let source = Cell::new(0);
let buffered = source.buffer_time(Duration::from_millis(50));
let (tx, rx) = std::sync::mpsc::channel::<Vec<i32>>();
let _guard = buffered.subscribe(move |signal| {
if let Signal::Value(v) = signal {
let _ = tx.send((**v).clone());
}
});
let initial = rx
.recv_timeout(Duration::from_millis(200))
.expect("expected initial emission");
assert!(initial.is_empty());
source.set(1);
source.set(2);
source.set(3);
let deadline = Instant::now() + Duration::from_millis(1000);
loop {
{
if let Ok(emitted) = rx.recv_timeout(Duration::from_millis(20))
&& emitted == vec![1, 2, 3]
{
break;
};
}
if Instant::now() >= deadline {
panic!("Timed out waiting for buffered emission [1, 2, 3]");
}
}
}
#[test]
fn test_buffer_time_emits_remainder_on_complete() {
let source = Cell::new(0);
let buffered = source.buffer_time(Duration::from_millis(100));
let (tx, rx) = std::sync::mpsc::channel::<Vec<i32>>();
let _guard = buffered.subscribe(move |signal| {
if let Signal::Value(v) = signal {
let _ = tx.send((**v).clone());
}
});
let initial = rx
.recv_timeout(Duration::from_millis(200))
.expect("expected initial emission");
assert!(initial.is_empty());
source.set(1);
source.set(2);
source.complete();
let deadline = Instant::now() + Duration::from_millis(1000);
loop {
{
if let Ok(emitted) = rx.recv_timeout(Duration::from_millis(20))
&& emitted == vec![1, 2]
{
break;
};
}
if Instant::now() >= deadline {
panic!("Timed out waiting for completion remainder [1, 2]");
}
}
}
}