crb_routine/
lib.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
mod runtime;

use anyhow::Error;
use async_trait::async_trait;
use crb_core::time::{sleep, timeout, Duration, Elapsed};
use crb_runtime::context::Context;
use crb_runtime::interruptor::{Controller, RegistrationTaken};
use futures::stream::{Abortable, Aborted};
use std::ops::DerefMut;
use thiserror::Error;

#[derive(Debug, Error)]
pub enum TaskError {
    #[error("task was aborted")]
    Aborted(#[from] Aborted),
    #[error("task was interrupted")]
    Interrupted,
    #[error("time for task execution elapsed")]
    Timeout(#[from] Elapsed),
    #[error("can't register a task: {0}")]
    Registration(#[from] RegistrationTaken),
    #[error("task failed: {0}")]
    Failed(#[from] Error),
}

#[async_trait]
pub trait Routine: Sized + Send + 'static {
    // TODO: Use TaskSession (as a trait) and add TaskSession
    type Context: Context + DerefMut<Target = TaskSession>;
    type Output: Send;

    async fn routine(&mut self, ctx: &mut Self::Context) -> Result<Self::Output, TaskError> {
        let reg = ctx.controller.take_registration()?;
        // TODO: Get time limit from the context (and make it ajustable in real-time)
        let time_limit = self.time_limit().await;
        let fut = timeout(time_limit, self.interruptable_routine(ctx));
        let output = Abortable::new(fut, reg).await???;
        Ok(output)
    }

    async fn interruptable_routine(
        &mut self,
        ctx: &mut Self::Context,
    ) -> Result<Self::Output, TaskError> {
        while ctx.controller.is_active() {
            let routine_result = self.repeatable_routine().await;
            match routine_result {
                Ok(Some(output)) => {
                    return Ok(output);
                }
                Ok(None) => {
                    self.routine_wait(true, ctx).await;
                }
                Err(err) => {
                    // TODO: Report about the error
                    self.routine_wait(false, ctx).await;
                }
            }
        }
        Err(TaskError::Interrupted)
    }

    async fn repeatable_routine(&mut self) -> Result<Option<Self::Output>, Error> {
        Ok(None)
    }

    // TODO: Use context instead
    async fn time_limit(&mut self) -> Option<Duration> {
        None
    }

    async fn routine_wait(&mut self, _succeed: bool, ctx: &mut Self::Context) {
        let duration = ctx.interval;
        sleep(duration).await
    }

    async fn finalize(&mut self, result: Result<Self::Output, TaskError>) -> Result<(), Error> {
        result?;
        Ok(())
    }
}

pub struct TaskSession {
    controller: Controller,
    /// Interval between repeatable routine calls
    interval: Duration,
}

impl TaskSession {
    /// Set repeat interval.
    pub fn set_interval(&mut self, interval: Duration) {
        self.interval = interval;
    }
}

impl Context for TaskSession {
    // TODO: TaskAddress that uses a controller internally
    type Address = ();

    fn address(&self) -> &Self::Address {
        &()
    }
}

pub trait TaskContext: Context {
    fn session(&mut self) -> &mut TaskSession;
}

impl TaskContext for TaskSession {
    fn session(&mut self) -> &mut TaskSession {
        self
    }
}