use futures::{FutureExt, StreamExt, join};
use crate::prelude::*;
use delegation::{transport, transport_messages};
mod topics
{
pub const CFP: &str = "delegation/cfp";
pub(super) fn offer(agent_name: &impl std::fmt::Display) -> String
{
format!("delegation/{}/offer", agent_name)
}
pub(super) fn execution_acceptance(agent_name: &impl std::fmt::Display) -> String
{
format!("delegation/{}/execution_acceptance", agent_name)
}
#[allow(dead_code)]
pub(super) fn execution_status(agent_name: &impl std::fmt::Display) -> String
{
format!("delegation/{}/execution_status", agent_name)
}
pub(super) fn proposal_acceptance(agent_name: &impl std::fmt::Display) -> String
{
format!("delegation/{}/proposal_acceptance", agent_name)
}
pub(super) fn cancel_acceptance(agent_name: &impl std::fmt::Display) -> String
{
format!("delegation/{}/cancel_acceptance", agent_name)
}
}
pub struct Options
{
client: mqtt_channel::Client,
}
impl Options
{
pub fn new(client: mqtt_channel::Client) -> Self
{
Self { client }
}
}
pub struct Module {}
impl Module
{
async fn send_message<TM: serde::Serialize>(
client: mqtt_channel::Client,
topic_name: &str,
tm: TM,
) -> crate::Result<()>
{
Runtime::tokio_compat(client.publish_raw(
topic_name,
mqtt_channel::QoS::ExactlyOnce,
true,
serde_json::to_string(&tm)?,
Some(mqtt_channel::mqttbytes::PublishProperties {
content_type: Some("application/json".to_string()),
..Default::default()
}),
))
.await?;
Ok(())
}
async fn handle_input_message(
msg: transport::InputMessage,
client: mqtt_channel::Client,
) -> crate::Result<()>
{
match msg
{
transport::InputMessage::SendCFP { cfp } =>
{
Module::send_message(client, topics::CFP, cfp).await
}
transport::InputMessage::SendProposal { proposal } =>
{
Module::send_message(client, &topics::offer(&proposal.requester_uri), proposal).await
}
transport::InputMessage::SendProposalAcceptance { acceptance } =>
{
Module::send_message(
client,
&topics::proposal_acceptance(&acceptance.agent_uri),
acceptance,
)
.await
}
transport::InputMessage::SendExecutionAcceptance { acceptance } =>
{
Module::send_message(
client,
&topics::execution_acceptance(&acceptance.requester_uri),
acceptance,
)
.await
}
transport::InputMessage::SendCancelAcceptance { cancel_acceptance } =>
{
Module::send_message(
client,
&topics::cancel_acceptance(&cancel_acceptance.agent_uri),
cancel_acceptance,
)
.await
}
}
}
}
module::create_module_private_interface!(
ModulePrivateInterface,
transport::InputMessage,
transport::OutputMessage
);
impl module::Module for Module
{
type InputMessage = transport::InputMessage;
type OutputMessage = transport::OutputMessage;
type ModulePrivateInterface = ModulePrivateInterface;
}
macro_rules! create_subscriber_forwarder {
($client: ident, $topic_name: expr, $output_sender: ident,
$transport_msg:path, $output_message:path, $field: ident, $agent_uri: expr, $counter: expr) => {
async {
let _ = Runtime::tokio_compat(
$client
.clone()
.get_or_create_json_subscription::<$transport_msg>(
$topic_name,
mqtt_channel::QoS::ExactlyOnce,
100,
)
.await
.unwrap()
.for_each_concurrent(None, |m| async {
ccutils::log::log_error!(
$output_sender
.broadcast({ { $output_message { $field: m.payload } } })
.await,
"while forwarding message"
);
}),
)
.await;
}
};
}
impl super::transport::Module for Module
{
type Options = Options;
fn start<'a>(
agent_data: agent::AgentData,
module_interfaces: (
module::ModuleInterface<transport::InputMessage, transport::OutputMessage>,
Self::ModulePrivateInterface,
),
_delegation_module_interface: crate::module::ModuleInterface<
super::InputMessage,
super::OutputMessage,
>,
options: Self::Options,
) -> Result<futures::future::BoxFuture<'a, ()>>
{
let (_, module_private_interface) = module_interfaces;
let mut input_receiver = module_private_interface.input_receiver.activate();
let output_sender = module_private_interface.output_sender;
let client = options.client;
let agent_uri = agent_data.agent_uri.to_owned();
let fut = async move {
let client = client.clone();
let input_future = async {
join!(
async {
loop
{
let msg = input_receiver.recv().await;
if let Ok(msg) = msg
&& let Err(e) = Module::handle_input_message(msg, client.clone()).await
{
log::error!(
"An error occured when handling MQTT Transport message: {}",
e
);
}
}
},
create_subscriber_forwarder!(
client,
topics::cancel_acceptance(&agent_uri.clone()),
output_sender,
transport_messages::CancelAcceptance,
transport::OutputMessage::ReceivedCancelAcceptance,
cancel_acceptance,
agent_uri.clone(),
"mca"
),
create_subscriber_forwarder!(
client,
topics::CFP,
output_sender,
delegation::CFP,
transport::OutputMessage::ReceivedCFP,
cfp,
agent_uri.clone(),
"mcfp"
),
create_subscriber_forwarder!(
client,
topics::execution_acceptance(&agent_uri.clone()),
output_sender,
transport_messages::Acceptance,
transport::OutputMessage::ReceivedExecutionAccepted,
acceptance,
agent_uri.clone(),
"mrea"
),
create_subscriber_forwarder!(
client,
topics::offer(&agent_uri.clone()),
output_sender,
delegation::Proposal,
transport::OutputMessage::ReceivedProposal,
proposal,
agent_uri.clone(),
"mrp"
),
create_subscriber_forwarder!(
client,
topics::proposal_acceptance(&agent_uri.clone()),
output_sender,
transport_messages::Acceptance,
transport::OutputMessage::ReceivedProposalAccepted,
acceptance,
agent_uri.clone(),
"mrpa"
),
);
Ok::<(), Error>(())
};
ccutils::log::log_error!(input_future.await, "waiting for input");
};
Ok(fut.boxed())
}
}