1use std::sync::mpsc::{channel, Receiver, TryRecvError};
10use std::sync::Arc;
11
12use log::{debug, info, trace, warn};
13use timer::MessageTimer;
14
15use libits_client::analyse::analyser::Analyser;
16use libits_client::analyse::configuration::Configuration;
17use libits_client::analyse::item::Item;
18use libits_client::reception::exchange::mobile::Mobile;
19use libits_client::reception::exchange::Exchange;
20use libits_client::reception::mortal::now;
21
22pub struct CopyCat {
23 configuration: Arc<Configuration>,
24 item_receiver: Receiver<Item<Exchange>>,
25 timer: MessageTimer<Item<Exchange>>,
26}
27
28impl Analyser for CopyCat {
29 fn new(configuration: Arc<Configuration>) -> Self
30 where
31 Self: Sized,
32 {
33 let (tx, item_receiver) = channel();
34 let timer = timer::MessageTimer::new(tx);
35 Self {
36 configuration,
37 item_receiver,
38 timer,
39 }
40 }
41
42 fn analyze(&mut self, item: Item<Exchange>) -> Vec<Item<Exchange>> {
43 let mut item_to_publish = Vec::new();
44 let component_name = self.configuration.component_name(None);
45
46 trace!("item received: {:?}", item);
47
48 if item.reception.source_uuid == component_name || item.reception.message.stopped() {
50 info!(
51 "we received an item as itself {} or stopped: we don't copy cat",
52 item.reception.source_uuid
53 );
54 } else {
55 info!(
56 "we start to schedule {} from {}",
57 item.reception.message.mobile_id(),
58 item.reception.source_uuid
59 );
60 let guard = self
61 .timer
62 .schedule_with_delay(chrono::Duration::seconds(3), item);
63 guard.ignore();
64 debug!("scheduling done");
65 }
66 let mut data_found = 0;
68 while data_found >= 0 {
69 match self.item_receiver.try_recv() {
70 Ok(item) => {
71 data_found += 1;
72 let mut own_exchange = item.reception.clone();
74 info!(
75 "we treat the scheduled item {} {} from {}",
76 data_found,
77 item.reception.message.mobile_id(),
78 item.reception.source_uuid
79 );
80 let timestamp = now();
81 own_exchange.appropriate(&self.configuration, timestamp);
82 let mut own_topic = item.topic.clone();
83 own_topic.appropriate(&self.configuration);
84 item_to_publish.push(Item::new(own_topic, own_exchange));
85 debug!("item scheduled published");
86 }
87 Err(e) => match e {
88 TryRecvError::Empty => {
89 debug!("delayed channel empty, we stop");
90 data_found = -1;
91 }
92 TryRecvError::Disconnected => {
93 warn!("delayed channel disconnected, we stop");
94 data_found = -1;
95 }
96 },
97 }
98 }
99
100 item_to_publish
102 }
103}
104
105#[cfg(test)]
106mod tests {
107 use std::sync::mpsc::channel;
108
109 #[test]
110 fn test_timer_schedule_with_delay() {
111 let (tx, rx) = channel();
112 let timer = timer::MessageTimer::new(tx);
113 let _guard = timer.schedule_with_delay(chrono::Duration::seconds(3), 3);
114
115 rx.recv().unwrap();
116 println!("This code has been executed after 3 seconds");
117 }
118}