libits_client/
pipeline.rs

1// Software Name: its-client
2// SPDX-FileCopyrightText: Copyright (c) 2016-2022 Orange
3// SPDX-License-Identifier: MIT License
4//
5// This software is distributed under the MIT license, see LICENSE.txt file for more details.
6//
7// Author: Frédéric GARDES <frederic.gardes@orange.com> et al.
8// Software description: This Intelligent Transportation Systems (ITS) [MQTT](https://mqtt.org/) client based on the [JSon](https://www.json.org) [ETSI](https://www.etsi.org/committee/its) specification transcription provides a ready to connect project for the mobility (connected and autonomous vehicles, road side units, vulnerable road users,...).
9use std::any::Any;
10use std::collections::HashMap;
11use std::sync::mpsc::{channel, Receiver};
12use std::sync::Arc;
13use std::thread;
14use std::thread::JoinHandle;
15use std::time::Duration;
16
17use log::{debug, error, info, trace, warn};
18use rumqttc::{Event, EventLoop, Publish};
19use serde::de::DeserializeOwned;
20
21use crate::analyse::analyser::Analyser;
22use crate::analyse::cause::Cause;
23use crate::analyse::configuration::Configuration;
24use crate::analyse::item::Item;
25use crate::monitor;
26use crate::mqtt::mqtt_client::{listen, Client};
27use crate::mqtt::{mqtt_client, mqtt_router};
28use crate::reception::exchange::collective_perception_message::CollectivePerceptionMessage;
29use crate::reception::exchange::cooperative_awareness_message::CooperativeAwarenessMessage;
30use crate::reception::exchange::decentralized_environmental_notification_message::DecentralizedEnvironmentalNotificationMessage;
31use crate::reception::exchange::Exchange;
32use crate::reception::information::Information;
33use crate::reception::typed::Typed;
34use crate::reception::Reception;
35
36pub async fn run<T: Analyser>(
37    mqtt_host: &str,
38    mqtt_port: u16,
39    mqtt_client_id: &str,
40    mqtt_username: Option<&str>,
41    mqtt_password: Option<&str>,
42    mqtt_root_topic: &str,
43    region_of_responsibility: bool,
44    custom_settings: HashMap<String, String>,
45) {
46    loop {
47        // build the shared topic list
48        let topic_list = vec![
49            format!("{}/v2x/cam", mqtt_root_topic),
50            format!("{}/v2x/cpm", mqtt_root_topic),
51            format!("{}/v2x/denm", mqtt_root_topic),
52            format!("{}/info", mqtt_root_topic),
53        ];
54
55        //initialize the client
56        let (mut client, event_loop) = mqtt_client::Client::new(
57            mqtt_host,
58            mqtt_port,
59            mqtt_client_id,
60            mqtt_username,
61            mqtt_password,
62        );
63
64        let configuration = Arc::new(Configuration::new(
65            mqtt_client_id.to_string(),
66            region_of_responsibility,
67            custom_settings.clone(),
68        ));
69
70        // subscribe
71        mqtt_client_subscribe(&topic_list, &mut client).await;
72        // receive
73        let (event_receiver, mqtt_client_listen_handle) = mqtt_client_listen_thread(event_loop);
74        // dispatch
75        let (item_receiver, monitoring_receiver, information_receiver, mqtt_router_dispatch_handle) =
76            mqtt_router_dispatch_thread(topic_list, event_receiver);
77
78        // in parallel, monitor exchanges reception
79        let monitor_reception_handle = monitor_thread(
80            "received_on".to_string(),
81            configuration.clone(),
82            monitoring_receiver,
83        );
84        // in parallel, analyse exchanges
85        let (analyser_item_receiver, analyser_generate_handle) =
86            analyser_generate_thread::<T>(configuration.clone(), item_receiver);
87
88        // read information
89        let reader_configure_handle =
90            reader_configure_thread(configuration.clone(), information_receiver);
91
92        // filter exchanges on region of responsibility
93        let (publish_item_receiver, publish_monitoring_receiver, filter_handle) =
94            filter_thread::<T>(configuration.clone(), analyser_item_receiver);
95
96        // in parallel, monitor exchanges publish
97        let monitor_publish_handle = monitor_thread(
98            "sent_on".to_string(),
99            configuration,
100            publish_monitoring_receiver,
101        );
102        // in parallel send
103        mqtt_client_publish(publish_item_receiver, &mut client).await;
104
105        debug!("mqtt_client_listen_handler joining...");
106        mqtt_client_listen_handle.await.unwrap();
107        debug!("mqtt_router_dispatch_handler joining...");
108        mqtt_router_dispatch_handle.join().unwrap();
109        debug!("monitor_reception_handle joining...");
110        monitor_reception_handle.join().unwrap();
111        debug!("reader_configure_handler joining...");
112        reader_configure_handle.join().unwrap();
113        debug!("analyser_generate_handler joining...");
114        analyser_generate_handle.join().unwrap();
115        debug!("filter_handle joining...");
116        filter_handle.join().unwrap();
117        debug!("monitor_publish_handle joining...");
118        monitor_publish_handle.join().unwrap();
119
120        warn!("loop done");
121        tokio::time::sleep(Duration::from_secs(5)).await;
122    }
123}
124
125fn mqtt_client_listen_thread(
126    event_loop: EventLoop,
127) -> (Receiver<Event>, tokio::task::JoinHandle<()>) {
128    info!("starting mqtt client listening...");
129    let (event_sender, event_receiver) = channel();
130    let handle = tokio::task::spawn(async move {
131        trace!("mqtt client listening closure entering...");
132        listen(event_loop, event_sender).await;
133        trace!("mqtt client listening closure finished");
134    });
135    info!("mqtt client listening started");
136    (event_receiver, handle)
137}
138
139fn mqtt_router_dispatch_thread(
140    topic_list: Vec<String>,
141    event_receiver: Receiver<Event>,
142    // FIXME manage a Box into the Exchange to use a unique object Trait instead
143) -> (
144    Receiver<Item<Exchange>>,
145    Receiver<(Item<Exchange>, Option<Cause>)>,
146    Receiver<Item<Information>>,
147    JoinHandle<()>,
148) {
149    info!("starting mqtt router dispatching...");
150    let (exchange_sender, exchange_receiver) = channel();
151    let (monitoring_sender, monitoring_receiver) = channel();
152    let (information_sender, information_receiver) = channel();
153
154    let handle = thread::Builder::new()
155        .name("mqtt-router-dispatcher".into())
156        .spawn(move || {
157            trace!("mqtt router dispatching closure entering...");
158            //initialize the router
159            let router = &mut mqtt_router::Router::new();
160
161            if let Some(cam_topic) = topic_list
162                .iter()
163                .find(|&r| r.contains(CooperativeAwarenessMessage::get_type().as_str()))
164            {
165                router.add_route(cam_topic, deserialize::<Exchange>);
166            }
167            if let Some(denm_topic) = topic_list.iter().find(|&r| {
168                r.contains(DecentralizedEnvironmentalNotificationMessage::get_type().as_str())
169            }) {
170                router.add_route(denm_topic, deserialize::<Exchange>);
171            }
172            if let Some(cpm_topic) = topic_list
173                .iter()
174                .find(|&r| r.contains(CollectivePerceptionMessage::get_type().as_str()))
175            {
176                router.add_route(cpm_topic, deserialize::<Exchange>);
177            }
178            if let Some(info_topic) = topic_list
179                .iter()
180                .find(|&r| r.contains(Information::get_type().as_str()))
181            {
182                router.add_route(info_topic, deserialize::<Information>);
183            }
184
185            for event in event_receiver {
186                match router.handle_event(event) {
187                    Some((topic, reception)) => {
188                        // TODO use the From Trait
189                        if reception.is::<Exchange>() {
190                            if let Ok(exchange) = reception.downcast::<Exchange>() {
191                                let item = Item {
192                                    topic,
193                                    reception: unbox(exchange),
194                                };
195                                //assumed clone, we send to 2 channels
196                                match monitoring_sender.send((item.clone(), None)) {
197                                    Ok(()) => trace!("mqtt monitoring sent"),
198                                    Err(error) => {
199                                        error!("stopped to send mqtt monitoring: {}", error);
200                                        break;
201                                    }
202                                }
203                                match exchange_sender.send(item) {
204                                    Ok(()) => trace!("mqtt exchange sent"),
205                                    Err(error) => {
206                                        error!("stopped to send mqtt exchange: {}", error);
207                                        break;
208                                    }
209                                }
210                            }
211                        } else if let Ok(information) = reception.downcast::<Information>() {
212                            match information_sender.send(Item {
213                                topic,
214                                reception: unbox(information),
215                            }) {
216                                Ok(()) => trace!("mqtt information sent"),
217                                Err(error) => {
218                                    error!("stopped to send mqtt information: {}", error);
219                                    break;
220                                }
221                            }
222                        }
223                    }
224                    None => trace!("no mqtt response to send"),
225                }
226            }
227            trace!("mqtt router dispatching closure finished");
228        })
229        .unwrap();
230    info!("mqtt router dispatching started");
231    (
232        exchange_receiver,
233        monitoring_receiver,
234        information_receiver,
235        handle,
236    )
237}
238
239fn monitor_thread(
240    direction: String,
241    configuration: Arc<Configuration>,
242    exchange_receiver: Receiver<(Item<Exchange>, Option<Cause>)>,
243) -> JoinHandle<()> {
244    info!("starting monitor reception thread...");
245    let handle = thread::Builder::new()
246        .name("monitor-reception".into())
247        .spawn(move || {
248            trace!("monitor reception entering...");
249            for tuple in exchange_receiver {
250                let publish_item = tuple.0;
251                let cause = tuple.1;
252                // monitor
253                monitor::monitor(
254                    &publish_item.reception,
255                    cause,
256                    direction.as_str(),
257                    // assumed clone, we preserve it for the topics
258                    configuration.component_name(None),
259                    format!(
260                        "{}/{}/{}",
261                        configuration.gateway_component_name(),
262                        publish_item.topic.project_base(),
263                        publish_item.reception.source_uuid
264                    ),
265                );
266            }
267        })
268        .unwrap();
269    info!("monitor reception thread started");
270    handle
271}
272
273pub fn unbox<T>(value: Box<T>) -> T {
274    *value
275}
276
277fn analyser_generate_thread<T: Analyser>(
278    configuration: Arc<Configuration>,
279    exchange_receiver: Receiver<Item<Exchange>>,
280) -> (Receiver<(Item<Exchange>, Option<Cause>)>, JoinHandle<()>) {
281    info!("starting analyser generation...");
282    let (analyser_sender, analyser_receiver) = channel();
283    let handle = thread::Builder::new()
284        .name("analyser-generator".into())
285        .spawn(move || {
286            trace!("analyser generation closure entering...");
287            //initialize the analyser
288            let mut analyser = T::new(configuration);
289            for item in exchange_receiver {
290                for publish_item in analyser.analyze(item.clone()) {
291                    let cause = Cause::from_exchange(&(item.reception));
292                    match analyser_sender.send((publish_item, cause)) {
293                        Ok(()) => trace!("analyser sent"),
294                        Err(error) => {
295                            error!("stopped to send analyser: {}", error);
296                            break;
297                        }
298                    }
299                }
300                trace!("analyser generation closure finished");
301            }
302        })
303        .unwrap();
304    info!("analyser generation started");
305    (analyser_receiver, handle)
306}
307
308fn filter_thread<T: Analyser>(
309    configuration: Arc<Configuration>,
310    exchange_receiver: Receiver<(Item<Exchange>, Option<Cause>)>,
311) -> (
312    Receiver<Item<Exchange>>,
313    Receiver<(Item<Exchange>, Option<Cause>)>,
314    JoinHandle<()>,
315) {
316    info!("starting filtering...");
317    let (publish_sender, publish_receiver) = channel();
318    let (monitoring_sender, monitoring_receiver) = channel();
319    let handle = thread::Builder::new()
320        .name("filter".into())
321        .spawn(move || {
322            trace!("filter closure entering...");
323            for tuple in exchange_receiver {
324                let item = tuple.0;
325                let cause = tuple.1;
326
327                //assumed clone, we just send the GeoExtension
328                if configuration.is_in_region_of_responsibility(item.topic.geo_extension.clone()) {
329                    //assumed clone, we send to 2 channels
330                    match publish_sender.send(item.clone()) {
331                        Ok(()) => trace!("publish sent"),
332                        Err(error) => {
333                            error!("stopped to send publish: {}", error);
334                            break;
335                        }
336                    }
337                    match monitoring_sender.send((item, cause)) {
338                        Ok(()) => trace!("monitoring sent"),
339                        Err(error) => {
340                            error!("stopped to send monitoring: {}", error);
341                            break;
342                        }
343                    }
344                }
345                trace!("filter closure finished");
346            }
347        })
348        .unwrap();
349    info!("filter started");
350    (publish_receiver, monitoring_receiver, handle)
351}
352
353fn reader_configure_thread(
354    configuration: Arc<Configuration>,
355    information_receiver: Receiver<Item<Information>>,
356) -> JoinHandle<()> {
357    info!("starting reader configuration...");
358    let handle = thread::Builder::new()
359        .name("reader-configurator".into())
360        .spawn(move || {
361            trace!("reader configuration closure entering...");
362            for item in information_receiver {
363                info!(
364                    "we received an information on the topic {}: {:?}",
365                    item.topic, item.reception
366                );
367                configuration.update(item.reception);
368            }
369            trace!("reader configuration closure finished");
370        })
371        .unwrap();
372    info!("reader configuration started");
373    handle
374}
375
376fn deserialize<T>(publish: Publish) -> Option<Box<dyn Any + 'static + Send>>
377where
378    T: DeserializeOwned + Reception + 'static + Send,
379{
380    // Incoming publish from the broker
381    match String::from_utf8(publish.payload.to_vec()) {
382        Ok(message) => {
383            let message_str = message.as_str();
384            match serde_json::from_str::<T>(message_str) {
385                Ok(message) => {
386                    trace!("message parsed");
387                    return Some(Box::new(message));
388                }
389                Err(e) => warn!("parse error({}) on: {}", e, message_str),
390            }
391        }
392        Err(e) => warn!("format error: {}", e),
393    }
394    Option::None
395}
396
397async fn mqtt_client_subscribe(topic_list: &Vec<String>, client: &mut Client) {
398    info!("mqtt client subscribing starting...");
399    // build the topic subscription list
400    let mut topic_subscription_list = Vec::new();
401    if let Some(cam_topic) = topic_list
402        .iter()
403        .find(|&r| r.contains(CooperativeAwarenessMessage::get_type().as_str()))
404    {
405        topic_subscription_list.push(format!("{}/+/#", cam_topic));
406    }
407    if let Some(denm_topic) = topic_list
408        .iter()
409        .find(|&r| r.contains(DecentralizedEnvironmentalNotificationMessage::get_type().as_str()))
410    {
411        topic_subscription_list.push(format!("{}/+/#", denm_topic));
412    }
413    if let Some(cpm_topic) = topic_list
414        .iter()
415        .find(|&r| r.contains(CollectivePerceptionMessage::get_type().as_str()))
416    {
417        topic_subscription_list.push(format!("{}/+/#", cpm_topic));
418    }
419    if let Some(info_topic) = topic_list
420        .iter()
421        .find(|&r| r.contains(Information::get_type().as_str()))
422    {
423        // The topic of the broker we are currently connected to
424        // is always: "5GCroCo/backOutQueue/info/broker"
425        topic_subscription_list.push(format!("{}/broker", info_topic));
426    }
427
428    // NOTE: we share the topic list with the dispatcher
429    client.subscribe(topic_subscription_list).await;
430    info!("mqtt client subscribing finished");
431}
432
433async fn mqtt_client_publish(publish_item_receiver: Receiver<Item<Exchange>>, client: &mut Client) {
434    info!("mqtt client publishing starting...");
435    for item in publish_item_receiver {
436        debug!("we received a publish");
437        client.publish(item).await;
438        debug!("we forwarded the publish");
439    }
440    info!("mqtt client publishing finished");
441}