agent-tk 0.4.0

`agent-tk` (`agent toolkit/tasks-knowledge`) is a crate for the development of autonomous agents using Rust, with an emphasis on tasks and knowledge based agents. This project is part of the [auKsys](http://auksys.org) project.
Documentation
//! Support for the delegation protocol via MQTT.

use futures::{FutureExt, StreamExt, join};

use crate::prelude::*;

use delegation::{transport, transport_messages};

//  _              _
// | |            (_)
// | |_ ___  _ __  _  ___ ___
// | __/ _ \| '_ \| |/ __/ __|
// | || (_) | |_) | | (__\__ \
//  \__\___/| .__/|_|\___|___/
//          | |
//          |_|

mod topics
{
  pub const CFP: &str = "delegation/cfp";

  pub(super) fn offer(agent_name: &impl std::fmt::Display) -> String
  {
    format!("delegation/{}/offer", agent_name)
  }
  pub(super) fn execution_acceptance(agent_name: &impl std::fmt::Display) -> String
  {
    format!("delegation/{}/execution_acceptance", agent_name)
  }
  #[allow(dead_code)]
  pub(super) fn execution_status(agent_name: &impl std::fmt::Display) -> String
  {
    format!("delegation/{}/execution_status", agent_name)
  }
  pub(super) fn proposal_acceptance(agent_name: &impl std::fmt::Display) -> String
  {
    format!("delegation/{}/proposal_acceptance", agent_name)
  }
  pub(super) fn cancel_acceptance(agent_name: &impl std::fmt::Display) -> String
  {
    format!("delegation/{}/cancel_acceptance", agent_name)
  }
}

//   ____        _   _
//  / __ \      | | (_)
//  | |  | |_ __ | |_ _  ___  _ __  ___
//  | |  | | '_ \| __| |/ _ \| '_ \/ __|
//  | |__| | |_) | |_| | (_) | | | \__ \
//   \____/| .__/ \__|_|\___/|_| |_|___/
//         | |
//         |_|

/// MQTT transport options
pub struct Options
{
  client: mqtt_channel::Client,
}

impl Options
{
  /// Create new options.
  pub fn new(client: mqtt_channel::Client) -> Self
  {
    Self { client }
  }
}

//  __  __               _           _
// |  \/  |   ___     __| |  _   _  | |   ___
// | |\/| |  / _ \   / _` | | | | | | |  / _ \
// | |  | | | (_) | | (_| | | |_| | | | |  __/
// |_|  |_|  \___/   \__,_|  \__,_| |_|  \___|

/// MQTT Module
pub struct Module {}

impl Module
{
  async fn send_message<TM: serde::Serialize>(
    client: mqtt_channel::Client,
    topic_name: &str,
    tm: TM,
  ) -> crate::Result<()>
  {
    Runtime::tokio_compat(client.publish_raw(
      topic_name,
      mqtt_channel::QoS::ExactlyOnce,
      true,
      serde_json::to_string(&tm)?,
      Some(mqtt_channel::mqttbytes::PublishProperties {
        content_type: Some("application/json".to_string()),
        ..Default::default()
      }),
    ))
    .await?;
    Ok(())
  }

  async fn handle_input_message(
    msg: transport::InputMessage,
    client: mqtt_channel::Client,
  ) -> crate::Result<()>
  {
    match msg
    {
      transport::InputMessage::SendCFP { cfp } =>
      {
        Module::send_message(client, topics::CFP, cfp).await
      }
      transport::InputMessage::SendProposal { proposal } =>
      {
        Module::send_message(client, &topics::offer(&proposal.requester_uri), proposal).await
      }
      transport::InputMessage::SendProposalAcceptance { acceptance } =>
      {
        Module::send_message(
          client,
          &topics::proposal_acceptance(&acceptance.agent_uri),
          acceptance,
        )
        .await
      }
      transport::InputMessage::SendExecutionAcceptance { acceptance } =>
      {
        Module::send_message(
          client,
          &topics::execution_acceptance(&acceptance.requester_uri),
          acceptance,
        )
        .await
      }
      transport::InputMessage::SendCancelAcceptance { cancel_acceptance } =>
      {
        Module::send_message(
          client,
          &topics::cancel_acceptance(&cancel_acceptance.agent_uri),
          cancel_acceptance,
        )
        .await
      }
    }
  }
}

module::create_module_private_interface!(
  ModulePrivateInterface,
  transport::InputMessage,
  transport::OutputMessage
);

impl module::Module for Module
{
  type InputMessage = transport::InputMessage;
  type OutputMessage = transport::OutputMessage;
  type ModulePrivateInterface = ModulePrivateInterface;
}

macro_rules! create_subscriber_forwarder {
  ($client: ident, $topic_name: expr, $output_sender: ident,
    $transport_msg:path, $output_message:path, $field: ident, $agent_uri: expr, $counter: expr) => {
    async {
      let _ = Runtime::tokio_compat(
        $client
          .clone()
          .get_or_create_json_subscription::<$transport_msg>(
            $topic_name,
            mqtt_channel::QoS::ExactlyOnce,
            100,
          )
          .await
          .unwrap()
          .for_each_concurrent(None, |m| async {
            ccutils::log::log_error!(
              $output_sender
                .broadcast({ { $output_message { $field: m.payload } } })
                .await,
              "while forwarding message"
            );
          }),
      )
      .await;
    }
  };
}

impl super::transport::Module for Module
{
  type Options = Options;
  fn start<'a>(
    agent_data: agent::AgentData,
    module_interfaces: (
      module::ModuleInterface<transport::InputMessage, transport::OutputMessage>,
      Self::ModulePrivateInterface,
    ),
    _delegation_module_interface: crate::module::ModuleInterface<
      super::InputMessage,
      super::OutputMessage,
    >,

    options: Self::Options,
  ) -> Result<futures::future::BoxFuture<'a, ()>>
  {
    let (_, module_private_interface) = module_interfaces;

    let mut input_receiver = module_private_interface.input_receiver.activate();
    let output_sender = module_private_interface.output_sender;

    let client = options.client;

    let agent_uri = agent_data.agent_uri.to_owned();
    let fut = async move {
      let client = client.clone();
      // MQTT Subscribptions
      let input_future = async {
        join!(
          async {
            loop
            {
              let msg = input_receiver.recv().await;
              if let Ok(msg) = msg
                && let Err(e) = Module::handle_input_message(msg, client.clone()).await
              {
                log::error!(
                  "An error occured when handling MQTT Transport message: {}",
                  e
                );
              }
            }
          },
          create_subscriber_forwarder!(
            client,
            topics::cancel_acceptance(&agent_uri.clone()),
            output_sender,
            transport_messages::CancelAcceptance,
            transport::OutputMessage::ReceivedCancelAcceptance,
            cancel_acceptance,
            agent_uri.clone(),
            "mca"
          ),
          create_subscriber_forwarder!(
            client,
            topics::CFP,
            output_sender,
            delegation::CFP,
            transport::OutputMessage::ReceivedCFP,
            cfp,
            agent_uri.clone(),
            "mcfp"
          ),
          create_subscriber_forwarder!(
            client,
            topics::execution_acceptance(&agent_uri.clone()),
            output_sender,
            transport_messages::Acceptance,
            transport::OutputMessage::ReceivedExecutionAccepted,
            acceptance,
            agent_uri.clone(),
            "mrea"
          ),
          create_subscriber_forwarder!(
            client,
            topics::offer(&agent_uri.clone()),
            output_sender,
            delegation::Proposal,
            transport::OutputMessage::ReceivedProposal,
            proposal,
            agent_uri.clone(),
            "mrp"
          ),
          create_subscriber_forwarder!(
            client,
            topics::proposal_acceptance(&agent_uri.clone()),
            output_sender,
            transport_messages::Acceptance,
            transport::OutputMessage::ReceivedProposalAccepted,
            acceptance,
            agent_uri.clone(),
            "mrpa"
          ),
        );
        Ok::<(), Error>(())
      };
      ccutils::log::log_error!(input_future.await, "waiting for input");
    };
    // Return
    Ok(fut.boxed())
  }
}