1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
// Copyright (c) Sean Lawlor
//
// This source code is licensed under both the MIT license found in the
// LICENSE-MIT file in the root directory of this source tree.

//! Timers for sending messages to actors periodically
//!
//! The methodology of timers in `ractor` are based on [Erlang's `timer` module](https://www.erlang.org/doc/man/timer.html).
//! We aren't supporting all timer functions, as many of them don't make sense but we
//! support the relevant ones for `ractor`. In short
//!
//! 1. Send on a period
//! 2. Send after a delay
//! 3. Stop after a delay
//! 4. Kill after a delay
//!
//! ## Examples
//!
//! ```rust
//! use ractor::concurrency::Duration;
//! use ractor::{Actor, ActorProcessingErr, ActorRef};
//!
//! struct ExampleActor;
//!
//! enum ExampleMessage {
//!     AfterDelay,
//!     OnPeriod,
//! }
//!
//! #[cfg(feature = "cluster")]
//! impl ractor::Message for ExampleMessage {}
//!
//! #[cfg_attr(feature = "async-trait", ractor::async_trait)]
//! impl Actor for ExampleActor {
//!     type Msg = ExampleMessage;
//!     type State = ();
//!     type Arguments = ();
//!
//!     async fn pre_start(
//!         &self,
//!         _myself: ActorRef<Self::Msg>,
//!         _args: Self::Arguments,
//!     ) -> Result<Self::State, ActorProcessingErr> {
//!         println!("Starting");
//!         Ok(())
//!     }
//!
//!     async fn handle(
//!         &self,
//!         _myself: ActorRef<Self::Msg>,
//!         message: Self::Msg,
//!         _state: &mut Self::State,
//!     ) -> Result<(), ActorProcessingErr> {
//!         match message {
//!             ExampleMessage::AfterDelay => println!("After delay"),
//!             ExampleMessage::OnPeriod => println!("On period"),
//!         }
//!         Ok(())
//!     }
//! }
//!
//! #[tokio::main]
//! async fn main() {
//!     let (actor, handle) = Actor::spawn(None, ExampleActor, ())
//!         .await
//!         .expect("Failed to startup dummy actor");
//!
//!     // send the message after a 100ms delay
//!     actor.send_after(Duration::from_millis(100), || ExampleMessage::AfterDelay);
//!
//!     // send this message every 10ms
//!     actor.send_interval(Duration::from_millis(10), || ExampleMessage::OnPeriod);
//!
//!     // Exit the actor after 200ms (equivalent of calling `stop(maybe_reason)`)
//!     actor.exit_after(Duration::from_millis(200));
//!
//!     // Kill the actor after 300ms (won't execute since we did stop before, but here
//!     // as an example)
//!     actor.kill_after(Duration::from_millis(300));
//!
//!     // wait for actor exit
//!     handle.await.unwrap();
//! }
//! ```

use crate::concurrency::{Duration, JoinHandle};

use crate::{ActorCell, Message, MessagingErr, ACTIVE_STATES};

#[cfg(test)]
mod tests;

/// Sends a message to a given actor repeatedly after a specified time
/// using the provided message generation function. The task will exit
/// once the channel is closed (meaning the underlying [crate::Actor]
/// has terminated)
///
/// * `period` - The [Duration] representing the period for the send interval
/// * `actor` - The [ActorCell] representing the [crate::Actor] to communicate with
/// * `msg` - The [Fn] message builder which is called to generate a message for each send
/// operation.
///
/// Returns: The [JoinHandle] which represents the backgrounded work (can be ignored to
/// "fire and forget")
pub fn send_interval<TMessage, F>(period: Duration, actor: ActorCell, msg: F) -> JoinHandle<()>
where
    TMessage: Message,
    F: Fn() -> TMessage + Send + 'static,
{
    // As per #57, the traditional sleep operation is subject to drift over long periods.
    // Tokio and our internal version for `async_std` provide an interval timer which
    // accounts for execution time to send a message and changes in polling to wake
    // the task to assure that the period doesn't drift over long runtimes.
    crate::concurrency::spawn(async move {
        let mut timer = crate::concurrency::interval(period);
        // timer tick's immediately the first time
        timer.tick().await;
        while ACTIVE_STATES.contains(&actor.get_status()) {
            timer.tick().await;
            // if we receive an error trying to send, the channel is closed and we should stop trying
            // actor died
            if actor.send_message::<TMessage>(msg()).is_err() {
                break;
            }
        }
    })
}

