1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
use hopper;
use metric;
use time;
use util;
mod programmable_filter;
pub mod delay_filter;
mod flush_boundary_filter;
pub mod json_encode_filter;
pub use self::delay_filter::{DelayFilter, DelayFilterConfig};
pub use self::flush_boundary_filter::{FlushBoundaryFilter, FlushBoundaryFilterConfig};
pub use self::json_encode_filter::{JSONEncodeFilter, JSONEncodeFilterConfig};
pub use self::programmable_filter::{ProgrammableFilter, ProgrammableFilterConfig};
#[derive(Debug)]
pub enum FilterError {
NoSuchFunction(&'static str, metric::Event),
LuaError(String, metric::Event),
}
fn msg_in_fe(fe: &FilterError) -> &str {
match fe {
&FilterError::NoSuchFunction(n, _) => n,
&FilterError::LuaError(ref n, _) => n,
}
}
fn event_in_fe(fe: FilterError) -> metric::Event {
match fe {
FilterError::NoSuchFunction(_, m) | FilterError::LuaError(_, m) => m,
}
}
pub trait Filter {
fn process(
&mut self,
event: metric::Event,
res: &mut Vec<metric::Event>,
) -> Result<(), FilterError>;
fn run(
&mut self,
recv: hopper::Receiver<metric::Event>,
sources: Vec<String>,
mut chans: util::Channel,
) {
let mut attempts = 0;
let mut events = Vec::with_capacity(64);
let mut recv = recv.into_iter();
let mut total_shutdowns = 0;
loop {
time::delay(attempts);
match recv.next() {
None => attempts += 1,
Some(metric::Event::Shutdown) => {
util::send(&mut chans, metric::Event::Shutdown);
total_shutdowns += 1;
if total_shutdowns >= sources.len() {
trace!(
"Received shutdown from every configured source: {:?}",
sources
);
return;
}
}
Some(event) => {
attempts = 0;
match self.process(event, &mut events) {
Ok(()) => for ev in events.drain(..) {
util::send(&mut chans, ev)
},
Err(fe) => {
error!(
"Failed to run filter with error: {:?}",
msg_in_fe(&fe)
);
let event = event_in_fe(fe);
util::send(&mut chans, event);
}
}
}
}
}
}
}