Skip to main content

elfo_core/time/
delay.rs

1use std::{
2    any::Any,
3    future::Future,
4    pin::Pin,
5    task::{self, Poll},
6};
7
8use pin_project::pin_project;
9use sealed::sealed;
10use tokio::time::{Duration, Instant, Sleep};
11
12use crate::{
13    addr::Addr,
14    envelope::{Envelope, MessageKind},
15    message::Message,
16    scope,
17    source::{SourceArc, SourceStream, UnattachedSource},
18    tracing::TraceId,
19};
20
21/// A source that emits a message after some specified time.
22///
23/// # Tracing
24///
25/// The emitted message continues the current trace.
26///
27/// # Example
28///
29/// ```
30/// # use std::time::Duration;
31/// # use elfo_core as elfo;
32/// # struct Config { delay: Duration }
33/// # async fn exec(mut ctx: elfo::Context<Config>) {
34/// # use elfo::{message, msg};
35/// # #[message]
36/// # struct SomeEvent;
37/// use elfo::time::Delay;
38///
39/// #[message]
40/// struct MyTick;
41///
42/// while let Some(envelope) = ctx.recv().await {
43///     msg!(match envelope {
44///         SomeEvent => {
45///             ctx.attach(Delay::new(ctx.config().delay, MyTick));
46///         },
47///         MyTick => {
48///             tracing::info!("tick!");
49///         },
50///     });
51/// }
52/// # }
53/// ```
54pub struct Delay<M> {
55    source: SourceArc<DelaySource<M>>,
56}
57
58#[sealed]
59impl<M: Message> crate::source::SourceHandle for Delay<M> {
60    fn is_terminated(&self) -> bool {
61        self.source.is_terminated()
62    }
63
64    fn terminate_by_ref(&self) -> bool {
65        self.source.terminate_by_ref()
66    }
67}
68
69#[pin_project]
70struct DelaySource<M> {
71    message: Option<M>,
72    trace_id: Option<TraceId>,
73    #[pin]
74    sleep: Sleep,
75}
76
77impl<M: Message> Delay<M> {
78    /// Schedules the timer to emit the provided message after `delay`.
79    ///
80    /// Creates an unattached instance of [`Delay`].
81    pub fn new(delay: Duration, message: M) -> UnattachedSource<Self> {
82        Self::until(Instant::now() + delay, message)
83    }
84
85    /// Schedules the timer to emit the provided message at `when`.
86    ///
87    /// Creates an unattached instance of [`Delay`].
88    ///
89    /// # Stability
90    ///
91    /// This method is unstable, because it accepts [`tokio::time::Instant`],
92    /// which will be replaced in the future to support other runtimes.
93    #[instability::unstable]
94    pub fn until(when: Instant, message: M) -> UnattachedSource<Self> {
95        let source = DelaySource {
96            message: Some(message),
97            trace_id: Some(scope::trace_id()),
98            sleep: tokio::time::sleep_until(when),
99        };
100
101        let source = SourceArc::new(source, true);
102        UnattachedSource::new(source, |source| Self { source })
103    }
104}
105
106impl<M: Message> SourceStream for DelaySource<M> {
107    fn as_any_mut(self: Pin<&mut Self>) -> Pin<&mut dyn Any> {
108        // SAFETY: we only cast here, it cannot move data.
109        unsafe { self.map_unchecked_mut(|s| s) }
110    }
111
112    fn poll_recv(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Envelope>> {
113        let mut this = self.project();
114
115        // Terminate if the message is already emitted.
116        if this.message.is_none() {
117            return Poll::Ready(None);
118        }
119
120        // Wait for a tick from implementation.
121        if !this.sleep.as_mut().poll(cx).is_ready() {
122            return Poll::Pending;
123        }
124
125        // Emit the message.
126        let message = this.message.take().unwrap();
127        let kind = MessageKind::regular(Addr::NULL);
128        let trace_id = this.trace_id.take().unwrap_or_else(TraceId::generate);
129        let envelope = Envelope::with_trace_id(message, kind, trace_id);
130
131        Poll::Ready(Some(envelope))
132    }
133}