1use std::any::Any;
10use std::collections::HashMap;
11use std::sync::mpsc::{channel, Receiver};
12use std::sync::Arc;
13use std::thread;
14use std::thread::JoinHandle;
15use std::time::Duration;
16
17use log::{debug, error, info, trace, warn};
18use rumqttc::{Event, EventLoop, Publish};
19use serde::de::DeserializeOwned;
20
21use crate::analyse::analyser::Analyser;
22use crate::analyse::cause::Cause;
23use crate::analyse::configuration::Configuration;
24use crate::analyse::item::Item;
25use crate::monitor;
26use crate::mqtt::mqtt_client::{listen, Client};
27use crate::mqtt::{mqtt_client, mqtt_router};
28use crate::reception::exchange::collective_perception_message::CollectivePerceptionMessage;
29use crate::reception::exchange::cooperative_awareness_message::CooperativeAwarenessMessage;
30use crate::reception::exchange::decentralized_environmental_notification_message::DecentralizedEnvironmentalNotificationMessage;
31use crate::reception::exchange::Exchange;
32use crate::reception::information::Information;
33use crate::reception::typed::Typed;
34use crate::reception::Reception;
35
36pub async fn run<T: Analyser>(
37 mqtt_host: &str,
38 mqtt_port: u16,
39 mqtt_client_id: &str,
40 mqtt_username: Option<&str>,
41 mqtt_password: Option<&str>,
42 mqtt_root_topic: &str,
43 region_of_responsibility: bool,
44 custom_settings: HashMap<String, String>,
45) {
46 loop {
47 let topic_list = vec![
49 format!("{}/v2x/cam", mqtt_root_topic),
50 format!("{}/v2x/cpm", mqtt_root_topic),
51 format!("{}/v2x/denm", mqtt_root_topic),
52 format!("{}/info", mqtt_root_topic),
53 ];
54
55 let (mut client, event_loop) = mqtt_client::Client::new(
57 mqtt_host,
58 mqtt_port,
59 mqtt_client_id,
60 mqtt_username,
61 mqtt_password,
62 );
63
64 let configuration = Arc::new(Configuration::new(
65 mqtt_client_id.to_string(),
66 region_of_responsibility,
67 custom_settings.clone(),
68 ));
69
70 mqtt_client_subscribe(&topic_list, &mut client).await;
72 let (event_receiver, mqtt_client_listen_handle) = mqtt_client_listen_thread(event_loop);
74 let (item_receiver, monitoring_receiver, information_receiver, mqtt_router_dispatch_handle) =
76 mqtt_router_dispatch_thread(topic_list, event_receiver);
77
78 let monitor_reception_handle = monitor_thread(
80 "received_on".to_string(),
81 configuration.clone(),
82 monitoring_receiver,
83 );
84 let (analyser_item_receiver, analyser_generate_handle) =
86 analyser_generate_thread::<T>(configuration.clone(), item_receiver);
87
88 let reader_configure_handle =
90 reader_configure_thread(configuration.clone(), information_receiver);
91
92 let (publish_item_receiver, publish_monitoring_receiver, filter_handle) =
94 filter_thread::<T>(configuration.clone(), analyser_item_receiver);
95
96 let monitor_publish_handle = monitor_thread(
98 "sent_on".to_string(),
99 configuration,
100 publish_monitoring_receiver,
101 );
102 mqtt_client_publish(publish_item_receiver, &mut client).await;
104
105 debug!("mqtt_client_listen_handler joining...");
106 mqtt_client_listen_handle.await.unwrap();
107 debug!("mqtt_router_dispatch_handler joining...");
108 mqtt_router_dispatch_handle.join().unwrap();
109 debug!("monitor_reception_handle joining...");
110 monitor_reception_handle.join().unwrap();
111 debug!("reader_configure_handler joining...");
112 reader_configure_handle.join().unwrap();
113 debug!("analyser_generate_handler joining...");
114 analyser_generate_handle.join().unwrap();
115 debug!("filter_handle joining...");
116 filter_handle.join().unwrap();
117 debug!("monitor_publish_handle joining...");
118 monitor_publish_handle.join().unwrap();
119
120 warn!("loop done");
121 tokio::time::sleep(Duration::from_secs(5)).await;
122 }
123}
124
125fn mqtt_client_listen_thread(
126 event_loop: EventLoop,
127) -> (Receiver<Event>, tokio::task::JoinHandle<()>) {
128 info!("starting mqtt client listening...");
129 let (event_sender, event_receiver) = channel();
130 let handle = tokio::task::spawn(async move {
131 trace!("mqtt client listening closure entering...");
132 listen(event_loop, event_sender).await;
133 trace!("mqtt client listening closure finished");
134 });
135 info!("mqtt client listening started");
136 (event_receiver, handle)
137}
138
139fn mqtt_router_dispatch_thread(
140 topic_list: Vec<String>,
141 event_receiver: Receiver<Event>,
142 ) -> (
144 Receiver<Item<Exchange>>,
145 Receiver<(Item<Exchange>, Option<Cause>)>,
146 Receiver<Item<Information>>,
147 JoinHandle<()>,
148) {
149 info!("starting mqtt router dispatching...");
150 let (exchange_sender, exchange_receiver) = channel();
151 let (monitoring_sender, monitoring_receiver) = channel();
152 let (information_sender, information_receiver) = channel();
153
154 let handle = thread::Builder::new()
155 .name("mqtt-router-dispatcher".into())
156 .spawn(move || {
157 trace!("mqtt router dispatching closure entering...");
158 let router = &mut mqtt_router::Router::new();
160
161 if let Some(cam_topic) = topic_list
162 .iter()
163 .find(|&r| r.contains(CooperativeAwarenessMessage::get_type().as_str()))
164 {
165 router.add_route(cam_topic, deserialize::<Exchange>);
166 }
167 if let Some(denm_topic) = topic_list.iter().find(|&r| {
168 r.contains(DecentralizedEnvironmentalNotificationMessage::get_type().as_str())
169 }) {
170 router.add_route(denm_topic, deserialize::<Exchange>);
171 }
172 if let Some(cpm_topic) = topic_list
173 .iter()
174 .find(|&r| r.contains(CollectivePerceptionMessage::get_type().as_str()))
175 {
176 router.add_route(cpm_topic, deserialize::<Exchange>);
177 }
178 if let Some(info_topic) = topic_list
179 .iter()
180 .find(|&r| r.contains(Information::get_type().as_str()))
181 {
182 router.add_route(info_topic, deserialize::<Information>);
183 }
184
185 for event in event_receiver {
186 match router.handle_event(event) {
187 Some((topic, reception)) => {
188 if reception.is::<Exchange>() {
190 if let Ok(exchange) = reception.downcast::<Exchange>() {
191 let item = Item {
192 topic,
193 reception: unbox(exchange),
194 };
195 match monitoring_sender.send((item.clone(), None)) {
197 Ok(()) => trace!("mqtt monitoring sent"),
198 Err(error) => {
199 error!("stopped to send mqtt monitoring: {}", error);
200 break;
201 }
202 }
203 match exchange_sender.send(item) {
204 Ok(()) => trace!("mqtt exchange sent"),
205 Err(error) => {
206 error!("stopped to send mqtt exchange: {}", error);
207 break;
208 }
209 }
210 }
211 } else if let Ok(information) = reception.downcast::<Information>() {
212 match information_sender.send(Item {
213 topic,
214 reception: unbox(information),
215 }) {
216 Ok(()) => trace!("mqtt information sent"),
217 Err(error) => {
218 error!("stopped to send mqtt information: {}", error);
219 break;
220 }
221 }
222 }
223 }
224 None => trace!("no mqtt response to send"),
225 }
226 }
227 trace!("mqtt router dispatching closure finished");
228 })
229 .unwrap();
230 info!("mqtt router dispatching started");
231 (
232 exchange_receiver,
233 monitoring_receiver,
234 information_receiver,
235 handle,
236 )
237}
238
239fn monitor_thread(
240 direction: String,
241 configuration: Arc<Configuration>,
242 exchange_receiver: Receiver<(Item<Exchange>, Option<Cause>)>,
243) -> JoinHandle<()> {
244 info!("starting monitor reception thread...");
245 let handle = thread::Builder::new()
246 .name("monitor-reception".into())
247 .spawn(move || {
248 trace!("monitor reception entering...");
249 for tuple in exchange_receiver {
250 let publish_item = tuple.0;
251 let cause = tuple.1;
252 monitor::monitor(
254 &publish_item.reception,
255 cause,
256 direction.as_str(),
257 configuration.component_name(None),
259 format!(
260 "{}/{}/{}",
261 configuration.gateway_component_name(),
262 publish_item.topic.project_base(),
263 publish_item.reception.source_uuid
264 ),
265 );
266 }
267 })
268 .unwrap();
269 info!("monitor reception thread started");
270 handle
271}
272
273pub fn unbox<T>(value: Box<T>) -> T {
274 *value
275}
276
277fn analyser_generate_thread<T: Analyser>(
278 configuration: Arc<Configuration>,
279 exchange_receiver: Receiver<Item<Exchange>>,
280) -> (Receiver<(Item<Exchange>, Option<Cause>)>, JoinHandle<()>) {
281 info!("starting analyser generation...");
282 let (analyser_sender, analyser_receiver) = channel();
283 let handle = thread::Builder::new()
284 .name("analyser-generator".into())
285 .spawn(move || {
286 trace!("analyser generation closure entering...");
287 let mut analyser = T::new(configuration);
289 for item in exchange_receiver {
290 for publish_item in analyser.analyze(item.clone()) {
291 let cause = Cause::from_exchange(&(item.reception));
292 match analyser_sender.send((publish_item, cause)) {
293 Ok(()) => trace!("analyser sent"),
294 Err(error) => {
295 error!("stopped to send analyser: {}", error);
296 break;
297 }
298 }
299 }
300 trace!("analyser generation closure finished");
301 }
302 })
303 .unwrap();
304 info!("analyser generation started");
305 (analyser_receiver, handle)
306}
307
308fn filter_thread<T: Analyser>(
309 configuration: Arc<Configuration>,
310 exchange_receiver: Receiver<(Item<Exchange>, Option<Cause>)>,
311) -> (
312 Receiver<Item<Exchange>>,
313 Receiver<(Item<Exchange>, Option<Cause>)>,
314 JoinHandle<()>,
315) {
316 info!("starting filtering...");
317 let (publish_sender, publish_receiver) = channel();
318 let (monitoring_sender, monitoring_receiver) = channel();
319 let handle = thread::Builder::new()
320 .name("filter".into())
321 .spawn(move || {
322 trace!("filter closure entering...");
323 for tuple in exchange_receiver {
324 let item = tuple.0;
325 let cause = tuple.1;
326
327 if configuration.is_in_region_of_responsibility(item.topic.geo_extension.clone()) {
329 match publish_sender.send(item.clone()) {
331 Ok(()) => trace!("publish sent"),
332 Err(error) => {
333 error!("stopped to send publish: {}", error);
334 break;
335 }
336 }
337 match monitoring_sender.send((item, cause)) {
338 Ok(()) => trace!("monitoring sent"),
339 Err(error) => {
340 error!("stopped to send monitoring: {}", error);
341 break;
342 }
343 }
344 }
345 trace!("filter closure finished");
346 }
347 })
348 .unwrap();
349 info!("filter started");
350 (publish_receiver, monitoring_receiver, handle)
351}
352
353fn reader_configure_thread(
354 configuration: Arc<Configuration>,
355 information_receiver: Receiver<Item<Information>>,
356) -> JoinHandle<()> {
357 info!("starting reader configuration...");
358 let handle = thread::Builder::new()
359 .name("reader-configurator".into())
360 .spawn(move || {
361 trace!("reader configuration closure entering...");
362 for item in information_receiver {
363 info!(
364 "we received an information on the topic {}: {:?}",
365 item.topic, item.reception
366 );
367 configuration.update(item.reception);
368 }
369 trace!("reader configuration closure finished");
370 })
371 .unwrap();
372 info!("reader configuration started");
373 handle
374}
375
376fn deserialize<T>(publish: Publish) -> Option<Box<dyn Any + 'static + Send>>
377where
378 T: DeserializeOwned + Reception + 'static + Send,
379{
380 match String::from_utf8(publish.payload.to_vec()) {
382 Ok(message) => {
383 let message_str = message.as_str();
384 match serde_json::from_str::<T>(message_str) {
385 Ok(message) => {
386 trace!("message parsed");
387 return Some(Box::new(message));
388 }
389 Err(e) => warn!("parse error({}) on: {}", e, message_str),
390 }
391 }
392 Err(e) => warn!("format error: {}", e),
393 }
394 Option::None
395}
396
397async fn mqtt_client_subscribe(topic_list: &Vec<String>, client: &mut Client) {
398 info!("mqtt client subscribing starting...");
399 let mut topic_subscription_list = Vec::new();
401 if let Some(cam_topic) = topic_list
402 .iter()
403 .find(|&r| r.contains(CooperativeAwarenessMessage::get_type().as_str()))
404 {
405 topic_subscription_list.push(format!("{}/+/#", cam_topic));
406 }
407 if let Some(denm_topic) = topic_list
408 .iter()
409 .find(|&r| r.contains(DecentralizedEnvironmentalNotificationMessage::get_type().as_str()))
410 {
411 topic_subscription_list.push(format!("{}/+/#", denm_topic));
412 }
413 if let Some(cpm_topic) = topic_list
414 .iter()
415 .find(|&r| r.contains(CollectivePerceptionMessage::get_type().as_str()))
416 {
417 topic_subscription_list.push(format!("{}/+/#", cpm_topic));
418 }
419 if let Some(info_topic) = topic_list
420 .iter()
421 .find(|&r| r.contains(Information::get_type().as_str()))
422 {
423 topic_subscription_list.push(format!("{}/broker", info_topic));
426 }
427
428 client.subscribe(topic_subscription_list).await;
430 info!("mqtt client subscribing finished");
431}
432
433async fn mqtt_client_publish(publish_item_receiver: Receiver<Item<Exchange>>, client: &mut Client) {
434 info!("mqtt client publishing starting...");
435 for item in publish_item_receiver {
436 debug!("we received a publish");
437 client.publish(item).await;
438 debug!("we forwarded the publish");
439 }
440 info!("mqtt client publishing finished");
441}