1
2pub trait Probe {
3 type Msg: Send;
4 type Pay: Clone + Send;
5
6 fn event(&self, evt: Self::Msg);
7 fn payload(&self) -> &Self::Pay;
8}
9
10pub trait ProbeReceive {
11 type Msg: Send;
12
13 fn recv(&self) -> Self::Msg;
14 fn reset_timer(&mut self);
15 fn last_event_milliseconds(&self) -> u64;
16 fn last_event_seconds(&self) -> u64;
17}
18
19pub mod channel {
23 use super::{Probe, ProbeReceive};
24
25 use chrono::prelude::*;
26 use std::sync::mpsc::{channel, Sender, Receiver};
27
28 pub fn probe<T: Send>() -> (ChannelProbe<(), T>, ChannelProbeReceive<T>) {
29 probe_with_payload(())
30 }
31
32 pub fn probe_with_payload<P: Clone + Send, T: Send>(payload: P) -> (ChannelProbe<P, T>, ChannelProbeReceive<T>) {
33 let (tx, rx) = channel::<T>();
34
35 let probe = ChannelProbe {
36 payload: Some(payload),
37 tx: tx.clone()
38 };
39
40 let receiver = ChannelProbeReceive {
41 rx: rx,
42 tx: tx,
43 timer_start: Utc::now()
44 };
45
46 (probe, receiver)
47 }
48
49 #[derive(Clone, Debug)]
50 pub struct ChannelProbe<P, T> {
51 payload: Option<P>,
52 tx: Sender<T>,
53 }
54
55 impl<P, T> Probe for ChannelProbe<P, T>
56 where P: Clone + Send, T: Send {
57 type Msg = T;
58 type Pay = P;
59
60 fn event(&self, evt: T) {
61 drop(self.tx.send(evt));
62 }
63
64 fn payload(&self) -> &P {
65 &self.payload.as_ref().unwrap()
66 }
67 }
68
69 impl<P, T> Probe for Option<ChannelProbe<P, T>>
70 where P: Clone + Send, T: Send {
71 type Msg = T;
72 type Pay = P;
73
74 fn event(&self, evt: T) {
75 drop(self.as_ref().unwrap().tx.send(evt));
76 }
77
78 fn payload(&self) -> &P {
79 &self.as_ref().unwrap().payload.as_ref().unwrap()
80 }
81 }
82
83 #[allow(dead_code)]
84 pub struct ChannelProbeReceive<T> {
85 rx: Receiver<T>,
86 tx: Sender<T>,
87 timer_start: DateTime<Utc>,
88 }
89
90 impl<T: Send> ProbeReceive for ChannelProbeReceive<T> {
91 type Msg = T;
92
93 fn recv(&self) -> T {
94 self.rx.recv().unwrap()
95 }
96
97 fn reset_timer(&mut self) {
98 self.timer_start = Utc::now();
99 }
100
101 fn last_event_milliseconds(&self) -> u64 {
102 let now = Utc::now();
103 now.time().signed_duration_since(self.timer_start.time()).num_milliseconds() as u64
104 }
105
106 fn last_event_seconds(&self) -> u64 {
107 let now = Utc::now();
108 now.time().signed_duration_since(self.timer_start.time()).num_seconds() as u64
109 }
110 }
111}
112
113#[cfg(test)]
114mod tests {
115 use super::{Probe, ProbeReceive};
116 use super::channel::{probe, probe_with_payload};
117 use std::thread;
118
119 #[test]
120 fn chan_probe() {
121 let (probe, listen) = probe();
122
123 thread::spawn(move || {
124 probe.event("some event");
125 });
126
127 assert_eq!(listen.recv(), "some event");
128 }
129
130 #[test]
131 fn chan_probe_with_payload() {
132 let payload = "test data".to_string();
133 let (probe, listen) = probe_with_payload(payload);
134
135 thread::spawn(move || {
136 if probe.payload() == "test data" {
138 probe.event("data received");
139 } else {
140 probe.event("");
141 }
142
143 });
144
145 assert_eq!(listen.recv(), "data received");
146 }
147}
148
149
150pub mod macros {
152 #[macro_export]
155 macro_rules! p_assert_eq {
156 ($listen:expr, $expected:expr) => {
157 assert_eq!($listen.recv(), $expected);
158 };
159 }
160
161 #[macro_export]
166 macro_rules! p_assert_events {
167 ($listen:expr, $expected:expr) => {
168 let mut expected = $expected.clone(); loop {
171 match expected.iter().position(|x| x == &$listen.recv()) {
172 Some(pos) => {
173 expected.remove(pos);
174 if expected.len() == 0 {
175 break;
176 }
177 }
178 _ => {
179 assert!(false);
181 }
182 }
183 }
184 };
185 }
186
187 #[macro_export]
188 macro_rules! p_timer {
189 ($listen:expr) => {
190 $listen.last_event_milliseconds()
191 };
192 }
193
194 #[cfg(test)]
195 mod tests {
196 use probe::{Probe, ProbeReceive};
197 use probe::channel::probe;
198
199 #[test]
200 fn p_assert_eq() {
201 let (probe, listen) = probe();
202
203 probe.event("test".to_string());
204
205 p_assert_eq!(listen, "test".to_string());
206 }
207
208 #[test]
209 fn p_assert_events() {
210 let (probe, listen) = probe();
211
212 let expected = vec!["event_1", "event_2", "event_3"];
213 probe.event("event_1");
214 probe.event("event_2");
215 probe.event("event_3");
216
217 p_assert_events!(listen, expected);
218 }
219
220 #[test]
221 fn p_timer() {
222 let (probe, listen) = probe();
223 probe.event("event_3");
224
225 println!("Milliseconds: {}", p_timer!(listen));
226 }
227
228 }
229}