agent_tk/execution/
default.rs

1//! This module defines a default execution module.
2
3use async_broadcast::Sender;
4use ccutils::sync::ArcMutex;
5use futures::join;
6
7use crate::definitions::task as definitions_task;
8use crate::delegation::Task;
9use crate::knowledge_base::KnowledgeBaseInterface;
10use crate::simulation::tst::SimulationResult;
11use crate::{agent, consts, decision, delegation, execution, module, utils, uuid, Error, Result};
12
13mod task_executor;
14
15//  _____         _    _____                     _
16// |_   _|_ _ ___| | _| ____|_  _____  ___ _   _| |_ ___  _ __
17//   | |/ _` / __| |/ /  _| \ \/ / _ \/ __| | | | __/ _ \| '__|
18//   | | (_| \__ \   <| |___ >  |  __/ (__| |_| | || (_) | |
19//   |_|\__,_|___/_|\_\_____/_/\_\___|\___|\__,_|\__\___/|_|
20
21/// Interface to a task executor
22pub trait TaskExecutor: Sync + Send + 'static
23{
24  /// Execute the given task. The function is expected to block until the execution is completed.
25  /// It will be called from an async context by the default executor.
26  fn execute_task(
27    &self,
28    task: definitions_task::Task,
29  ) -> futures::future::BoxFuture<'static, Result<()>>;
30
31  //  impl std::future::Future<Output = Result<()>> + std::marker::Send + 'static;
32  /// Check if the given executor can execute the task
33  fn can_execute(&self, task: &definitions_task::Task) -> bool;
34}
35
36//   ____        _   _
37//  / __ \      | | (_)
38//  | |  | |_ __ | |_ _  ___  _ __  ___
39//  | |  | | '_ \| __| |/ _ \| '_ \/ __|
40//  | |__| | |_) | |_| | (_) | | | \__ \
41//   \____/| .__/ \__|_|\___/|_| |_|___/
42//         | |
43//         |_|
44
45/// Options for the default execution module
46pub struct Options
47{
48  task_executor: Box<dyn TaskExecutor>,
49}
50
51impl Options
52{
53  /// Create options for the default execution module
54  pub fn new<TTaskExecutor>(task_executor: TTaskExecutor) -> Self
55  where
56    TTaskExecutor: TaskExecutor,
57  {
58    Self {
59      task_executor: Box::new(task_executor),
60    }
61  }
62}
63
64//  __  __           _       _
65// |  \/  | ___   __| |_   _| | ___
66// | |\/| |/ _ \ / _` | | | | |/ _ \
67// | |  | | (_) | (_| | |_| | |  __/
68// |_|  |_|\___/ \__,_|\__,_|_|\___|
69
70module::create_module_private_interface!(
71  ModulePrivateInterface,
72  execution::InputMessage,
73  execution::OutputMessage
74);
75
76struct EndOfQueue
77{
78  total_cost: f32,
79  final_states: crate::states::States,
80  queue_length: usize,
81}
82
83struct ModuleData
84{
85  cancelled_tasks: ccutils::sync::ArcMutex<Vec<uuid::Uuid>>,
86  end_of_queue: ccutils::sync::ArcMutex<EndOfQueue>,
87  task_sender: Sender<definitions_task::Task>,
88}
89
90/// This structure implements a execution module
91pub struct Module {}
92
93impl Module
94{
95  async fn handle_input_message(
96    msg: execution::InputMessage,
97    agent_data: &agent::AgentData,
98    module_data: &ModuleData,
99    output_sender: &Sender<execution::OutputMessage>,
100  ) -> Result<()>
101  {
102    match msg
103    {
104      execution::InputMessage::QueueExecution { uuid } =>
105      {
106        let sr: SimulationResult = agent_data
107          .knowledge_base
108          .retrieve("task_simulation_result", uuid.to_hex())?;
109        let task = agent_data
110          .knowledge_base
111          .retrieve::<definitions_task::Task>("tasks", uuid.to_hex())?;
112        {
113          let (current_cost, final_states) = {
114            let mut end_of_queue = module_data.end_of_queue.lock()?;
115            end_of_queue.total_cost += sr.get_estimated_cost();
116            end_of_queue.final_states = sr.get_final_states().to_owned();
117            (
118              end_of_queue.total_cost,
119              end_of_queue.final_states.to_owned(),
120            )
121          };
122          output_sender
123            .broadcast(execution::OutputMessage::CurrentEstimatedIdlingState {
124              current_cost,
125              final_states,
126              queue_length: module_data.task_sender.len(),
127            })
128            .await;
129        }
130        module_data.task_sender.broadcast(task).await;
131      }
132      execution::InputMessage::CancelExecution { uuid } =>
133      {
134        module_data.cancelled_tasks.lock()?.push(uuid);
135      }
136    }
137    Ok(())
138  }
139  fn handle_task_execution(
140    task: definitions_task::Task,
141    cancelled_tasks: &ccutils::sync::ArcMutex<Vec<uuid::Uuid>>,
142    task_executor: &dyn TaskExecutor,
143  ) -> Result<Option<futures::future::BoxFuture<'static, Result<()>>>>
144  {
145    if cancelled_tasks.lock()?.contains(&task.task_id())
146    {
147      Ok(None)
148    }
149    else if (task_executor.can_execute(&task))
150    {
151      Ok(Some(task_executor.execute_task(task)))
152    }
153    else
154    {
155      Err(Error::NoExecutor())
156    }
157  }
158}
159
160impl execution::Module for Module
161{
162  type Options = Options;
163  async fn start(
164    agent_data: agent::AgentData,
165    module_interfaces: (
166      crate::module::ModuleInterface<execution::InputMessage, execution::OutputMessage>,
167      ModulePrivateInterface,
168    ),
169    options: Options,
170  )
171  {
172    let (module_interface, module_private_interface) = module_interfaces;
173    let mut input_receiver = module_private_interface.input_receiver.activate();
174    let output_sender = module_private_interface.output_sender.clone();
175    let (task_sender, mut task_receiver) =
176      async_broadcast::broadcast::<definitions_task::Task>(consts::TASK_QUEUE_LENGTH);
177    let cancelled_tasks: ccutils::sync::ArcMutex<Vec<uuid::Uuid>> = Default::default();
178    let end_of_queue: ccutils::sync::ArcMutex<_> = EndOfQueue {
179      total_cost: 0.0,
180      final_states: agent_data.states.to_owned_states().unwrap(),
181      queue_length: 0,
182    }
183    .into();
184    let msg_fut = async {
185      let module_data = ModuleData {
186        cancelled_tasks: cancelled_tasks.to_owned(),
187        end_of_queue: end_of_queue.to_owned(),
188        task_sender,
189      };
190      loop
191      {
192        let msg = input_receiver.recv().await;
193
194        if let Ok(msg) = msg
195        {
196          if let Err(e) =
197            Self::handle_input_message(msg, &agent_data, &module_data, &output_sender).await
198          {
199            log::error!(
200              "An error occured when handling input execution message: {} for agent {}",
201              e,
202              agent_data.agent_uri
203            );
204          }
205        }
206        else
207        {
208          return;
209        }
210      }
211    };
212    let output_sender = module_private_interface.output_sender.clone();
213    let agent_data = agent_data.clone();
214    let cancelled_tasks = cancelled_tasks.clone();
215    let end_of_queue = end_of_queue.clone();
216    let exec_fut = async move {
217      loop
218      {
219        let msg = task_receiver.recv().await;
220        if let Ok(msg) = msg
221        {
222          let sr = agent_data
223            .knowledge_base
224            .retrieve::<SimulationResult>("task_simulation_result", msg.task_id().to_hex());
225          let cost = match sr
226          {
227            Ok(sr) => sr.get_estimated_cost(),
228            Err(_) => 0.0,
229          };
230          match Self::handle_task_execution(msg, &cancelled_tasks, &*options.task_executor)
231          {
232            Ok(Some(fut)) =>
233            {
234              if let Err(e) = fut.await
235              {
236                log::error!(
237                  "An error occured when executing a task: {} for agent {}",
238                  e,
239                  agent_data.agent_uri
240                );
241              }
242            }
243            Ok(None) =>
244            {}
245            Err(e) =>
246            {
247              log::error!(
248                "An error occured when handling task execution message: {} for agent {}",
249                e,
250                agent_data.agent_uri
251              );
252            }
253          }
254          if task_receiver.is_empty()
255          {
256            {
257              let mut end_of_queue = end_of_queue.lock().unwrap();
258              end_of_queue.total_cost = 0.0;
259              end_of_queue.final_states = agent_data.states.to_owned_states().unwrap();
260              end_of_queue.queue_length = 0;
261            }
262            output_sender
263              .broadcast(execution::OutputMessage::CurrentEstimatedIdlingState {
264                current_cost: 0.0,
265                final_states: agent_data.states.to_owned_states().unwrap(),
266                queue_length: 0,
267              })
268              .await;
269          }
270          else
271          {
272            let (current_cost, final_states, queue_length) = {
273              let mut end_of_queue = end_of_queue.lock().unwrap();
274              end_of_queue.total_cost -= cost;
275              end_of_queue.queue_length = task_receiver.len();
276              (
277                end_of_queue.total_cost,
278                end_of_queue.final_states.to_owned(),
279                end_of_queue.queue_length,
280              )
281            };
282            output_sender
283              .broadcast(execution::OutputMessage::CurrentEstimatedIdlingState {
284                current_cost,
285                final_states,
286                queue_length,
287              })
288              .await;
289          }
290        }
291        else
292        {
293          return;
294        }
295      }
296    };
297
298    join!(msg_fut, exec_fut);
299  }
300}
301
302impl module::Module for Module
303{
304  type InputMessage = execution::InputMessage;
305  type OutputMessage = execution::OutputMessage;
306  type ModulePrivateInterface = ModulePrivateInterface;
307}