crb_agent/performers/
sync_performer.rs

1use crate::agent::Agent;
2use crate::context::{AgentContext, Context};
3use crate::global::CRB;
4use crate::performers::{
5    AgentState, ConsumptionReason, Next, StatePerformer, Transition, TransitionCommand,
6};
7use anyhow::{Error, Result};
8use async_trait::async_trait;
9use crb_core::spawn_blocking;
10use crb_core::time::Instant;
11use crb_runtime::Stopper;
12use std::marker::PhantomData;
13
14impl<T> Next<T>
15where
16    T: Agent,
17{
18    pub fn do_sync<S>(state: S) -> Self
19    where
20        T: DoSync<S>,
21        S: AgentState,
22    {
23        let performer = SyncPerformer {
24            _task: PhantomData,
25            state: Some(state),
26        };
27        Self::new(performer)
28    }
29}
30
31pub trait DoSync<S = ()>: Agent {
32    fn perform(&mut self, mut state: S, stopper: Stopper) -> Result<Next<Self>> {
33        while stopper.is_active() {
34            let iteration = Instant::now();
35
36            let result = self.repeat(&mut state);
37            match result {
38                Ok(Some(state)) => {
39                    return Ok(state);
40                }
41                Ok(None) => {}
42                Err(err) => {
43                    self.repair(err)?;
44                }
45            }
46
47            if iteration.elapsed().as_millis() as usize >= CRB.get_long_threshold() {
48                use std::any::type_name;
49                log::warn!(
50                    "DoAsync<{}> for {} is too long!",
51                    type_name::<S>(),
52                    type_name::<Self>()
53                );
54            }
55        }
56        Ok(Next::interrupt())
57    }
58
59    fn repeat(&mut self, state: &mut S) -> Result<Option<Next<Self>>> {
60        self.once(state).map(Some)
61    }
62
63    fn once(&mut self, _state: &mut S) -> Result<Next<Self>> {
64        Ok(Next::done())
65    }
66
67    fn repair(&mut self, err: Error) -> Result<(), Error> {
68        Err(err)
69    }
70
71    fn fallback(&mut self, err: Error) -> Next<Self> {
72        Next::fail(err)
73    }
74}
75
76struct SyncPerformer<T, S> {
77    _task: PhantomData<T>,
78    state: Option<S>,
79}
80
81#[async_trait]
82impl<T, S> StatePerformer<T> for SyncPerformer<T, S>
83where
84    T: DoSync<S>,
85    S: AgentState,
86{
87    async fn perform(&mut self, mut agent: T, ctx: &mut Context<T>) -> Transition<T> {
88        let stopper = ctx.session().controller.stopper.clone();
89        let state = self.state.take().unwrap();
90        let handle = spawn_blocking(move || {
91            let next_state = match agent.perform(state, stopper) {
92                Ok(next) => next,
93                Err(err) => agent.fallback(err),
94            };
95            let command = TransitionCommand::Next(next_state);
96            Transition::Continue { agent, command }
97        });
98        match handle.await {
99            Ok(transition) => transition,
100            Err(err) => {
101                let err = err.into();
102                let reason = ConsumptionReason::Crashed(err);
103                Transition::Consume { reason }
104            }
105        }
106    }
107}