meio/
lite_runtime.rs

1use crate::actor_runtime::{Actor, Status};
2use crate::compat::watch;
3use crate::handlers::{Operation, TaskEliminated};
4use crate::ids::{Id, IdOf};
5use crate::lifecycle::{LifecycleNotifier, TaskDone};
6use crate::linkage::Address;
7use anyhow::Error;
8use async_trait::async_trait;
9use futures::{
10    future::{select, Either, FusedFuture},
11    Future, FutureExt,
12};
13use std::hash::{Hash, Hasher};
14use std::pin::Pin;
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17use thiserror::Error;
18
19/// Custom tag for `LiteTask`.
20/// Attached to a runtime.
21pub trait Tag: Send + 'static {}
22
23impl Tag for () {}
24
25/// Minimalistic actor that hasn't `Address`.
26///
27/// **Recommended** to implement sequences or intensive loops (routines).
28#[async_trait]
29pub trait LiteTask: Sized + Send + 'static {
30    /// The result of a finished task.
31    type Output: Send;
32
33    /// The log target for the `LiteTask`.
34    fn log_target(&self) -> &str;
35
36    /// Routine of the task that can contain loops.
37    /// It can taks into accout provided receiver to implement graceful interruption.
38    ///
39    /// By default uses the following calling chain that you can override at any step:
40    /// `routine` -> `interruptable_routine` -> `repeatable_routine` -> `retry_at` -> `retry_delay`
41    async fn routine(self, mut stop: StopReceiver) -> Result<Self::Output, Error> {
42        stop.or(self.interruptable_routine())
43            .await
44            .map_err(Error::from)
45            // TODO: use `flatten` instead
46            .and_then(std::convert::identity)
47    }
48
49    /// Routine that can be unconditionally interrupted.
50    async fn interruptable_routine(mut self) -> Result<Self::Output, Error> {
51        self.pre_repeatable_routine().await?;
52        loop {
53            let last_attempt = Instant::now();
54            let routine_result = self.repeatable_routine().await;
55            match routine_result {
56                Ok(Some(output)) => {
57                    break Ok(output);
58                }
59                Ok(None) => {
60                    // Continue
61                    self.routine_wait(last_attempt, true).await;
62                }
63                Err(err) => {
64                    log::error!(target: self.log_target(), "Routine failed: {}", err);
65                    self.routine_wait(last_attempt, false).await;
66                }
67            }
68        }
69    }
70
71    /// Called before `repeatable_routine` for initialization.
72    /// The routine will be interrupted if this method failed.
73    async fn pre_repeatable_routine(&mut self) -> Result<(), Error> {
74        Ok(())
75    }
76
77    /// Routine that will be repeated till fail or success.
78    ///
79    /// To stop it you should return `Some(value)`.
80    async fn repeatable_routine(&mut self) -> Result<Option<Self::Output>, Error> {
81        Ok(None)
82    }
83
84    /// Check of the every intaration of a routine.
85    async fn routine_wait(&mut self, _last_attempt: Instant, _succeed: bool) {
86        let duration = Duration::from_secs(5);
87        crate::compat::delay(duration).await
88    }
89}
90
91pub(crate) fn spawn<T, S, M>(task: T, tag: M, supervisor: Option<Address<S>>) -> TaskAddress<T>
92where
93    T: LiteTask,
94    S: Actor + TaskEliminated<T, M>,
95    M: Tag,
96{
97    let id = Id::unique();
98    let (stop_sender, stop_receiver) = make_stop_channel(id.clone());
99    let id_of = IdOf::<T>::new(id.clone());
100    let done_notifier = {
101        match supervisor {
102            None => <dyn LifecycleNotifier<_>>::ignore(),
103            Some(super_addr) => {
104                //let event = TaskDone::new(id_of.clone());
105                let op = Operation::Done { id };
106                <dyn LifecycleNotifier<_>>::once(super_addr, op)
107            }
108        }
109    };
110    let runtime = LiteRuntime {
111        id: id_of,
112        task,
113        done_notifier,
114        stop_receiver,
115        tag,
116    };
117    crate::compat::spawn_async(runtime.entrypoint());
118    stop_sender
119}
120
121/// Just receives a stop signal.
122pub trait StopSignal: Future<Output = ()> + FusedFuture + Send {}
123
124impl<T> StopSignal for T where T: Future<Output = ()> + FusedFuture + Send {}
125
126#[derive(Debug, Error)]
127#[error("task interrupted by a signal")]
128pub struct TaskStopped;
129
130fn make_stop_channel<T>(id: Id) -> (TaskAddress<T>, StopReceiver) {
131    let (tx, rx) = watch::channel(Status::Alive);
132    let stop_sender = StopSender { tx: Arc::new(tx) };
133    let address = TaskAddress {
134        id: IdOf::new(id),
135        stop_sender,
136    };
137    let receiver = StopReceiver { status: rx };
138    (address, receiver)
139}
140
141/// Contains a sender to update a status of a task.
142#[derive(Debug, Clone)]
143pub struct StopSender {
144    tx: Arc<watch::Sender<Status>>,
145}
146
147impl StopSender {
148    /// Send a stop signal to the task.
149    pub fn stop(&self) -> Result<(), Error> {
150        self.tx.send(Status::Stop).map_err(Error::from)
151    }
152}
153
154impl<T> From<TaskAddress<T>> for StopSender {
155    fn from(addr: TaskAddress<T>) -> Self {
156        addr.stop_sender
157    }
158}
159
160/// Address of a spawned task.
161///
162/// It can be used to interrupt the task.
163#[derive(Debug)]
164pub struct TaskAddress<T> {
165    id: IdOf<T>,
166    stop_sender: StopSender,
167}
168
169impl<T> Clone for TaskAddress<T> {
170    fn clone(&self) -> Self {
171        Self {
172            id: self.id(),
173            stop_sender: self.stop_sender.clone(),
174        }
175    }
176}
177
178impl<T> TaskAddress<T> {
179    /// Id of the task.
180    pub fn id(&self) -> IdOf<T> {
181        self.id.clone()
182    }
183
184    /// Send a stop signal to the task.
185    pub fn stop(&self) -> Result<(), Error> {
186        self.stop_sender.stop()
187    }
188}
189
190impl<T: LiteTask> PartialEq for TaskAddress<T> {
191    fn eq(&self, other: &Self) -> bool {
192        self.id.eq(&other.id)
193    }
194}
195
196impl<T: LiteTask> Eq for TaskAddress<T> {}
197
198impl<T: LiteTask> Hash for TaskAddress<T> {
199    fn hash<H: Hasher>(&self, state: &mut H) {
200        self.id.hash(state);
201    }
202}
203
204/// Contains a receiver with a status of a task.
205#[derive(Debug, Clone)]
206pub struct StopReceiver {
207    status: watch::Receiver<Status>,
208}
209
210impl StopReceiver {
211    /// Returns `true` is the task can be alive.
212    pub fn is_alive(&self) -> bool {
213        *self.status.borrow() == Status::Alive
214    }
215
216    /// Returns a `Future` that completed when `Done` signal received.
217    pub fn into_future(self) -> Pin<Box<dyn StopSignal>> {
218        Box::pin(just_done(self.status).fuse())
219    }
220
221    /// Tries to execute provided `Future` to completion if the `ShutdownReceived`
222    /// won't interrupted during that time.
223    pub async fn or<Fut>(&mut self, fut: Fut) -> Result<Fut::Output, TaskStopped>
224    where
225        Fut: Future,
226    {
227        let fut = Box::pin(fut);
228        let either = select(self.clone().into_future(), fut).await;
229        match either {
230            Either::Left((_done, _rem_fut)) => Err(TaskStopped),
231            Either::Right((output, _rem_fut)) => Ok(output),
232        }
233    }
234}
235
236async fn just_done(mut status: watch::Receiver<Status>) {
237    while status.changed().await.is_ok() {
238        if status.borrow().is_done() {
239            break;
240        }
241    }
242}
243
244/// An error that can happen in a task.
245#[derive(Debug, Error)]
246pub enum TaskError {
247    /// The task was interrupted.
248    #[error("task was interrupted")]
249    Interrupted,
250    /// Task had any other error.
251    #[error("task failed: {0}")]
252    Other(Error),
253}
254
255impl TaskError {
256    /// If the task was interrupted it returs `true`.
257    pub fn is_interrupted(&self) -> bool {
258        matches!(self, Self::Interrupted)
259    }
260
261    /// Converts the error into other error value if
262    /// the task wasn't interrupted.
263    pub fn into_other(self) -> Option<Error> {
264        match self {
265            Self::Interrupted => None,
266            Self::Other(err) => Some(err),
267        }
268    }
269
270    #[deprecated(since = "0.86.2", note = "Use ordinary `match` for checking.")]
271    /// Moves interrupted flag from `err` to the optional result.
272    pub fn swap<T>(result: Result<T, Self>) -> Result<Option<T>, Error> {
273        match result {
274            Ok(value) => Ok(Some(value)),
275            Err(Self::Interrupted) => Ok(None),
276            Err(Self::Other(err)) => Err(err),
277        }
278    }
279}
280
281impl From<Error> for TaskError {
282    fn from(error: Error) -> Self {
283        match error.downcast::<TaskStopped>() {
284            Ok(_stopped) => Self::Interrupted,
285            Err(other) => Self::Other(other),
286        }
287    }
288}
289
290struct LiteRuntime<T: LiteTask, M: Tag> {
291    id: IdOf<T>,
292    task: T,
293    // TODO: Add T::Output to TaskDone
294    done_notifier: Box<dyn LifecycleNotifier<TaskDone<T, M>>>,
295    stop_receiver: StopReceiver,
296    tag: M,
297}
298
299impl<T: LiteTask, M: Tag> LiteRuntime<T, M> {
300    async fn entrypoint(mut self) {
301        let log_target = self.task.log_target().to_owned();
302        log::info!(target: &log_target, "Task started: {}", self.id);
303        let res = self
304            .task
305            .routine(self.stop_receiver)
306            .await
307            .map_err(TaskError::from);
308        if let Err(err) = res.as_ref() {
309            if !err.is_interrupted() {
310                // Can't downcast. It was a real error.
311                log::error!(target: &log_target, "Task failed: {}: {}", self.id, err);
312            }
313        }
314        log::info!(target: &log_target, "Task finished: {}", self.id);
315        // TODO: Add result to it
316        let task_done = TaskDone::new(self.id.clone(), self.tag, res);
317        if let Err(err) = self.done_notifier.notify(task_done) {
318            log::error!(
319                target: &log_target,
320                "Can't send done notification from the task {}: {}",
321                self.id,
322                err
323            );
324        }
325    }
326}