1use std::fs::{self, File};
2use std::io::{self, Write};
3use std::path::Path;
4
5use fs4::fs_std::FileExt;
6use serde::Serialize;
7use serde_json::Value;
8
9pub fn append_json_line<T: Serialize>(path: impl AsRef<Path>, value: &T) -> io::Result<()> {
14 let line = serde_json::to_string(value).map_err(io::Error::other)?;
15 append_serialized_json_line(path, &line)
16}
17
18pub fn append_serialized_json_line(path: impl AsRef<Path>, line: &str) -> io::Result<()> {
23 if line.contains('\n') || line.contains('\r') {
24 return Err(io::Error::new(
25 io::ErrorKind::InvalidInput,
26 "jsonl line contains newline",
27 ));
28 }
29 let path = path.as_ref();
30 if let Some(parent) = path.parent() {
31 fs::create_dir_all(parent)?;
32 }
33 let mut file = fs::OpenOptions::new()
34 .create(true)
35 .append(true)
36 .open(path)?;
37 file.lock_exclusive()?;
38 let result = (|| {
39 writeln!(file, "{line}")?;
40 file.sync_all()
41 })();
42 let unlock = file.unlock();
43 result.and(unlock)
44}
45
46pub fn read_records(
51 path: impl AsRef<Path>,
52) -> io::Result<impl Iterator<Item = serde_json::Result<Value>>> {
53 let path = path.as_ref();
54 let body = fs::read_to_string(path)?;
55 Ok(parse_records(path, &body).into_iter())
56}
57
58pub fn write_file_atomic(path: impl AsRef<Path>, body: &str) -> io::Result<()> {
63 let path = path.as_ref();
64 if let Some(parent) = path.parent() {
65 fs::create_dir_all(parent)?;
66 }
67 let tmp = path.with_extension(format!(
68 "{}.tmp.{}",
69 path.extension()
70 .and_then(|ext| ext.to_str())
71 .unwrap_or("json"),
72 std::process::id()
73 ));
74 {
75 let mut file = fs::OpenOptions::new()
76 .create(true)
77 .write(true)
78 .truncate(true)
79 .open(&tmp)?;
80 file.write_all(body.as_bytes())?;
81 file.sync_all()?;
82 }
83 fs::rename(&tmp, path)?;
84 if let Some(parent) = path.parent() {
85 let dir = File::open(parent)?;
86 dir.sync_all()?;
87 }
88 Ok(())
89}
90
91fn parse_records(path: &Path, body: &str) -> Vec<serde_json::Result<Value>> {
92 let missing_final_newline = !body.is_empty() && !body.ends_with('\n');
93 let mut lines = body.lines().peekable();
94 let mut out = Vec::new();
95 let mut line_number = 0;
96
97 while let Some(line) = lines.next() {
98 line_number += 1;
99 let is_final_line = lines.peek().is_none();
100 let line = line.trim();
101 if line.is_empty() {
102 continue;
103 }
104 match serde_json::from_str(line) {
105 Ok(value) => out.push(Ok(value)),
106 Err(error) if missing_final_newline && is_final_line => {
107 tracing::debug!(
108 path = %path.display(),
109 line = line_number,
110 error = %error,
111 "skipping malformed trailing jsonl record"
112 );
113 }
114 Err(error) => out.push(Err(error)),
115 }
116 }
117
118 out
119}
120
121#[cfg(test)]
122mod tests {
123 use super::*;
124 use std::process::Command;
125 use std::process::Stdio;
126
127 #[derive(Serialize)]
128 struct TestRecord {
129 worker: usize,
130 seq: usize,
131 payload: String,
132 }
133
134 #[test]
135 fn concurrent_process_writers_produce_valid_jsonl() {
136 if let Ok(path) = std::env::var("NETSKY_JSONL_TEST_PATH") {
137 let worker = std::env::var("NETSKY_JSONL_TEST_WORKER")
138 .unwrap()
139 .parse::<usize>()
140 .unwrap();
141 for seq in 0..25 {
142 append_json_line(
143 &path,
144 &TestRecord {
145 worker,
146 seq,
147 payload: format!("worker-{worker}-seq-{seq}"),
148 },
149 )
150 .unwrap();
151 }
152 return;
153 }
154
155 let dir = tempfile::tempdir().unwrap();
156 let path = dir.path().join("events.jsonl");
157 let exe = std::env::current_exe().unwrap();
158 let mut children = Vec::new();
159
160 for worker in 0..16 {
161 children.push(
162 Command::new(&exe)
163 .arg("--exact")
164 .arg("jsonl::tests::concurrent_process_writers_produce_valid_jsonl")
165 .arg("--quiet")
166 .env("NETSKY_JSONL_TEST_PATH", &path)
167 .env("NETSKY_JSONL_TEST_WORKER", worker.to_string())
168 .stdout(Stdio::null())
169 .stderr(Stdio::null())
170 .spawn()
171 .unwrap(),
172 );
173 }
174
175 for mut child in children {
176 let status = child.wait().unwrap();
177 assert!(status.success(), "child writer exited {status}");
178 }
179
180 let body = fs::read_to_string(path).unwrap();
181 let lines: Vec<_> = body.lines().collect();
182 assert_eq!(lines.len(), 16 * 25);
183 for line in lines {
184 let value: serde_json::Value = serde_json::from_str(line).unwrap();
185 assert!(value["worker"].is_number());
186 assert!(value["seq"].is_number());
187 assert!(value["payload"].is_string());
188 }
189 }
190
191 #[test]
192 fn read_records_skips_one_malformed_trailing_record_without_newline() {
193 let dir = tempfile::tempdir().unwrap();
194 let path = dir.path().join("events.jsonl");
195 fs::write(
196 &path,
197 concat!("{\"seq\":1}\n", "{\"seq\":2}\n", "{\"seq\":"),
198 )
199 .unwrap();
200
201 let records = read_records(&path)
202 .unwrap()
203 .collect::<serde_json::Result<Vec<_>>>()
204 .unwrap();
205
206 assert_eq!(records.len(), 2);
207 assert_eq!(records[0]["seq"], 1);
208 assert_eq!(records[1]["seq"], 2);
209 }
210
211 #[test]
212 fn read_records_yields_valid_final_record_without_newline() {
213 let dir = tempfile::tempdir().unwrap();
214 let path = dir.path().join("events.jsonl");
215 fs::write(&path, "{\"seq\":1}\n{\"seq\":2}").unwrap();
216
217 let records = read_records(&path)
218 .unwrap()
219 .collect::<serde_json::Result<Vec<_>>>()
220 .unwrap();
221
222 assert_eq!(records.len(), 2);
223 assert_eq!(records[1]["seq"], 2);
224 }
225
226 #[test]
227 fn read_records_keeps_middle_parse_errors() {
228 let dir = tempfile::tempdir().unwrap();
229 let path = dir.path().join("events.jsonl");
230 fs::write(&path, "{\"seq\":1}\n{\"seq\":\n{\"seq\":2}\n").unwrap();
231
232 let mut records = read_records(&path).unwrap();
233 assert!(records.next().unwrap().is_ok());
234 assert!(records.next().unwrap().is_err());
235 assert!(records.next().unwrap().is_ok());
236 assert!(records.next().is_none());
237 }
238}