crb_task/performers/
async_performer.rs

1use crate::hybryd_task::{
2    DoHybrid, HybrydSession, HybrydState, HybrydTask, NextState, StatePerformer, Transition,
3};
4use anyhow::{Error, Result};
5use async_trait::async_trait;
6use crb_runtime::kit::Interruptor;
7use futures::Future;
8use std::marker::PhantomData;
9
10impl<T> NextState<T>
11where
12    T: HybrydTask,
13{
14    pub fn do_async<S>(state: S) -> Self
15    where
16        T: AsyncActivity<S>,
17        S: HybrydState,
18    {
19        let performer = AsyncPerformer {
20            _task: PhantomData,
21            state: Some(state),
22        };
23        Self::new(performer)
24    }
25}
26
27#[async_trait]
28pub trait AsyncActivity<S: Send + 'static>: HybrydTask {
29    async fn perform(&mut self, mut state: S, interruptor: Interruptor) -> Result<NextState<Self>> {
30        while interruptor.is_active() {
31            let result = self.many(&mut state).await;
32            match result {
33                Ok(Some(state)) => {
34                    return Ok(state);
35                }
36                Ok(None) => {}
37                Err(_) => {}
38            }
39        }
40        Ok(NextState::interrupt(None))
41    }
42
43    async fn many(&mut self, state: &mut S) -> Result<Option<NextState<Self>>> {
44        self.once(state).await.map(Some)
45    }
46
47    async fn once(&mut self, _state: &mut S) -> Result<NextState<Self>> {
48        Ok(NextState::done())
49    }
50
51    async fn repair(&mut self, err: Error) -> Result<(), Error> {
52        Err(err)
53    }
54
55    async fn fallback(&mut self, err: Error) -> NextState<Self> {
56        NextState::fail(err)
57    }
58}
59
60struct AsyncPerformer<T, S> {
61    _task: PhantomData<T>,
62    state: Option<S>,
63}
64
65#[async_trait]
66impl<T, S> StatePerformer<T> for AsyncPerformer<T, S>
67where
68    T: AsyncActivity<S>,
69    S: HybrydState,
70{
71    async fn perform(&mut self, mut task: T, session: &mut HybrydSession) -> Transition<T> {
72        let interruptor = session.controller.interruptor.clone();
73        let state = self.state.take().unwrap();
74        let next_state = task.perform(state, interruptor).await;
75        Transition::Next(task, next_state)
76    }
77
78    async fn fallback(&mut self, mut task: T, err: Error) -> (T, NextState<T>) {
79        let next_state = task.fallback(err).await;
80        (task, next_state)
81    }
82}
83
84impl DoHybrid<AsyncFn> {
85    pub fn new_async<F: AnyAsyncFn>(fut: F) -> Self {
86        let task = AsyncFn {
87            fut: Some(Box::new(fut)),
88        };
89        Self::new(task)
90    }
91}
92
93pub trait AnyAsyncFn: Future<Output = Result<()>> + Send + 'static {}
94
95impl<F> AnyAsyncFn for F where F: Future<Output = Result<()>> + Send + 'static {}
96
97struct AsyncFn {
98    fut: Option<Box<dyn AnyAsyncFn>>,
99}
100
101impl HybrydTask for AsyncFn {
102    fn initial_state(&mut self) -> NextState<Self> {
103        NextState::do_async(CallFn)
104    }
105}
106
107struct CallFn;
108
109#[async_trait]
110impl AsyncActivity<CallFn> for AsyncFn {
111    async fn once(&mut self, _state: &mut CallFn) -> Result<NextState<Self>> {
112        let fut = self.fut.take().unwrap();
113        let pinned_fut = Box::into_pin(fut);
114        pinned_fut.await?;
115        Ok(NextState::done())
116    }
117}