rf_distributed_impl/
network.rs1use async_trait::async_trait;
2use bytes::Bytes;
3use rf_distributed::network::{asynchronous::Network, NetworkResult, NetworkUpdate};
4use rumqttc::{AsyncClient, MqttOptions, QoS};
5use tokio::sync::mpsc::Receiver;
6
7pub struct AsyncMQTTNetwork {
10 client: AsyncClient,
11 receiver: Receiver<NetworkUpdate>,
12}
13
14impl AsyncMQTTNetwork {
15 pub async fn new(options: MqttOptions, topics: Vec<i32>) -> Self {
16 let (client, mut eventloop) = AsyncClient::new(options, 10);
17 AsyncMQTTNetwork::subscribe_to_topics(client.clone(), topics)
18 .await
19 .unwrap();
20 let (sender, receiver) = tokio::sync::mpsc::channel::<NetworkUpdate>(100);
21 tokio::spawn(async move {
22 loop {
23 if let Ok(notification) = eventloop.poll().await {
24 if let rumqttc::Event::Incoming(rumqttc::Packet::Publish(msg)) = notification {
25 let msg_string = String::from_utf8(msg.payload.to_vec()).unwrap();
26 sender
27 .send(NetworkUpdate::Update { msg: msg_string })
28 .await
29 .unwrap();
30 }
31 } else {
32 sender.send(NetworkUpdate::None).await.unwrap();
33 }
34 }
35 });
36 Self { client, receiver }
37 }
38
39 async fn subscribe_to_topics(client: AsyncClient, topics: Vec<i32>) -> NetworkResult<()> {
40 for nbr in topics.clone() {
41 client
42 .subscribe(format!("hello-rufi/{nbr}/subscriptions"), QoS::AtMostOnce)
43 .await?;
44 }
45 Ok(())
46 }
47}
48
49#[async_trait]
50impl Network for AsyncMQTTNetwork {
51 async fn send(&mut self, source: i32, msg: String) -> NetworkResult<()> {
52 self.client
53 .publish(
54 format!("hello-rufi/{source}/subscriptions"),
55 QoS::AtMostOnce,
56 false,
57 Bytes::from(msg),
58 )
59 .await
60 .map_err(|e| e.into())
61 }
62
63 async fn receive(&mut self) -> NetworkResult<NetworkUpdate> {
64 self.receiver
65 .recv()
66 .await
67 .ok_or("No message received".into())
68 }
69}