1use std::{
2 any::Any,
3 future::Future,
4 pin::Pin,
5 task::{self, Poll},
6};
7
8use pin_project::pin_project;
9use sealed::sealed;
10use tokio::time::{Duration, Instant, Sleep};
11
12use crate::{
13 addr::Addr,
14 envelope::{Envelope, MessageKind},
15 message::Message,
16 scope,
17 source::{SourceArc, SourceStream, UnattachedSource},
18 tracing::TraceId,
19};
20
21pub struct Delay<M> {
55 source: SourceArc<DelaySource<M>>,
56}
57
58#[sealed]
59impl<M: Message> crate::source::SourceHandle for Delay<M> {
60 fn is_terminated(&self) -> bool {
61 self.source.is_terminated()
62 }
63
64 fn terminate_by_ref(&self) -> bool {
65 self.source.terminate_by_ref()
66 }
67}
68
69#[pin_project]
70struct DelaySource<M> {
71 message: Option<M>,
72 trace_id: Option<TraceId>,
73 #[pin]
74 sleep: Sleep,
75}
76
77impl<M: Message> Delay<M> {
78 pub fn new(delay: Duration, message: M) -> UnattachedSource<Self> {
82 Self::until(Instant::now() + delay, message)
83 }
84
85 #[instability::unstable]
94 pub fn until(when: Instant, message: M) -> UnattachedSource<Self> {
95 let source = DelaySource {
96 message: Some(message),
97 trace_id: Some(scope::trace_id()),
98 sleep: tokio::time::sleep_until(when),
99 };
100
101 let source = SourceArc::new(source, true);
102 UnattachedSource::new(source, |source| Self { source })
103 }
104}
105
106impl<M: Message> SourceStream for DelaySource<M> {
107 fn as_any_mut(self: Pin<&mut Self>) -> Pin<&mut dyn Any> {
108 unsafe { self.map_unchecked_mut(|s| s) }
110 }
111
112 fn poll_recv(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Envelope>> {
113 let mut this = self.project();
114
115 if this.message.is_none() {
117 return Poll::Ready(None);
118 }
119
120 if !this.sleep.as_mut().poll(cx).is_ready() {
122 return Poll::Pending;
123 }
124
125 let message = this.message.take().unwrap();
127 let kind = MessageKind::regular(Addr::NULL);
128 let trace_id = this.trace_id.take().unwrap_or_else(TraceId::generate);
129 let envelope = Envelope::with_trace_id(message, kind, trace_id);
130
131 Poll::Ready(Some(envelope))
132 }
133}