/// Sends a message after a given period to the specified actor. The task terminates
/// once the send has completed
///
/// * `period` - The [Duration] representing the time to delay before sending
/// * `actor` - The [ActorCell] representing the [crate::Actor] to communicate with
/// * `msg` - The [FnOnce] message builder which is called to generate a message for the send
/// operation
///
/// Returns: The [JoinHandle<Result<(), MessagingErr>>] which represents the backgrounded work.
/// Awaiting the handle will yield the result of the send operation. Can be safely ignored to
/// "fire and forget"
pub fn send_after<TMessage, F>(
    period: Duration,
    actor: ActorCell,
    msg: F,
) -> JoinHandle<Result<(), MessagingErr<TMessage>>>
where
    TMessage: Message,
    F: FnOnce() -> TMessage + Send + 'static,
{
    crate::concurrency::spawn(async move {
        crate::concurrency::sleep(period).await;
        actor.send_message::<TMessage>(msg())
    })
}

/// Sends the stop signal to the actor after a specified duration, attaching a reason
/// of "Exit after {}ms" by default
///
/// * `period` - The [Duration] representing the time to delay before sending
/// * `actor` - The [ActorCell] representing the [crate::Actor] to exit after the duration
///
/// Returns: The [JoinHandle] which denotes the backgrounded operation. To cancel the
/// exit operation, you can abort the handle
pub fn exit_after(period: Duration, actor: ActorCell) -> JoinHandle<()> {
    crate::concurrency::spawn(async move {
        crate::concurrency::sleep(period).await;
        actor.stop(Some(format!("Exit after {}ms", period.as_millis())))
    })
}

/// Sends the KILL signal to the actor after a specified duration
///
/// * `period` - The [Duration] representing the time to delay before sending
/// * `actor` - The [ActorCell] representing the [crate::Actor] to kill after the duration
///
/// Returns: The [JoinHandle] which denotes the backgrounded operation. To cancel the
/// kill operation, you can abort the handle
pub fn kill_after(period: Duration, actor: ActorCell) -> JoinHandle<()> {
    crate::concurrency::spawn(async move {
        crate::concurrency::sleep(period).await;
        actor.kill()
    })
}

/// Add the timing functionality on top of the [crate::ActorRef]
impl<TMessage> crate::ActorRef<TMessage>
where
    TMessage: crate::Message,
{
    /// Alias of [send_interval]
    pub fn send_interval<F>(&self, period: Duration, msg: F) -> JoinHandle<()>
    where
        F: Fn() -> TMessage + Send + 'static,
    {
        send_interval::<TMessage, F>(period, self.get_cell(), msg)
    }

    /// Alias of [send_after]
    pub fn send_after<F>(
        &self,
        period: Duration,
        msg: F,
    ) -> JoinHandle<Result<(), MessagingErr<TMessage>>>
    where
        F: FnOnce() -> TMessage + Send + 'static,
    {
        send_after::<TMessage, F>(period, self.get_cell(), msg)
    }

    /// Alias of [exit_after]
    pub fn exit_after(&self, period: Duration) -> JoinHandle<()> {
        exit_after(period, self.get_cell())
    }

    /// Alias of [kill_after]
    pub fn kill_after(&self, period: Duration) -> JoinHandle<()> {
        kill_after(period, self.get_cell())
    }
}