Skip to main content

hyperi_rustlib/worker/
ndjson.rs

1// Project:   hyperi-rustlib
2// File:      src/worker/ndjson.rs
3// Purpose:   NDJSON batch splitting and parallel parsing utilities
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! NDJSON (newline-delimited JSON) batch processing utilities.
10//!
11//! Splits NDJSON byte payloads into individual lines and optionally parses
12//! them in parallel via [`AdaptiveWorkerPool::process_batch`](crate::worker::AdaptiveWorkerPool::process_batch).
13//!
14//! Does NOT depend on a specific JSON parser -- the parse function is a closure.
15//! Use with `sonic-rs`, `serde_json`, or any other parser.
16//!
17//! ## Example
18//!
19//! ```rust,ignore
20//! use hyperi_rustlib::worker::ndjson;
21//!
22//! let payload = b"{\"a\":1}\n{\"b\":2}\n{\"c\":3}\n";
23//! let lines = ndjson::split_lines(payload);
24//! assert_eq!(lines.len(), 3);
25//!
26//! // Parallel parse (with worker pool)
27//! let parsed = pool.process_batch(&lines, |line| {
28//!     sonic_rs::from_slice::<Value>(line).map_err(|e| e.to_string())
29//! });
30//! ```
31
32/// Split an NDJSON payload into individual line slices.
33///
34/// Handles `\n`, `\r\n`, trailing newlines, and blank lines (skipped).
35/// Zero-copy -- returns slices into the original payload.
36#[must_use]
37pub fn split_lines(payload: &[u8]) -> Vec<&[u8]> {
38    let mut lines = Vec::new();
39    let mut start = 0;
40
41    for (i, &byte) in payload.iter().enumerate() {
42        if byte == b'\n' {
43            let mut end = i;
44            // Handle \r\n
45            if end > start && payload[end - 1] == b'\r' {
46                end -= 1;
47            }
48            if end > start {
49                lines.push(&payload[start..end]);
50            }
51            start = i + 1;
52        }
53    }
54
55    // Handle last line without trailing newline
56    if start < payload.len() {
57        let end = if payload[payload.len() - 1] == b'\r' {
58            payload.len() - 1
59        } else {
60            payload.len()
61        };
62        if end > start {
63            lines.push(&payload[start..end]);
64        }
65    }
66
67    lines
68}
69
70/// Count the number of NDJSON lines in a payload without allocating.
71///
72/// Useful for pre-sizing buffers before splitting.
73#[must_use]
74pub fn count_lines(payload: &[u8]) -> usize {
75    if payload.is_empty() {
76        return 0;
77    }
78
79    // Count newline bytes -- bytecount crate would be marginally faster but is
80    // not worth a dependency for a non-hot-path utility function.
81    #[allow(clippy::naive_bytecount)]
82    let newlines = payload.iter().filter(|&&b| b == b'\n').count();
83    // If the payload doesn't end with \n, there's one more line
84    let trailing = usize::from(payload.last() != Some(&b'\n'));
85    newlines + trailing
86}
87
88#[cfg(test)]
89mod tests {
90    use super::*;
91
92    #[test]
93    fn test_split_simple() {
94        let payload = b"{\"a\":1}\n{\"b\":2}\n{\"c\":3}\n";
95        let lines = split_lines(payload);
96        assert_eq!(lines.len(), 3);
97        assert_eq!(lines[0], b"{\"a\":1}");
98        assert_eq!(lines[1], b"{\"b\":2}");
99        assert_eq!(lines[2], b"{\"c\":3}");
100    }
101
102    #[test]
103    fn test_split_no_trailing_newline() {
104        let payload = b"{\"a\":1}\n{\"b\":2}";
105        let lines = split_lines(payload);
106        assert_eq!(lines.len(), 2);
107        assert_eq!(lines[0], b"{\"a\":1}");
108        assert_eq!(lines[1], b"{\"b\":2}");
109    }
110
111    #[test]
112    fn test_split_single_line() {
113        let payload = b"{\"x\":42}";
114        let lines = split_lines(payload);
115        assert_eq!(lines.len(), 1);
116        assert_eq!(lines[0], b"{\"x\":42}");
117    }
118
119    #[test]
120    fn test_split_empty() {
121        let lines = split_lines(b"");
122        assert!(lines.is_empty());
123    }
124
125    #[test]
126    fn test_split_blank_lines_skipped() {
127        let payload = b"{\"a\":1}\n\n{\"b\":2}\n\n";
128        let lines = split_lines(payload);
129        assert_eq!(lines.len(), 2);
130    }
131
132    #[test]
133    fn test_split_crlf() {
134        let payload = b"{\"a\":1}\r\n{\"b\":2}\r\n";
135        let lines = split_lines(payload);
136        assert_eq!(lines.len(), 2);
137        assert_eq!(lines[0], b"{\"a\":1}");
138        assert_eq!(lines[1], b"{\"b\":2}");
139    }
140
141    #[test]
142    fn test_split_large_payload() {
143        let mut payload = Vec::new();
144        for i in 0..1000 {
145            payload.extend_from_slice(format!("{{\"id\":{i}}}\n").as_bytes());
146        }
147        let lines = split_lines(&payload);
148        assert_eq!(lines.len(), 1000);
149    }
150
151    #[test]
152    fn test_count_lines_simple() {
153        assert_eq!(count_lines(b"{}\n{}\n{}\n"), 3);
154    }
155
156    #[test]
157    fn test_count_lines_no_trailing() {
158        assert_eq!(count_lines(b"{}\n{}"), 2);
159    }
160
161    #[test]
162    fn test_count_lines_empty() {
163        assert_eq!(count_lines(b""), 0);
164    }
165
166    #[test]
167    fn test_count_lines_single() {
168        assert_eq!(count_lines(b"{}"), 1);
169    }
170}