crb_task/performers/
async_performer.rs1use crate::hybryd_task::{
2 DoHybrid, HybrydSession, HybrydState, HybrydTask, NextState, StatePerformer, Transition,
3};
4use anyhow::{Error, Result};
5use async_trait::async_trait;
6use crb_runtime::kit::Interruptor;
7use futures::Future;
8use std::marker::PhantomData;
9
10impl<T> NextState<T>
11where
12 T: HybrydTask,
13{
14 pub fn do_async<S>(state: S) -> Self
15 where
16 T: AsyncActivity<S>,
17 S: HybrydState,
18 {
19 let performer = AsyncPerformer {
20 _task: PhantomData,
21 state: Some(state),
22 };
23 Self::new(performer)
24 }
25}
26
27#[async_trait]
28pub trait AsyncActivity<S: Send + 'static>: HybrydTask {
29 async fn perform(&mut self, mut state: S, interruptor: Interruptor) -> Result<NextState<Self>> {
30 while interruptor.is_active() {
31 let result = self.many(&mut state).await;
32 match result {
33 Ok(Some(state)) => {
34 return Ok(state);
35 }
36 Ok(None) => {}
37 Err(_) => {}
38 }
39 }
40 Ok(NextState::interrupt(None))
41 }
42
43 async fn many(&mut self, state: &mut S) -> Result<Option<NextState<Self>>> {
44 self.once(state).await.map(Some)
45 }
46
47 async fn once(&mut self, _state: &mut S) -> Result<NextState<Self>> {
48 Ok(NextState::done())
49 }
50
51 async fn repair(&mut self, err: Error) -> Result<(), Error> {
52 Err(err)
53 }
54
55 async fn fallback(&mut self, err: Error) -> NextState<Self> {
56 NextState::fail(err)
57 }
58}
59
60struct AsyncPerformer<T, S> {
61 _task: PhantomData<T>,
62 state: Option<S>,
63}
64
65#[async_trait]
66impl<T, S> StatePerformer<T> for AsyncPerformer<T, S>
67where
68 T: AsyncActivity<S>,
69 S: HybrydState,
70{
71 async fn perform(&mut self, mut task: T, session: &mut HybrydSession) -> Transition<T> {
72 let interruptor = session.controller.interruptor.clone();
73 let state = self.state.take().unwrap();
74 let next_state = task.perform(state, interruptor).await;
75 Transition::Next(task, next_state)
76 }
77
78 async fn fallback(&mut self, mut task: T, err: Error) -> (T, NextState<T>) {
79 let next_state = task.fallback(err).await;
80 (task, next_state)
81 }
82}
83
84impl DoHybrid<AsyncFn> {
85 pub fn new_async<F: AnyAsyncFn>(fut: F) -> Self {
86 let task = AsyncFn {
87 fut: Some(Box::new(fut)),
88 };
89 Self::new(task)
90 }
91}
92
93pub trait AnyAsyncFn: Future<Output = Result<()>> + Send + 'static {}
94
95impl<F> AnyAsyncFn for F where F: Future<Output = Result<()>> + Send + 'static {}
96
97struct AsyncFn {
98 fut: Option<Box<dyn AnyAsyncFn>>,
99}
100
101impl HybrydTask for AsyncFn {
102 fn initial_state(&mut self) -> NextState<Self> {
103 NextState::do_async(CallFn)
104 }
105}
106
107struct CallFn;
108
109#[async_trait]
110impl AsyncActivity<CallFn> for AsyncFn {
111 async fn once(&mut self, _state: &mut CallFn) -> Result<NextState<Self>> {
112 let fut = self.fut.take().unwrap();
113 let pinned_fut = Box::into_pin(fut);
114 pinned_fut.await?;
115 Ok(NextState::done())
116 }
117}