1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
//! Tools to make a given actor able to receive delayed and recurring messages.
//!
//! To apply this to a given (receiving) actor:
//! * Use [`TimedContext<Self::Message>`] as [`Actor::Context`] associated type.
//!   * Such actors cannot be spawned unless wrapped, making it impossible to forget wrapping it.
//! * Wrap the actor in [`Timed`] before spawning.
//!
//! The wrapped actor will accept [`TimedMessage<M>`] with convenience conversion from `M`.
//! [`RecipientExt`] becomes available for [`Recipient<TimedMessage<M>>`]s which provides methods like
//! `send_delayed()`, `send_recurring()`.
//!
//! Once accepted by the actor, delayed and recurring messages do not occupy place in actor's
//! channel inbox, they are placed to internal queue instead. Due to the design, delayed and
//! recurring messages have always lower priority than instant messages when the actor is
//! saturated.
//!
//! See `delay_actor.rs` example for usage.

use crate::{Actor, Context, Priority, Recipient, SendError, SystemHandle};
use std::{
    cmp::Ordering,
    collections::BinaryHeap,
    ops::Deref,
    time::{Duration, Instant},
};

/// A message that can be delivered now, at certain time and optionally repeatedly.
pub enum TimedMessage<M> {
    Instant { message: M },
    Delayed { message: M, fire_at: Instant },
    Recurring { factory: Box<dyn FnMut() -> M + Send>, fire_at: Instant, interval: Duration },
}

/// This implementation allows sending direct unwrapped messages to wrapped actors.
impl<M> From<M> for TimedMessage<M> {
    fn from(message: M) -> Self {
        Self::Instant { message }
    }
}

/// Convenience methods for [`Recipient`]s that accept [`TimedMessage`]s.
pub trait RecipientExt<M> {
    /// Send a `message` now. Convenience to wrap message in [`TimedMessage::Instant`].
    fn send_now(&self, message: M) -> Result<(), SendError>;

    /// Send a `message` to be delivered later at a certain instant.
    fn send_timed(&self, message: M, fire_at: Instant) -> Result<(), SendError>;

    /// Send a `message` to be delivered later after some time from now.
    fn send_delayed(&self, message: M, delay: Duration) -> Result<(), SendError> {
        self.send_timed(message, Instant::now() + delay)
    }

    /// Schedule sending of message at `fire_at` plus at regular `interval`s from that point on.
    fn send_recurring(
        &self,
        factory: impl FnMut() -> M + Send + 'static,
        fire_at: Instant,
        interval: Duration,
    ) -> Result<(), SendError>;
}

impl<M> RecipientExt<M> for Recipient<TimedMessage<M>> {
    fn send_now(&self, message: M) -> Result<(), SendError> {
        self.send(TimedMessage::Instant { message })
    }

    fn send_timed(&self, message: M, fire_at: Instant) -> Result<(), SendError> {
        self.send(TimedMessage::Delayed { message, fire_at })
    }

    fn send_recurring(
        &self,
        factory: impl FnMut() -> M + Send + 'static,
        fire_at: Instant,
        interval: Duration,
    ) -> Result<(), SendError> {
        self.send(TimedMessage::Recurring { factory: Box::new(factory), fire_at, interval })
    }
}

/// A [`Context`] variant available to actors wrapped by the [`Timed`] actor wrapper.
pub struct TimedContext<M> {
    pub system_handle: SystemHandle,
    pub myself: Recipient<TimedMessage<M>>,
}

impl<M> TimedContext<M> {
    fn from_context(context: &Context<TimedMessage<M>>) -> Self {
        TimedContext {
            system_handle: context.system_handle.clone(),
            myself: context.myself.clone(),
        }
    }
}

/// A wrapper around actors to add ability to receive delayed and recurring messages.
/// See [module documentation](self) for a complete recipe.
pub struct Timed<A: Actor> {
    inner: A,
    queue: BinaryHeap<QueueItem<A::Message>>,
}

