Skip to main content

sanitize_engine/processor/
jsonl_proc.rs

1//! NDJSON / JSON Lines structured processor.
2//!
3//! Processes files where each non-empty line is an independent JSON object
4//! (Newline-Delimited JSON, also called JSON Lines). Unlike the [`JsonProcessor`](crate::processor::json_proc::JsonProcessor),
5//! this processor never builds a full in-memory parse tree for the whole file —
6//! each line is parsed, walked, serialised, and written out independently,
7//! keeping per-line memory overhead constant regardless of input size.
8//!
9//! When used via the CLI with a matching profile, the processor is invoked
10//! through the streaming path: the file is opened as a reader and processed
11//! line-by-line without `fs::read` loading it into a `Vec<u8>` first. This
12//! makes GB-scale NDJSON log files practical to sanitize.
13//!
14//! # Options
15//!
16//! | Key | Values | Default | Description |
17//! |-----|--------|---------|-------------|
18//! | `skip_invalid` | `"true"` / `"false"` | `"false"` | Pass malformed lines through unchanged instead of returning an error. Useful for mixed log files that interleave plain-text lines with JSON. |
19//! | `compact` | `"true"` / `"false"` | `"true"` | Serialise each output line as compact JSON. Set to `"false"` only for debugging — pretty-printed NDJSON is non-standard. |
20//!
21//! # Example profile entry
22//!
23//! ```yaml
24//! - processor: jsonl
25//!   extensions: [".jsonl", ".ndjson", ".log"]
26//!   options:
27//!     skip_invalid: "true"
28//!   fields:
29//!     - pattern: "*.email"
30//!       category: email
31//!     - pattern: "*.password"
32//!       category: "custom:password"
33//! ```
34
35use crate::error::{Result, SanitizeError};
36use crate::processor::json_proc::walk_json;
37use crate::processor::{FileTypeProfile, Processor};
38use crate::store::MappingStore;
39use serde_json::Value;
40use std::io::{self, BufRead, BufReader, Write};
41
42/// Structured processor for NDJSON / JSON Lines files.
43pub struct JsonLinesProcessor;
44
45impl JsonLinesProcessor {
46    /// Core line-by-line processing logic, shared by both `process` and
47    /// `process_stream`. Reads from any `BufRead` source and writes to any
48    /// `Write` sink.
49    fn process_lines(
50        reader: impl BufRead,
51        writer: &mut dyn Write,
52        profile: &FileTypeProfile,
53        store: &MappingStore,
54    ) -> Result<()> {
55        let skip_invalid = profile
56            .options
57            .get("skip_invalid")
58            .is_some_and(|v| v == "true");
59
60        let compact = profile
61            .options
62            .get("compact")
63            .map_or(true, |v| v != "false");
64
65        for (line_no, line_result) in reader.lines().enumerate() {
66            let raw_line = line_result?;
67
68            if raw_line.trim().is_empty() {
69                continue;
70            }
71
72            let mut value: Value = match serde_json::from_str(&raw_line) {
73                Ok(v) => v,
74                Err(e) => {
75                    if skip_invalid {
76                        writer.write_all(raw_line.as_bytes())?;
77                        writer.write_all(b"\n")?;
78                        continue;
79                    }
80                    return Err(SanitizeError::ParseError {
81                        format: "JSONL".into(),
82                        message: format!("line {}: {}", line_no + 1, e),
83                    });
84                }
85            };
86
87            walk_json(&mut value, "", profile, store, 0)?;
88
89            let serialised = if compact {
90                serde_json::to_vec(&value)
91            } else {
92                serde_json::to_vec_pretty(&value)
93            }
94            .map_err(|e| {
95                SanitizeError::IoError(std::io::Error::other(format!("JSONL serialize error: {e}")))
96            })?;
97
98            writer.write_all(&serialised)?;
99            writer.write_all(b"\n")?;
100        }
101
102        Ok(())
103    }
104}
105
106impl Processor for JsonLinesProcessor {
107    fn name(&self) -> &'static str {
108        "jsonl"
109    }
110
111    fn can_handle(&self, content: &[u8], profile: &FileTypeProfile) -> bool {
112        if profile.processor == "jsonl" {
113            return true;
114        }
115        // Heuristic: first non-empty line starts with `{` and there are
116        // multiple lines — distinguishes NDJSON from a single-object JSON file.
117        let Ok(text) = std::str::from_utf8(content) else {
118            return false;
119        };
120        let mut lines = text.lines().filter(|l| !l.trim().is_empty());
121        let first = match lines.next() {
122            Some(l) => l.trim_start(),
123            None => return false,
124        };
125        first.starts_with('{') && lines.next().is_some()
126    }
127
128    fn supports_streaming(&self) -> bool {
129        true
130    }
131
132    fn process(
133        &self,
134        content: &[u8],
135        profile: &FileTypeProfile,
136        store: &MappingStore,
137    ) -> Result<Vec<u8>> {
138        // Validate UTF-8 upfront so the error points at the file, not a line.
139        std::str::from_utf8(content).map_err(|e| SanitizeError::ParseError {
140            format: "JSONL".into(),
141            message: format!("invalid UTF-8: {}", e),
142        })?;
143        let mut output = Vec::with_capacity(content.len());
144        Self::process_lines(BufReader::new(content), &mut output, profile, store)?;
145        Ok(output)
146    }
147
148    fn process_stream(
149        &self,
150        reader: &mut dyn io::Read,
151        writer: &mut dyn io::Write,
152        profile: &FileTypeProfile,
153        store: &MappingStore,
154    ) -> Result<()> {
155        Self::process_lines(BufReader::new(reader), writer, profile, store)
156    }
157}
158
159#[cfg(test)]
160mod tests {
161    use super::*;
162    use crate::category::Category;
163    use crate::generator::HmacGenerator;
164    use crate::processor::profile::FieldRule;
165    use std::sync::Arc;
166
167    fn make_store() -> MappingStore {
168        let gen = Arc::new(HmacGenerator::new([42u8; 32]));
169        MappingStore::new(gen, None)
170    }
171
172    fn make_profile(fields: Vec<FieldRule>) -> FileTypeProfile {
173        FileTypeProfile::new("jsonl", fields).with_option("compact", "true")
174    }
175
176    #[test]
177    fn replaces_matched_fields_across_lines() {
178        let store = make_store();
179        let proc = JsonLinesProcessor;
180        let input = b"{\"email\":\"a@b.com\",\"level\":\"info\"}\n{\"email\":\"c@d.com\",\"level\":\"warn\"}\n";
181        let profile = make_profile(vec![FieldRule::new("email").with_category(Category::Email)]);
182
183        let result = proc.process(input, &profile, &store).unwrap();
184        let lines: Vec<&str> = std::str::from_utf8(&result).unwrap().lines().collect();
185
186        assert_eq!(lines.len(), 2);
187        let v0: Value = serde_json::from_str(lines[0]).unwrap();
188        let v1: Value = serde_json::from_str(lines[1]).unwrap();
189        assert_ne!(v0["email"].as_str().unwrap(), "a@b.com");
190        assert_ne!(v1["email"].as_str().unwrap(), "c@d.com");
191        assert_eq!(v0["level"].as_str().unwrap(), "info");
192        assert_eq!(v1["level"].as_str().unwrap(), "warn");
193    }
194
195    #[test]
196    fn process_stream_matches_process() {
197        let input = b"{\"email\":\"a@b.com\"}\n{\"email\":\"c@d.com\"}\n";
198        let profile = make_profile(vec![FieldRule::new("email").with_category(Category::Email)]);
199
200        // process
201        let store1 = make_store();
202        let proc = JsonLinesProcessor;
203        let from_process = proc.process(input, &profile, &store1).unwrap();
204
205        // process_stream with identical store seed
206        let store2 = make_store();
207        let mut reader = io::Cursor::new(input);
208        let mut from_stream = Vec::new();
209        proc.process_stream(&mut reader, &mut from_stream, &profile, &store2)
210            .unwrap();
211
212        assert_eq!(from_process, from_stream);
213    }
214
215    #[test]
216    fn glob_suffix_pattern() {
217        let store = make_store();
218        let proc = JsonLinesProcessor;
219        let input = b"{\"db\":{\"password\":\"pw1\"},\"name\":\"app\"}\n";
220        let profile = make_profile(vec![
221            FieldRule::new("*.password").with_category(Category::Custom("pw".into()))
222        ]);
223
224        let result = proc.process(input, &profile, &store).unwrap();
225        let trimmed = result
226            .iter()
227            .rposition(|b| !b.is_ascii_whitespace())
228            .map_or(&[][..], |i| &result[..=i]);
229        let v: Value = serde_json::from_slice(trimmed).unwrap();
230        assert_ne!(v["db"]["password"].as_str().unwrap(), "pw1");
231        assert_eq!(v["name"].as_str().unwrap(), "app");
232    }
233
234    #[test]
235    fn skip_invalid_passes_through_bad_lines() {
236        let store = make_store();
237        let proc = JsonLinesProcessor;
238        let input = b"{\"email\":\"a@b.com\"}\nnot json at all\n{\"email\":\"c@d.com\"}\n";
239        let profile = FileTypeProfile::new(
240            "jsonl",
241            vec![FieldRule::new("email").with_category(Category::Email)],
242        )
243        .with_option("skip_invalid", "true")
244        .with_option("compact", "true");
245
246        let result = proc.process(input, &profile, &store).unwrap();
247        let text = std::str::from_utf8(&result).unwrap();
248        let lines: Vec<&str> = text.lines().collect();
249
250        assert_eq!(lines.len(), 3);
251        assert_eq!(lines[1], "not json at all");
252        let v0: Value = serde_json::from_str(lines[0]).unwrap();
253        assert_ne!(v0["email"].as_str().unwrap(), "a@b.com");
254    }
255
256    #[test]
257    fn error_on_invalid_line_by_default() {
258        let store = make_store();
259        let proc = JsonLinesProcessor;
260        let input = b"{\"email\":\"a@b.com\"}\nnot json\n";
261        let profile = make_profile(vec![FieldRule::new("email").with_category(Category::Email)]);
262
263        assert!(proc.process(input, &profile, &store).is_err());
264    }
265
266    #[test]
267    fn deterministic_same_value_same_replacement() {
268        let store = make_store();
269        let proc = JsonLinesProcessor;
270        let input = b"{\"email\":\"a@b.com\"}\n{\"email\":\"a@b.com\"}\n";
271        let profile = make_profile(vec![FieldRule::new("email").with_category(Category::Email)]);
272
273        let result = proc.process(input, &profile, &store).unwrap();
274        let lines: Vec<&str> = std::str::from_utf8(&result).unwrap().lines().collect();
275        let v0: Value = serde_json::from_str(lines[0]).unwrap();
276        let v1: Value = serde_json::from_str(lines[1]).unwrap();
277        assert_eq!(v0["email"].as_str().unwrap(), v1["email"].as_str().unwrap());
278    }
279
280    #[test]
281    fn can_handle_heuristic_multi_line_json_objects() {
282        let proc = JsonLinesProcessor;
283        let profile = FileTypeProfile::new("yaml", vec![]);
284        let input = b"{\"a\":1}\n{\"b\":2}\n";
285        assert!(proc.can_handle(input, &profile));
286    }
287
288    #[test]
289    fn can_handle_rejects_single_object() {
290        let proc = JsonLinesProcessor;
291        let profile = FileTypeProfile::new("yaml", vec![]);
292        let input = b"{\"a\":1}";
293        assert!(!proc.can_handle(input, &profile));
294    }
295
296    #[test]
297    fn supports_streaming_is_true() {
298        assert!(JsonLinesProcessor.supports_streaming());
299    }
300}