1use ::std::fmt;
2use ::std::fmt::Debug;
4use ::std::io;
5use ::std::io::Write;
6use std::process::exit;
7
8use ::async_std::sync::Arc;
9use ::async_std::sync::Mutex;
10use ::async_std::sync::MutexGuard;
11use ::async_trait::async_trait;
12use ::log::debug;
13use ::log::warn;
14use ::regex::Regex;
15use ::smallvec::SmallVec;
16use futures::future::join;
17
18#[async_trait]
19pub trait LineWriter: Debug + Send {
20 async fn write_line(&mut self, line: impl AsRef<str> + Send);
21
22 async fn write_all_lines<S: AsRef<str> + Send>(
23 &mut self,
24 lines: impl Iterator<Item = S> + Send,
25 ) {
26 for line in lines {
27 self.write_line(line).await
28 }
29 }
30}
31
32#[derive(Debug)]
34pub struct StdWriter<W: Write + Unpin + Send> {
35 writer: W,
36}
37
38impl<W: Write + Unpin + Send> StdWriter<W> {
39 pub fn of(writer: W) -> Self {
40 StdWriter { writer }
41 }
42}
43
44impl StdWriter<io::Stdout> {
45 pub fn stdout() -> Self {
46 StdWriter::of(io::stdout())
47 }
48}
49
50impl StdWriter<io::Stderr> {
51 pub fn stderr() -> Self {
52 StdWriter::of(io::stderr())
53 }
54}
55
56impl <W: Write + Unpin + Send> StdWriter<W> {
57 fn write_unless_broken_pipe(&mut self, data: &[u8]) {
58 let res = self.writer.write(data);
59 match res {
60 Ok(write_len) => assert_eq!(data.len(), write_len, "did not write all bytes"),
61 Err(err) => {
62 if err.kind() == io::ErrorKind::BrokenPipe {
63 debug!("broken pipe while writing");
64 warn!("exiting because of broken pipe"); exit(0);
66 }
67 panic!("error while writing");
68 }
69 }
70 }
71}
72
73#[async_trait]
74impl<W: Write + Unpin + Send + Debug> LineWriter for StdWriter<W> {
75 async fn write_line(&mut self, line: impl AsRef<str> + Send) {
76 let bytes = line.as_ref().as_bytes();
77 self.write_unless_broken_pipe(bytes);
78 self.write_unless_broken_pipe(&[b'\n']);
79 }
80}
81
82#[derive(Debug)]
83pub struct VecWriter {
84 lines: Vec<String>,
85}
86
87impl VecWriter {
88 pub fn new() -> Self {
89 VecWriter { lines: vec![] }
90 }
91
92 pub fn get(self) -> Vec<String> {
93 self.lines
94 }
95}
96
97impl Default for VecWriter {
98 fn default() -> Self {
99 Self::new()
100 }
101}
102
103#[async_trait]
104impl LineWriter for VecWriter {
105 async fn write_line(&mut self, line: impl AsRef<str> + Send) {
106 self.lines.push(line.as_ref().to_owned())
107 }
108}
109
110#[derive(Debug)]
111pub struct CollectorWriter {
112 lines: LineContainer,
113}
114
115#[derive(Debug, Clone)]
116pub struct LineContainer {
117 lines: Arc<Mutex<Vec<String>>>,
118}
119
120impl LineContainer {
121 pub async fn snapshot(&self) -> MutexGuard<'_, Vec<String>> {
122 self.lines.lock().await
123 }
124}
125
126impl CollectorWriter {
127 pub fn new() -> Self {
128 CollectorWriter {
129 lines: LineContainer {
130 lines: Arc::new(Mutex::new(vec![])),
131 },
132 }
133 }
134
135 pub fn lines(&self) -> LineContainer {
136 self.lines.clone()
137 }
138}
139
140impl Default for CollectorWriter {
141 fn default() -> Self {
142 Self::new()
143 }
144}
145
146#[async_trait]
147impl LineWriter for CollectorWriter {
148 async fn write_line(&mut self, line: impl AsRef<str> + Send) {
149 self.lines.lines.lock().await.push(line.as_ref().to_owned())
150 }
151}
152
153#[derive(Debug)]
154pub struct FirstItemWriter {
155 line: Option<String>,
156}
157
158impl FirstItemWriter {
159 pub fn new() -> Self {
160 FirstItemWriter { line: None }
161 }
162
163 pub fn get(self) -> Option<String> {
164 self.line
165 }
166}
167
168impl Default for FirstItemWriter {
169 fn default() -> Self {
170 Self::new()
171 }
172}
173
174#[async_trait]
175impl LineWriter for FirstItemWriter {
176 async fn write_line(&mut self, line: impl AsRef<str> + Send) {
177 self.line.get_or_insert_with(|| line.as_ref().to_owned());
178 }
179}
180
181#[derive(Debug)]
182pub struct DiscardWriter();
183
184impl DiscardWriter {
185 pub fn new() -> Self {
186 DiscardWriter()
187 }
188}
189
190impl Default for DiscardWriter {
191 fn default() -> Self {
192 Self::new()
193 }
194}
195
196#[async_trait]
197impl LineWriter for DiscardWriter {
198 async fn write_line(&mut self, _line: impl AsRef<str> + Send) {
199 }
201}
202
203#[derive(Debug)]
205pub struct TeeWriter<'a, W1: LineWriter, W2: LineWriter> {
206 first: &'a mut W1,
207 second: &'a mut W2,
208}
209
210impl<'a, W1: LineWriter, W2: LineWriter> TeeWriter<'a, W1, W2> {
211 pub fn new(first: &'a mut W1, second: &'a mut W2) -> Self {
212 TeeWriter { first, second }
213 }
214}
215
216#[async_trait]
217impl<'a, W1: LineWriter, W2: LineWriter> LineWriter for TeeWriter<'a, W1, W2> {
218 async fn write_line(&mut self, line: impl AsRef<str> + Send) {
219 let line = line.as_ref();
220 let _: ((), ()) = join(self.first.write_line(line), self.second.write_line(line),).await;
221 }
222}
223
224#[derive(Debug)]
226pub struct FunnelWriter<'a, W: LineWriter> {
227 name: &'a str,
228 delegate: Arc<Mutex<&'a mut W>>,
229}
230
231#[derive(Debug)]
232pub struct FunnelFactory<'a, W: LineWriter> {
233 delegate: Arc<Mutex<&'a mut W>>,
234}
235
236impl<'a, W: LineWriter> FunnelFactory<'a, W> {
237 pub fn new(delegate: &'a mut W) -> Self {
238 FunnelFactory {
239 delegate: Arc::new(Mutex::new(delegate)),
240 }
241 }
242
243 pub fn writer(&self, name: &'a str) -> FunnelWriter<'a, W> {
244 FunnelWriter {
245 name,
246 delegate: self.delegate.clone(),
247 }
248 }
249}
250
251#[async_trait]
252impl<'a, W: LineWriter> LineWriter for FunnelWriter<'a, W> {
253 async fn write_line(&mut self, line: impl AsRef<str> + Send) {
254 let line = line.as_ref();
255 let mut dw = self.delegate.lock().await;
256 if self.name.is_empty() {
257 dw.write_line(format!("{}", line)).await;
258 } else {
259 dw.write_line(format!("[{}] {}", self.name, line)).await;
260 }
261 }
262}
263
264pub struct RegexWatcherWriter<F: Fn(&str) + Send> {
266 patterns: SmallVec<[Regex; 1]>,
267 action: F,
268}
269
270impl<F: Fn(&str) + Send> fmt::Debug for RegexWatcherWriter<F> {
271 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
272 write!(
273 f,
274 "RegexWatcherWriter{{patterns={:?},action=fn}}",
275 self.patterns
276 )
277 }
278}
279
280impl<F: Fn(&str) + Send> RegexWatcherWriter<F> {
281 pub fn new(patterns: impl Into<SmallVec<[Regex; 1]>>, action: F) -> Self {
282 RegexWatcherWriter {
283 patterns: patterns.into(),
284 action,
285 }
286 }
287}
288
289#[async_trait]
290impl<F: Fn(&str) + Send> LineWriter for RegexWatcherWriter<F> {
291 async fn write_line(&mut self, line: impl AsRef<str> + Send) {
292 let line = line.as_ref();
293 for pattern in &self.patterns {
294 if pattern.is_match(line) {
295 debug!("pattern {} matched for line {}", pattern, line);
296 (self.action)(line)
297 }
298 }
299 }
300}
301
302#[derive(Debug)]
303pub struct PrefixWriter<'a, W: LineWriter> {
304 delegate: &'a mut W,
305 prefix: String,
306}
307
308impl<'a, W: LineWriter> PrefixWriter<'a, W> {
309 pub fn new(delegate: &'a mut W, prefix: String) -> Self {
310 PrefixWriter { delegate, prefix }
311 }
312}
313
314#[async_trait]
315impl<'a, W: LineWriter> LineWriter for PrefixWriter<'a, W> {
316 async fn write_line(&mut self, line: impl AsRef<str> + Send) {
317 let mut prefixed_line = line.as_ref().to_owned();
318 prefixed_line.insert_str(0, &self.prefix);
319 self.delegate.write_line(&prefixed_line).await
320 }
321}
322
323#[cfg(test)]
324mod tests {
325 use super::*;
326
327 #[test]
328 fn can_craete_writer_without_type_annotation() {
329 let _ = StdWriter::stdout();
330 }
331}