impl<M: Send + 'static, A: Actor<Context = TimedContext<M>, Message = M>> Timed<A> {
    pub fn new(inner: A) -> Self {
        Self { inner, queue: Default::default() }
    }

    fn schedule_timeout(&self, context: &mut <Self as Actor>::Context) {
        // Schedule next timeout if the queue is not empty.
        context.set_deadline(self.queue.peek().map(|earliest| earliest.fire_at));
    }
}

impl<M: Send + 'static, A: Actor<Context = TimedContext<M>, Message = M>> Actor for Timed<A> {
    type Context = Context<Self::Message>;
    type Error = A::Error;
    type Message = TimedMessage<M>;

    fn handle(
        &mut self,
        context: &mut Self::Context,
        timed_message: Self::Message,
    ) -> Result<(), Self::Error> {
        let item = match timed_message {
            TimedMessage::Instant { message } => {
                return self.inner.handle(&mut TimedContext::from_context(context), message);
            },
            TimedMessage::Delayed { message, fire_at } => {
                QueueItem { fire_at, payload: Payload::Delayed { message } }
            },
            TimedMessage::Recurring { factory, fire_at, interval } => {
                QueueItem { fire_at, payload: Payload::Recurring { factory, interval } }
            },
        };

        self.queue.push(item);
        self.schedule_timeout(context);
        Ok(())
    }

    fn name() -> &'static str {
        A::name()
    }

    fn priority(message: &Self::Message) -> Priority {
        match message {
            // Use underlying message priority if we can reference it.
            TimedMessage::Instant { message } | TimedMessage::Delayed { message, .. } => {
                A::priority(message)
            },
            // Recurring message is only received once, the recurring instances go through the
            // internal queue (and not actor's channel). Assign high priority to the request to
            // set-up the recurrent sending.
            TimedMessage::Recurring { .. } => Priority::High,
        }
    }

    fn started(&mut self, context: &mut Self::Context) {
        self.inner.started(&mut TimedContext::from_context(context))
    }

    fn stopped(&mut self, context: &mut Self::Context) {
        self.inner.stopped(&mut TimedContext::from_context(context))
    }

    fn deadline_passed(
        &mut self,
        context: &mut Self::Context,
        _deadline: Instant,
    ) -> Result<(), Self::Error> {
        // Handle all messages that should have been handled by now.
        let now = Instant::now();
        while self.queue.peek().map(|m| m.fire_at <= now).unwrap_or(false) {
            let item = self.queue.pop().expect("heap is non-empty, we have just peeked");

            let message = match item.payload {
                Payload::Delayed { message } => message,
                Payload::Recurring { mut factory, interval } => {
                    let message = factory();
                    self.queue.push(QueueItem {
                        fire_at: item.fire_at + interval,
                        payload: Payload::Recurring { factory, interval },
                    });
                    message
                },
            };

            // Let inner actor do its job.
            self.inner.handle(&mut TimedContext::from_context(context), message)?;
        }

        self.schedule_timeout(context);
        Ok(())
    }
}

/// Access wrapped actor.
impl<A: Actor> Deref for Timed<A> {
    type Target = A;

    fn deref(&self) -> &Self::Target {
        &self.inner
    }
}

/// Implementation detail, element of message queue ordered by time to fire at.
struct QueueItem<M> {
    fire_at: Instant,
    payload: Payload<M>,
}

impl<M> PartialEq for QueueItem<M> {
    fn eq(&self, other: &Self) -> bool {
        self.fire_at == other.fire_at
    }
}

// We cannot derive because that would add too strict bounds.
impl<M> Eq for QueueItem<M> {}

impl<M> PartialOrd for QueueItem<M> {
    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
        // Reverse because [BinaryHeap] is a *max* heap, but we want pop() to return lowest `fire_at`.
        Some(self.fire_at.cmp(&other.fire_at).reverse())
    }
}

impl<M> Ord for QueueItem<M> {
    fn cmp(&self, other: &Self) -> Ordering {
        self.partial_cmp(other).expect("we can always compare")
    }
}

enum Payload<M> {
    Delayed { message: M },
    Recurring { factory: Box<dyn FnMut() -> M + Send>, interval: Duration },
}