use bytes::Buf;
use futures::{join, select, FutureExt};
use std::thread::{self, Thread};
use super::{transport, transport_messages, InputMessage};
use crate::{delegation, module, Error, Result};
mod topics
{
pub const CANCEL_ACCEPTANCE: &str = "delegation/cancel_acceptance";
pub const SEND_CFP: &str = "delegation/send_cfp";
pub const SEND_EXECUTION_ACCEPTANCE: &str = "delegation/send_execution_acceptance";
pub const SEND_EXECUTION_STATUS: &str = "delegation/send_execution_status";
pub const SEND_OFFER: &str = "delegation/send_offer";
pub const SEND_PROPOSAL_ACCEPTANCE: &str = "delegation/send_proposal_acceptance";
}
pub struct Options
{
node_id: String,
hostname: String,
port: u16,
}
impl Options
{
pub fn new(node_id: String, hostname: String, port: u16) -> Self
{
Self {
node_id,
hostname,
port,
}
}
}
impl From<rumqttc::v5::ClientError> for Error
{
fn from(value: rumqttc::v5::ClientError) -> Self
{
Error::TransportError(value.to_string())
}
}
pub struct Module {}
impl Module
{
fn send_message<TM: serde::Serialize>(
client: &rumqttc::v5::AsyncClient,
topic_name: &str,
tm: TM,
) -> crate::Result<()>
{
client.try_publish_with_properties(
topic_name,
rumqttc::v5::mqttbytes::QoS::AtLeastOnce,
true,
serde_json::to_string(&tm)?,
rumqttc::v5::mqttbytes::v5::PublishProperties {
content_type: Some("application/json").map(str::to_string),
..Default::default()
},
)?;
Ok(())
}
fn handle_input_message(
msg: transport::InputMessage,
client: &rumqttc::v5::AsyncClient,
) -> crate::Result<()>
{
match msg
{
transport::InputMessage::SendCFP { cfp } =>
{
Module::send_message(&client, topics::SEND_CFP, cfp)
}
transport::InputMessage::SendProposal { proposal } =>
{
Module::send_message(&client, topics::SEND_OFFER, proposal)
}
transport::InputMessage::SendProposalAcceptance { acceptance } =>
{
Module::send_message(&client, topics::SEND_PROPOSAL_ACCEPTANCE, acceptance)
}
transport::InputMessage::SendExecutionAcceptance { acceptance } =>
{
Module::send_message(&client, topics::SEND_EXECUTION_ACCEPTANCE, acceptance)
}
transport::InputMessage::SendCancelAcceptance { cancel_acceptance } =>
{
Module::send_message(&client, topics::CANCEL_ACCEPTANCE, cancel_acceptance)
}
}
}
async fn handle_mqtt_events(
event: std::result::Result<rumqttc::v5::Event, rumqttc::v5::ConnectionError>,
output_sender: &async_broadcast::Sender<transport::OutputMessage>,
) -> Result<()>
{
match event?
{
rumqttc::v5::Event::Incoming(packet) => match &packet
{
rumqttc::v5::Incoming::Publish(pub_msg) =>
{
match std::str::from_utf8(&pub_msg.topic.chunk())?
{
topics::CANCEL_ACCEPTANCE =>
{
let cancel_acceptance = serde_json::from_slice::<transport_messages::CancelAcceptance>(
pub_msg.payload.chunk(),
)?;
output_sender
.broadcast(transport::OutputMessage::ReceivedCancelAcceptance { cancel_acceptance })
.await;
}
topics::SEND_CFP =>
{
let cfp = serde_json::from_slice::<delegation::CFP>(pub_msg.payload.chunk())?;
output_sender
.broadcast(transport::OutputMessage::ReceivedCFP { cfp })
.await;
}
topics::SEND_EXECUTION_ACCEPTANCE =>
{
let acceptance =
serde_json::from_slice::<transport_messages::Acceptance>(pub_msg.payload.chunk())?;
output_sender
.broadcast(transport::OutputMessage::ReceivedExecutionAccepted { acceptance })
.await;
}
topics::SEND_EXECUTION_STATUS =>
{
let status =
serde_json::from_slice::<transport_messages::Status>(pub_msg.payload.chunk())?;
output_sender
.broadcast(transport::OutputMessage::ReceivedStatus { status })
.await;
}
topics::SEND_OFFER =>
{
let proposal =
serde_json::from_slice::<delegation::Proposal>(pub_msg.payload.chunk())?;
output_sender
.broadcast(transport::OutputMessage::ReceivedProposal { proposal })
.await;
}
topics::SEND_PROPOSAL_ACCEPTANCE =>
{
let acceptance =
serde_json::from_slice::<transport_messages::Acceptance>(pub_msg.payload.chunk())?;
output_sender
.broadcast(transport::OutputMessage::ReceivedProposalAccepted { acceptance })
.await;
}
unhandled_topic =>
{
println!("Subscribed to {} but not handled.", unhandled_topic);
}
}
}
rumqttc::v5::Incoming::ConnAck(_) =>
{}
rumqttc::v5::Incoming::SubAck(_) =>
{}
rumqttc::v5::Incoming::PubAck(_) =>
{}
rumqttc::v5::Incoming::PingResp(_) =>
{}
_ =>
{
println!("Incoming unhandled packet {packet:?}");
}
},
rumqttc::v5::Event::Outgoing(_) =>
{ }
}
Ok(())
}
}
module::create_module_private_interface!(
ModulePrivateInterface,
transport::InputMessage,
transport::OutputMessage
);
impl module::Module for Module
{
type InputMessage = transport::InputMessage;
type OutputMessage = transport::OutputMessage;
type ModulePrivateInterface = ModulePrivateInterface;
}
impl super::transport::Module for Module
{
type Options = Options;
fn start<'a>(
module_interfaces: (
module::ModuleInterface<transport::InputMessage, transport::OutputMessage>,
Self::ModulePrivateInterface,
),
delegation_module_interface: crate::module::ModuleInterface<
super::InputMessage,
super::OutputMessage,
>,
options: Self::Options,
) -> Result<futures::future::BoxFuture<'a, ()>>
{
let (module_interface, module_private_interface) = module_interfaces;
let mut input_receiver = module_private_interface.input_receiver.activate();
let output_sender = module_private_interface.output_sender;
let mut mqttoptions =
rumqttc::v5::MqttOptions::new(options.node_id, options.hostname, options.port);
mqttoptions.set_keep_alive(std::time::Duration::from_secs(5));
let (client, mut connection) = rumqttc::v5::AsyncClient::new(mqttoptions, 1000);
let fut = async move {
let mqtt_reception_future = async move {
let mut connection = connection;
let output_sender = output_sender.clone();
loop
{
let recv = connection.poll().await;
if let Err(e) = Self::handle_mqtt_events(recv, &output_sender).await
{
log::error!("An error occured when handling MQTT event: {}", e);
}
}
};
let input_future = async {
let qos = rumqttc::v5::mqttbytes::QoS::AtLeastOnce;
client.subscribe(topics::CANCEL_ACCEPTANCE, qos).await;
client.subscribe(topics::SEND_CFP, qos).await;
client
.subscribe(topics::SEND_EXECUTION_ACCEPTANCE, qos)
.await;
client.subscribe(topics::SEND_EXECUTION_STATUS, qos).await;
client.subscribe(topics::SEND_OFFER, qos).await;
client
.subscribe(topics::SEND_PROPOSAL_ACCEPTANCE, qos)
.await;
loop
{
let msg = input_receiver.recv().await;
if let Ok(msg) = msg
{
if let Err(e) = Module::handle_input_message(msg, &client)
{
log::error!(
"An error occured when handling MQTT Transport message: {}",
e
);
}
}
else
{
return;
}
}
};
join!(input_future, mqtt_reception_future);
};
Ok(fut.boxed())
}
}