1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
use std::{
any::Any,
future::Future,
pin::Pin,
task::{self, Poll},
};
use pin_project::pin_project;
use sealed::sealed;
use tokio::time::{Duration, Instant, Sleep};
use crate::{
address_book::Addr,
envelope::{Envelope, MessageKind},
message::Message,
scope,
source::{SourceArc, SourceStream, UnattachedSource},
tracing::TraceId,
};
/// A source that emits a message after some specified time.
///
/// # Tracing
///
/// The emitted message continues the current trace.
///
/// # Example
///
/// ```
/// # use std::time::Duration;
/// # use elfo_core as elfo;
/// # struct Config { delay: Duration }
/// # async fn exec(mut ctx: elfo::Context<Config>) {
/// # use elfo::{message, msg};
/// # #[message]
/// # struct SomeEvent;
/// use elfo::time::Delay;
///
/// #[message]
/// struct MyTick;
///
/// while let Some(envelope) = ctx.recv().await {
/// msg!(match envelope {
/// SomeEvent => {
/// ctx.attach(Delay::new(ctx.config().delay, MyTick));
/// },
/// MyTick => {
/// tracing::info!("tick!");
/// },
/// });
/// }
/// # }
/// ```
pub struct Delay<M> {
source: SourceArc<DelaySource<M>>,
}
#[sealed]
impl<M: Message> crate::source::SourceHandle for Delay<M> {
fn is_terminated(&self) -> bool {
self.source.lock().is_none()
}
fn terminate(self) {
ward!(self.source.lock()).terminate();
}
}
#[pin_project]
struct DelaySource<M> {
message: Option<M>,
trace_id: Option<TraceId>,
#[pin]
sleep: Sleep,
}
impl<M: Message> Delay<M> {
/// Schedules the timer to emit the provided message after `delay`.
///
/// Creates an unattached instance of [`Delay`].
pub fn new(delay: Duration, message: M) -> UnattachedSource<Self> {
Self::until(Instant::now() + delay, message)
}
/// Schedules the timer to emit the provided message at `when`.
///
/// Creates an unattached instance of [`Delay`].
///
/// # Stability
///
/// This method is unstable, because it accepts [`tokio::time::Instant`],
/// which will be replaced in the future to support other runtimes.
#[stability::unstable]
pub fn until(when: Instant, message: M) -> UnattachedSource<Self> {
let source = DelaySource {
message: Some(message),
trace_id: Some(scope::trace_id()),
sleep: tokio::time::sleep_until(when),
};
let source = SourceArc::new(source, true);
UnattachedSource::new(source, |source| Self { source })
}
}
impl<M: Message> SourceStream for DelaySource<M> {
fn as_any_mut(self: Pin<&mut Self>) -> Pin<&mut dyn Any> {
// SAFETY: we only cast here, it cannot move data.
unsafe { self.map_unchecked_mut(|s| s) }
}
fn poll_recv(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Envelope>> {
let mut this = self.project();
// Terminate if the message is already emitted.
if this.message.is_none() {
return Poll::Ready(None);
}
// Wait for a tick from implementation.
if !this.sleep.as_mut().poll(cx).is_ready() {
return Poll::Pending;
}
// Emit the message.
let message = this.message.take().unwrap();
let kind = MessageKind::Regular { sender: Addr::NULL };
let trace_id = this.trace_id.take().unwrap_or_else(TraceId::generate);
let envelope = Envelope::with_trace_id(message, kind, trace_id).upcast();
Poll::Ready(Some(envelope))
}
}