rf_distributed_impl/
network.rs

1use async_trait::async_trait;
2use bytes::Bytes;
3use rf_distributed::network::{asynchronous::Network, NetworkResult, NetworkUpdate};
4use rumqttc::{AsyncClient, MqttOptions, QoS};
5use tokio::sync::mpsc::Receiver;
6
7/// This struct represent the network that will be used to send and receive messages
8/// using the MQTT protocol.
9pub struct AsyncMQTTNetwork {
10    client: AsyncClient,
11    receiver: Receiver<NetworkUpdate>,
12}
13
14impl AsyncMQTTNetwork {
15    pub async fn new(options: MqttOptions, topics: Vec<i32>) -> Self {
16        let (client, mut eventloop) = AsyncClient::new(options, 10);
17        AsyncMQTTNetwork::subscribe_to_topics(client.clone(), topics)
18            .await
19            .unwrap();
20        let (sender, receiver) = tokio::sync::mpsc::channel::<NetworkUpdate>(100);
21        tokio::spawn(async move {
22            loop {
23                if let Ok(notification) = eventloop.poll().await {
24                    if let rumqttc::Event::Incoming(rumqttc::Packet::Publish(msg)) = notification {
25                        let msg_string = String::from_utf8(msg.payload.to_vec()).unwrap();
26                        sender
27                            .send(NetworkUpdate::Update { msg: msg_string })
28                            .await
29                            .unwrap();
30                    }
31                } else {
32                    sender.send(NetworkUpdate::None).await.unwrap();
33                }
34            }
35        });
36        Self { client, receiver }
37    }
38
39    async fn subscribe_to_topics(client: AsyncClient, topics: Vec<i32>) -> NetworkResult<()> {
40        for nbr in topics.clone() {
41            client
42                .subscribe(format!("hello-rufi/{nbr}/subscriptions"), QoS::AtMostOnce)
43                .await?;
44        }
45        Ok(())
46    }
47}
48
49#[async_trait]
50impl Network for AsyncMQTTNetwork {
51    async fn send(&mut self, source: i32, msg: String) -> NetworkResult<()> {
52        self.client
53            .publish(
54                format!("hello-rufi/{source}/subscriptions"),
55                QoS::AtMostOnce,
56                false,
57                Bytes::from(msg),
58            )
59            .await
60            .map_err(|e| e.into())
61    }
62
63    async fn receive(&mut self) -> NetworkResult<NetworkUpdate> {
64        self.receiver
65            .recv()
66            .await
67            .ok_or("No message received".into())
68    }
69}