Skip to main content

netsky_core/
jsonl.rs

1use std::fs::{self, File};
2use std::io::{self, Write};
3use std::path::Path;
4
5use fs4::fs_std::FileExt;
6use serde::Serialize;
7use serde_json::Value;
8
9/// Append exactly one serialized JSON value plus a newline to a JSONL file.
10///
11/// The append is guarded by an exclusive cross-process file lock, uses
12/// `O_APPEND`, and calls `sync_all` before releasing the lock.
13pub fn append_json_line<T: Serialize>(path: impl AsRef<Path>, value: &T) -> io::Result<()> {
14    let line = serde_json::to_string(value).map_err(io::Error::other)?;
15    append_serialized_json_line(path, &line)
16}
17
18/// Append one pre-serialized JSON value plus a newline to a JSONL file.
19///
20/// The caller owns JSON serialization. This function rejects embedded
21/// newlines so one call cannot create more than one JSONL record.
22pub fn append_serialized_json_line(path: impl AsRef<Path>, line: &str) -> io::Result<()> {
23    if line.contains('\n') || line.contains('\r') {
24        return Err(io::Error::new(
25            io::ErrorKind::InvalidInput,
26            "jsonl line contains newline",
27        ));
28    }
29    let path = path.as_ref();
30    if let Some(parent) = path.parent() {
31        fs::create_dir_all(parent)?;
32    }
33    let mut file = fs::OpenOptions::new()
34        .create(true)
35        .append(true)
36        .open(path)?;
37    file.lock_exclusive()?;
38    let result = (|| {
39        writeln!(file, "{line}")?;
40        file.sync_all()
41    })();
42    let unlock = file.unlock();
43    result.and(unlock)
44}
45
46/// Read JSONL records, tolerating one malformed crash-tail record.
47///
48/// A malformed final line is skipped only when the file does not end in a
49/// newline. Malformed middle lines are yielded as parse errors.
50pub fn read_records(
51    path: impl AsRef<Path>,
52) -> io::Result<impl Iterator<Item = serde_json::Result<Value>>> {
53    let path = path.as_ref();
54    let body = fs::read_to_string(path)?;
55    Ok(parse_records(path, &body).into_iter())
56}
57
58/// Atomically replace a file with fsynced contents.
59///
60/// The data is written to a sibling temp file, fsynced, renamed over the
61/// destination, then the parent directory is fsynced.
62pub fn write_file_atomic(path: impl AsRef<Path>, body: &str) -> io::Result<()> {
63    let path = path.as_ref();
64    if let Some(parent) = path.parent() {
65        fs::create_dir_all(parent)?;
66    }
67    let tmp = path.with_extension(format!(
68        "{}.tmp.{}",
69        path.extension()
70            .and_then(|ext| ext.to_str())
71            .unwrap_or("json"),
72        std::process::id()
73    ));
74    {
75        let mut file = fs::OpenOptions::new()
76            .create(true)
77            .write(true)
78            .truncate(true)
79            .open(&tmp)?;
80        file.write_all(body.as_bytes())?;
81        file.sync_all()?;
82    }
83    fs::rename(&tmp, path)?;
84    if let Some(parent) = path.parent() {
85        let dir = File::open(parent)?;
86        dir.sync_all()?;
87    }
88    Ok(())
89}
90
91fn parse_records(path: &Path, body: &str) -> Vec<serde_json::Result<Value>> {
92    let missing_final_newline = !body.is_empty() && !body.ends_with('\n');
93    let mut lines = body.lines().peekable();
94    let mut out = Vec::new();
95    let mut line_number = 0;
96
97    while let Some(line) = lines.next() {
98        line_number += 1;
99        let is_final_line = lines.peek().is_none();
100        let line = line.trim();
101        if line.is_empty() {
102            continue;
103        }
104        match serde_json::from_str(line) {
105            Ok(value) => out.push(Ok(value)),
106            Err(error) if missing_final_newline && is_final_line => {
107                tracing::debug!(
108                    path = %path.display(),
109                    line = line_number,
110                    error = %error,
111                    "skipping malformed trailing jsonl record"
112                );
113            }
114            Err(error) => out.push(Err(error)),
115        }
116    }
117
118    out
119}
120
121#[cfg(test)]
122mod tests {
123    use super::*;
124    use std::process::Command;
125    use std::process::Stdio;
126
127    #[derive(Serialize)]
128    struct TestRecord {
129        worker: usize,
130        seq: usize,
131        payload: String,
132    }
133
134    #[test]
135    fn concurrent_process_writers_produce_valid_jsonl() {
136        if let Ok(path) = std::env::var("NETSKY_JSONL_TEST_PATH") {
137            let worker = std::env::var("NETSKY_JSONL_TEST_WORKER")
138                .unwrap()
139                .parse::<usize>()
140                .unwrap();
141            for seq in 0..25 {
142                append_json_line(
143                    &path,
144                    &TestRecord {
145                        worker,
146                        seq,
147                        payload: format!("worker-{worker}-seq-{seq}"),
148                    },
149                )
150                .unwrap();
151            }
152            return;
153        }
154
155        let dir = tempfile::tempdir().unwrap();
156        let path = dir.path().join("events.jsonl");
157        let exe = std::env::current_exe().unwrap();
158        let mut children = Vec::new();
159
160        for worker in 0..16 {
161            children.push(
162                Command::new(&exe)
163                    .arg("--exact")
164                    .arg("jsonl::tests::concurrent_process_writers_produce_valid_jsonl")
165                    .arg("--quiet")
166                    .env("NETSKY_JSONL_TEST_PATH", &path)
167                    .env("NETSKY_JSONL_TEST_WORKER", worker.to_string())
168                    .stdout(Stdio::null())
169                    .stderr(Stdio::null())
170                    .spawn()
171                    .unwrap(),
172            );
173        }
174
175        for mut child in children {
176            let status = child.wait().unwrap();
177            assert!(status.success(), "child writer exited {status}");
178        }
179
180        let body = fs::read_to_string(path).unwrap();
181        let lines: Vec<_> = body.lines().collect();
182        assert_eq!(lines.len(), 16 * 25);
183        for line in lines {
184            let value: serde_json::Value = serde_json::from_str(line).unwrap();
185            assert!(value["worker"].is_number());
186            assert!(value["seq"].is_number());
187            assert!(value["payload"].is_string());
188        }
189    }
190
191    #[test]
192    fn read_records_skips_one_malformed_trailing_record_without_newline() {
193        let dir = tempfile::tempdir().unwrap();
194        let path = dir.path().join("events.jsonl");
195        fs::write(
196            &path,
197            concat!("{\"seq\":1}\n", "{\"seq\":2}\n", "{\"seq\":"),
198        )
199        .unwrap();
200
201        let records = read_records(&path)
202            .unwrap()
203            .collect::<serde_json::Result<Vec<_>>>()
204            .unwrap();
205
206        assert_eq!(records.len(), 2);
207        assert_eq!(records[0]["seq"], 1);
208        assert_eq!(records[1]["seq"], 2);
209    }
210
211    #[test]
212    fn read_records_yields_valid_final_record_without_newline() {
213        let dir = tempfile::tempdir().unwrap();
214        let path = dir.path().join("events.jsonl");
215        fs::write(&path, "{\"seq\":1}\n{\"seq\":2}").unwrap();
216
217        let records = read_records(&path)
218            .unwrap()
219            .collect::<serde_json::Result<Vec<_>>>()
220            .unwrap();
221
222        assert_eq!(records.len(), 2);
223        assert_eq!(records[1]["seq"], 2);
224    }
225
226    #[test]
227    fn read_records_keeps_middle_parse_errors() {
228        let dir = tempfile::tempdir().unwrap();
229        let path = dir.path().join("events.jsonl");
230        fs::write(&path, "{\"seq\":1}\n{\"seq\":\n{\"seq\":2}\n").unwrap();
231
232        let mut records = read_records(&path).unwrap();
233        assert!(records.next().unwrap().is_ok());
234        assert!(records.next().unwrap().is_err());
235        assert!(records.next().unwrap().is_ok());
236        assert!(records.next().is_none());
237    }
238}