crb_agent/performers/
async_performer.rs

1use crate::agent::Agent;
2use crate::context::{AgentContext, Context};
3use crate::global::CRB;
4use crate::performers::{AgentState, Next, StatePerformer, Transition, TransitionCommand};
5use anyhow::{Error, Result};
6use async_trait::async_trait;
7use crb_core::time::Instant;
8use std::marker::PhantomData;
9
10impl<T> Next<T>
11where
12    T: Agent,
13{
14    pub fn do_async<S>(state: S) -> Self
15    where
16        T: DoAsync<S>,
17        S: AgentState,
18    {
19        let performer = AsyncPerformer {
20            _task: PhantomData,
21            state: Some(state),
22        };
23        Self::new(performer)
24    }
25}
26
27#[async_trait]
28pub trait DoAsync<S: Send + 'static = ()>: Agent {
29    async fn handle(&mut self, mut state: S, ctx: &mut Context<Self>) -> Result<Next<Self>> {
30        let stopper = ctx.session().controller.stopper.clone();
31        while stopper.is_active() {
32            let iteration = Instant::now();
33
34            let result = self.repeat(&mut state).await;
35            match result {
36                Ok(Some(state)) => {
37                    return Ok(state);
38                }
39                Ok(None) => {}
40                Err(err) => {
41                    self.repair(err).await?;
42                }
43            }
44
45            if iteration.elapsed().as_millis() as usize >= CRB.get_long_threshold() {
46                use std::any::type_name;
47                log::warn!(
48                    "DoAsync<{}> for {} is too long!",
49                    type_name::<S>(),
50                    type_name::<Self>()
51                );
52            }
53        }
54        Ok(Next::interrupt())
55    }
56
57    async fn repeat(&mut self, state: &mut S) -> Result<Option<Next<Self>>> {
58        self.once(state).await.map(Some)
59    }
60
61    async fn once(&mut self, _state: &mut S) -> Result<Next<Self>> {
62        Ok(Next::done())
63    }
64
65    async fn repair(&mut self, err: Error) -> Result<(), Error> {
66        Err(err)
67    }
68
69    async fn fallback_with_context(&mut self, err: Error, _ctx: &mut Context<Self>) -> Next<Self> {
70        self.fallback(err).await
71    }
72
73    async fn fallback(&mut self, err: Error) -> Next<Self> {
74        Next::fail(err)
75    }
76}
77
78struct AsyncPerformer<T, S> {
79    _task: PhantomData<T>,
80    state: Option<S>,
81}
82
83#[async_trait]
84impl<T, S> StatePerformer<T> for AsyncPerformer<T, S>
85where
86    T: DoAsync<S>,
87    S: AgentState,
88{
89    async fn perform(&mut self, mut agent: T, ctx: &mut Context<T>) -> Transition<T> {
90        let state = self.state.take().unwrap();
91        let next_state = match agent.handle(state, ctx).await {
92            Ok(next) => next,
93            Err(err) => agent.fallback_with_context(err, ctx).await,
94        };
95        let command = TransitionCommand::Next(next_state);
96        Transition::Continue { agent, command }
97    }
98}