1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
use std::sync::mpsc::{channel, Receiver, TryRecvError};
use std::sync::Arc;

use log::{debug, info, trace, warn};
use timer::MessageTimer;

use libits_client::analyse::analyser::Analyser;
use libits_client::analyse::configuration::Configuration;
use libits_client::analyse::item::Item;
use libits_client::reception::exchange::mobile::Mobile;
use libits_client::reception::exchange::Exchange;
use libits_client::reception::mortal::now;

pub struct CopyCat {
    configuration: Arc<Configuration>,
    item_receiver: Receiver<Item<Exchange>>,
    timer: MessageTimer<Item<Exchange>>,
}

impl Analyser for CopyCat {
    fn new(configuration: Arc<Configuration>) -> Self
    where
        Self: Sized,
    {
        let (tx, item_receiver) = channel();
        let timer = timer::MessageTimer::new(tx);
        Self {
            configuration,
            item_receiver,
            timer,
        }
    }

    fn analyze(&mut self, item: Item<Exchange>) -> Vec<Item<Exchange>> {
        let mut item_to_publish = Vec::new();
        let component_name = self.configuration.component_name(None);

        trace!("item received: {:?}", item);

        // 1- delay the storage of the new item
        if item.reception.source_uuid == component_name || item.reception.message.stopped() {
            info!(
                "we received an item as itself {} or stopped: we don't copy cat",
                item.reception.source_uuid
            );
        } else {
            info!(
                "we start to schedule {} from {}",
                item.reception.message.mobile_id(),
                item.reception.source_uuid
            );
            let guard = self
                .timer
                .schedule_with_delay(chrono::Duration::seconds(3), item);
            guard.ignore();
            debug!("scheduling done");
        }
        // 2- create the copy cat items for each removed delayed item
        let mut data_found = 0;
        while data_found >= 0 {
            match self.item_receiver.try_recv() {
                Ok(item) => {
                    data_found += 1;
                    //assumed clone, we create a new item
                    let mut own_exchange = item.reception.clone();
                    info!(
                        "we treat the scheduled item {} {} from {}",
                        data_found,
                        item.reception.message.mobile_id(),
                        item.reception.source_uuid
                    );
                    let timestamp = now();
                    own_exchange.appropriate(&self.configuration, timestamp);
                    let mut own_topic = item.topic.clone();
                    own_topic.appropriate(&self.configuration);
                    item_to_publish.push(Item::new(own_topic, own_exchange));
                    debug!("item scheduled published");
                }
                Err(e) => match e {
                    TryRecvError::Empty => {
                        debug!("delayed channel empty, we stop");
                        data_found = -1;
                    }
                    TryRecvError::Disconnected => {
                        warn!("delayed channel disconnected, we stop");
                        data_found = -1;
                    }
                },
            }
        }

        // 3- send the copy cat items
        item_to_publish
    }
}

#[cfg(test)]
mod tests {
    use std::sync::mpsc::channel;

    #[test]
    fn test_timer_schedule_with_delay() {
        let (tx, rx) = channel();
        let timer = timer::MessageTimer::new(tx);
        let _guard = timer.schedule_with_delay(chrono::Duration::seconds(3), 3);

        rx.recv().unwrap();
        println!("This code has been executed after 3 seconds");
    }
}