crb_agent/
runtime.rs

1use crate::agent::Agent;
2use crate::context::{AgentContext, Context};
3use crate::performers::{ConsumptionReason, StopReason, Transition, TransitionCommand};
4use anyhow::{Result, anyhow};
5use async_trait::async_trait;
6use crb_runtime::{
7    InteractiveRuntime, InteractiveTask, InterruptionLevel, Interruptor, ManagedContext,
8    ReachableContext, Runtime, Task,
9};
10use futures::{FutureExt, stream::Abortable};
11use std::future::{Future, IntoFuture};
12use std::pin::Pin;
13
14pub struct RunAgent<A: Agent> {
15    pub agent: Option<A>,
16    pub context: Context<A>,
17    pub level: InterruptionLevel,
18}
19
20impl<A: Agent> RunAgent<A> {
21    pub fn new(agent: A) -> Self
22    where
23        A::Context: Default,
24    {
25        Self {
26            agent: Some(agent),
27            context: Context::wrap(A::Context::default()),
28            level: InterruptionLevel::FLAG,
29        }
30    }
31
32    pub async fn operate(mut self) {
33        self.perform_and_report().await;
34    }
35
36    pub fn report(&mut self, interrupted: bool) {
37        self.context.session().joint.report(interrupted).ok();
38    }
39
40    pub async fn perform_and_report(&mut self) {
41        self.perform().await;
42        let interrupted = self.agent.is_none();
43        self.report(interrupted);
44    }
45
46    pub async fn perform(&mut self) {
47        let name = std::any::type_name::<A>();
48        log::info!("Agent {name} started.");
49        let result = self.perform_abortable_task().await;
50        if let Err(err) = result {
51            A::rollback(self.agent.as_mut(), err, &mut self.context).await;
52        }
53        log::info!("Agent {name} finished.");
54    }
55
56    pub async fn perform_abortable_task(&mut self) -> Result<()> {
57        let reg = self.context.session().controller.take_registration()?;
58        let fut = self.perform_task();
59        Abortable::new(fut, reg).await??;
60        Ok(())
61    }
62
63    async fn perform_task(&mut self) -> Result<()> {
64        if let Some(mut agent) = self.agent.take() {
65            // let session = self.context.session();
66
67            // Initialize
68            let initial_state = agent.initialize(&mut self.context);
69            let mut pair = (agent, Some(initial_state));
70
71            // Events or States
72            while self.context.is_alive() {
73                let (mut agent, next_state) = pair;
74                if let Some(mut next_state) = next_state {
75                    let res = next_state
76                        .transition
77                        .perform(agent, &mut self.context)
78                        .await;
79                    match res {
80                        Transition::Continue { mut agent, command } => match command {
81                            TransitionCommand::Next(next_state) => {
82                                pair = (agent, Some(next_state));
83                            }
84                            TransitionCommand::ProcessEvents => {
85                                pair = (agent, None);
86                            }
87                            TransitionCommand::Stop(reason) => {
88                                match reason {
89                                    StopReason::Failed(err) => {
90                                        agent.failed(err, &mut self.context);
91                                    }
92                                    StopReason::Stopped => {}
93                                }
94                                pair = (agent, None);
95                                break;
96                            }
97                            TransitionCommand::InContext(envelope) => {
98                                envelope
99                                    .handle(&mut agent, &mut self.context)
100                                    .await
101                                    .expect("Agent's loopback should never fail");
102                                let next_state = self.context.session().next_state.take();
103                                pair = (agent, next_state);
104                            }
105                        },
106                        Transition::Consume { reason } => match reason {
107                            ConsumptionReason::Transformed => {
108                                return Ok(());
109                            }
110                            ConsumptionReason::Crashed(err) => {
111                                return Err(err);
112                            }
113                        },
114                    }
115                } else {
116                    let result = agent.event(&mut self.context).await;
117                    if let Err(err) = result {
118                        agent.failed(err, &mut self.context);
119                    }
120                    let next_state = self.context.session().next_state.take();
121                    pair = (agent, next_state);
122                }
123            }
124
125            // Finalize
126            let mut agent = pair.0;
127            agent.finalize(&mut self.context);
128            self.agent = Some(agent);
129            Ok(())
130        } else {
131            Err(anyhow!("Agent's agent has consumed already."))
132        }
133    }
134}
135
136impl<A: Agent> Task<A> for RunAgent<A> {}
137impl<A: Agent> InteractiveTask<A> for RunAgent<A> {}
138
139#[async_trait]
140impl<A> Runtime for RunAgent<A>
141where
142    A: Agent,
143{
144    fn get_interruptor(&mut self) -> Box<dyn Interruptor> {
145        let session = self.context.session();
146        let address = session.address().clone();
147        Box::new(address)
148    }
149
150    fn interruption_level(&self) -> InterruptionLevel {
151        self.level
152    }
153
154    async fn routine(&mut self) {
155        self.perform_and_report().await;
156    }
157}
158
159#[async_trait]
160impl<A: Agent> InteractiveRuntime for RunAgent<A> {
161    type Context = A::Context;
162
163    fn address(&self) -> <Self::Context as ReachableContext>::Address {
164        self.context.address().clone()
165    }
166}
167
168impl<A: Agent> IntoFuture for RunAgent<A> {
169    type Output = ();
170    type IntoFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
171
172    fn into_future(self) -> Self::IntoFuture {
173        self.operate().boxed()
174    }
175}