1use crate::agent::Agent;
2use crate::context::{AgentContext, Context};
3use crate::performers::{ConsumptionReason, StopReason, Transition, TransitionCommand};
4use anyhow::{Result, anyhow};
5use async_trait::async_trait;
6use crb_runtime::{
7 InteractiveRuntime, InteractiveTask, InterruptionLevel, Interruptor, ManagedContext,
8 ReachableContext, Runtime, Task,
9};
10use futures::{FutureExt, stream::Abortable};
11use std::future::{Future, IntoFuture};
12use std::pin::Pin;
13
14pub struct RunAgent<A: Agent> {
15 pub agent: Option<A>,
16 pub context: Context<A>,
17 pub level: InterruptionLevel,
18}
19
20impl<A: Agent> RunAgent<A> {
21 pub fn new(agent: A) -> Self
22 where
23 A::Context: Default,
24 {
25 Self {
26 agent: Some(agent),
27 context: Context::wrap(A::Context::default()),
28 level: InterruptionLevel::FLAG,
29 }
30 }
31
32 pub async fn operate(mut self) {
33 self.perform_and_report().await;
34 }
35
36 pub fn report(&mut self, interrupted: bool) {
37 self.context.session().joint.report(interrupted).ok();
38 }
39
40 pub async fn perform_and_report(&mut self) {
41 self.perform().await;
42 let interrupted = self.agent.is_none();
43 self.report(interrupted);
44 }
45
46 pub async fn perform(&mut self) {
47 let name = std::any::type_name::<A>();
48 log::info!("Agent {name} started.");
49 let result = self.perform_abortable_task().await;
50 if let Err(err) = result {
51 A::rollback(self.agent.as_mut(), err, &mut self.context).await;
52 }
53 log::info!("Agent {name} finished.");
54 }
55
56 pub async fn perform_abortable_task(&mut self) -> Result<()> {
57 let reg = self.context.session().controller.take_registration()?;
58 let fut = self.perform_task();
59 Abortable::new(fut, reg).await??;
60 Ok(())
61 }
62
63 async fn perform_task(&mut self) -> Result<()> {
64 if let Some(mut agent) = self.agent.take() {
65 let initial_state = agent.initialize(&mut self.context);
69 let mut pair = (agent, Some(initial_state));
70
71 while self.context.is_alive() {
73 let (mut agent, next_state) = pair;
74 if let Some(mut next_state) = next_state {
75 let res = next_state
76 .transition
77 .perform(agent, &mut self.context)
78 .await;
79 match res {
80 Transition::Continue { mut agent, command } => match command {
81 TransitionCommand::Next(next_state) => {
82 pair = (agent, Some(next_state));
83 }
84 TransitionCommand::ProcessEvents => {
85 pair = (agent, None);
86 }
87 TransitionCommand::Stop(reason) => {
88 match reason {
89 StopReason::Failed(err) => {
90 agent.failed(err, &mut self.context);
91 }
92 StopReason::Stopped => {}
93 }
94 pair = (agent, None);
95 break;
96 }
97 TransitionCommand::InContext(envelope) => {
98 envelope
99 .handle(&mut agent, &mut self.context)
100 .await
101 .expect("Agent's loopback should never fail");
102 let next_state = self.context.session().next_state.take();
103 pair = (agent, next_state);
104 }
105 },
106 Transition::Consume { reason } => match reason {
107 ConsumptionReason::Transformed => {
108 return Ok(());
109 }
110 ConsumptionReason::Crashed(err) => {
111 return Err(err);
112 }
113 },
114 }
115 } else {
116 let result = agent.event(&mut self.context).await;
117 if let Err(err) = result {
118 agent.failed(err, &mut self.context);
119 }
120 let next_state = self.context.session().next_state.take();
121 pair = (agent, next_state);
122 }
123 }
124
125 let mut agent = pair.0;
127 agent.finalize(&mut self.context);
128 self.agent = Some(agent);
129 Ok(())
130 } else {
131 Err(anyhow!("Agent's agent has consumed already."))
132 }
133 }
134}
135
136impl<A: Agent> Task<A> for RunAgent<A> {}
137impl<A: Agent> InteractiveTask<A> for RunAgent<A> {}
138
139#[async_trait]
140impl<A> Runtime for RunAgent<A>
141where
142 A: Agent,
143{
144 fn get_interruptor(&mut self) -> Box<dyn Interruptor> {
145 let session = self.context.session();
146 let address = session.address().clone();
147 Box::new(address)
148 }
149
150 fn interruption_level(&self) -> InterruptionLevel {
151 self.level
152 }
153
154 async fn routine(&mut self) {
155 self.perform_and_report().await;
156 }
157}
158
159#[async_trait]
160impl<A: Agent> InteractiveRuntime for RunAgent<A> {
161 type Context = A::Context;
162
163 fn address(&self) -> <Self::Context as ReachableContext>::Address {
164 self.context.address().clone()
165 }
166}
167
168impl<A: Agent> IntoFuture for RunAgent<A> {
169 type Output = ();
170 type IntoFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
171
172 fn into_future(self) -> Self::IntoFuture {
173 self.operate().boxed()
174 }
175}