crb_task/performers/
sync_performer.rs

1use crate::hybryd_task::{
2    DoHybrid, HybrydSession, HybrydState, HybrydTask, NextState, StatePerformer, Transition,
3};
4use anyhow::{Error, Result};
5use async_trait::async_trait;
6use crb_runtime::kit::Interruptor;
7use std::marker::PhantomData;
8use tokio::task::spawn_blocking;
9
10impl<T> NextState<T>
11where
12    T: HybrydTask,
13{
14    pub fn do_sync<S>(state: S) -> Self
15    where
16        T: SyncActivity<S>,
17        S: HybrydState,
18    {
19        let performer = SyncPerformer {
20            _task: PhantomData,
21            state: Some(state),
22        };
23        Self::new(performer)
24    }
25}
26
27pub trait SyncActivity<S>: HybrydTask {
28    fn perform(&mut self, mut state: S, interruptor: Interruptor) -> Result<NextState<Self>> {
29        while interruptor.is_active() {
30            let result = self.many(&mut state);
31            match result {
32                Ok(Some(state)) => {
33                    return Ok(state);
34                }
35                Ok(None) => {}
36                Err(err) => {
37                    self.repair(err)?;
38                }
39            }
40        }
41        Ok(NextState::interrupt(None))
42    }
43
44    fn many(&mut self, state: &mut S) -> Result<Option<NextState<Self>>> {
45        self.once(state).map(Some)
46    }
47
48    fn once(&mut self, _state: &mut S) -> Result<NextState<Self>> {
49        Ok(NextState::done())
50    }
51
52    fn repair(&mut self, err: Error) -> Result<(), Error> {
53        Err(err)
54    }
55
56    fn fallback(&mut self, err: Error) -> NextState<Self> {
57        NextState::fail(err)
58    }
59}
60
61struct SyncPerformer<T, S> {
62    _task: PhantomData<T>,
63    state: Option<S>,
64}
65
66#[async_trait]
67impl<T, S> StatePerformer<T> for SyncPerformer<T, S>
68where
69    T: SyncActivity<S>,
70    S: HybrydState,
71{
72    async fn perform(&mut self, mut task: T, session: &mut HybrydSession) -> Transition<T> {
73        let interruptor = session.controller.interruptor.clone();
74        let state = self.state.take().unwrap();
75        let handle = spawn_blocking(move || {
76            let next_state = task.perform(state, interruptor);
77            Transition::Next(task, next_state)
78        });
79        match handle.await {
80            Ok(transition) => transition,
81            Err(err) => Transition::Crashed(err.into()),
82        }
83    }
84
85    async fn fallback(&mut self, mut task: T, err: Error) -> (T, NextState<T>) {
86        let next_state = task.fallback(err);
87        (task, next_state)
88    }
89}
90
91impl DoHybrid<SyncFn> {
92    pub fn new_sync<F: AnySyncFn>(func: F) -> Self {
93        let task = SyncFn {
94            func: Some(Box::new(func)),
95        };
96        Self::new(task)
97    }
98}
99
100pub trait AnySyncFn: FnOnce() -> Result<()> + Send + 'static {}
101
102impl<F> AnySyncFn for F where F: FnOnce() -> Result<()> + Send + 'static {}
103
104struct SyncFn {
105    func: Option<Box<dyn AnySyncFn>>,
106}
107
108impl HybrydTask for SyncFn {
109    fn initial_state(&mut self) -> NextState<Self> {
110        NextState::do_sync(CallFn)
111    }
112}
113
114struct CallFn;
115
116impl SyncActivity<CallFn> for SyncFn {
117    fn once(&mut self, _state: &mut CallFn) -> Result<NextState<Self>> {
118        let func = self.func.take().unwrap();
119        func()?;
120        Ok(NextState::done())
121    }
122}