1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
use std::{
    any::Any,
    future::Future,
    pin::Pin,
    task::{self, Poll},
};

use pin_project::pin_project;
use sealed::sealed;
use tokio::time::{Duration, Instant, Sleep};

use crate::{
    address_book::Addr,
    envelope::{Envelope, MessageKind},
    message::Message,
    scope,
    source::{SourceArc, SourceStream, UnattachedSource},
    tracing::TraceId,
};

/// A source that emits a message after some specified time.
///
/// # Tracing
///
/// The emitted message continues the current trace.
///
/// # Example
///
/// ```
/// # use std::time::Duration;
/// # use elfo_core as elfo;
/// # struct Config { delay: Duration }
/// # async fn exec(mut ctx: elfo::Context<Config>) {
/// # use elfo::{message, msg};
/// # #[message]
/// # struct SomeEvent;
/// use elfo::time::Delay;
///
/// #[message]
/// struct MyTick;
///
/// while let Some(envelope) = ctx.recv().await {
///     msg!(match envelope {
///         SomeEvent => {
///             ctx.attach(Delay::new(ctx.config().delay, MyTick));
///         },
///         MyTick => {
///             tracing::info!("tick!");
///         },
///     });
/// }
/// # }
/// ```
pub struct Delay<M> {
    source: SourceArc<DelaySource<M>>,
}

#[sealed]
impl<M: Message> crate::source::SourceHandle for Delay<M> {
    fn is_terminated(&self) -> bool {
        self.source.lock().is_none()
    }

    fn terminate(self) {
        ward!(self.source.lock()).terminate();
    }
}

#[pin_project]
struct DelaySource<M> {
    message: Option<M>,
    trace_id: Option<TraceId>,
    #[pin]
    sleep: Sleep,
}

impl<M: Message> Delay<M> {
    /// Schedules the timer to emit the provided message after `delay`.
    ///
    /// Creates an unattached instance of [`Delay`].
    pub fn new(delay: Duration, message: M) -> UnattachedSource<Self> {
        Self::until(Instant::now() + delay, message)
    }

    /// Schedules the timer to emit the provided message at `when`.
    ///
    /// Creates an unattached instance of [`Delay`].
    ///
    /// # Stability
    ///
    /// This method is unstable, because it accepts [`tokio::time::Instant`],
    /// which will be replaced in the future to support other runtimes.
    #[stability::unstable]
    pub fn until(when: Instant, message: M) -> UnattachedSource<Self> {
        let source = DelaySource {
            message: Some(message),
            trace_id: Some(scope::trace_id()),
            sleep: tokio::time::sleep_until(when),
        };

        let source = SourceArc::new(source, true);
        UnattachedSource::new(source, |source| Self { source })
    }
}

impl<M: Message> SourceStream for DelaySource<M> {
    fn as_any_mut(self: Pin<&mut Self>) -> Pin<&mut dyn Any> {
        // SAFETY: we only cast here, it cannot move data.
        unsafe { self.map_unchecked_mut(|s| s) }
    }

    fn poll_recv(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Envelope>> {
        let mut this = self.project();

        // Terminate if the message is already emitted.
        if this.message.is_none() {
            return Poll::Ready(None);
        }

        // Wait for a tick from implementation.
        if !this.sleep.as_mut().poll(cx).is_ready() {
            return Poll::Pending;
        }

        // Emit the message.
        let message = this.message.take().unwrap();
        let kind = MessageKind::Regular { sender: Addr::NULL };
        let trace_id = this.trace_id.take().unwrap_or_else(TraceId::generate);
        let envelope = Envelope::with_trace_id(message, kind, trace_id).upcast();

        Poll::Ready(Some(envelope))
    }
}