crb_agent/performers/
sync_performer.rs1use crate::agent::Agent;
2use crate::context::{AgentContext, Context};
3use crate::global::CRB;
4use crate::performers::{
5 AgentState, ConsumptionReason, Next, StatePerformer, Transition, TransitionCommand,
6};
7use anyhow::{Error, Result};
8use async_trait::async_trait;
9use crb_core::spawn_blocking;
10use crb_core::time::Instant;
11use crb_runtime::Stopper;
12use std::marker::PhantomData;
13
14impl<T> Next<T>
15where
16 T: Agent,
17{
18 pub fn do_sync<S>(state: S) -> Self
19 where
20 T: DoSync<S>,
21 S: AgentState,
22 {
23 let performer = SyncPerformer {
24 _task: PhantomData,
25 state: Some(state),
26 };
27 Self::new(performer)
28 }
29}
30
31pub trait DoSync<S = ()>: Agent {
32 fn perform(&mut self, mut state: S, stopper: Stopper) -> Result<Next<Self>> {
33 while stopper.is_active() {
34 let iteration = Instant::now();
35
36 let result = self.repeat(&mut state);
37 match result {
38 Ok(Some(state)) => {
39 return Ok(state);
40 }
41 Ok(None) => {}
42 Err(err) => {
43 self.repair(err)?;
44 }
45 }
46
47 if iteration.elapsed().as_millis() as usize >= CRB.get_long_threshold() {
48 use std::any::type_name;
49 log::warn!(
50 "DoAsync<{}> for {} is too long!",
51 type_name::<S>(),
52 type_name::<Self>()
53 );
54 }
55 }
56 Ok(Next::interrupt())
57 }
58
59 fn repeat(&mut self, state: &mut S) -> Result<Option<Next<Self>>> {
60 self.once(state).map(Some)
61 }
62
63 fn once(&mut self, _state: &mut S) -> Result<Next<Self>> {
64 Ok(Next::done())
65 }
66
67 fn repair(&mut self, err: Error) -> Result<(), Error> {
68 Err(err)
69 }
70
71 fn fallback(&mut self, err: Error) -> Next<Self> {
72 Next::fail(err)
73 }
74}
75
76struct SyncPerformer<T, S> {
77 _task: PhantomData<T>,
78 state: Option<S>,
79}
80
81#[async_trait]
82impl<T, S> StatePerformer<T> for SyncPerformer<T, S>
83where
84 T: DoSync<S>,
85 S: AgentState,
86{
87 async fn perform(&mut self, mut agent: T, ctx: &mut Context<T>) -> Transition<T> {
88 let stopper = ctx.session().controller.stopper.clone();
89 let state = self.state.take().unwrap();
90 let handle = spawn_blocking(move || {
91 let next_state = match agent.perform(state, stopper) {
92 Ok(next) => next,
93 Err(err) => agent.fallback(err),
94 };
95 let command = TransitionCommand::Next(next_state);
96 Transition::Continue { agent, command }
97 });
98 match handle.await {
99 Ok(transition) => transition,
100 Err(err) => {
101 let err = err.into();
102 let reason = ConsumptionReason::Crashed(err);
103 Transition::Consume { reason }
104 }
105 }
106 }
107}