1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
use std::{io::Write, sync::Arc};
use serde_json::json;
use crate::{pipelining::StageReceiver, utils::Utils, Error};
pub fn jsonl_writer_loop(
input: StageReceiver,
output: &mut impl Write,
utils: Arc<Utils>,
) -> Result<(), Error> {
for evt in input.iter() {
let buf = json!(evt).to_string();
let result = output
.write_all(buf.as_bytes())
.and_then(|_| output.write_all(b"\n"));
match result {
Ok(_) => {
utils.track_sink_progress(&evt);
}
Err(err) => return Err(Box::new(err)),
}
}
Ok(())
}