use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
use std::sync::Arc;
use std::thread;
use arc_swap::ArcSwapOption;
pub struct LatestSlot<T: Send + Sync + 'static> {
slot: ArcSwapOption<T>,
wake: SyncSender<()>,
}
impl<T: Send + Sync + 'static> LatestSlot<T> {
pub fn new() -> (Arc<Self>, Receiver<()>) {
let (tx, rx) = sync_channel::<()>(1);
let slot = Arc::new(Self {
slot: ArcSwapOption::const_empty(),
wake: tx,
});
(slot, rx)
}
pub fn publish(&self, value: T) {
self.slot.store(Some(Arc::new(value)));
let _ = self.wake.try_send(());
}
fn take(&self) -> Option<Arc<T>> {
self.slot.swap(None)
}
}
pub fn spawn_drain<T, F>(
name: &str,
slot: Arc<LatestSlot<T>>,
wake: Receiver<()>,
mut sink: F,
) -> thread::JoinHandle<()>
where
T: Send + Sync + 'static,
F: FnMut(Arc<T>) + Send + 'static,
{
let weak = Arc::downgrade(&slot);
drop(slot);
let name = name.to_string();
thread::Builder::new()
.name(name)
.spawn(move || {
while wake.recv().is_ok() {
while wake.try_recv().is_ok() {}
if let Some(slot) = weak.upgrade() {
if let Some(v) = slot.take() {
sink(v);
}
}
}
})
.expect("spawn dora drain thread")
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
#[test]
fn drain_observes_published_values() {
let (slot, wake) = LatestSlot::<i32>::new();
let count = Arc::new(AtomicUsize::new(0));
let count_clone = Arc::clone(&count);
let _h = spawn_drain("test-drain", Arc::clone(&slot), wake, move |_v| {
count_clone.fetch_add(1, Ordering::SeqCst);
});
slot.publish(1);
slot.publish(2);
for _ in 0..50 {
if count.load(Ordering::SeqCst) > 0 {
break;
}
thread::sleep(Duration::from_millis(10));
}
assert!(count.load(Ordering::SeqCst) >= 1);
}
#[test]
fn drain_thread_exits_when_slot_is_dropped() {
let (slot, wake) = LatestSlot::<i32>::new();
let handle = spawn_drain("test-shutdown", Arc::clone(&slot), wake, move |_| {});
slot.publish(1);
thread::sleep(Duration::from_millis(20));
drop(slot);
let deadline = std::time::Instant::now() + Duration::from_millis(500);
loop {
if handle.is_finished() {
break;
}
assert!(
std::time::Instant::now() < deadline,
"drain thread did not exit within 500 ms after slot drop"
);
thread::sleep(Duration::from_millis(10));
}
handle.join().expect("drain exits cleanly");
}
}