use async_executors::Timer;
use async_nursery::{Nurse, NurseExt};
use std::{
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering as AtomicOrdering},
Arc,
},
time::Duration,
};
use crate::{Message, Source};
pub fn interval(
period: Duration,
nursery: impl Nurse<()> + Timer + Send + Sync + Clone + 'static,
) -> Source<usize> {
(move |message| {
if let Message::Handshake(sink) = message {
let i = AtomicUsize::new(0);
let interval_cleared = Arc::new(AtomicBool::new(false));
if let Err(err) = nursery.nurse({
let nursery = nursery.clone();
let sink = Arc::clone(&sink);
let interval_cleared = Arc::clone(&interval_cleared);
async move {
loop {
nursery.sleep(period).await;
if interval_cleared.load(AtomicOrdering::Acquire) {
break;
}
let i = i.fetch_add(1, AtomicOrdering::AcqRel);
sink(Message::Data(i));
}
}
}) {
sink(Message::Error(Arc::new(err)));
return;
}
sink(Message::Handshake(Arc::new(
(move |message| {
if let Message::Error(_) | Message::Terminate = message {
interval_cleared.store(true, AtomicOrdering::Release);
}
})
.into(),
)));
}
})
.into()
}