crb_task/
hybryd_task.rs

1use anyhow::{Error, Result};
2use async_trait::async_trait;
3use crb_runtime::kit::{Controller, Failures, Interruptor, Runtime, Task};
4use futures::stream::Abortable;
5
6pub trait HybrydState: Send + 'static {}
7
8impl<T> HybrydState for T where T: Send + 'static {}
9
10pub struct NextState<T: ?Sized> {
11    transition: Box<dyn StatePerformer<T>>,
12}
13
14impl<T> NextState<T>
15where
16    T: HybrydTask,
17{
18    pub(crate) fn new(performer: impl StatePerformer<T>) -> Self {
19        Self {
20            transition: Box::new(performer),
21        }
22    }
23}
24
25pub enum Transition<T> {
26    Next(T, Result<NextState<T>>),
27    Crashed(Error),
28    Interrupted,
29}
30
31#[async_trait]
32pub trait StatePerformer<T>: Send + 'static {
33    async fn perform(&mut self, task: T, session: &mut HybrydSession) -> Transition<T>;
34    async fn fallback(&mut self, task: T, err: Error) -> (T, NextState<T>);
35}
36
37pub trait HybrydTask: Sized + Send + 'static {
38    fn initial_state(&mut self) -> NextState<Self>;
39}
40
41pub struct HybrydSession {
42    pub controller: Controller,
43}
44
45pub struct DoHybrid<T> {
46    pub task: Option<T>,
47    pub session: HybrydSession,
48    pub failures: Failures,
49}
50
51impl<T: HybrydTask> DoHybrid<T> {
52    pub fn new(task: T) -> Self {
53        let session = HybrydSession {
54            controller: Controller::default(),
55        };
56        Self {
57            task: Some(task),
58            session,
59            failures: Failures::default(),
60        }
61    }
62}
63
64impl<T: HybrydTask> Task<T> for DoHybrid<T> {}
65
66impl<T: HybrydTask> DoHybrid<T> {
67    async fn perform_routine(&mut self) -> Result<(), Error> {
68        let reg = self.session.controller.take_registration()?;
69        let fut = self.perform_task();
70        Abortable::new(fut, reg).await??;
71        Ok(())
72    }
73
74    async fn perform_task(&mut self) -> Result<(), Error> {
75        if let Some(mut task) = self.task.take() {
76            let session = &mut self.session;
77            let initial_state = task.initial_state();
78            let mut pair = (task, initial_state);
79            loop {
80                let (task, mut next_state) = pair;
81                let res = next_state.transition.perform(task, session).await;
82                match res {
83                    Transition::Next(task, Ok(next_state)) => {
84                        pair = (task, next_state);
85                    }
86                    Transition::Next(task, Err(err)) => {
87                        let (task, next_state) = next_state.transition.fallback(task, err).await;
88                        pair = (task, next_state);
89                    }
90                    Transition::Crashed(err) => {
91                        return Err(err);
92                    }
93                    Transition::Interrupted => {
94                        break;
95                    }
96                }
97            }
98        }
99        Ok(())
100    }
101}
102
103#[async_trait]
104impl<T> Runtime for DoHybrid<T>
105where
106    T: HybrydTask,
107{
108    fn get_interruptor(&mut self) -> Interruptor {
109        self.session.controller.interruptor.clone()
110    }
111
112    async fn routine(&mut self) {
113        let result = self.perform_routine().await;
114        self.failures.put(result);
115    }
116}