Skip to main content

sanitize_engine/processor/
jsonl_proc.rs

1//! NDJSON / JSON Lines structured processor.
2//!
3//! Processes files where each non-empty line is an independent JSON object
4//! (Newline-Delimited JSON, also called JSON Lines). Unlike the [`JsonProcessor`](crate::processor::json_proc::JsonProcessor),
5//! this processor never builds a full in-memory parse tree for the whole file —
6//! each line is parsed, walked, serialised, and written out independently,
7//! keeping per-line memory overhead constant regardless of input size.
8//!
9//! When used via the CLI with a matching profile, the processor is invoked
10//! through the streaming path: the file is opened as a reader and processed
11//! line-by-line without `fs::read` loading it into a `Vec<u8>` first. This
12//! makes GB-scale NDJSON log files practical to sanitize.
13//!
14//! # Options
15//!
16//! | Key | Values | Default | Description |
17//! |-----|--------|---------|-------------|
18//! | `skip_invalid` | `"true"` / `"false"` | `"false"` | Pass malformed lines through unchanged instead of returning an error. Useful for mixed log files that interleave plain-text lines with JSON. |
19//! | `compact` | `"true"` / `"false"` | `"true"` | Serialise each output line as compact JSON. Set to `"false"` only for debugging — pretty-printed NDJSON is non-standard. |
20//!
21//! # Example profile entry
22//!
23//! ```yaml
24//! - processor: jsonl
25//!   extensions: [".jsonl", ".ndjson", ".log"]
26//!   options:
27//!     skip_invalid: "true"
28//!   fields:
29//!     - pattern: "*.email"
30//!       category: email
31//!     - pattern: "*.password"
32//!       category: "custom:password"
33//! ```
34
35use crate::error::{Result, SanitizeError};
36use crate::processor::json_proc::walk_json;
37use crate::processor::{FileTypeProfile, Processor};
38use crate::store::MappingStore;
39use serde_json::Value;
40use std::io::{self, BufRead, BufReader, Write};
41
42/// Structured processor for NDJSON / JSON Lines files.
43pub struct JsonLinesProcessor;
44
45impl JsonLinesProcessor {
46    /// Core line-by-line processing logic, shared by both `process` and
47    /// `process_stream`. Reads from any `BufRead` source and writes to any
48    /// `Write` sink.
49    fn process_lines(
50        reader: impl BufRead,
51        writer: &mut dyn Write,
52        profile: &FileTypeProfile,
53        store: &MappingStore,
54    ) -> Result<()> {
55        let skip_invalid = profile
56            .options
57            .get("skip_invalid")
58            .is_some_and(|v| v == "true");
59
60        let compact = profile
61            .options
62            .get("compact")
63            .map_or(true, |v| v != "false");
64
65        for (line_no, line_result) in reader.lines().enumerate() {
66            let raw_line = line_result?;
67
68            if raw_line.trim().is_empty() {
69                continue;
70            }
71
72            let mut value: Value = match serde_json::from_str(&raw_line) {
73                Ok(v) => v,
74                Err(e) => {
75                    if skip_invalid {
76                        writer.write_all(raw_line.as_bytes())?;
77                        writer.write_all(b"\n")?;
78                        continue;
79                    }
80                    return Err(SanitizeError::ParseError {
81                        format: "JSONL".into(),
82                        message: format!("line {}: {}", line_no + 1, e),
83                    });
84                }
85            };
86
87            walk_json(&mut value, "", profile, store, 0)?;
88
89            let serialised = if compact {
90                serde_json::to_vec(&value)
91            } else {
92                serde_json::to_vec_pretty(&value)
93            }
94            .map_err(|e| SanitizeError::IoError(format!("JSONL serialize error: {}", e)))?;
95
96            writer.write_all(&serialised)?;
97            writer.write_all(b"\n")?;
98        }
99
100        Ok(())
101    }
102}
103
104impl Processor for JsonLinesProcessor {
105    fn name(&self) -> &'static str {
106        "jsonl"
107    }
108
109    fn can_handle(&self, content: &[u8], profile: &FileTypeProfile) -> bool {
110        if profile.processor == "jsonl" {
111            return true;
112        }
113        // Heuristic: first non-empty line starts with `{` and there are
114        // multiple lines — distinguishes NDJSON from a single-object JSON file.
115        let Ok(text) = std::str::from_utf8(content) else {
116            return false;
117        };
118        let mut lines = text.lines().filter(|l| !l.trim().is_empty());
119        let first = match lines.next() {
120            Some(l) => l.trim_start(),
121            None => return false,
122        };
123        first.starts_with('{') && lines.next().is_some()
124    }
125
126    fn supports_streaming(&self) -> bool {
127        true
128    }
129
130    fn process(
131        &self,
132        content: &[u8],
133        profile: &FileTypeProfile,
134        store: &MappingStore,
135    ) -> Result<Vec<u8>> {
136        // Validate UTF-8 upfront so the error points at the file, not a line.
137        std::str::from_utf8(content).map_err(|e| SanitizeError::ParseError {
138            format: "JSONL".into(),
139            message: format!("invalid UTF-8: {}", e),
140        })?;
141        let mut output = Vec::with_capacity(content.len());
142        Self::process_lines(BufReader::new(content), &mut output, profile, store)?;
143        Ok(output)
144    }
145
146    fn process_stream(
147        &self,
148        reader: &mut dyn io::Read,
149        writer: &mut dyn io::Write,
150        profile: &FileTypeProfile,
151        store: &MappingStore,
152    ) -> Result<()> {
153        Self::process_lines(BufReader::new(reader), writer, profile, store)
154    }
155}
156
157#[cfg(test)]
158mod tests {
159    use super::*;
160    use crate::category::Category;
161    use crate::generator::HmacGenerator;
162    use crate::processor::profile::FieldRule;
163    use std::sync::Arc;
164
165    fn make_store() -> MappingStore {
166        let gen = Arc::new(HmacGenerator::new([42u8; 32]));
167        MappingStore::new(gen, None)
168    }
169
170    fn make_profile(fields: Vec<FieldRule>) -> FileTypeProfile {
171        FileTypeProfile::new("jsonl", fields).with_option("compact", "true")
172    }
173
174    #[test]
175    fn replaces_matched_fields_across_lines() {
176        let store = make_store();
177        let proc = JsonLinesProcessor;
178        let input = b"{\"email\":\"a@b.com\",\"level\":\"info\"}\n{\"email\":\"c@d.com\",\"level\":\"warn\"}\n";
179        let profile = make_profile(vec![FieldRule::new("email").with_category(Category::Email)]);
180
181        let result = proc.process(input, &profile, &store).unwrap();
182        let lines: Vec<&str> = std::str::from_utf8(&result).unwrap().lines().collect();
183
184        assert_eq!(lines.len(), 2);
185        let v0: Value = serde_json::from_str(lines[0]).unwrap();
186        let v1: Value = serde_json::from_str(lines[1]).unwrap();
187        assert_ne!(v0["email"].as_str().unwrap(), "a@b.com");
188        assert_ne!(v1["email"].as_str().unwrap(), "c@d.com");
189        assert_eq!(v0["level"].as_str().unwrap(), "info");
190        assert_eq!(v1["level"].as_str().unwrap(), "warn");
191    }
192
193    #[test]
194    fn process_stream_matches_process() {
195        let input = b"{\"email\":\"a@b.com\"}\n{\"email\":\"c@d.com\"}\n";
196        let profile = make_profile(vec![FieldRule::new("email").with_category(Category::Email)]);
197
198        // process
199        let store1 = make_store();
200        let proc = JsonLinesProcessor;
201        let from_process = proc.process(input, &profile, &store1).unwrap();
202
203        // process_stream with identical store seed
204        let store2 = make_store();
205        let mut reader = io::Cursor::new(input);
206        let mut from_stream = Vec::new();
207        proc.process_stream(&mut reader, &mut from_stream, &profile, &store2)
208            .unwrap();
209
210        assert_eq!(from_process, from_stream);
211    }
212
213    #[test]
214    fn glob_suffix_pattern() {
215        let store = make_store();
216        let proc = JsonLinesProcessor;
217        let input = b"{\"db\":{\"password\":\"pw1\"},\"name\":\"app\"}\n";
218        let profile = make_profile(vec![
219            FieldRule::new("*.password").with_category(Category::Custom("pw".into()))
220        ]);
221
222        let result = proc.process(input, &profile, &store).unwrap();
223        let trimmed = result
224            .iter()
225            .rposition(|b| !b.is_ascii_whitespace())
226            .map_or(&[][..], |i| &result[..=i]);
227        let v: Value = serde_json::from_slice(trimmed).unwrap();
228        assert_ne!(v["db"]["password"].as_str().unwrap(), "pw1");
229        assert_eq!(v["name"].as_str().unwrap(), "app");
230    }
231
232    #[test]
233    fn skip_invalid_passes_through_bad_lines() {
234        let store = make_store();
235        let proc = JsonLinesProcessor;
236        let input = b"{\"email\":\"a@b.com\"}\nnot json at all\n{\"email\":\"c@d.com\"}\n";
237        let profile = FileTypeProfile::new(
238            "jsonl",
239            vec![FieldRule::new("email").with_category(Category::Email)],
240        )
241        .with_option("skip_invalid", "true")
242        .with_option("compact", "true");
243
244        let result = proc.process(input, &profile, &store).unwrap();
245        let text = std::str::from_utf8(&result).unwrap();
246        let lines: Vec<&str> = text.lines().collect();
247
248        assert_eq!(lines.len(), 3);
249        assert_eq!(lines[1], "not json at all");
250        let v0: Value = serde_json::from_str(lines[0]).unwrap();
251        assert_ne!(v0["email"].as_str().unwrap(), "a@b.com");
252    }
253
254    #[test]
255    fn error_on_invalid_line_by_default() {
256        let store = make_store();
257        let proc = JsonLinesProcessor;
258        let input = b"{\"email\":\"a@b.com\"}\nnot json\n";
259        let profile = make_profile(vec![FieldRule::new("email").with_category(Category::Email)]);
260
261        assert!(proc.process(input, &profile, &store).is_err());
262    }
263
264    #[test]
265    fn deterministic_same_value_same_replacement() {
266        let store = make_store();
267        let proc = JsonLinesProcessor;
268        let input = b"{\"email\":\"a@b.com\"}\n{\"email\":\"a@b.com\"}\n";
269        let profile = make_profile(vec![FieldRule::new("email").with_category(Category::Email)]);
270
271        let result = proc.process(input, &profile, &store).unwrap();
272        let lines: Vec<&str> = std::str::from_utf8(&result).unwrap().lines().collect();
273        let v0: Value = serde_json::from_str(lines[0]).unwrap();
274        let v1: Value = serde_json::from_str(lines[1]).unwrap();
275        assert_eq!(v0["email"].as_str().unwrap(), v1["email"].as_str().unwrap());
276    }
277
278    #[test]
279    fn can_handle_heuristic_multi_line_json_objects() {
280        let proc = JsonLinesProcessor;
281        let profile = FileTypeProfile::new("yaml", vec![]);
282        let input = b"{\"a\":1}\n{\"b\":2}\n";
283        assert!(proc.can_handle(input, &profile));
284    }
285
286    #[test]
287    fn can_handle_rejects_single_object() {
288        let proc = JsonLinesProcessor;
289        let profile = FileTypeProfile::new("yaml", vec![]);
290        let input = b"{\"a\":1}";
291        assert!(!proc.can_handle(input, &profile));
292    }
293
294    #[test]
295    fn supports_streaming_is_true() {
296        assert!(JsonLinesProcessor.supports_streaming());
297    }
298}