agent_tk/decision/
default.rs

1//! This module defines a default decision module.
2
3use async_broadcast::Sender;
4use futures::join;
5
6use crate::prelude::*;
7use definitions::task as definitions_task;
8use delegation::Task as DelTask;
9
10//   ____        _   _
11//  / __ \      | | (_)
12//  | |  | |_ __ | |_ _  ___  _ __  ___
13//  | |  | | '_ \| __| |/ _ \| '_ \/ __|
14//  | |__| | |_) | |_| | (_) | | | \__ \
15//   \____/| .__/ \__|_|\___/|_| |_|___/
16//         | |
17//         |_|
18
19/// Options for the default decision module
20pub struct Options {}
21
22impl Default for Options
23{
24  fn default() -> Self
25  {
26    Self::new()
27  }
28}
29
30impl Options
31{
32  /// Create options for the default decision module
33  pub fn new() -> Self
34  {
35    Self {}
36  }
37}
38
39//  __  __           _       _
40// |  \/  | ___   __| |_   _| | ___
41// | |\/| |/ _ \ / _` | | | | |/ _ \
42// | |  | | (_) | (_| | |_| | |  __/
43// |_|  |_|\___/ \__,_|\__,_|_|\___|
44
45module::create_module_private_interface!(
46  ModulePrivateInterface,
47  decision::InputMessage,
48  decision::OutputMessage
49);
50
51struct EndOfQueue
52{
53  total_cost: f32,
54  final_states: crate::states::States,
55  queue_length: usize,
56}
57
58/// This structure implements a decision module
59pub struct Module {}
60
61impl Module
62{
63  async fn execute_tst_task(
64    uuid: &uuid::Uuid,
65    tst: &definitions::tst::Node,
66    requester_uri: &String,
67    agent_data: &agent::AgentData,
68    end_of_queue: &ccutils::sync::ArcMutex<EndOfQueue>,
69    output_sender: &Sender<decision::OutputMessage>,
70    _execution_input_sender: &Sender<execution::InputMessage>,
71  ) -> Result<()>
72  {
73    agent_data.knowledge_base.insert(
74      "tasks",
75      uuid.to_hex(),
76      &definitions_task::Task::from_tst(tst.clone()),
77    )?;
78    let queue_length = end_of_queue.lock()?.queue_length;
79    if 10 * queue_length > 8 * consts::TASK_QUEUE_LENGTH
80    {
81      return Ok(());
82    }
83    let states = end_of_queue.lock()?.final_states.clone();
84    let sr = crate::simulation::tst::simulate_execution(
85      states,
86      agent_data.capabilities.clone(),
87      tst.clone(),
88    )
89    .await;
90    let total_cost = end_of_queue.lock()?.total_cost;
91    match sr
92    {
93      Ok(sr) =>
94      {
95        agent_data
96          .knowledge_base
97          .insert("task_simulation_result", uuid.to_hex(), &sr)?;
98        ccutils::log::log_error!(
99          output_sender
100            .broadcast(decision::OutputMessage::CFPProposal {
101              proposal: delegation::Proposal {
102                agent_uri: agent_data.agent_uri.to_owned(),
103                requester_uri: requester_uri.to_owned(),
104                cost: total_cost + sr.get_estimated_cost(),
105                signature: Default::default(),
106                task_uuid: uuid.to_owned(),
107              },
108            })
109            .await,
110          "While sending CFP Proposal"
111        );
112        Ok(())
113      }
114      Err(e) => match e
115      {
116        Error::ExecutionFailed(ef) => match *ef
117        {
118          Error::UnknownCapability(_) => Ok(()),
119          e => Err(e),
120        },
121        e => Err(e),
122      },
123    }
124  }
125  async fn handle_input_message(
126    msg: decision::InputMessage,
127    agent_data: &agent::AgentData,
128    end_of_queue: &ccutils::sync::ArcMutex<EndOfQueue>,
129    output_sender: &Sender<decision::OutputMessage>,
130    execution_input_sender: &Sender<execution::InputMessage>,
131  ) -> Result<()>
132  {
133    match msg
134    {
135      decision::InputMessage::DecideCFPAcceptance { cfp } =>
136      {
137        let task = definitions_task::Task::from_description(&cfp.task_type, &cfp.task_description)?;
138
139        match task.get_container()
140        {
141          definitions_task::TaskContainer::Tst(tst) =>
142          {
143            Self::execute_tst_task(
144              &task.task_id(),
145              tst,
146              &cfp.requester_uri,
147              agent_data,
148              end_of_queue,
149              output_sender,
150              execution_input_sender,
151            )
152            .await?;
153          }
154          definitions_task::TaskContainer::Goal(goal) =>
155          {
156            let tst = conversion::goal::ToTst::convert(goal)?;
157            Self::execute_tst_task(
158              &task.task_id(),
159              &tst,
160              &cfp.requester_uri,
161              agent_data,
162              end_of_queue,
163              output_sender,
164              execution_input_sender,
165            )
166            .await?;
167          }
168        }
169      }
170      decision::InputMessage::QueueExecution {
171        uuid,
172        requester_uri,
173      } =>
174      {
175        let accept = 10 * end_of_queue.lock()?.queue_length < 8 * consts::TASK_QUEUE_LENGTH;
176        if accept
177        {
178          ccutils::log::log_error!(
179            execution_input_sender
180              .broadcast(execution::InputMessage::QueueExecution { uuid })
181              .await,
182            "Sending queue execution"
183          );
184        }
185        ccutils::log::log_error!(
186          output_sender
187            .broadcast(decision::OutputMessage::QueueExecutionResult {
188              uuid,
189              accepted: accept,
190              requester_uri,
191            })
192            .await,
193          "sneding queued execution result"
194        );
195      }
196      decision::InputMessage::CancelExecution { uuid } =>
197      {
198        ccutils::log::log_error!(
199          execution_input_sender
200            .broadcast(execution::InputMessage::CancelExecution {
201              uuid: uuid.to_owned(),
202            })
203            .await,
204          "sending execution cancelled"
205        );
206      }
207    }
208
209    Ok(())
210  }
211}
212
213impl decision::Module for Module
214{
215  type Options = Options;
216  async fn start(
217    agent_data: agent::AgentData,
218    module_interfaces: (decision::ModuleInterface, ModulePrivateInterface),
219    execution_interface: execution::ModuleInterface,
220    _: Options,
221  )
222  {
223    let (_, module_private_interface) = module_interfaces;
224
225    let mut input_receiver = module_private_interface.input_receiver.activate();
226    let output_sender = module_private_interface.output_sender;
227
228    let end_of_queue: ccutils::sync::ArcMutex<_> = EndOfQueue {
229      total_cost: 0.0,
230      final_states: agent_data.states.to_owned_states().unwrap(),
231      queue_length: 0,
232    }
233    .into();
234
235    let execution_input_sender = execution_interface.input_sender();
236
237    let fut_ir = {
238      let end_of_queue = end_of_queue.clone();
239      async move {
240        loop
241        {
242          let msg = input_receiver.recv().await;
243          if let Ok(msg) = msg
244          {
245            if let Err(e) = Self::handle_input_message(
246              msg,
247              &agent_data,
248              &end_of_queue,
249              &output_sender,
250              &execution_input_sender,
251            )
252            .await
253            {
254              log::error!(
255                "An error occured for agent {} when handling decision message: {}",
256                agent_data.agent_uri,
257                e
258              );
259            }
260          }
261          else
262          {
263            log::info!("Decision loop for {:?} is closing", agent_data.agent_uri);
264            return;
265          }
266        }
267      }
268    };
269    let mut execution_output_receiver = execution_interface.output_receiver();
270    let fut_exec_or = async move {
271      loop
272      {
273        let msg = execution_output_receiver.recv().await;
274        if let Ok(msg) = msg
275        {
276          match msg
277          {
278            execution::OutputMessage::CurrentEstimatedIdlingState {
279              current_cost,
280              final_states,
281              queue_length,
282            } =>
283            {
284              let mut end_of_queue = end_of_queue.lock().unwrap();
285              end_of_queue.total_cost = current_cost;
286              end_of_queue.final_states = final_states;
287              end_of_queue.queue_length = queue_length;
288            }
289          }
290        }
291        else
292        {
293          return;
294        }
295      }
296    };
297    join!(fut_ir, fut_exec_or);
298  }
299}
300
301impl module::Module for Module
302{
303  type InputMessage = decision::InputMessage;
304  type OutputMessage = decision::OutputMessage;
305  type ModulePrivateInterface = ModulePrivateInterface;
306}