agent_tk/execution/
default.rs1use async_broadcast::Sender;
4use futures::join;
5
6use crate::definitions::task as definitions_task;
7use crate::delegation::Task;
8use crate::knowledge_base::KnowledgeBaseInterface;
9use crate::simulation::tst::SimulationResult;
10use crate::{agent, consts, execution, module, uuid, Error, Result};
11
12mod task_executor;
13
14pub trait TaskExecutor: Sync + Send + 'static
22{
23 fn execute_task<'a>(
26 &'a self,
27 task: definitions_task::Task,
28 ) -> futures::future::BoxFuture<'a, Result<()>>;
29
30 fn can_execute(&self, task: &definitions_task::Task) -> bool;
33}
34
35pub struct Options
46{
47 task_executor: Box<dyn TaskExecutor>,
48}
49
50impl Options
51{
52 pub fn new<TTaskExecutor>(task_executor: TTaskExecutor) -> Self
54 where
55 TTaskExecutor: TaskExecutor,
56 {
57 Self {
58 task_executor: Box::new(task_executor),
59 }
60 }
61}
62
63module::create_module_private_interface!(
70 ModulePrivateInterface,
71 execution::InputMessage,
72 execution::OutputMessage
73);
74
75struct EndOfQueue
76{
77 total_cost: f32,
78 final_states: crate::states::States,
79 queue_length: usize,
80}
81
82struct ModuleData
83{
84 cancelled_tasks: ccutils::sync::ArcMutex<Vec<uuid::Uuid>>,
85 end_of_queue: ccutils::sync::ArcMutex<EndOfQueue>,
86 task_sender: Sender<definitions_task::Task>,
87}
88
89pub struct Module {}
91
92impl Module
93{
94 async fn handle_input_message(
95 msg: execution::InputMessage,
96 agent_data: &agent::AgentData,
97 module_data: &ModuleData,
98 output_sender: &Sender<execution::OutputMessage>,
99 ) -> Result<()>
100 {
101 match msg
102 {
103 execution::InputMessage::QueueExecution { uuid } =>
104 {
105 let sr: SimulationResult = agent_data
106 .knowledge_base
107 .retrieve("task_simulation_result", uuid.to_hex())?;
108 let task = agent_data
109 .knowledge_base
110 .retrieve::<definitions_task::Task>("tasks", uuid.to_hex())?;
111 {
112 let (current_cost, final_states) = {
113 let mut end_of_queue = module_data.end_of_queue.lock()?;
114 end_of_queue.total_cost += sr.get_estimated_cost();
115 end_of_queue.final_states = sr.get_final_states().to_owned();
116 (
117 end_of_queue.total_cost,
118 end_of_queue.final_states.to_owned(),
119 )
120 };
121 ccutils::log::log_error!(
122 output_sender
123 .broadcast(execution::OutputMessage::CurrentEstimatedIdlingState {
124 current_cost,
125 final_states,
126 queue_length: module_data.task_sender.len(),
127 })
128 .await,
129 "sending current cost"
130 );
131 }
132 ccutils::log::log_error!(
133 module_data.task_sender.broadcast(task).await,
134 "sending task"
135 );
136 }
137 execution::InputMessage::CancelExecution { uuid } =>
138 {
139 module_data.cancelled_tasks.lock()?.push(uuid);
140 }
141 }
142 Ok(())
143 }
144 fn handle_task_execution<'a>(
145 task: definitions_task::Task,
146 cancelled_tasks: &ccutils::sync::ArcMutex<Vec<uuid::Uuid>>,
147 task_executor: &'a dyn TaskExecutor,
148 ) -> Result<Option<futures::future::BoxFuture<'a, Result<()>>>>
149 {
150 if cancelled_tasks.lock()?.contains(&task.task_id())
151 {
152 Ok(None)
153 }
154 else if task_executor.can_execute(&task)
155 {
156 Ok(Some(task_executor.execute_task(task)))
157 }
158 else
159 {
160 Err(Error::NoExecutor())
161 }
162 }
163}
164
165impl execution::Module for Module
166{
167 type Options = Options;
168 async fn start(
169 agent_data: agent::AgentData,
170 module_interfaces: (
171 crate::module::ModuleInterface<execution::InputMessage, execution::OutputMessage>,
172 ModulePrivateInterface,
173 ),
174 options: Options,
175 )
176 {
177 let (_, module_private_interface) = module_interfaces;
178 let mut input_receiver = module_private_interface.input_receiver.activate();
179 let output_sender = module_private_interface.output_sender.clone();
180 let (task_sender, mut task_receiver) =
181 async_broadcast::broadcast::<definitions_task::Task>(consts::TASK_QUEUE_LENGTH);
182 let cancelled_tasks: ccutils::sync::ArcMutex<Vec<uuid::Uuid>> = Default::default();
183 let end_of_queue: ccutils::sync::ArcMutex<_> = EndOfQueue {
184 total_cost: 0.0,
185 final_states: agent_data.states.to_owned_states().unwrap(),
186 queue_length: 0,
187 }
188 .into();
189 let msg_fut = async {
190 let module_data = ModuleData {
191 cancelled_tasks: cancelled_tasks.to_owned(),
192 end_of_queue: end_of_queue.to_owned(),
193 task_sender,
194 };
195 loop
196 {
197 let msg = input_receiver.recv().await;
198
199 if let Ok(msg) = msg
200 {
201 if let Err(e) =
202 Self::handle_input_message(msg, &agent_data, &module_data, &output_sender).await
203 {
204 log::error!(
205 "An error occured when handling input execution message: {} for agent {}",
206 e,
207 agent_data.agent_uri
208 );
209 }
210 }
211 else
212 {
213 return;
214 }
215 }
216 };
217 let output_sender = module_private_interface.output_sender.clone();
218 let agent_data = agent_data.clone();
219 let cancelled_tasks = cancelled_tasks.clone();
220 let end_of_queue = end_of_queue.clone();
221 let exec_fut = async move {
222 loop
223 {
224 let msg = task_receiver.recv().await;
225 if let Ok(msg) = msg
226 {
227 let sr = agent_data
228 .knowledge_base
229 .retrieve::<SimulationResult>("task_simulation_result", msg.task_id().to_hex());
230 let cost = match sr
231 {
232 Ok(sr) => sr.get_estimated_cost(),
233 Err(_) => 0.0,
234 };
235 match Self::handle_task_execution(msg, &cancelled_tasks, &*options.task_executor)
236 {
237 Ok(Some(fut)) =>
238 {
239 if let Err(e) = fut.await
240 {
241 log::error!(
242 "An error occured when executing a task: {} for agent {}",
243 e,
244 agent_data.agent_uri
245 );
246 }
247 }
248 Ok(None) =>
249 {}
250 Err(e) =>
251 {
252 log::error!(
253 "An error occured when handling task execution message: {} for agent {}",
254 e,
255 agent_data.agent_uri
256 );
257 }
258 }
259 if task_receiver.is_empty()
260 {
261 {
262 let mut end_of_queue = end_of_queue.lock().unwrap();
263 end_of_queue.total_cost = 0.0;
264 end_of_queue.final_states = agent_data.states.to_owned_states().unwrap();
265 end_of_queue.queue_length = 0;
266 }
267 ccutils::log::log_error!(
268 output_sender
269 .broadcast(execution::OutputMessage::CurrentEstimatedIdlingState {
270 current_cost: 0.0,
271 final_states: agent_data.states.to_owned_states().unwrap(),
272 queue_length: 0,
273 })
274 .await,
275 "sending output message"
276 );
277 }
278 else
279 {
280 let (current_cost, final_states, queue_length) = {
281 let mut end_of_queue = end_of_queue.lock().unwrap();
282 end_of_queue.total_cost -= cost;
283 end_of_queue.queue_length = task_receiver.len();
284 (
285 end_of_queue.total_cost,
286 end_of_queue.final_states.to_owned(),
287 end_of_queue.queue_length,
288 )
289 };
290 ccutils::log::log_error!(
291 output_sender
292 .broadcast(execution::OutputMessage::CurrentEstimatedIdlingState {
293 current_cost,
294 final_states,
295 queue_length,
296 })
297 .await,
298 "sending output message"
299 );
300 }
301 }
302 else
303 {
304 return;
305 }
306 }
307 };
308
309 join!(msg_fut, exec_fut);
310 }
311}
312
313impl module::Module for Module
314{
315 type InputMessage = execution::InputMessage;
316 type OutputMessage = execution::OutputMessage;
317 type ModulePrivateInterface = ModulePrivateInterface;
318}