agent_tk/execution/
default.rs

1//! This module defines a default execution module.
2
3use async_broadcast::Sender;
4use futures::join;
5
6use crate::definitions::task as definitions_task;
7use crate::delegation::Task;
8use crate::knowledge_base::KnowledgeBaseInterface;
9use crate::simulation::tst::SimulationResult;
10use crate::{agent, consts, execution, module, uuid, Error, Result};
11
12mod task_executor;
13
14//  _____         _    _____                     _
15// |_   _|_ _ ___| | _| ____|_  _____  ___ _   _| |_ ___  _ __
16//   | |/ _` / __| |/ /  _| \ \/ / _ \/ __| | | | __/ _ \| '__|
17//   | | (_| \__ \   <| |___ >  |  __/ (__| |_| | || (_) | |
18//   |_|\__,_|___/_|\_\_____/_/\_\___|\___|\__,_|\__\___/|_|
19
20/// Interface to a task executor
21pub trait TaskExecutor: Sync + Send + 'static
22{
23  /// Execute the given task. The function is expected to block until the execution is completed.
24  /// It will be called from an async context by the default executor.
25  fn execute_task<'a>(
26    &'a self,
27    task: definitions_task::Task,
28  ) -> futures::future::BoxFuture<'a, Result<()>>;
29
30  //  impl std::future::Future<Output = Result<()>> + std::marker::Send + 'static;
31  /// Check if the given executor can execute the task
32  fn can_execute(&self, task: &definitions_task::Task) -> bool;
33}
34
35//   ____        _   _
36//  / __ \      | | (_)
37//  | |  | |_ __ | |_ _  ___  _ __  ___
38//  | |  | | '_ \| __| |/ _ \| '_ \/ __|
39//  | |__| | |_) | |_| | (_) | | | \__ \
40//   \____/| .__/ \__|_|\___/|_| |_|___/
41//         | |
42//         |_|
43
44/// Options for the default execution module
45pub struct Options
46{
47  task_executor: Box<dyn TaskExecutor>,
48}
49
50impl Options
51{
52  /// Create options for the default execution module
53  pub fn new<TTaskExecutor>(task_executor: TTaskExecutor) -> Self
54  where
55    TTaskExecutor: TaskExecutor,
56  {
57    Self {
58      task_executor: Box::new(task_executor),
59    }
60  }
61}
62
63//  __  __           _       _
64// |  \/  | ___   __| |_   _| | ___
65// | |\/| |/ _ \ / _` | | | | |/ _ \
66// | |  | | (_) | (_| | |_| | |  __/
67// |_|  |_|\___/ \__,_|\__,_|_|\___|
68
69module::create_module_private_interface!(
70  ModulePrivateInterface,
71  execution::InputMessage,
72  execution::OutputMessage
73);
74
75struct EndOfQueue
76{
77  total_cost: f32,
78  final_states: crate::states::States,
79  queue_length: usize,
80}
81
82struct ModuleData
83{
84  cancelled_tasks: ccutils::sync::ArcMutex<Vec<uuid::Uuid>>,
85  end_of_queue: ccutils::sync::ArcMutex<EndOfQueue>,
86  task_sender: Sender<definitions_task::Task>,
87}
88
89/// This structure implements a execution module
90pub struct Module {}
91
92impl Module
93{
94  async fn handle_input_message(
95    msg: execution::InputMessage,
96    agent_data: &agent::AgentData,
97    module_data: &ModuleData,
98    output_sender: &Sender<execution::OutputMessage>,
99  ) -> Result<()>
100  {
101    match msg
102    {
103      execution::InputMessage::QueueExecution { uuid } =>
104      {
105        let sr: SimulationResult = agent_data
106          .knowledge_base
107          .retrieve("task_simulation_result", uuid.to_hex())?;
108        let task = agent_data
109          .knowledge_base
110          .retrieve::<definitions_task::Task>("tasks", uuid.to_hex())?;
111        {
112          let (current_cost, final_states) = {
113            let mut end_of_queue = module_data.end_of_queue.lock()?;
114            end_of_queue.total_cost += sr.get_estimated_cost();
115            end_of_queue.final_states = sr.get_final_states().to_owned();
116            (
117              end_of_queue.total_cost,
118              end_of_queue.final_states.to_owned(),
119            )
120          };
121          ccutils::log::log_error!(
122            output_sender
123              .broadcast(execution::OutputMessage::CurrentEstimatedIdlingState {
124                current_cost,
125                final_states,
126                queue_length: module_data.task_sender.len(),
127              })
128              .await,
129            "sending current cost"
130          );
131        }
132        ccutils::log::log_error!(
133          module_data.task_sender.broadcast(task).await,
134          "sending task"
135        );
136      }
137      execution::InputMessage::CancelExecution { uuid } =>
138      {
139        module_data.cancelled_tasks.lock()?.push(uuid);
140      }
141    }
142    Ok(())
143  }
144  fn handle_task_execution<'a>(
145    task: definitions_task::Task,
146    cancelled_tasks: &ccutils::sync::ArcMutex<Vec<uuid::Uuid>>,
147    task_executor: &'a dyn TaskExecutor,
148  ) -> Result<Option<futures::future::BoxFuture<'a, Result<()>>>>
149  {
150    if cancelled_tasks.lock()?.contains(&task.task_id())
151    {
152      Ok(None)
153    }
154    else if task_executor.can_execute(&task)
155    {
156      Ok(Some(task_executor.execute_task(task)))
157    }
158    else
159    {
160      Err(Error::NoExecutor())
161    }
162  }
163}
164
165impl execution::Module for Module
166{
167  type Options = Options;
168  async fn start(
169    agent_data: agent::AgentData,
170    module_interfaces: (
171      crate::module::ModuleInterface<execution::InputMessage, execution::OutputMessage>,
172      ModulePrivateInterface,
173    ),
174    options: Options,
175  )
176  {
177    let (_, module_private_interface) = module_interfaces;
178    let mut input_receiver = module_private_interface.input_receiver.activate();
179    let output_sender = module_private_interface.output_sender.clone();
180    let (task_sender, mut task_receiver) =
181      async_broadcast::broadcast::<definitions_task::Task>(consts::TASK_QUEUE_LENGTH);
182    let cancelled_tasks: ccutils::sync::ArcMutex<Vec<uuid::Uuid>> = Default::default();
183    let end_of_queue: ccutils::sync::ArcMutex<_> = EndOfQueue {
184      total_cost: 0.0,
185      final_states: agent_data.states.to_owned_states().unwrap(),
186      queue_length: 0,
187    }
188    .into();
189    let msg_fut = async {
190      let module_data = ModuleData {
191        cancelled_tasks: cancelled_tasks.to_owned(),
192        end_of_queue: end_of_queue.to_owned(),
193        task_sender,
194      };
195      loop
196      {
197        let msg = input_receiver.recv().await;
198
199        if let Ok(msg) = msg
200        {
201          if let Err(e) =
202            Self::handle_input_message(msg, &agent_data, &module_data, &output_sender).await
203          {
204            log::error!(
205              "An error occured when handling input execution message: {} for agent {}",
206              e,
207              agent_data.agent_uri
208            );
209          }
210        }
211        else
212        {
213          return;
214        }
215      }
216    };
217    let output_sender = module_private_interface.output_sender.clone();
218    let agent_data = agent_data.clone();
219    let cancelled_tasks = cancelled_tasks.clone();
220    let end_of_queue = end_of_queue.clone();
221    let exec_fut = async move {
222      loop
223      {
224        let msg = task_receiver.recv().await;
225        if let Ok(msg) = msg
226        {
227          let sr = agent_data
228            .knowledge_base
229            .retrieve::<SimulationResult>("task_simulation_result", msg.task_id().to_hex());
230          let cost = match sr
231          {
232            Ok(sr) => sr.get_estimated_cost(),
233            Err(_) => 0.0,
234          };
235          match Self::handle_task_execution(msg, &cancelled_tasks, &*options.task_executor)
236          {
237            Ok(Some(fut)) =>
238            {
239              if let Err(e) = fut.await
240              {
241                log::error!(
242                  "An error occured when executing a task: {} for agent {}",
243                  e,
244                  agent_data.agent_uri
245                );
246              }
247            }
248            Ok(None) =>
249            {}
250            Err(e) =>
251            {
252              log::error!(
253                "An error occured when handling task execution message: {} for agent {}",
254                e,
255                agent_data.agent_uri
256              );
257            }
258          }
259          if task_receiver.is_empty()
260          {
261            {
262              let mut end_of_queue = end_of_queue.lock().unwrap();
263              end_of_queue.total_cost = 0.0;
264              end_of_queue.final_states = agent_data.states.to_owned_states().unwrap();
265              end_of_queue.queue_length = 0;
266            }
267            ccutils::log::log_error!(
268              output_sender
269                .broadcast(execution::OutputMessage::CurrentEstimatedIdlingState {
270                  current_cost: 0.0,
271                  final_states: agent_data.states.to_owned_states().unwrap(),
272                  queue_length: 0,
273                })
274                .await,
275              "sending output message"
276            );
277          }
278          else
279          {
280            let (current_cost, final_states, queue_length) = {
281              let mut end_of_queue = end_of_queue.lock().unwrap();
282              end_of_queue.total_cost -= cost;
283              end_of_queue.queue_length = task_receiver.len();
284              (
285                end_of_queue.total_cost,
286                end_of_queue.final_states.to_owned(),
287                end_of_queue.queue_length,
288              )
289            };
290            ccutils::log::log_error!(
291              output_sender
292                .broadcast(execution::OutputMessage::CurrentEstimatedIdlingState {
293                  current_cost,
294                  final_states,
295                  queue_length,
296                })
297                .await,
298              "sending output message"
299            );
300          }
301        }
302        else
303        {
304          return;
305        }
306      }
307    };
308
309    join!(msg_fut, exec_fut);
310  }
311}
312
313impl module::Module for Module
314{
315  type InputMessage = execution::InputMessage;
316  type OutputMessage = execution::OutputMessage;
317  type ModulePrivateInterface = ModulePrivateInterface;
318}