tosca_controller/
events.rs

1use std::time::Duration;
2
3use tosca::events::{BrokerData, Events as ToscaEvents, EventsDescription};
4
5use rumqttc::v5::{
6    AsyncClient, ConnectionError, Event, EventLoop, MqttOptions, mqttbytes::QoS,
7    mqttbytes::v5::Packet,
8};
9
10use tokio::sync::{broadcast, mpsc};
11use tokio::task::JoinHandle;
12
13use tokio_util::sync::CancellationToken;
14
15use tracing::{error, warn};
16
17use crate::error::Result;
18
19// The capacity of the bounded asynchronous channel.
20const ASYNC_CHANNEL_CAPACITY: usize = 10;
21
22// Keep alive time to send `pingreq` to broker when the connection is idle.
23const KEEP_ALIVE_TIME: Duration = Duration::from_secs(5);
24
25/// Event payload transmitted by the global asynchronous receiver task.
26///
27/// The payload consists of a device identifier and its associated event data.
28#[derive(Debug)]
29pub struct EventPayload {
30    /// Device identifier.
31    pub device_id: usize,
32    /// Device events.
33    pub events: ToscaEvents,
34}
35
36impl std::fmt::Display for EventPayload {
37    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> {
38        writeln!(f)?;
39        writeln!(f, "Events for `Device {}`", self.device_id)?;
40        writeln!(f)?;
41        write!(f, "{}", self.events)
42    }
43}
44
45impl EventPayload {
46    pub(crate) const fn new(device_id: usize, events: ToscaEvents) -> Self {
47        Self { device_id, events }
48    }
49}
50
51#[derive(Debug)]
52pub(crate) struct Events {
53    // Events description.
54    pub(crate) description: EventsDescription,
55    // The token used to cancel the event task.
56    pub(crate) cancellation_token: CancellationToken,
57}
58
59impl Events {
60    pub(crate) fn new(description: EventsDescription) -> Self {
61        Self {
62            description,
63            cancellation_token: CancellationToken::new(),
64        }
65    }
66}
67
68#[inline]
69fn parse_event(event: &std::result::Result<Event, ConnectionError>) -> Option<ToscaEvents> {
70    let event = match event {
71        Ok(event) => event,
72        Err(e) => {
73            error!("Error in receiving the event, discard it: {e}");
74            return None;
75        }
76    };
77
78    let packet = match event {
79        Event::Incoming(packet) => packet,
80        Event::Outgoing(outgoing) => {
81            warn!("Outgoing packet, discard it: {:?}", outgoing);
82            return None;
83        }
84    };
85
86    let Packet::Publish(packet) = packet else {
87        warn!("Packet ignored: {:?}", packet);
88        return None;
89    };
90
91    match serde_json::from_slice(&packet.payload) {
92        Ok(tosca_events) => tosca_events,
93        Err(e) => {
94            error!("Error converting packet bytes into events: {e}");
95            None
96        }
97    }
98}
99
100async fn run_global_event_subscriber(
101    client: AsyncClient,
102    mut eventloop: EventLoop,
103    id: usize,
104    cancellation_token: CancellationToken,
105    sender: mpsc::Sender<EventPayload>,
106) {
107    loop {
108        tokio::select! {
109            // Use the cancellation token to stop the loop
110            () = cancellation_token.cancelled() => { break; }
111            // Poll the `MQTT` event coming from the network
112            event = eventloop.poll() => {
113                let Some(tosca_events) = parse_event(&event) else {
114                    continue;
115                };
116
117                if let Err(e) = sender.send(EventPayload::new(id, tosca_events)).await {
118                    error!(
119                        "Stop sending events to the global receiver: {e}"
120                    );
121                    break;
122                }
123            }
124        }
125    }
126    drop(sender);
127    drop(eventloop);
128    drop(client);
129}
130
131async fn run_event_subscriber(
132    client: AsyncClient,
133    mut eventloop: EventLoop,
134    id: usize,
135    cancellation_token: CancellationToken,
136    sender: broadcast::Sender<ToscaEvents>,
137) {
138    loop {
139        tokio::select! {
140            // Use the cancellation token to stop the loop
141            () = cancellation_token.cancelled() => { break; }
142            // Poll the `MQTT` event coming from the network
143            event = eventloop.poll() => {
144                let Some(tosca_events) = parse_event(&event) else {
145                    continue;
146                };
147
148                if let Err(e) = sender.send(tosca_events) {
149                    error!(
150                        "Stop sending events to the device receiver with id `{id}`: {e}"
151                    );
152                    break;
153                }
154            }
155        }
156        tokio::time::sleep(Duration::from_millis(100)).await;
157    }
158    drop(sender);
159    drop(eventloop);
160    drop(client);
161}
162
163pub(crate) struct EventsRunner;
164
165impl EventsRunner {
166    pub(crate) async fn run_global_subscriber(
167        events: &Events,
168        id: usize,
169        sender: mpsc::Sender<EventPayload>,
170    ) -> Result<JoinHandle<()>> {
171        let (client, eventloop) = Self::init(id, events).await?;
172
173        Ok(tokio::spawn(run_global_event_subscriber(
174            client,
175            eventloop,
176            id,
177            events.cancellation_token.clone(),
178            sender,
179        )))
180    }
181
182    pub(crate) async fn run_device_subscriber(
183        events: &Events,
184        id: usize,
185        sender: broadcast::Sender<ToscaEvents>,
186    ) -> Result<JoinHandle<()>> {
187        let (client, eventloop) = Self::init(id, events).await?;
188
189        Ok(tokio::spawn(run_event_subscriber(
190            client,
191            eventloop,
192            id,
193            events.cancellation_token.clone(),
194            sender,
195        )))
196    }
197
198    #[inline]
199    async fn init(id: usize, events: &Events) -> Result<(AsyncClient, EventLoop)> {
200        let BrokerData { address, port } = events.description.broker_data;
201        let topic = events.description.topic.as_str();
202
203        let mut mqttoptions = MqttOptions::new(id.to_string(), address.to_string(), port);
204        mqttoptions.set_keep_alive(KEEP_ALIVE_TIME);
205
206        let (client, eventloop) = AsyncClient::new(mqttoptions, ASYNC_CHANNEL_CAPACITY);
207        client
208            .subscribe(topic, QoS::AtMostOnce)
209            .await
210            .map_err(|e| {
211                error!("Impossible to subscribe to topic {topic} for device {id}: {e}");
212                e
213            })?;
214
215        Ok((client, eventloop))
216    }
217}