libits_copycat/
lib.rs

1// Software Name: its-client
2// SPDX-FileCopyrightText: Copyright (c) 2016-2022 Orange
3// SPDX-License-Identifier: MIT License
4//
5// This software is distributed under the MIT license, see LICENSE.txt file for more details.
6//
7// Author: Frédéric GARDES <frederic.gardes@orange.com> et al.
8// Software description: This Intelligent Transportation Systems (ITS) [MQTT](https://mqtt.org/) client based on the [JSon](https://www.json.org) [ETSI](https://www.etsi.org/committee/its) specification transcription provides a ready to connect project for the mobility (connected and autonomous vehicles, road side units, vulnerable road users,...).
9use std::sync::mpsc::{channel, Receiver, TryRecvError};
10use std::sync::Arc;
11
12use log::{debug, info, trace, warn};
13use timer::MessageTimer;
14
15use libits_client::analyse::analyser::Analyser;
16use libits_client::analyse::configuration::Configuration;
17use libits_client::analyse::item::Item;
18use libits_client::reception::exchange::Exchange;
19use libits_client::reception::mortal::now;
20
21pub struct CopyCat {
22    configuration: Arc<Configuration>,
23    item_receiver: Receiver<Item<Exchange>>,
24    timer: MessageTimer<Item<Exchange>>,
25}
26
27impl Analyser for CopyCat {
28    fn new(configuration: Arc<Configuration>) -> Self
29    where
30        Self: Sized,
31    {
32        let (tx, item_receiver) = channel();
33        let timer = timer::MessageTimer::new(tx);
34        Self {
35            configuration,
36            item_receiver,
37            timer,
38        }
39    }
40
41    fn analyze(&mut self, item: Item<Exchange>) -> Vec<Item<Exchange>> {
42        let mut item_to_publish = Vec::new();
43        let component_name = self.configuration.component_name(None);
44
45        trace!("item received: {:?}", item);
46
47        // 1- delay the storage of the new item
48        match item.reception.message.as_mobile() {
49            Ok(mobile_message) => {
50                if item.reception.source_uuid == component_name || mobile_message.stopped() {
51                    info!(
52                        "we received an item as itself {} or stopped: we don't copy cat",
53                        item.reception.source_uuid
54                    );
55                } else {
56                    info!(
57                        "we start to schedule {} from {}",
58                        &mobile_message.mobile_id(),
59                        item.reception.source_uuid
60                    );
61                    // FIXME adding `as_mobile()` method forced me to clone the item here
62                    //       as a quick solve; to be investigated...
63                    let guard = self
64                        .timer
65                        .schedule_with_delay(chrono::Duration::seconds(3), item.clone());
66                    guard.ignore();
67                    debug!("scheduling done");
68                }
69                // 2- create the copy cat items for each removed delayed item
70                let mut data_found = 0;
71                while data_found >= 0 {
72                    match self.item_receiver.try_recv() {
73                        Ok(item) => {
74                            data_found += 1;
75                            //assumed clone, we create a new item
76                            let mut own_exchange = item.reception.clone();
77                            info!(
78                                "we treat the scheduled item {} {} from {}",
79                                data_found,
80                                &mobile_message.mobile_id(),
81                                item.reception.source_uuid
82                            );
83                            let timestamp = now();
84                            own_exchange.appropriate(&self.configuration, timestamp);
85                            let mut own_topic = item.topic.clone();
86                            own_topic.appropriate(&self.configuration);
87                            item_to_publish.push(Item::new(own_topic, own_exchange));
88                            debug!("item scheduled published");
89                        }
90                        Err(e) => match e {
91                            TryRecvError::Empty => {
92                                debug!("delayed channel empty, we stop");
93                                data_found = -1;
94                            }
95                            TryRecvError::Disconnected => {
96                                warn!("delayed channel disconnected, we stop");
97                                data_found = -1;
98                            }
99                        },
100                    }
101                }
102            }
103            Err(e) => warn!("{}", e),
104        }
105        // 3- send the copy cat items
106        item_to_publish
107    }
108}
109
110#[cfg(test)]
111mod tests {
112    use std::sync::mpsc::channel;
113
114    #[test]
115    fn test_timer_schedule_with_delay() {
116        let (tx, rx) = channel();
117        let timer = timer::MessageTimer::new(tx);
118        let _guard = timer.schedule_with_delay(chrono::Duration::seconds(3), 3);
119
120        rx.recv().unwrap();
121        println!("This code has been executed after 3 seconds");
122    }
123}