1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
use glob::glob;
use metric;
use mio;
use source;
use source::file::file_watcher::FileWatcher;
use source::internal::report_full_telemetry;
use std::mem;
use std::path::PathBuf;
use std::str;
use std::time;
use util;
use util::send;
pub struct FileServer {
pattern: PathBuf,
max_read_bytes: usize,
}
#[derive(Clone, Debug, Deserialize)]
pub struct FileServerConfig {
pub path: Option<PathBuf>,
pub max_read_bytes: usize,
pub forwards: Vec<String>,
pub config_path: Option<String>,
}
impl Default for FileServerConfig {
fn default() -> Self {
FileServerConfig {
path: None,
max_read_bytes: 2048,
forwards: Vec::default(),
config_path: None,
}
}
}
impl source::Source<FileServerConfig> for FileServer {
fn init(config: FileServerConfig) -> Self {
let pattern = config.path.expect("must specify a 'path' for FileServer");
FileServer {
pattern: pattern,
max_read_bytes: config.max_read_bytes,
}
}
fn run(self, mut chans: util::Channel, poller: mio::Poll) -> () {
let mut buffer = String::new();
let mut fp_map: util::HashMap<PathBuf, FileWatcher> = Default::default();
let mut fp_map_alt: util::HashMap<PathBuf, FileWatcher> = Default::default();
let mut backoff_cap: usize = 1;
let mut lines = Vec::new();
loop {
let mut global_bytes_read: usize = 0;
for entry in glob(self.pattern.to_str().expect("no ability to glob"))
.expect("Failed to read glob pattern")
{
if let Ok(path) = entry {
let entry = fp_map.entry(path.clone());
if let Ok(fw) = FileWatcher::new(&path) {
entry.or_insert(fw);
};
}
}
for (path, mut watcher) in fp_map.drain() {
let mut bytes_read: usize = 0;
while let Ok(sz) = watcher.read_line(&mut buffer) {
if sz > 0 {
bytes_read += sz;
lines.push(metric::LogLine::new(
path.to_str().expect("not a valid path"),
&buffer,
));
buffer.clear();
} else {
break;
}
if bytes_read > self.max_read_bytes {
break;
}
}
report_full_telemetry(
"cernan.sources.file.bytes_read",
bytes_read as f64,
Some(vec![
("file_path", path.to_str().expect("not a valid path")),
]),
);
if !watcher.dead() {
fp_map_alt.insert(path, watcher);
}
global_bytes_read = global_bytes_read.saturating_add(bytes_read);
}
for l in lines.drain(..) {
send(&mut chans, metric::Event::new_log(l));
}
mem::swap(&mut fp_map, &mut fp_map_alt);
if global_bytes_read == 0 {
let lim = backoff_cap.saturating_mul(2);
if lim > 2_048 {
backoff_cap = 2_048;
} else {
backoff_cap = lim;
}
} else {
backoff_cap = 1;
}
let backoff = backoff_cap.saturating_sub(global_bytes_read);
let mut events = mio::Events::with_capacity(1024);
match poller.poll(
&mut events,
Some(time::Duration::from_millis(backoff as u64)),
) {
Err(e) => panic!(format!("Failed during poll {:?}", e)),
Ok(0) => {}
Ok(_num_events) => {
send(&mut chans, metric::Event::Shutdown);
return;
}
}
}
}
}