1use anyhow::{Error, Result};
2use async_trait::async_trait;
3use crb_runtime::kit::{Controller, Failures, Interruptor, Runtime, Task};
4use futures::stream::Abortable;
5
6pub trait HybrydState: Send + 'static {}
7
8impl<T> HybrydState for T where T: Send + 'static {}
9
10pub struct NextState<T: ?Sized> {
11 transition: Box<dyn StatePerformer<T>>,
12}
13
14impl<T> NextState<T>
15where
16 T: HybrydTask,
17{
18 pub(crate) fn new(performer: impl StatePerformer<T>) -> Self {
19 Self {
20 transition: Box::new(performer),
21 }
22 }
23}
24
25pub enum Transition<T> {
26 Next(T, Result<NextState<T>>),
27 Crashed(Error),
28 Interrupted,
29}
30
31#[async_trait]
32pub trait StatePerformer<T>: Send + 'static {
33 async fn perform(&mut self, task: T, session: &mut HybrydSession) -> Transition<T>;
34 async fn fallback(&mut self, task: T, err: Error) -> (T, NextState<T>);
35}
36
37pub trait HybrydTask: Sized + Send + 'static {
38 fn initial_state(&mut self) -> NextState<Self>;
39}
40
41pub struct HybrydSession {
42 pub controller: Controller,
43}
44
45pub struct DoHybrid<T> {
46 pub task: Option<T>,
47 pub session: HybrydSession,
48 pub failures: Failures,
49}
50
51impl<T: HybrydTask> DoHybrid<T> {
52 pub fn new(task: T) -> Self {
53 let session = HybrydSession {
54 controller: Controller::default(),
55 };
56 Self {
57 task: Some(task),
58 session,
59 failures: Failures::default(),
60 }
61 }
62}
63
64impl<T: HybrydTask> Task<T> for DoHybrid<T> {}
65
66impl<T: HybrydTask> DoHybrid<T> {
67 async fn perform_routine(&mut self) -> Result<(), Error> {
68 let reg = self.session.controller.take_registration()?;
69 let fut = self.perform_task();
70 Abortable::new(fut, reg).await??;
71 Ok(())
72 }
73
74 async fn perform_task(&mut self) -> Result<(), Error> {
75 if let Some(mut task) = self.task.take() {
76 let session = &mut self.session;
77 let initial_state = task.initial_state();
78 let mut pair = (task, initial_state);
79 loop {
80 let (task, mut next_state) = pair;
81 let res = next_state.transition.perform(task, session).await;
82 match res {
83 Transition::Next(task, Ok(next_state)) => {
84 pair = (task, next_state);
85 }
86 Transition::Next(task, Err(err)) => {
87 let (task, next_state) = next_state.transition.fallback(task, err).await;
88 pair = (task, next_state);
89 }
90 Transition::Crashed(err) => {
91 return Err(err);
92 }
93 Transition::Interrupted => {
94 break;
95 }
96 }
97 }
98 }
99 Ok(())
100 }
101}
102
103#[async_trait]
104impl<T> Runtime for DoHybrid<T>
105where
106 T: HybrydTask,
107{
108 fn get_interruptor(&mut self) -> Interruptor {
109 self.session.controller.interruptor.clone()
110 }
111
112 async fn routine(&mut self) {
113 let result = self.perform_routine().await;
114 self.failures.put(result);
115 }
116}