libits_copycat/
lib.rs

1// Software Name: its-client
2// SPDX-FileCopyrightText: Copyright (c) 2016-2022 Orange
3// SPDX-License-Identifier: MIT License
4//
5// This software is distributed under the MIT license, see LICENSE.txt file for more details.
6//
7// Author: Frédéric GARDES <frederic.gardes@orange.com> et al.
8// Software description: This Intelligent Transportation Systems (ITS) [MQTT](https://mqtt.org/) client based on the [JSon](https://www.json.org) [ETSI](https://www.etsi.org/committee/its) specification transcription provides a ready to connect project for the mobility (connected and autonomous vehicles, road side units, vulnerable road users,...).
9use std::sync::mpsc::{channel, Receiver, TryRecvError};
10use std::sync::Arc;
11
12use log::{debug, info, trace, warn};
13use timer::MessageTimer;
14
15use libits_client::analyse::analyser::Analyser;
16use libits_client::analyse::configuration::Configuration;
17use libits_client::analyse::item::Item;
18use libits_client::reception::exchange::mobile::Mobile;
19use libits_client::reception::exchange::Exchange;
20use libits_client::reception::mortal::now;
21
22pub struct CopyCat {
23    configuration: Arc<Configuration>,
24    item_receiver: Receiver<Item<Exchange>>,
25    timer: MessageTimer<Item<Exchange>>,
26}
27
28impl Analyser for CopyCat {
29    fn new(configuration: Arc<Configuration>) -> Self
30    where
31        Self: Sized,
32    {
33        let (tx, item_receiver) = channel();
34        let timer = timer::MessageTimer::new(tx);
35        Self {
36            configuration,
37            item_receiver,
38            timer,
39        }
40    }
41
42    fn analyze(&mut self, item: Item<Exchange>) -> Vec<Item<Exchange>> {
43        let mut item_to_publish = Vec::new();
44        let component_name = self.configuration.component_name(None);
45
46        trace!("item received: {:?}", item);
47
48        // 1- delay the storage of the new item
49        if item.reception.source_uuid == component_name || item.reception.message.stopped() {
50            info!(
51                "we received an item as itself {} or stopped: we don't copy cat",
52                item.reception.source_uuid
53            );
54        } else {
55            info!(
56                "we start to schedule {} from {}",
57                item.reception.message.mobile_id(),
58                item.reception.source_uuid
59            );
60            let guard = self
61                .timer
62                .schedule_with_delay(chrono::Duration::seconds(3), item);
63            guard.ignore();
64            debug!("scheduling done");
65        }
66        // 2- create the copy cat items for each removed delayed item
67        let mut data_found = 0;
68        while data_found >= 0 {
69            match self.item_receiver.try_recv() {
70                Ok(item) => {
71                    data_found += 1;
72                    //assumed clone, we create a new item
73                    let mut own_exchange = item.reception.clone();
74                    info!(
75                        "we treat the scheduled item {} {} from {}",
76                        data_found,
77                        item.reception.message.mobile_id(),
78                        item.reception.source_uuid
79                    );
80                    let timestamp = now();
81                    own_exchange.appropriate(&self.configuration, timestamp);
82                    let mut own_topic = item.topic.clone();
83                    own_topic.appropriate(&self.configuration);
84                    item_to_publish.push(Item::new(own_topic, own_exchange));
85                    debug!("item scheduled published");
86                }
87                Err(e) => match e {
88                    TryRecvError::Empty => {
89                        debug!("delayed channel empty, we stop");
90                        data_found = -1;
91                    }
92                    TryRecvError::Disconnected => {
93                        warn!("delayed channel disconnected, we stop");
94                        data_found = -1;
95                    }
96                },
97            }
98        }
99
100        // 3- send the copy cat items
101        item_to_publish
102    }
103}
104
105#[cfg(test)]
106mod tests {
107    use std::sync::mpsc::channel;
108
109    #[test]
110    fn test_timer_schedule_with_delay() {
111        let (tx, rx) = channel();
112        let timer = timer::MessageTimer::new(tx);
113        let _guard = timer.schedule_with_delay(chrono::Duration::seconds(3), 3);
114
115        rx.recv().unwrap();
116        println!("This code has been executed after 3 seconds");
117    }
118}