use futures::FutureExt;
type Result<T> = std::result::Result<T, Error>;
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("The remote service failed.")]
ServiceFailed(String),
#[error("Received an invalid message.")]
InvalidMessage(String),
#[error("Failed to parse a message.")]
MessageParseError(String),
#[error("An error occured when parsing bytes as Utf-8.")]
Utf8Error(String),
#[error("Future was cancelled.")]
CancelledFuture(String),
#[error("An error occured in the communication middleware.")]
CommunicationError(String),
#[error("Execution of the query failed with given error.")]
QueryExecutionFailed(String),
}
struct ConnectionData
{
client: rumqttc::v5::Client,
service_results: std::collections::HashMap<bytes::Bytes, futures::channel::oneshot::Sender<Result<String>>>,
node_id: String,
}
pub struct Connection
{
connection_data: std::sync::Arc<std::sync::Mutex<ConnectionData>>,
}
impl From<rumqttc::v5::ClientError> for crate::Error
{
fn from(value: rumqttc::v5::ClientError) -> Self
{
crate::Error::CommunicationError(value.to_string())
}
}
impl ConnectionData
{
fn handle_publish(&mut self, msg: &rumqttc::v5::mqttbytes::v5::Publish)
{
let payload = &msg.payload;
if let Some(properties) = &msg.properties
{
if let Some(correlation_data) = &properties.correlation_data
{
if let Some(sender) = self.service_results.remove(correlation_data)
{
let payload_utf8 = String::from_utf8(payload.to_vec());
let r = match payload_utf8 {
Ok(v) => sender.send(Ok(v)),
Err(e) => sender.send(Err(Error::Utf8Error(e.to_string()))),
};
if let Err(e) = r
{
println!("Sending the result failed with error {e:?}.");
}
}
}
}
}
}
impl Connection
{
pub fn new(node_id: impl Into<String>, hostname: impl Into<String>, port: u16) -> Connection
{
let node_id: String = node_id.into();
let mut mqttoptions = rumqttc::v5::MqttOptions::new(node_id.clone(), hostname, port);
mqttoptions.set_keep_alive(std::time::Duration::from_secs(5));
let (client, mut connection) = rumqttc::v5::Client::new(mqttoptions, 10);
let connection_data = ConnectionData {
client: client,
service_results: Default::default(),
node_id: node_id,
};
let connection_data = std::sync::Arc::new(std::sync::Mutex::new(connection_data));
{
let connection_data = connection_data.clone();
std::thread::spawn(move || {
loop
{
let recv = connection.recv();
match &recv {
Ok(v) => {
match &v {
Ok(event) => {
match &event {
rumqttc::v5::Event::Incoming(packet) => {
match &packet {
rumqttc::v5::Incoming::Publish(pub_msg) => {
let mut connection_data = connection_data.as_ref().lock().unwrap();
connection_data.handle_publish(pub_msg)
},
rumqttc::v5::Incoming::ConnAck(_) => {},
rumqttc::v5::Incoming::SubAck(_) => {},
rumqttc::v5::Incoming::PubAck(_) => {},
rumqttc::v5::Incoming::PingResp(_) => {},
_ => { println!("Incoming unhandled packet {packet:?}"); }
}
},
rumqttc::v5::Event::Outgoing(_) => { },
}
},
Err(e) => {
println!("A MQTT connection error occured: {e:?}");
}
}
}
Err(e) => {
println!("A MQTT reception error occured: {e:?}");
}
}
}
});
}
return Connection {
connection_data: connection_data
};
}
}
impl Connection
{
pub fn call_service(&self, topic: impl Into<String>, message: impl Into<bytes::Bytes>)
-> Result<impl std::future::Future<Output = Result<String>>>
{
let mut connection_data = self.connection_data.lock().unwrap();
let qos = rumqttc::v5::mqttbytes::QoS::AtLeastOnce;
let topic_string = topic.into();
let correlation_data = uuid::Uuid::new_v4();
let response_topic = format!("{}/response/{}", topic_string, connection_data.node_id);
connection_data.client.subscribe(response_topic.clone(), qos)?;
connection_data.client.try_publish_with_properties(
topic_string, qos, true, message,
rumqttc::v5::mqttbytes::v5::PublishProperties
{
content_type: Some("application/json").map(str::to_string),
response_topic: Some(response_topic),
correlation_data: Some(bytes::Bytes::copy_from_slice(correlation_data.as_bytes())),
..Default::default()
}
)?;
let(sender, receiver) = futures::channel::oneshot::channel::<Result<String>>();
connection_data.service_results.insert(bytes::Bytes::copy_from_slice(correlation_data.as_bytes()), sender);
Ok(receiver.map(|v| -> Result<String> {
match v {
Ok(v) => v,
Err(e) => Err(Error::CancelledFuture(e.to_string()))
}
}))
}
}