use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
use std::sync::Arc;
use std::thread;
use arc_swap::ArcSwapOption;
pub struct LatestSlot<T: Send + Sync + 'static> {
slot: ArcSwapOption<T>,
wake: SyncSender<()>,
}
impl<T: Send + Sync + 'static> LatestSlot<T> {
pub fn new() -> (Arc<Self>, Receiver<()>) {
let (tx, rx) = sync_channel::<()>(1);
let slot = Arc::new(Self {
slot: ArcSwapOption::const_empty(),
wake: tx,
});
(slot, rx)
}
pub fn publish(&self, value: T) {
self.slot.store(Some(Arc::new(value)));
let _ = self.wake.try_send(());
}
fn take(&self) -> Option<Arc<T>> {
self.slot.swap(None)
}
}
pub fn spawn_drain<T, F>(
name: &str,
slot: Arc<LatestSlot<T>>,
wake: Receiver<()>,
mut sink: F,
) -> thread::JoinHandle<()>
where
T: Send + Sync + 'static,
F: FnMut(Arc<T>) + Send + 'static,
{
let name = name.to_string();
thread::Builder::new()
.name(name)
.spawn(move || {
while wake.recv().is_ok() {
while wake.try_recv().is_ok() {}
if let Some(v) = slot.take() {
sink(v);
}
}
})
.expect("spawn rerun drain thread")
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
#[test]
fn drain_observes_published_values() {
let (slot, wake) = LatestSlot::<i32>::new();
let count = Arc::new(AtomicUsize::new(0));
let count_clone = Arc::clone(&count);
let _h = spawn_drain("test-drain", Arc::clone(&slot), wake, move |_v| {
count_clone.fetch_add(1, Ordering::SeqCst);
});
slot.publish(1);
slot.publish(2);
slot.publish(3);
for _ in 0..50 {
if count.load(Ordering::SeqCst) > 0 {
break;
}
thread::sleep(Duration::from_millis(10));
}
assert!(count.load(Ordering::SeqCst) >= 1);
}
#[test]
fn drain_drops_intermediate_values_under_slow_sink() {
let (slot, wake) = LatestSlot::<i32>::new();
let observed = Arc::new(std::sync::Mutex::new(Vec::<i32>::new()));
let observed_clone = Arc::clone(&observed);
let _h = spawn_drain("test-drain", Arc::clone(&slot), wake, move |v| {
thread::sleep(Duration::from_millis(50));
observed_clone.lock().unwrap().push(*v);
});
for i in 1..=10_i32 {
slot.publish(i);
thread::sleep(Duration::from_millis(5));
}
thread::sleep(Duration::from_millis(200));
let seen = observed.lock().unwrap().clone();
assert!(!seen.is_empty(), "drain saw nothing");
assert!(seen.len() < 10, "drain saw all values; coalescing failed");
assert_eq!(
*seen.last().unwrap(),
10,
"last observed must be the most recent"
);
}
}