crb_routine/
routine.rs

1use crate::runtime::RoutineContext;
2use anyhow::Error;
3use async_trait::async_trait;
4use crb_core::time::{sleep, timeout, Duration, Elapsed};
5use crb_runtime::kit::{ManagedContext, RegistrationTaken};
6use futures::stream::Aborted;
7use thiserror::Error;
8
9#[derive(Debug, Error)]
10pub enum TaskError {
11    #[error("task was aborted")]
12    Aborted(#[from] Aborted),
13    #[error("task was interrupted")]
14    Interrupted,
15    #[error("time for task execution elapsed")]
16    Timeout(#[from] Elapsed),
17    #[error("can't register a task: {0}")]
18    Registration(#[from] RegistrationTaken),
19    #[error("task failed: {0}")]
20    Failed(#[from] Error),
21}
22
23#[async_trait]
24pub trait Routine: Sized + Send + 'static {
25    type Context: RoutineContext<Self>;
26    type Output: Send;
27
28    async fn routine(&mut self, ctx: &mut Self::Context) -> Result<(), Error> {
29        // let reg = ctx.session().controller().take_registration()?;
30        // TODO: Get time limit from the context (and make it ajustable in real-time)
31        let time_limit = self.time_limit().await;
32        let fut = timeout(time_limit, self.basic_routine(ctx));
33        fut.await??;
34        Ok(())
35    }
36
37    async fn basic_routine(&mut self, ctx: &mut Self::Context) -> Result<(), Error> {
38        let output = self.interruptable_routine(ctx).await;
39        self.finalize(output, ctx).await?;
40        Ok(())
41    }
42
43    async fn interruptable_routine(
44        &mut self,
45        ctx: &mut Self::Context,
46    ) -> Result<Self::Output, TaskError> {
47        while ctx.session().controller().is_active() {
48            let routine_result = self.repeatable_routine().await;
49            match routine_result {
50                Ok(Some(output)) => {
51                    return Ok(output);
52                }
53                Ok(None) => {
54                    self.routine_wait(true, ctx).await;
55                }
56                Err(err) => {
57                    log::error!("Routine's iteration is failed: {err}");
58                    self.routine_wait(false, ctx).await;
59                }
60            }
61        }
62        Err(TaskError::Interrupted)
63    }
64
65    async fn repeatable_routine(&mut self) -> Result<Option<Self::Output>, Error> {
66        Ok(None)
67    }
68
69    async fn finalize(
70        &mut self,
71        output: Result<Self::Output, TaskError>,
72        ctx: &mut Self::Context,
73    ) -> Result<(), Error> {
74        if let Some(mut finalizer) = ctx.session().take_finalizer() {
75            finalizer.finalize(output).await?;
76        };
77        Ok(())
78    }
79
80    // TODO: Use context instead
81    async fn time_limit(&mut self) -> Option<Duration> {
82        None
83    }
84
85    async fn routine_wait(&mut self, _succeed: bool, ctx: &mut Self::Context) {
86        let duration = ctx.session().interval();
87        sleep(duration).await
88    }
89}