use std::fs::{self, File};
use std::io::{self, Write};
use std::path::Path;
use fs4::fs_std::FileExt;
use serde::Serialize;
use serde_json::Value;
pub fn append_json_line<T: Serialize>(path: impl AsRef<Path>, value: &T) -> io::Result<()> {
let line = serde_json::to_string(value).map_err(io::Error::other)?;
append_serialized_json_line(path, &line)
}
pub fn append_serialized_json_line(path: impl AsRef<Path>, line: &str) -> io::Result<()> {
if line.contains('\n') || line.contains('\r') {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"jsonl line contains newline",
));
}
let path = path.as_ref();
if let Some(parent) = path.parent() {
fs::create_dir_all(parent)?;
}
let mut file = fs::OpenOptions::new()
.create(true)
.append(true)
.open(path)?;
file.lock_exclusive()?;
let result = (|| {
writeln!(file, "{line}")?;
file.sync_all()
})();
let unlock = file.unlock();
result.and(unlock)
}
pub fn read_records(
path: impl AsRef<Path>,
) -> io::Result<impl Iterator<Item = serde_json::Result<Value>>> {
let path = path.as_ref();
let body = fs::read_to_string(path)?;
Ok(parse_records(path, &body).into_iter())
}
pub fn write_file_atomic(path: impl AsRef<Path>, body: &str) -> io::Result<()> {
let path = path.as_ref();
if let Some(parent) = path.parent() {
fs::create_dir_all(parent)?;
}
let tmp = path.with_extension(format!(
"{}.tmp.{}",
path.extension()
.and_then(|ext| ext.to_str())
.unwrap_or("json"),
std::process::id()
));
{
let mut file = fs::OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(&tmp)?;
file.write_all(body.as_bytes())?;
file.sync_all()?;
}
fs::rename(&tmp, path)?;
if let Some(parent) = path.parent() {
let dir = File::open(parent)?;
dir.sync_all()?;
}
Ok(())
}
fn parse_records(path: &Path, body: &str) -> Vec<serde_json::Result<Value>> {
let missing_final_newline = !body.is_empty() && !body.ends_with('\n');
let mut lines = body.lines().peekable();
let mut out = Vec::new();
let mut line_number = 0;
while let Some(line) = lines.next() {
line_number += 1;
let is_final_line = lines.peek().is_none();
let line = line.trim();
if line.is_empty() {
continue;
}
match serde_json::from_str(line) {
Ok(value) => out.push(Ok(value)),
Err(error) if missing_final_newline && is_final_line => {
tracing::debug!(
path = %path.display(),
line = line_number,
error = %error,
"skipping malformed trailing jsonl record"
);
}
Err(error) => out.push(Err(error)),
}
}
out
}
#[cfg(test)]
mod tests {
use super::*;
use std::process::Command;
use std::process::Stdio;
#[derive(Serialize)]
struct TestRecord {
worker: usize,
seq: usize,
payload: String,
}
#[test]
fn concurrent_process_writers_produce_valid_jsonl() {
if let Ok(path) = std::env::var("NETSKY_JSONL_TEST_PATH") {
let worker = std::env::var("NETSKY_JSONL_TEST_WORKER")
.unwrap()
.parse::<usize>()
.unwrap();
for seq in 0..25 {
append_json_line(
&path,
&TestRecord {
worker,
seq,
payload: format!("worker-{worker}-seq-{seq}"),
},
)
.unwrap();
}
return;
}
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("events.jsonl");
let exe = std::env::current_exe().unwrap();
let mut children = Vec::new();
for worker in 0..16 {
children.push(
Command::new(&exe)
.arg("--exact")
.arg("jsonl::tests::concurrent_process_writers_produce_valid_jsonl")
.arg("--quiet")
.env("NETSKY_JSONL_TEST_PATH", &path)
.env("NETSKY_JSONL_TEST_WORKER", worker.to_string())
.stdout(Stdio::null())
.stderr(Stdio::null())
.spawn()
.unwrap(),
);
}
for mut child in children {
let status = child.wait().unwrap();
assert!(status.success(), "child writer exited {status}");
}
let body = fs::read_to_string(path).unwrap();
let lines: Vec<_> = body.lines().collect();
assert_eq!(lines.len(), 16 * 25);
for line in lines {
let value: serde_json::Value = serde_json::from_str(line).unwrap();
assert!(value["worker"].is_number());
assert!(value["seq"].is_number());
assert!(value["payload"].is_string());
}
}
#[test]
fn read_records_skips_one_malformed_trailing_record_without_newline() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("events.jsonl");
fs::write(
&path,
concat!("{\"seq\":1}\n", "{\"seq\":2}\n", "{\"seq\":"),
)
.unwrap();
let records = read_records(&path)
.unwrap()
.collect::<serde_json::Result<Vec<_>>>()
.unwrap();
assert_eq!(records.len(), 2);
assert_eq!(records[0]["seq"], 1);
assert_eq!(records[1]["seq"], 2);
}
#[test]
fn read_records_yields_valid_final_record_without_newline() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("events.jsonl");
fs::write(&path, "{\"seq\":1}\n{\"seq\":2}").unwrap();
let records = read_records(&path)
.unwrap()
.collect::<serde_json::Result<Vec<_>>>()
.unwrap();
assert_eq!(records.len(), 2);
assert_eq!(records[1]["seq"], 2);
}
#[test]
fn read_records_keeps_middle_parse_errors() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("events.jsonl");
fs::write(&path, "{\"seq\":1}\n{\"seq\":\n{\"seq\":2}\n").unwrap();
let mut records = read_records(&path).unwrap();
assert!(records.next().unwrap().is_ok());
assert!(records.next().unwrap().is_err());
assert!(records.next().unwrap().is_ok());
assert!(records.next().is_none());
}
}