1use std::sync::mpsc::{channel, Receiver, TryRecvError};
10use std::sync::Arc;
11
12use log::{debug, info, trace, warn};
13use timer::MessageTimer;
14
15use libits_client::analyse::analyser::Analyser;
16use libits_client::analyse::configuration::Configuration;
17use libits_client::analyse::item::Item;
18use libits_client::reception::exchange::Exchange;
19use libits_client::reception::mortal::now;
20
21pub struct CopyCat {
22 configuration: Arc<Configuration>,
23 item_receiver: Receiver<Item<Exchange>>,
24 timer: MessageTimer<Item<Exchange>>,
25}
26
27impl Analyser for CopyCat {
28 fn new(configuration: Arc<Configuration>) -> Self
29 where
30 Self: Sized,
31 {
32 let (tx, item_receiver) = channel();
33 let timer = timer::MessageTimer::new(tx);
34 Self {
35 configuration,
36 item_receiver,
37 timer,
38 }
39 }
40
41 fn analyze(&mut self, item: Item<Exchange>) -> Vec<Item<Exchange>> {
42 let mut item_to_publish = Vec::new();
43 let component_name = self.configuration.component_name(None);
44
45 trace!("item received: {:?}", item);
46
47 match item.reception.message.as_mobile() {
49 Ok(mobile_message) => {
50 if item.reception.source_uuid == component_name || mobile_message.stopped() {
51 info!(
52 "we received an item as itself {} or stopped: we don't copy cat",
53 item.reception.source_uuid
54 );
55 } else {
56 info!(
57 "we start to schedule {} from {}",
58 &mobile_message.mobile_id(),
59 item.reception.source_uuid
60 );
61 let guard = self
64 .timer
65 .schedule_with_delay(chrono::Duration::seconds(3), item.clone());
66 guard.ignore();
67 debug!("scheduling done");
68 }
69 let mut data_found = 0;
71 while data_found >= 0 {
72 match self.item_receiver.try_recv() {
73 Ok(item) => {
74 data_found += 1;
75 let mut own_exchange = item.reception.clone();
77 info!(
78 "we treat the scheduled item {} {} from {}",
79 data_found,
80 &mobile_message.mobile_id(),
81 item.reception.source_uuid
82 );
83 let timestamp = now();
84 own_exchange.appropriate(&self.configuration, timestamp);
85 let mut own_topic = item.topic.clone();
86 own_topic.appropriate(&self.configuration);
87 item_to_publish.push(Item::new(own_topic, own_exchange));
88 debug!("item scheduled published");
89 }
90 Err(e) => match e {
91 TryRecvError::Empty => {
92 debug!("delayed channel empty, we stop");
93 data_found = -1;
94 }
95 TryRecvError::Disconnected => {
96 warn!("delayed channel disconnected, we stop");
97 data_found = -1;
98 }
99 },
100 }
101 }
102 }
103 Err(e) => warn!("{}", e),
104 }
105 item_to_publish
107 }
108}
109
110#[cfg(test)]
111mod tests {
112 use std::sync::mpsc::channel;
113
114 #[test]
115 fn test_timer_schedule_with_delay() {
116 let (tx, rx) = channel();
117 let timer = timer::MessageTimer::new(tx);
118 let _guard = timer.schedule_with_delay(chrono::Duration::seconds(3), 3);
119
120 rx.recv().unwrap();
121 println!("This code has been executed after 3 seconds");
122 }
123}