agent_tk/execution/
default.rs1use async_broadcast::Sender;
4use ccutils::sync::ArcMutex;
5use futures::join;
6
7use crate::definitions::task as definitions_task;
8use crate::delegation::Task;
9use crate::knowledge_base::KnowledgeBaseInterface;
10use crate::simulation::tst::SimulationResult;
11use crate::{agent, consts, decision, delegation, execution, module, utils, uuid, Error, Result};
12
13mod task_executor;
14
15pub trait TaskExecutor: Sync + Send + 'static
23{
24 fn execute_task(
27 &self,
28 task: definitions_task::Task,
29 ) -> futures::future::BoxFuture<'static, Result<()>>;
30
31 fn can_execute(&self, task: &definitions_task::Task) -> bool;
34}
35
36pub struct Options
47{
48 task_executor: Box<dyn TaskExecutor>,
49}
50
51impl Options
52{
53 pub fn new<TTaskExecutor>(task_executor: TTaskExecutor) -> Self
55 where
56 TTaskExecutor: TaskExecutor,
57 {
58 Self {
59 task_executor: Box::new(task_executor),
60 }
61 }
62}
63
64module::create_module_private_interface!(
71 ModulePrivateInterface,
72 execution::InputMessage,
73 execution::OutputMessage
74);
75
76struct EndOfQueue
77{
78 total_cost: f32,
79 final_states: crate::states::States,
80 queue_length: usize,
81}
82
83struct ModuleData
84{
85 cancelled_tasks: ccutils::sync::ArcMutex<Vec<uuid::Uuid>>,
86 end_of_queue: ccutils::sync::ArcMutex<EndOfQueue>,
87 task_sender: Sender<definitions_task::Task>,
88}
89
90pub struct Module {}
92
93impl Module
94{
95 async fn handle_input_message(
96 msg: execution::InputMessage,
97 agent_data: &agent::AgentData,
98 module_data: &ModuleData,
99 output_sender: &Sender<execution::OutputMessage>,
100 ) -> Result<()>
101 {
102 match msg
103 {
104 execution::InputMessage::QueueExecution { uuid } =>
105 {
106 let sr: SimulationResult = agent_data
107 .knowledge_base
108 .retrieve("task_simulation_result", uuid.to_hex())?;
109 let task = agent_data
110 .knowledge_base
111 .retrieve::<definitions_task::Task>("tasks", uuid.to_hex())?;
112 {
113 let (current_cost, final_states) = {
114 let mut end_of_queue = module_data.end_of_queue.lock()?;
115 end_of_queue.total_cost += sr.get_estimated_cost();
116 end_of_queue.final_states = sr.get_final_states().to_owned();
117 (
118 end_of_queue.total_cost,
119 end_of_queue.final_states.to_owned(),
120 )
121 };
122 output_sender
123 .broadcast(execution::OutputMessage::CurrentEstimatedIdlingState {
124 current_cost,
125 final_states,
126 queue_length: module_data.task_sender.len(),
127 })
128 .await;
129 }
130 module_data.task_sender.broadcast(task).await;
131 }
132 execution::InputMessage::CancelExecution { uuid } =>
133 {
134 module_data.cancelled_tasks.lock()?.push(uuid);
135 }
136 }
137 Ok(())
138 }
139 fn handle_task_execution(
140 task: definitions_task::Task,
141 cancelled_tasks: &ccutils::sync::ArcMutex<Vec<uuid::Uuid>>,
142 task_executor: &dyn TaskExecutor,
143 ) -> Result<Option<futures::future::BoxFuture<'static, Result<()>>>>
144 {
145 if cancelled_tasks.lock()?.contains(&task.task_id())
146 {
147 Ok(None)
148 }
149 else if (task_executor.can_execute(&task))
150 {
151 Ok(Some(task_executor.execute_task(task)))
152 }
153 else
154 {
155 Err(Error::NoExecutor())
156 }
157 }
158}
159
160impl execution::Module for Module
161{
162 type Options = Options;
163 async fn start(
164 agent_data: agent::AgentData,
165 module_interfaces: (
166 crate::module::ModuleInterface<execution::InputMessage, execution::OutputMessage>,
167 ModulePrivateInterface,
168 ),
169 options: Options,
170 )
171 {
172 let (module_interface, module_private_interface) = module_interfaces;
173 let mut input_receiver = module_private_interface.input_receiver.activate();
174 let output_sender = module_private_interface.output_sender.clone();
175 let (task_sender, mut task_receiver) =
176 async_broadcast::broadcast::<definitions_task::Task>(consts::TASK_QUEUE_LENGTH);
177 let cancelled_tasks: ccutils::sync::ArcMutex<Vec<uuid::Uuid>> = Default::default();
178 let end_of_queue: ccutils::sync::ArcMutex<_> = EndOfQueue {
179 total_cost: 0.0,
180 final_states: agent_data.states.to_owned_states().unwrap(),
181 queue_length: 0,
182 }
183 .into();
184 let msg_fut = async {
185 let module_data = ModuleData {
186 cancelled_tasks: cancelled_tasks.to_owned(),
187 end_of_queue: end_of_queue.to_owned(),
188 task_sender,
189 };
190 loop
191 {
192 let msg = input_receiver.recv().await;
193
194 if let Ok(msg) = msg
195 {
196 if let Err(e) =
197 Self::handle_input_message(msg, &agent_data, &module_data, &output_sender).await
198 {
199 log::error!(
200 "An error occured when handling input execution message: {} for agent {}",
201 e,
202 agent_data.agent_uri
203 );
204 }
205 }
206 else
207 {
208 return;
209 }
210 }
211 };
212 let output_sender = module_private_interface.output_sender.clone();
213 let agent_data = agent_data.clone();
214 let cancelled_tasks = cancelled_tasks.clone();
215 let end_of_queue = end_of_queue.clone();
216 let exec_fut = async move {
217 loop
218 {
219 let msg = task_receiver.recv().await;
220 if let Ok(msg) = msg
221 {
222 let sr = agent_data
223 .knowledge_base
224 .retrieve::<SimulationResult>("task_simulation_result", msg.task_id().to_hex());
225 let cost = match sr
226 {
227 Ok(sr) => sr.get_estimated_cost(),
228 Err(_) => 0.0,
229 };
230 match Self::handle_task_execution(msg, &cancelled_tasks, &*options.task_executor)
231 {
232 Ok(Some(fut)) =>
233 {
234 if let Err(e) = fut.await
235 {
236 log::error!(
237 "An error occured when executing a task: {} for agent {}",
238 e,
239 agent_data.agent_uri
240 );
241 }
242 }
243 Ok(None) =>
244 {}
245 Err(e) =>
246 {
247 log::error!(
248 "An error occured when handling task execution message: {} for agent {}",
249 e,
250 agent_data.agent_uri
251 );
252 }
253 }
254 if task_receiver.is_empty()
255 {
256 {
257 let mut end_of_queue = end_of_queue.lock().unwrap();
258 end_of_queue.total_cost = 0.0;
259 end_of_queue.final_states = agent_data.states.to_owned_states().unwrap();
260 end_of_queue.queue_length = 0;
261 }
262 output_sender
263 .broadcast(execution::OutputMessage::CurrentEstimatedIdlingState {
264 current_cost: 0.0,
265 final_states: agent_data.states.to_owned_states().unwrap(),
266 queue_length: 0,
267 })
268 .await;
269 }
270 else
271 {
272 let (current_cost, final_states, queue_length) = {
273 let mut end_of_queue = end_of_queue.lock().unwrap();
274 end_of_queue.total_cost -= cost;
275 end_of_queue.queue_length = task_receiver.len();
276 (
277 end_of_queue.total_cost,
278 end_of_queue.final_states.to_owned(),
279 end_of_queue.queue_length,
280 )
281 };
282 output_sender
283 .broadcast(execution::OutputMessage::CurrentEstimatedIdlingState {
284 current_cost,
285 final_states,
286 queue_length,
287 })
288 .await;
289 }
290 }
291 else
292 {
293 return;
294 }
295 }
296 };
297
298 join!(msg_fut, exec_fut);
299 }
300}
301
302impl module::Module for Module
303{
304 type InputMessage = execution::InputMessage;
305 type OutputMessage = execution::OutputMessage;
306 type ModulePrivateInterface = ModulePrivateInterface;
307}