use std::{
any::Any,
future::Future,
pin::Pin,
task::{self, Poll},
};
use pin_project::pin_project;
use sealed::sealed;
use tokio::time::{Duration, Instant, Sleep};
use crate::{
addr::Addr,
envelope::{Envelope, MessageKind},
message::Message,
scope,
source::{SourceArc, SourceStream, UnattachedSource},
tracing::TraceId,
};
pub struct Delay<M> {
source: SourceArc<DelaySource<M>>,
}
#[sealed]
impl<M: Message> crate::source::SourceHandle for Delay<M> {
fn is_terminated(&self) -> bool {
self.source.is_terminated()
}
fn terminate_by_ref(&self) -> bool {
self.source.terminate_by_ref()
}
}
#[pin_project]
struct DelaySource<M> {
message: Option<M>,
trace_id: Option<TraceId>,
#[pin]
sleep: Sleep,
}
impl<M: Message> Delay<M> {
pub fn new(delay: Duration, message: M) -> UnattachedSource<Self> {
Self::until(Instant::now() + delay, message)
}
#[instability::unstable]
pub fn until(when: Instant, message: M) -> UnattachedSource<Self> {
let source = DelaySource {
message: Some(message),
trace_id: Some(scope::trace_id()),
sleep: tokio::time::sleep_until(when),
};
let source = SourceArc::new(source, true);
UnattachedSource::new(source, |source| Self { source })
}
}
impl<M: Message> SourceStream for DelaySource<M> {
fn as_any_mut(self: Pin<&mut Self>) -> Pin<&mut dyn Any> {
unsafe { self.map_unchecked_mut(|s| s) }
}
fn poll_recv(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Envelope>> {
let mut this = self.project();
if this.message.is_none() {
return Poll::Ready(None);
}
if !this.sleep.as_mut().poll(cx).is_ready() {
return Poll::Pending;
}
let message = this.message.take().unwrap();
let kind = MessageKind::regular(Addr::NULL);
let trace_id = this.trace_id.take().unwrap_or_else(TraceId::generate);
let envelope = Envelope::with_trace_id(message, kind, trace_id);
Poll::Ready(Some(envelope))
}
}