use std::pin::Pin;
use std::time::Duration;
use tokio_stream::Stream;
use tokio_util::sync::CancellationToken;
use super::Subscription;
pub struct IntervalImmediateSubscription<M, F>
where
F: Fn() -> M + Send + 'static,
{
pub(crate) interval: Duration,
message_fn: F,
}
impl<M, F> IntervalImmediateSubscription<M, F>
where
F: Fn() -> M + Send + 'static,
{
pub fn new(interval: Duration, message_fn: F) -> Self {
Self {
interval,
message_fn,
}
}
}
impl<M: Send + 'static, F: Fn() -> M + Send + 'static> Subscription<M>
for IntervalImmediateSubscription<M, F>
{
fn into_stream(
self: Box<Self>,
cancel: CancellationToken,
) -> Pin<Box<dyn Stream<Item = M> + Send>> {
let interval_duration = self.interval;
let message_fn = self.message_fn;
Box::pin(async_stream::stream! {
yield (message_fn)();
let mut interval = tokio::time::interval(interval_duration);
interval.tick().await;
loop {
tokio::select! {
_ = interval.tick() => {
yield (message_fn)();
}
_ = cancel.cancelled() => {
break;
}
}
}
})
}
}
pub struct IntervalImmediateBuilder {
interval: Duration,
}
impl IntervalImmediateBuilder {
pub fn every(interval: Duration) -> Self {
Self { interval }
}
pub fn with_message<M, F>(self, message_fn: F) -> IntervalImmediateSubscription<M, F>
where
F: Fn() -> M + Send + 'static,
{
IntervalImmediateSubscription::new(self.interval, message_fn)
}
}
pub fn interval_immediate(interval: Duration) -> IntervalImmediateBuilder {
IntervalImmediateBuilder::every(interval)
}