use crate::{Actor, BareContext, Context, Event, Priority, Recipient, SendError};
use std::{
cmp::Ordering,
collections::BinaryHeap,
ops::Deref,
time::{Duration, Instant},
};
pub enum TimedMessage<M> {
Instant { message: M },
Delayed { message: M, fire_at: Instant },
Recurring { factory: Box<dyn FnMut() -> M + Send>, fire_at: Instant, interval: Duration },
}
impl<M> From<M> for TimedMessage<M> {
fn from(message: M) -> Self {
Self::Instant { message }
}
}
pub trait RecipientExt<M> {
fn send_now(&self, message: M) -> Result<(), SendError>;
fn send_timed(&self, message: M, fire_at: Instant) -> Result<(), SendError>;
fn send_delayed(&self, message: M, delay: Duration) -> Result<(), SendError> {
self.send_timed(message, Instant::now() + delay)
}
fn send_recurring(
&self,
factory: impl FnMut() -> M + Send + 'static,
fire_at: Instant,
interval: Duration,
) -> Result<(), SendError>;
}
impl<M> RecipientExt<M> for Recipient<TimedMessage<M>> {
fn send_now(&self, message: M) -> Result<(), SendError> {
self.send(TimedMessage::Instant { message })
}
fn send_timed(&self, message: M, fire_at: Instant) -> Result<(), SendError> {
self.send(TimedMessage::Delayed { message, fire_at })
}
fn send_recurring(
&self,
factory: impl FnMut() -> M + Send + 'static,
fire_at: Instant,
interval: Duration,
) -> Result<(), SendError> {
self.send(TimedMessage::Recurring { factory: Box::new(factory), fire_at, interval })
}
}
pub struct TimedContext<M>(BareContext<TimedMessage<M>>);
impl<M> TimedContext<M> {
fn from_context(context: &Context<TimedMessage<M>>) -> Self {
Self(context.deref().clone())
}
}
impl<M: 'static> TimedContext<M> {
pub fn subscribe<E: Event + Into<M>>(&self) {
self.system_handle.subscribe_recipient::<M, E>(self.myself.recipient());
}
pub fn subscribe_and_receive_latest<E: Event + Into<M>>(&self) -> Result<(), SendError> {
self.system_handle.subscribe_and_receive_latest::<M, E>(self.myself.recipient())
}
}
impl<M> Deref for TimedContext<M> {
type Target = BareContext<TimedMessage<M>>;
fn deref(&self) -> &BareContext<TimedMessage<M>> {
&self.0
}
}
pub struct Timed<A: Actor> {
inner: A,
queue: BinaryHeap<QueueItem<A::Message>>,
}
impl<M: Send + 'static, A: Actor<Context = TimedContext<M>, Message = M>> Timed<A> {
pub fn new(inner: A) -> Self {
Self { inner, queue: Default::default() }
}
fn process_queue(&mut self, context: &mut <Self as Actor>::Context) -> Result<(), A::Error> {
let now = Instant::now();
while self.queue.peek().map(|m| m.fire_at <= now).unwrap_or(false) {
let item = self.queue.pop().expect("heap is non-empty, we have just peeked");
let message = match item.payload {
Payload::Delayed { message } => message,
Payload::Recurring { mut factory, interval } => {
let message = factory();
self.queue.push(QueueItem {
fire_at: item.fire_at + interval,
payload: Payload::Recurring { factory, interval },
});
message
},
};
self.inner.handle(&mut TimedContext::from_context(context), message)?;
}
Ok(())
}
fn schedule_timeout(&self, context: &mut <Self as Actor>::Context) {
context.set_deadline(self.queue.peek().map(|earliest| earliest.fire_at));
}
}
impl<M: Send + 'static, A: Actor<Context = TimedContext<M>, Message = M>> Actor for Timed<A> {
type Context = Context<Self::Message>;
type Error = A::Error;
type Message = TimedMessage<M>;
const DEFAULT_CAPACITY_HIGH: usize = A::DEFAULT_CAPACITY_HIGH;
const DEFAULT_CAPACITY_NORMAL: usize = A::DEFAULT_CAPACITY_NORMAL;
fn handle(
&mut self,
context: &mut Self::Context,
timed_message: Self::Message,
) -> Result<(), Self::Error> {
match timed_message {
TimedMessage::Instant { message } => {
self.inner.handle(&mut TimedContext::from_context(context), message)?;
},
TimedMessage::Delayed { message, fire_at } => {
self.queue.push(QueueItem { fire_at, payload: Payload::Delayed { message } });
},
TimedMessage::Recurring { factory, fire_at, interval } => {
self.queue
.push(QueueItem { fire_at, payload: Payload::Recurring { factory, interval } });
},
};
self.process_queue(context)?;
self.schedule_timeout(context);
Ok(())
}
fn name() -> &'static str {
A::name()
}
fn priority(message: &Self::Message) -> Priority {
match message {
TimedMessage::Instant { message } | TimedMessage::Delayed { message, .. } => {
A::priority(message)
},
TimedMessage::Recurring { .. } => Priority::High,
}
}
fn started(&mut self, context: &mut Self::Context) -> Result<(), Self::Error> {
self.inner.started(&mut TimedContext::from_context(context))
}
fn stopped(&mut self, context: &mut Self::Context) -> Result<(), Self::Error> {
self.inner.stopped(&mut TimedContext::from_context(context))
}
fn deadline_passed(
&mut self,
context: &mut Self::Context,
_deadline: Instant,
) -> Result<(), Self::Error> {
self.process_queue(context)?;
self.schedule_timeout(context);
Ok(())
}
}
impl<A: Actor> Deref for Timed<A> {
type Target = A;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
struct QueueItem<M> {
fire_at: Instant,
payload: Payload<M>,
}
impl<M> PartialEq for QueueItem<M> {
fn eq(&self, other: &Self) -> bool {
self.fire_at == other.fire_at
}
}
impl<M> Eq for QueueItem<M> {}
impl<M> PartialOrd for QueueItem<M> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl<M> Ord for QueueItem<M> {
fn cmp(&self, other: &Self) -> Ordering {
self.fire_at.cmp(&other.fire_at).reverse()
}
}
enum Payload<M> {
Delayed { message: M },
Recurring { factory: Box<dyn FnMut() -> M + Send>, interval: Duration },
}
#[cfg(test)]
mod tests {
use super::*;
use crate::System;
use std::{
sync::{Arc, Mutex},
thread,
};
struct TimedTestActor {
received: Arc<Mutex<Vec<usize>>>,
}
impl Actor for TimedTestActor {
type Context = TimedContext<Self::Message>;
type Error = String;
type Message = usize;
fn handle(&mut self, context: &mut Self::Context, message: usize) -> Result<(), String> {
{
let mut guard = self.received.lock().unwrap();
guard.push(message);
}
if message == 1 || message == 3 {
thread::sleep(Duration::from_millis(100));
context.myself.send_now(3).unwrap();
}
Ok(())
}
fn started(&mut self, context: &mut Self::Context) -> Result<(), String> {
context
.myself
.send_recurring(
|| 2,
Instant::now() + Duration::from_millis(50),
Duration::from_millis(100),
)
.map_err(|e| e.to_string())
}
}
#[test]
fn recurring_messages_for_busy_actors() {
let received = Arc::new(Mutex::new(Vec::new()));
let mut system = System::new("timed test");
let address =
system.spawn(Timed::new(TimedTestActor { received: Arc::clone(&received) })).unwrap();
address.send_now(1).unwrap();
thread::sleep(Duration::from_millis(225));
assert_eq!(*received.lock().unwrap(), vec![1, 2, 3, 2, 3]);
system.shutdown().unwrap();
}
}