1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
//! This module contains `LiteTask` trait and the runtime to execute it.

use crate::{channel, Controller, Id, Operator, Signal};
use anyhow::Error;
use async_trait::async_trait;
use derive_more::{From, Into};
use futures::{select_biased, FutureExt, StreamExt};
use tokio::sync::watch;
use uuid::Uuid;

/// Contains a receiver with a status of a task.
#[derive(Debug, From, Into)]
pub struct ShutdownReceiver {
    /// Use `Into` to extract that `Receiver`.
    status: watch::Receiver<LiteStatus>,
}

impl ShutdownReceiver {
    /// Converts the `Receiver` into a `Future` that just returns `()` when task finished.
    pub async fn just_done(mut self) {
        // TODO: tokio 0.3
        // while self.status.changed().await.is_ok() {
        while self.status.recv().await.is_some() {
            if self.status.borrow().is_done() {
                break;
            }
        }
    }
}

/// Spawns `Lite` task and return `Controller` to manage that.
fn task<L: LiteTask>(lite_task: L, supervisor: Option<impl Into<Controller>>) -> Controller {
    let id = Id::of_task(&lite_task);
    let supervisor = supervisor.map(Into::into);
    let (controller, operator) = channel::pair(id, supervisor);
    let id = controller.id();
    let runtime = LiteRuntime {
        id,
        lite_task: Some(lite_task),
        operator,
    };
    tokio::spawn(runtime.entrypoint());
    controller
}

/// Status of the task.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum LiteStatus {
    /// Task is alive and working.
    Alive,
    /// Task had finished.
    Stop,
}

impl LiteStatus {
    /// Is task finished yet?
    pub fn is_done(&self) -> bool {
        *self == LiteStatus::Stop
    }
}

/// Minimalistic actor that hasn't `Address`.
#[async_trait]
pub trait LiteTask: Sized + Send + 'static {
    /// Returns unique name of the `LiteTask`.
    /// Uses `Uuid` by default.
    fn name(&self) -> String {
        let uuid = Uuid::new_v4();
        format!("Task:{}({})", std::any::type_name::<Self>(), uuid)
    }

    /// Starts a lite task.
    fn start<T>(self, supervisor: Option<T>) -> Controller
    where
        T: Into<Controller> + Send,
    {
        task(self, supervisor)
    }

    /// Routine of the task that can contain loops.
    /// It can taks into accout provided receiver to implement graceful interruption.
    async fn routine(self, signal: ShutdownReceiver) -> Result<(), Error>;
}

struct LiteRuntime<L: LiteTask> {
    id: Id,
    lite_task: Option<L>,
    operator: Operator, // aka Context here
}

impl<L: LiteTask> LiteRuntime<L> {
    async fn entrypoint(mut self) {
        self.operator.initialize();
        log::info!("Starting task: {:?}", self.id);
        self.routine().await;
        log::info!("Task finished: {:?}", self.id);
        self.operator.finalize();
    }

    async fn routine(&mut self) {
        let (shutdown_tx, shutdown_rx) = watch::channel(LiteStatus::Alive);
        let lite_task = self.lite_task.take().unwrap();
        let mut routine = lite_task.routine(shutdown_rx.into()).fuse();
        let mut shutdown = Some(shutdown_tx);
        loop {
            select_biased! {
                event = self.operator.next() => {
                    log::trace!("Stop signal received: {:?} for task {:?}", event, self.id);
                    let signal = event.expect("task controller couldn't be closed");
                    match signal {
                        Signal::Shutdown => {
                            // Ok
                        }
                        Signal::Finished { .. } => {
                            // Because tasks don't terminate them:
                            panic!("Tasks don't support supervised childs.");
                        }
                    }
                    // TODO: Check that origin is none!
                    if let Some(tx) = shutdown.take() {
                        // TODO: tokio 0.3
                        // if tx.send(LiteStatus::Stop).is_err() {
                        if tx.broadcast(LiteStatus::Stop).is_err() {
                            log::error!("Can't send shutdown signal to: {:?}", self.id);
                        }
                        // Wait for `done` instead of `stop_signal` used for actors
                    }
                }
                done = routine => {
                    if let Err(err) = done {
                        log::error!("LiteTask {:?} failed: {}", self.id, err);
                    }
                    break;
                }
            }
        }
    }
}