1use crate::runtime::RoutineContext;
2use anyhow::Error;
3use async_trait::async_trait;
4use crb_core::time::{sleep, timeout, Duration, Elapsed};
5use crb_runtime::kit::{ManagedContext, RegistrationTaken};
6use futures::stream::Aborted;
7use thiserror::Error;
8
9#[derive(Debug, Error)]
10pub enum TaskError {
11 #[error("task was aborted")]
12 Aborted(#[from] Aborted),
13 #[error("task was interrupted")]
14 Interrupted,
15 #[error("time for task execution elapsed")]
16 Timeout(#[from] Elapsed),
17 #[error("can't register a task: {0}")]
18 Registration(#[from] RegistrationTaken),
19 #[error("task failed: {0}")]
20 Failed(#[from] Error),
21}
22
23#[async_trait]
24pub trait Routine: Sized + Send + 'static {
25 type Context: RoutineContext<Self>;
26 type Output: Send;
27
28 async fn routine(&mut self, ctx: &mut Self::Context) -> Result<(), Error> {
29 let time_limit = self.time_limit().await;
32 let fut = timeout(time_limit, self.basic_routine(ctx));
33 fut.await??;
34 Ok(())
35 }
36
37 async fn basic_routine(&mut self, ctx: &mut Self::Context) -> Result<(), Error> {
38 let output = self.interruptable_routine(ctx).await;
39 self.finalize(output, ctx).await?;
40 Ok(())
41 }
42
43 async fn interruptable_routine(
44 &mut self,
45 ctx: &mut Self::Context,
46 ) -> Result<Self::Output, TaskError> {
47 while ctx.session().controller().is_active() {
48 let routine_result = self.repeatable_routine().await;
49 match routine_result {
50 Ok(Some(output)) => {
51 return Ok(output);
52 }
53 Ok(None) => {
54 self.routine_wait(true, ctx).await;
55 }
56 Err(err) => {
57 log::error!("Routine's iteration is failed: {err}");
58 self.routine_wait(false, ctx).await;
59 }
60 }
61 }
62 Err(TaskError::Interrupted)
63 }
64
65 async fn repeatable_routine(&mut self) -> Result<Option<Self::Output>, Error> {
66 Ok(None)
67 }
68
69 async fn finalize(
70 &mut self,
71 output: Result<Self::Output, TaskError>,
72 ctx: &mut Self::Context,
73 ) -> Result<(), Error> {
74 if let Some(mut finalizer) = ctx.session().take_finalizer() {
75 finalizer.finalize(output).await?;
76 };
77 Ok(())
78 }
79
80 async fn time_limit(&mut self) -> Option<Duration> {
82 None
83 }
84
85 async fn routine_wait(&mut self, _succeed: bool, ctx: &mut Self::Context) {
86 let duration = ctx.session().interval();
87 sleep(duration).await
88 }
89}