tosca_controller/
events.rs1use std::time::Duration;
2
3use tosca::events::{BrokerData, Events as ToscaEvents, EventsDescription};
4
5use rumqttc::v5::{
6 AsyncClient, ConnectionError, Event, EventLoop, MqttOptions, mqttbytes::QoS,
7 mqttbytes::v5::Packet,
8};
9
10use tokio::sync::{broadcast, mpsc};
11use tokio::task::JoinHandle;
12
13use tokio_util::sync::CancellationToken;
14
15use tracing::{error, warn};
16
17use crate::error::Result;
18
19const ASYNC_CHANNEL_CAPACITY: usize = 10;
21
22const KEEP_ALIVE_TIME: Duration = Duration::from_secs(5);
24
25#[derive(Debug)]
29pub struct EventPayload {
30 pub device_id: usize,
32 pub events: ToscaEvents,
34}
35
36impl std::fmt::Display for EventPayload {
37 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> {
38 writeln!(f)?;
39 writeln!(f, "Events for `Device {}`", self.device_id)?;
40 writeln!(f)?;
41 write!(f, "{}", self.events)
42 }
43}
44
45impl EventPayload {
46 pub(crate) const fn new(device_id: usize, events: ToscaEvents) -> Self {
47 Self { device_id, events }
48 }
49}
50
51#[derive(Debug)]
52pub(crate) struct Events {
53 pub(crate) description: EventsDescription,
55 pub(crate) cancellation_token: CancellationToken,
57}
58
59impl Events {
60 pub(crate) fn new(description: EventsDescription) -> Self {
61 Self {
62 description,
63 cancellation_token: CancellationToken::new(),
64 }
65 }
66}
67
68#[inline]
69fn parse_event(event: &std::result::Result<Event, ConnectionError>) -> Option<ToscaEvents> {
70 let event = match event {
71 Ok(event) => event,
72 Err(e) => {
73 error!("Error in receiving the event, discard it: {e}");
74 return None;
75 }
76 };
77
78 let packet = match event {
79 Event::Incoming(packet) => packet,
80 Event::Outgoing(outgoing) => {
81 warn!("Outgoing packet, discard it: {:?}", outgoing);
82 return None;
83 }
84 };
85
86 let Packet::Publish(packet) = packet else {
87 warn!("Packet ignored: {:?}", packet);
88 return None;
89 };
90
91 match serde_json::from_slice(&packet.payload) {
92 Ok(tosca_events) => tosca_events,
93 Err(e) => {
94 error!("Error converting packet bytes into events: {e}");
95 None
96 }
97 }
98}
99
100async fn run_global_event_subscriber(
101 client: AsyncClient,
102 mut eventloop: EventLoop,
103 id: usize,
104 cancellation_token: CancellationToken,
105 sender: mpsc::Sender<EventPayload>,
106) {
107 loop {
108 tokio::select! {
109 () = cancellation_token.cancelled() => { break; }
111 event = eventloop.poll() => {
113 let Some(tosca_events) = parse_event(&event) else {
114 continue;
115 };
116
117 if let Err(e) = sender.send(EventPayload::new(id, tosca_events)).await {
118 error!(
119 "Stop sending events to the global receiver: {e}"
120 );
121 break;
122 }
123 }
124 }
125 }
126 drop(sender);
127 drop(eventloop);
128 drop(client);
129}
130
131async fn run_event_subscriber(
132 client: AsyncClient,
133 mut eventloop: EventLoop,
134 id: usize,
135 cancellation_token: CancellationToken,
136 sender: broadcast::Sender<ToscaEvents>,
137) {
138 loop {
139 tokio::select! {
140 () = cancellation_token.cancelled() => { break; }
142 event = eventloop.poll() => {
144 let Some(tosca_events) = parse_event(&event) else {
145 continue;
146 };
147
148 if let Err(e) = sender.send(tosca_events) {
149 error!(
150 "Stop sending events to the device receiver with id `{id}`: {e}"
151 );
152 break;
153 }
154 }
155 }
156 tokio::time::sleep(Duration::from_millis(100)).await;
157 }
158 drop(sender);
159 drop(eventloop);
160 drop(client);
161}
162
163pub(crate) struct EventsRunner;
164
165impl EventsRunner {
166 pub(crate) async fn run_global_subscriber(
167 events: &Events,
168 id: usize,
169 sender: mpsc::Sender<EventPayload>,
170 ) -> Result<JoinHandle<()>> {
171 let (client, eventloop) = Self::init(id, events).await?;
172
173 Ok(tokio::spawn(run_global_event_subscriber(
174 client,
175 eventloop,
176 id,
177 events.cancellation_token.clone(),
178 sender,
179 )))
180 }
181
182 pub(crate) async fn run_device_subscriber(
183 events: &Events,
184 id: usize,
185 sender: broadcast::Sender<ToscaEvents>,
186 ) -> Result<JoinHandle<()>> {
187 let (client, eventloop) = Self::init(id, events).await?;
188
189 Ok(tokio::spawn(run_event_subscriber(
190 client,
191 eventloop,
192 id,
193 events.cancellation_token.clone(),
194 sender,
195 )))
196 }
197
198 #[inline]
199 async fn init(id: usize, events: &Events) -> Result<(AsyncClient, EventLoop)> {
200 let BrokerData { address, port } = events.description.broker_data;
201 let topic = events.description.topic.as_str();
202
203 let mut mqttoptions = MqttOptions::new(id.to_string(), address.to_string(), port);
204 mqttoptions.set_keep_alive(KEEP_ALIVE_TIME);
205
206 let (client, eventloop) = AsyncClient::new(mqttoptions, ASYNC_CHANNEL_CAPACITY);
207 client
208 .subscribe(topic, QoS::AtMostOnce)
209 .await
210 .map_err(|e| {
211 error!("Impossible to subscribe to topic {topic} for device {id}: {e}");
212 e
213 })?;
214
215 Ok((client, eventloop))
216 }
217}