agent_tk/
delegation.rs

1//! This module implements a delegation system for the agent.
2
3use ::std::borrow::Borrow;
4use std::time::{Duration, Instant};
5
6#[cfg(feature = "mqtt")]
7pub mod mqtt;
8
9pub mod transport;
10pub mod transport_messages;
11
12use futures::join;
13use yaaral::TaskInterface as _;
14use yaaral::time::TimeInterface as _;
15
16use crate::prelude::*;
17
18//   ____   _____   ____
19//  / ___| |  ___| |  _ \
20// | |     | |_    | |_) |
21// | |___  |  _|   |  __/
22//  \____| |_|     |_|
23
24/// Represents a call for proposal
25#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
26pub struct CFP
27{
28  /// uuid of the CFP
29  pub uuid: uuid::Uuid,
30  /// URI for the agent which resuest the delegation of the task.
31  pub requester_uri: String,
32  /// the URI of the team
33  pub team_uri: Option<String>,
34  /// the URI for the type of the task
35  pub task_type: String,
36  /// a string description of a task
37  pub task_description: String,
38  /// a cryptographic dignature for the CFP
39  pub signature: Vec<u8>,
40}
41
42//  ____                                                 _
43// |  _ \   _ __    ___    _ __     ___    ___    __ _  | |
44// | |_) | | '__|  / _ \  | '_ \   / _ \  / __|  / _` | | |
45// |  __/  | |    | (_) | | |_) | | (_) | \__ \ | (_| | | |
46// |_|     |_|     \___/  | .__/   \___/  |___/  \__,_| |_|
47//                        |_|
48
49/// Represents a proposal to complete a CFP.
50#[derive(Clone, serde::Deserialize, serde::Serialize)]
51pub struct Proposal
52{
53  /// uri of this agent
54  pub agent_uri: String,
55  /// URI for the agent which resuest the delegation of the task.
56  pub requester_uri: String,
57  /// cost for executing the given task
58  pub cost: f32,
59  /// uuid of the CFP/Task
60  pub task_uuid: uuid::Uuid,
61  /// signature it ingludes the agent_uri, cost, uuid and description
62  pub signature: Vec<u8>,
63}
64
65//  ____    _             _
66// / ___|  | |_    __ _  | |_   _   _   ___
67// \___ \  | __|  / _` | | __| | | | | / __|
68//  ___) | | |_  | (_| | | |_  | |_| | \__ \
69// |____/   \__|  \__,_|  \__|  \__,_| |___/
70
71/// Enum representing the statuts of a delegation and its execution
72#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
73pub enum Status
74{
75  /// The CFP was sent
76  SendCFP,
77  /// A number of proposals have been received
78  ReceivedProposals(usize),
79  /// A proposal was accepted
80  ProposalAccepted,
81  /// No proposals were received before the timeout
82  NoReceivedProposals,
83  /// No proposal was accepted
84  NoAcceptedProposal,
85  /// The execution started
86  ExecutionStarted,
87  /// The execution of a specific node started
88  NodeExecutionStarted
89  {
90    /// uuid of the executed node
91    node: uuid::Uuid,
92  },
93  /// The execution of a specific node was completed
94  NodeExecutionCompleted
95  {
96    /// uuid of the completed node
97    node: uuid::Uuid,
98  },
99}
100
101//  _____                 _
102// |_   _|   __ _   ___  | | __
103//   | |    / _` | / __| | |/ /
104//   | |   | (_| | \__ \ |   <
105//   |_|    \__,_| |___/ |_|\_\
106
107/// Base trait for representing tasks in the delegation
108pub trait Task: Sized
109{
110  /// Create a task from a text description (aka json string...)
111  fn from_description(task_type: impl AsRef<str>, description: impl AsRef<str>) -> Result<Self>;
112  /// The type of the task (tst, goalspec...)
113  fn task_type(&self) -> &str;
114  /// Convert into a string representation
115  fn to_description(&self) -> String;
116  /// An UUID for the task
117  fn task_id(&self) -> uuid::Uuid;
118}
119
120//  ___                   _   __  __
121// |_ _|_ __  _ __  _   _| |_|  \/  | ___  ___ ___  __ _  __ _  ___
122//  | || '_ \| '_ \| | | | __| |\/| |/ _ \/ __/ __|/ _` |/ _` |/ _ \
123//  | || | | | |_) | |_| | |_| |  | |  __/\__ \__ \ (_| | (_| |  __/
124// |___|_| |_| .__/ \__,_|\__|_|  |_|\___||___/___/\__,_|\__, |\___|
125//           |_|                                         |___/
126
127/// Input message from the delegation module
128#[derive(Clone)]
129pub enum InputMessage
130{
131  /// Tell the delegation module to start a delegation
132  StartDelegation
133  {
134    /// Call for proposal
135    cfp: CFP,
136  },
137}
138
139impl InputMessage
140{
141  /// Convenience function for creating a CFP and wrap it into a start delegation message
142  pub(crate) fn create_start_delegation(
143    requester_uri: String,
144    task: impl Task,
145    team_uri: Option<String>,
146  ) -> InputMessage
147  {
148    InputMessage::StartDelegation {
149      cfp: CFP {
150        uuid: task.task_id(),
151        requester_uri,
152        team_uri,
153        task_type: task.task_type().into(),
154        task_description: task.to_description(),
155        signature: Vec::<u8>::default(),
156      },
157    }
158  }
159}
160
161//   ___        _               _   __  __
162//  / _ \ _   _| |_ _ __  _   _| |_|  \/  | ___  ___ ___  __ _  __ _  ___
163// | | | | | | | __| '_ \| | | | __| |\/| |/ _ \/ __/ __|/ _` |/ _` |/ _ \
164// | |_| | |_| | |_| |_) | |_| | |_| |  | |  __/\__ \__ \ (_| | (_| |  __/
165//  \___/ \__,_|\__| .__/ \__,_|\__|_|  |_|\___||___/___/\__,_|\__, |\___|
166//                 |_|                                         |___/
167
168/// Output message from the delegation module
169#[derive(Clone)]
170pub enum OutputMessage
171{
172  /// Status of the delegation
173  Status
174  {
175    /// Uuid of the task
176    uuid: uuid::Uuid,
177    /// Status
178    status: Status,
179  },
180}
181
182//  __  __           _       _      ___       _             __
183// |  \/  | ___   __| |_   _| | ___|_ _|_ __ | |_ ___ _ __ / _| __ _  ___ ___
184// | |\/| |/ _ \ / _` | | | | |/ _ \| || '_ \| __/ _ \ '__| |_ / _` |/ __/ _ \
185// | |  | | (_) | (_| | |_| | |  __/| || | | | ||  __/ |  |  _| (_| | (_|  __/
186// |_|  |_|\___/ \__,_|\__,_|_|\___|___|_| |_|\__\___|_|  |_|  \__,_|\___\___|
187
188/// Module interface for the delegation modue
189pub type ModuleInterface = module::ModuleInterface<InputMessage, OutputMessage>;
190
191//  __  __           _       _
192// |  \/  | ___   __| |_   _| | ___
193// | |\/| |/ _ \ / _` | | | | |/ _ \
194// | |  | | (_) | (_| | |_| | |  __/
195// |_|  |_|\___/ \__,_|\__,_|_|\___|
196
197module::create_module_private_interface!(ModulePrivateInterface, InputMessage, OutputMessage);
198
199/// This structure implements a delegation module
200pub(crate) struct Module {}
201
202impl Module
203{
204  /// Create a new delegation module
205  pub(crate) async fn start(
206    agent_data: agent::AgentData,
207    module_interfaces: (
208      crate::module::ModuleInterface<InputMessage, OutputMessage>,
209      ModulePrivateInterface,
210    ),
211    transport_interface: transport::ModuleInterface,
212    decision_interface: decision::ModuleInterface,
213  ) -> ()
214  {
215    let (_, module_private_interface) = module_interfaces;
216
217    let mut input_receiver = module_private_interface.input_receiver.activate();
218    let output_sender = module_private_interface.output_sender;
219    let transport_interface = transport_interface;
220
221    let ir_fut = {
222      let agent_data = agent_data.clone();
223      let transport_interface = transport_interface.clone();
224      async move {
225        loop
226        {
227          let msg = input_receiver.recv().await;
228          if let Ok(msg) = msg
229          {
230            match msg
231            {
232              InputMessage::StartDelegation { cfp } =>
233              {
234                let output_sender = output_sender.clone();
235                let transport_input_sender = transport_interface.input_sender();
236                let mut transport_output_receiver = transport_interface.output_receiver();
237                let agent_data_agent_uri = agent_data.agent_uri.to_owned();
238                let delegation = async move {
239                  let cfp_uuid = cfp.uuid.to_owned();
240
241                  // Send CFP to transport
242                  match transport_input_sender
243                    .broadcast_direct(transport::InputMessage::SendCFP { cfp })
244                    .await
245                  {
246                    Ok(_) =>
247                    {}
248                    Err(e) => log::error!("While sending CFP: {:?}", e.to_string()),
249                  }
250
251                  // Accumulate proposals
252                  let mut proposals = Vec::<delegation::Proposal>::new();
253                  let start = Instant::now();
254                  while start.elapsed().as_secs() < 10
255                  {
256                    let msg = crate::Runtime::timeout(
257                      Duration::from_millis(10 * 1000 - start.elapsed().as_millis() as u64),
258                      transport_output_receiver.recv(),
259                    )
260                    .await;
261                    if let Ok(Ok(transport::OutputMessage::ReceivedProposal { proposal })) = msg
262                      && proposal.task_uuid == cfp_uuid
263                    {
264                      proposals.push(proposal);
265                    }
266                  }
267                  log::info!(
268                    "Received {} proposals for delegation {:?}",
269                    proposals.len(),
270                    cfp_uuid
271                  );
272
273                  // Sort proposals
274                  proposals.sort_by(|a, b| a.cost.partial_cmp(b.cost.borrow()).unwrap());
275                  if proposals.is_empty()
276                  {
277                    log::error!("No proposals received.");
278                    ccutils::log_error!(
279                      output_sender
280                        .broadcast_direct(OutputMessage::Status {
281                          uuid: cfp_uuid,
282                          status: Status::NoReceivedProposals,
283                        })
284                        .await,
285                      "sending no received proposals"
286                    );
287                    return;
288                  }
289                  else
290                  {
291                    ccutils::log::log_error!(
292                      output_sender
293                        .broadcast(OutputMessage::Status {
294                          uuid: cfp_uuid.to_owned(),
295                          status: Status::ReceivedProposals(proposals.len()),
296                        })
297                        .await,
298                      "sending status"
299                    );
300                  }
301
302                  // Select proposals
303                  for p in proposals
304                  {
305                    ccutils::log::log_error!(
306                      transport_input_sender
307                        .broadcast(transport::InputMessage::SendProposalAcceptance {
308                          acceptance: delegation::transport_messages::Acceptance {
309                            agent_uri: p.agent_uri.to_owned(),
310                            requester_uri: agent_data_agent_uri.to_owned(),
311                            acceptance: true,
312                            uuid: p.task_uuid,
313                            signature: Default::default(),
314                          },
315                        })
316                        .await,
317                      "sending proposal acceptance"
318                    );
319                    let start = Instant::now();
320                    while start.elapsed().as_secs() < 10
321                    {
322                      let msg = crate::Runtime::timeout(
323                        Duration::from_millis(10 * 1000 - start.elapsed().as_millis() as u64),
324                        transport_output_receiver.recv(),
325                      )
326                      .await;
327                      if let Ok(Ok(transport::OutputMessage::ReceivedExecutionAccepted {
328                        acceptance,
329                      })) = msg
330                        && acceptance.uuid == cfp_uuid
331                      {
332                        if acceptance.acceptance
333                        {
334                          ccutils::log::log_error!(
335                            output_sender
336                              .broadcast(OutputMessage::Status {
337                                uuid: cfp_uuid,
338                                status: Status::ProposalAccepted,
339                              })
340                              .await,
341                            "sending proposal accepted"
342                          );
343                          return;
344                        }
345                        else
346                        {
347                          break;
348                        }
349                      }
350                    }
351                    // Cancel the acceptance
352                    ccutils::log::log_error!(
353                      transport_input_sender
354                        .broadcast(transport::InputMessage::SendCancelAcceptance {
355                          cancel_acceptance: transport_messages::CancelAcceptance {
356                            agent_uri: p.agent_uri,
357                            uuid: p.task_uuid,
358                            signature: Default::default(),
359                          },
360                        })
361                        .await,
362                      "sending acceptance cancellation"
363                    );
364                  }
365                  // No proposal was accepted, delegation failed
366                  ccutils::log::log_error!(
367                    output_sender
368                      .broadcast(OutputMessage::Status {
369                        uuid: cfp_uuid.to_owned(),
370                        status: Status::NoAcceptedProposal,
371                      })
372                      .await,
373                    "sending no accepted proposal"
374                  );
375                };
376                ccutils::log::log_error!(
377                  agent_data.async_runtime.spawn_task(delegation).detach(),
378                  "spawning delergation task"
379                );
380              }
381            }
382          }
383          else
384          {
385            return;
386          }
387        }
388      }
389    };
390    let tr_fut = {
391      let agent_data = agent_data.clone();
392      let decision_input_sender = decision_interface.input_sender();
393      let mut transport_output_receiver = transport_interface.output_receiver();
394      async move {
395        loop
396        {
397          match transport_output_receiver.recv().await
398          {
399            Ok(msg) => match msg
400            {
401              transport::OutputMessage::ReceivedCFP { cfp } =>
402              {
403                ccutils::log::log_error!(
404                  decision_input_sender
405                    .broadcast(decision::InputMessage::DecideCFPAcceptance { cfp })
406                    .await,
407                  "sending cfp acceptance"
408                );
409              }
410              transport::OutputMessage::ReceivedProposalAccepted { acceptance } =>
411              {
412                if acceptance.agent_uri == agent_data.agent_uri
413                {
414                  ccutils::log::log_error!(
415                    decision_input_sender
416                      .broadcast(decision::InputMessage::QueueExecution {
417                        uuid: acceptance.uuid,
418                        requester_uri: acceptance.requester_uri,
419                      })
420                      .await,
421                    "sending queue execution"
422                  );
423                }
424              }
425              transport::OutputMessage::ReceivedCancelAcceptance { cancel_acceptance } =>
426              {
427                if cancel_acceptance.agent_uri == agent_data.agent_uri
428                {
429                  ccutils::log::log_error!(
430                    decision_input_sender
431                      .broadcast(decision::InputMessage::CancelExecution {
432                        uuid: cancel_acceptance.uuid,
433                      })
434                      .await,
435                    "sending cancel execution"
436                  );
437                }
438              }
439              _ =>
440              {}
441            },
442            Err(_) =>
443            {
444              return;
445            }
446          }
447        }
448      }
449    };
450    let dr_fut = {
451      let mut decision_output_receiver = decision_interface.output_receiver();
452      let transport_input_sender = transport_interface.input_sender();
453      async move {
454        loop
455        {
456          match decision_output_receiver.recv().await
457          {
458            Ok(msg) => match msg
459            {
460              decision::OutputMessage::CFPProposal { proposal } =>
461              {
462                ccutils::log_error!(
463                  transport_input_sender
464                    .broadcast(transport::InputMessage::SendProposal { proposal })
465                    .await,
466                  "sending proposal"
467                );
468              }
469              decision::OutputMessage::QueueExecutionResult {
470                uuid,
471                requester_uri,
472                accepted,
473              } =>
474              {
475                ccutils::log::log_error!(
476                  transport_input_sender
477                    .broadcast(transport::InputMessage::SendExecutionAcceptance {
478                      acceptance: transport_messages::Acceptance {
479                        agent_uri: agent_data.agent_uri.to_owned(),
480                        requester_uri,
481                        acceptance: accepted,
482                        uuid,
483                        signature: Default::default(),
484                      },
485                    })
486                    .await,
487                  "sending execution acceptance"
488                );
489              }
490            },
491            Err(_) =>
492            {
493              return;
494            }
495          }
496        }
497      }
498    };
499    join!(ir_fut, tr_fut, dr_fut);
500  }
501}
502
503impl crate::module::Module for Module
504{
505  type InputMessage = InputMessage;
506  type OutputMessage = OutputMessage;
507  type ModulePrivateInterface = ModulePrivateInterface;
508}