rusht/common/
write.rs

1use ::std::fmt;
2// using async caused deadlocks in concurrent mvn commands
3use ::std::fmt::Debug;
4use ::std::io;
5use ::std::io::Write;
6use std::process::exit;
7
8use ::async_std::sync::Arc;
9use ::async_std::sync::Mutex;
10use ::async_std::sync::MutexGuard;
11use ::async_trait::async_trait;
12use ::log::debug;
13use ::log::warn;
14use ::regex::Regex;
15use ::smallvec::SmallVec;
16use futures::future::join;
17
18#[async_trait]
19pub trait LineWriter: Debug + Send {
20    async fn write_line(&mut self, line: impl AsRef<str> + Send);
21
22    async fn write_all_lines<S: AsRef<str> + Send>(
23        &mut self,
24        lines: impl Iterator<Item = S> + Send,
25    ) {
26        for line in lines {
27            self.write_line(line).await
28        }
29    }
30}
31
32//TODO @mverleg: not called Std?
33#[derive(Debug)]
34pub struct StdWriter<W: Write + Unpin + Send> {
35    writer: W,
36}
37
38impl<W: Write + Unpin + Send> StdWriter<W> {
39    pub fn of(writer: W) -> Self {
40        StdWriter { writer }
41    }
42}
43
44impl StdWriter<io::Stdout> {
45    pub fn stdout() -> Self {
46        StdWriter::of(io::stdout())
47    }
48}
49
50impl StdWriter<io::Stderr> {
51    pub fn stderr() -> Self {
52        StdWriter::of(io::stderr())
53    }
54}
55
56impl <W: Write + Unpin + Send> StdWriter<W> {
57    fn write_unless_broken_pipe(&mut self, data: &[u8]) {
58        let res = self.writer.write(data);
59        match res {
60            Ok(write_len) => assert_eq!(data.len(), write_len, "did not write all bytes"),
61            Err(err) => {
62                if err.kind() == io::ErrorKind::BrokenPipe {
63                    debug!("broken pipe while writing");
64                    warn!("exiting because of broken pipe");  //TODO @mverleg: is this the right approach?
65                    exit(0);
66                }
67                panic!("error while writing");
68            }
69        }
70    }
71}
72
73#[async_trait]
74impl<W: Write + Unpin + Send + Debug> LineWriter for StdWriter<W> {
75    async fn write_line(&mut self, line: impl AsRef<str> + Send) {
76        let bytes = line.as_ref().as_bytes();
77        self.write_unless_broken_pipe(bytes);
78        self.write_unless_broken_pipe(&[b'\n']);
79    }
80}
81
82#[derive(Debug)]
83pub struct VecWriter {
84    lines: Vec<String>,
85}
86
87impl VecWriter {
88    pub fn new() -> Self {
89        VecWriter { lines: vec![] }
90    }
91
92    pub fn get(self) -> Vec<String> {
93        self.lines
94    }
95}
96
97impl Default for VecWriter {
98    fn default() -> Self {
99        Self::new()
100    }
101}
102
103#[async_trait]
104impl LineWriter for VecWriter {
105    async fn write_line(&mut self, line: impl AsRef<str> + Send) {
106        self.lines.push(line.as_ref().to_owned())
107    }
108}
109
110#[derive(Debug)]
111pub struct CollectorWriter {
112    lines: LineContainer,
113}
114
115#[derive(Debug, Clone)]
116pub struct LineContainer {
117    lines: Arc<Mutex<Vec<String>>>,
118}
119
120impl LineContainer {
121    pub async fn snapshot(&self) -> MutexGuard<'_, Vec<String>> {
122        self.lines.lock().await
123    }
124}
125
126impl CollectorWriter {
127    pub fn new() -> Self {
128        CollectorWriter {
129            lines: LineContainer {
130                lines: Arc::new(Mutex::new(vec![])),
131            },
132        }
133    }
134
135    pub fn lines(&self) -> LineContainer {
136        self.lines.clone()
137    }
138}
139
140impl Default for CollectorWriter {
141    fn default() -> Self {
142        Self::new()
143    }
144}
145
146#[async_trait]
147impl LineWriter for CollectorWriter {
148    async fn write_line(&mut self, line: impl AsRef<str> + Send) {
149        self.lines.lines.lock().await.push(line.as_ref().to_owned())
150    }
151}
152
153#[derive(Debug)]
154pub struct FirstItemWriter {
155    line: Option<String>,
156}
157
158impl FirstItemWriter {
159    pub fn new() -> Self {
160        FirstItemWriter { line: None }
161    }
162
163    pub fn get(self) -> Option<String> {
164        self.line
165    }
166}
167
168impl Default for FirstItemWriter {
169    fn default() -> Self {
170        Self::new()
171    }
172}
173
174#[async_trait]
175impl LineWriter for FirstItemWriter {
176    async fn write_line(&mut self, line: impl AsRef<str> + Send) {
177        self.line.get_or_insert_with(|| line.as_ref().to_owned());
178    }
179}
180
181#[derive(Debug)]
182pub struct DiscardWriter();
183
184impl DiscardWriter {
185    pub fn new() -> Self {
186        DiscardWriter()
187    }
188}
189
190impl Default for DiscardWriter {
191    fn default() -> Self {
192        Self::new()
193    }
194}
195
196#[async_trait]
197impl LineWriter for DiscardWriter {
198    async fn write_line(&mut self, _line: impl AsRef<str> + Send) {
199        // drop line
200    }
201}
202
203/// For every line written, send it to two other writers.
204#[derive(Debug)]
205pub struct TeeWriter<'a, W1: LineWriter, W2: LineWriter> {
206    first: &'a mut W1,
207    second: &'a mut W2,
208}
209
210impl<'a, W1: LineWriter, W2: LineWriter> TeeWriter<'a, W1, W2> {
211    pub fn new(first: &'a mut W1, second: &'a mut W2) -> Self {
212        TeeWriter { first, second }
213    }
214}
215
216#[async_trait]
217impl<'a, W1: LineWriter, W2: LineWriter> LineWriter for TeeWriter<'a, W1, W2> {
218    async fn write_line(&mut self, line: impl AsRef<str> + Send) {
219        let line = line.as_ref();
220        let _: ((), ()) = join(self.first.write_line(line), self.second.write_line(line),).await;
221    }
222}
223
224/// Several handles can send to the same writer asynchronously.
225#[derive(Debug)]
226pub struct FunnelWriter<'a, W: LineWriter> {
227    name: &'a str,
228    delegate: Arc<Mutex<&'a mut W>>,
229}
230
231#[derive(Debug)]
232pub struct FunnelFactory<'a, W: LineWriter> {
233    delegate: Arc<Mutex<&'a mut W>>,
234}
235
236impl<'a, W: LineWriter> FunnelFactory<'a, W> {
237    pub fn new(delegate: &'a mut W) -> Self {
238        FunnelFactory {
239            delegate: Arc::new(Mutex::new(delegate)),
240        }
241    }
242
243    pub fn writer(&self, name: &'a str) -> FunnelWriter<'a, W> {
244        FunnelWriter {
245            name,
246            delegate: self.delegate.clone(),
247        }
248    }
249}
250
251#[async_trait]
252impl<'a, W: LineWriter> LineWriter for FunnelWriter<'a, W> {
253    async fn write_line(&mut self, line: impl AsRef<str> + Send) {
254        let line = line.as_ref();
255        let mut dw = self.delegate.lock().await;
256        if self.name.is_empty() {
257            dw.write_line(format!("{}", line)).await;
258        } else {
259            dw.write_line(format!("[{}] {}", self.name, line)).await;
260        }
261    }
262}
263
264/// For every line written, send it to two other writers.
265pub struct RegexWatcherWriter<F: Fn(&str) + Send> {
266    patterns: SmallVec<[Regex; 1]>,
267    action: F,
268}
269
270impl<F: Fn(&str) + Send> fmt::Debug for RegexWatcherWriter<F> {
271    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
272        write!(
273            f,
274            "RegexWatcherWriter{{patterns={:?},action=fn}}",
275            self.patterns
276        )
277    }
278}
279
280impl<F: Fn(&str) + Send> RegexWatcherWriter<F> {
281    pub fn new(patterns: impl Into<SmallVec<[Regex; 1]>>, action: F) -> Self {
282        RegexWatcherWriter {
283            patterns: patterns.into(),
284            action,
285        }
286    }
287}
288
289#[async_trait]
290impl<F: Fn(&str) + Send> LineWriter for RegexWatcherWriter<F> {
291    async fn write_line(&mut self, line: impl AsRef<str> + Send) {
292        let line = line.as_ref();
293        for pattern in &self.patterns {
294            if pattern.is_match(line) {
295                debug!("pattern {} matched for line {}", pattern, line);
296                (self.action)(line)
297            }
298        }
299    }
300}
301
302#[derive(Debug)]
303pub struct PrefixWriter<'a, W: LineWriter> {
304    delegate: &'a mut W,
305    prefix: String,
306}
307
308impl<'a, W: LineWriter> PrefixWriter<'a, W> {
309    pub fn new(delegate: &'a mut W, prefix: String) -> Self {
310        PrefixWriter { delegate, prefix }
311    }
312}
313
314#[async_trait]
315impl<'a, W: LineWriter> LineWriter for PrefixWriter<'a, W> {
316    async fn write_line(&mut self, line: impl AsRef<str> + Send) {
317        let mut prefixed_line = line.as_ref().to_owned();
318        prefixed_line.insert_str(0, &self.prefix);
319        self.delegate.write_line(&prefixed_line).await
320    }
321}
322
323#[cfg(test)]
324mod tests {
325    use super::*;
326
327    #[test]
328    fn can_craete_writer_without_type_annotation() {
329        let _ = StdWriter::stdout();
330    }
331}