use std::io::{self, BufWriter, Write};
use std::path::Path;
use serde::Serialize;
use crate::error::{AtomwriteError, ErrorJson};
pub struct NdjsonWriter<W: Write> {
writer: BufWriter<W>,
}
impl<W: Write> NdjsonWriter<W> {
pub fn new(inner: W) -> Self {
Self {
writer: BufWriter::with_capacity(crate::constants::BUF_CAPACITY, inner),
}
}
pub fn write_event<T: Serialize>(&mut self, event: &T) -> anyhow::Result<()> {
match serde_json::to_writer(&mut self.writer, event) {
Ok(()) => {}
Err(e) if is_broken_pipe(&e) => {
return Err(crate::error::AtomwriteError::BrokenPipe.into());
}
Err(e) => return Err(e.into()),
}
match self.writer.write_all(b"\n") {
Ok(()) => {}
Err(e) if e.kind() == io::ErrorKind::BrokenPipe => {
return Err(crate::error::AtomwriteError::BrokenPipe.into());
}
Err(e) => return Err(e.into()),
}
match self.writer.flush() {
Ok(()) => {}
Err(e) if e.kind() == io::ErrorKind::BrokenPipe => {
return Err(crate::error::AtomwriteError::BrokenPipe.into());
}
Err(e) => return Err(e.into()),
}
Ok(())
}
pub fn write_error(&mut self, err: &AtomwriteError, path: Option<&Path>) -> anyhow::Result<()> {
let mut json = ErrorJson::from_error(err);
if json.path.is_none() {
json.path = path.map(|p| p.display().to_string());
}
self.write_event(&json)
}
pub fn flush(&mut self) -> anyhow::Result<()> {
self.writer.flush().map_err(|e| e.into())
}
}
#[cold]
pub fn write_error_json(
out: &mut impl Write,
err: &AtomwriteError,
path: Option<&Path>,
) -> anyhow::Result<()> {
let mut json = ErrorJson::from_error(err);
if json.path.is_none() {
json.path = path.map(|p| p.display().to_string());
}
serde_json::to_writer(&mut *out, &json)?;
out.write_all(b"\n")?;
out.flush()?;
Ok(())
}
pub fn read_limited_line(
reader: &mut impl std::io::BufRead,
buf: &mut String,
max_bytes: usize,
) -> std::io::Result<usize> {
buf.clear();
let n = reader.read_line(buf)?;
if buf.len() > max_bytes {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!(
"NDJSON line exceeds maximum size of {} bytes ({} bytes read)",
max_bytes,
buf.len()
),
));
}
Ok(n)
}
fn is_broken_pipe(err: &serde_json::Error) -> bool {
if let Some(io_err) = err.io_error_kind() {
return io_err == io::ErrorKind::BrokenPipe;
}
false
}