crb_task/performers/
sync_performer.rs1use crate::hybryd_task::{
2 DoHybrid, HybrydSession, HybrydState, HybrydTask, NextState, StatePerformer, Transition,
3};
4use anyhow::{Error, Result};
5use async_trait::async_trait;
6use crb_runtime::kit::Interruptor;
7use std::marker::PhantomData;
8use tokio::task::spawn_blocking;
9
10impl<T> NextState<T>
11where
12 T: HybrydTask,
13{
14 pub fn do_sync<S>(state: S) -> Self
15 where
16 T: SyncActivity<S>,
17 S: HybrydState,
18 {
19 let performer = SyncPerformer {
20 _task: PhantomData,
21 state: Some(state),
22 };
23 Self::new(performer)
24 }
25}
26
27pub trait SyncActivity<S>: HybrydTask {
28 fn perform(&mut self, mut state: S, interruptor: Interruptor) -> Result<NextState<Self>> {
29 while interruptor.is_active() {
30 let result = self.many(&mut state);
31 match result {
32 Ok(Some(state)) => {
33 return Ok(state);
34 }
35 Ok(None) => {}
36 Err(err) => {
37 self.repair(err)?;
38 }
39 }
40 }
41 Ok(NextState::interrupt(None))
42 }
43
44 fn many(&mut self, state: &mut S) -> Result<Option<NextState<Self>>> {
45 self.once(state).map(Some)
46 }
47
48 fn once(&mut self, _state: &mut S) -> Result<NextState<Self>> {
49 Ok(NextState::done())
50 }
51
52 fn repair(&mut self, err: Error) -> Result<(), Error> {
53 Err(err)
54 }
55
56 fn fallback(&mut self, err: Error) -> NextState<Self> {
57 NextState::fail(err)
58 }
59}
60
61struct SyncPerformer<T, S> {
62 _task: PhantomData<T>,
63 state: Option<S>,
64}
65
66#[async_trait]
67impl<T, S> StatePerformer<T> for SyncPerformer<T, S>
68where
69 T: SyncActivity<S>,
70 S: HybrydState,
71{
72 async fn perform(&mut self, mut task: T, session: &mut HybrydSession) -> Transition<T> {
73 let interruptor = session.controller.interruptor.clone();
74 let state = self.state.take().unwrap();
75 let handle = spawn_blocking(move || {
76 let next_state = task.perform(state, interruptor);
77 Transition::Next(task, next_state)
78 });
79 match handle.await {
80 Ok(transition) => transition,
81 Err(err) => Transition::Crashed(err.into()),
82 }
83 }
84
85 async fn fallback(&mut self, mut task: T, err: Error) -> (T, NextState<T>) {
86 let next_state = task.fallback(err);
87 (task, next_state)
88 }
89}
90
91impl DoHybrid<SyncFn> {
92 pub fn new_sync<F: AnySyncFn>(func: F) -> Self {
93 let task = SyncFn {
94 func: Some(Box::new(func)),
95 };
96 Self::new(task)
97 }
98}
99
100pub trait AnySyncFn: FnOnce() -> Result<()> + Send + 'static {}
101
102impl<F> AnySyncFn for F where F: FnOnce() -> Result<()> + Send + 'static {}
103
104struct SyncFn {
105 func: Option<Box<dyn AnySyncFn>>,
106}
107
108impl HybrydTask for SyncFn {
109 fn initial_state(&mut self) -> NextState<Self> {
110 NextState::do_sync(CallFn)
111 }
112}
113
114struct CallFn;
115
116impl SyncActivity<CallFn> for SyncFn {
117 fn once(&mut self, _state: &mut CallFn) -> Result<NextState<Self>> {
118 let func = self.func.take().unwrap();
119 func()?;
120 Ok(NextState::done())
121 }
122}