crb_agent/performers/
async_performer.rs1use crate::agent::Agent;
2use crate::context::{AgentContext, Context};
3use crate::global::CRB;
4use crate::performers::{AgentState, Next, StatePerformer, Transition, TransitionCommand};
5use anyhow::{Error, Result};
6use async_trait::async_trait;
7use crb_core::time::Instant;
8use std::marker::PhantomData;
9
10impl<T> Next<T>
11where
12 T: Agent,
13{
14 pub fn do_async<S>(state: S) -> Self
15 where
16 T: DoAsync<S>,
17 S: AgentState,
18 {
19 let performer = AsyncPerformer {
20 _task: PhantomData,
21 state: Some(state),
22 };
23 Self::new(performer)
24 }
25}
26
27#[async_trait]
28pub trait DoAsync<S: Send + 'static = ()>: Agent {
29 async fn handle(&mut self, mut state: S, ctx: &mut Context<Self>) -> Result<Next<Self>> {
30 let stopper = ctx.session().controller.stopper.clone();
31 while stopper.is_active() {
32 let iteration = Instant::now();
33
34 let result = self.repeat(&mut state).await;
35 match result {
36 Ok(Some(state)) => {
37 return Ok(state);
38 }
39 Ok(None) => {}
40 Err(err) => {
41 self.repair(err).await?;
42 }
43 }
44
45 if iteration.elapsed().as_millis() as usize >= CRB.get_long_threshold() {
46 use std::any::type_name;
47 log::warn!(
48 "DoAsync<{}> for {} is too long!",
49 type_name::<S>(),
50 type_name::<Self>()
51 );
52 }
53 }
54 Ok(Next::interrupt())
55 }
56
57 async fn repeat(&mut self, state: &mut S) -> Result<Option<Next<Self>>> {
58 self.once(state).await.map(Some)
59 }
60
61 async fn once(&mut self, _state: &mut S) -> Result<Next<Self>> {
62 Ok(Next::done())
63 }
64
65 async fn repair(&mut self, err: Error) -> Result<(), Error> {
66 Err(err)
67 }
68
69 async fn fallback_with_context(&mut self, err: Error, _ctx: &mut Context<Self>) -> Next<Self> {
70 self.fallback(err).await
71 }
72
73 async fn fallback(&mut self, err: Error) -> Next<Self> {
74 Next::fail(err)
75 }
76}
77
78struct AsyncPerformer<T, S> {
79 _task: PhantomData<T>,
80 state: Option<S>,
81}
82
83#[async_trait]
84impl<T, S> StatePerformer<T> for AsyncPerformer<T, S>
85where
86 T: DoAsync<S>,
87 S: AgentState,
88{
89 async fn perform(&mut self, mut agent: T, ctx: &mut Context<T>) -> Transition<T> {
90 let state = self.state.take().unwrap();
91 let next_state = match agent.handle(state, ctx).await {
92 Ok(next) => next,
93 Err(err) => agent.fallback_with_context(err, ctx).await,
94 };
95 let command = TransitionCommand::Next(next_state);
96 Transition::Continue { agent, command }
97 }
98}