sanitize_engine/processor/
jsonl_proc.rs1use crate::error::{Result, SanitizeError};
36use crate::processor::json_proc::walk_json;
37use crate::processor::{FileTypeProfile, Processor};
38use crate::store::MappingStore;
39use serde_json::Value;
40use std::io::{self, BufRead, BufReader, Write};
41
42pub struct JsonLinesProcessor;
44
45impl JsonLinesProcessor {
46 fn process_lines(
50 reader: impl BufRead,
51 writer: &mut dyn Write,
52 profile: &FileTypeProfile,
53 store: &MappingStore,
54 ) -> Result<()> {
55 let skip_invalid = profile
56 .options
57 .get("skip_invalid")
58 .is_some_and(|v| v == "true");
59
60 let compact = profile
61 .options
62 .get("compact")
63 .map_or(true, |v| v != "false");
64
65 for (line_no, line_result) in reader.lines().enumerate() {
66 let raw_line = line_result?;
67
68 if raw_line.trim().is_empty() {
69 continue;
70 }
71
72 let mut value: Value = match serde_json::from_str(&raw_line) {
73 Ok(v) => v,
74 Err(e) => {
75 if skip_invalid {
76 writer.write_all(raw_line.as_bytes())?;
77 writer.write_all(b"\n")?;
78 continue;
79 }
80 return Err(SanitizeError::ParseError {
81 format: "JSONL".into(),
82 message: format!("line {}: {}", line_no + 1, e),
83 });
84 }
85 };
86
87 walk_json(&mut value, "", profile, store, 0)?;
88
89 let serialised = if compact {
90 serde_json::to_vec(&value)
91 } else {
92 serde_json::to_vec_pretty(&value)
93 }
94 .map_err(|e| {
95 SanitizeError::IoError(std::io::Error::other(format!("JSONL serialize error: {e}")))
96 })?;
97
98 writer.write_all(&serialised)?;
99 writer.write_all(b"\n")?;
100 }
101
102 Ok(())
103 }
104}
105
106impl Processor for JsonLinesProcessor {
107 fn name(&self) -> &'static str {
108 "jsonl"
109 }
110
111 fn can_handle(&self, content: &[u8], profile: &FileTypeProfile) -> bool {
112 if profile.processor == "jsonl" {
113 return true;
114 }
115 let Ok(text) = std::str::from_utf8(content) else {
118 return false;
119 };
120 let mut lines = text.lines().filter(|l| !l.trim().is_empty());
121 let first = match lines.next() {
122 Some(l) => l.trim_start(),
123 None => return false,
124 };
125 first.starts_with('{') && lines.next().is_some()
126 }
127
128 fn supports_streaming(&self) -> bool {
129 true
130 }
131
132 fn process(
133 &self,
134 content: &[u8],
135 profile: &FileTypeProfile,
136 store: &MappingStore,
137 ) -> Result<Vec<u8>> {
138 std::str::from_utf8(content).map_err(|e| SanitizeError::ParseError {
140 format: "JSONL".into(),
141 message: format!("invalid UTF-8: {}", e),
142 })?;
143 let mut output = Vec::with_capacity(content.len());
144 Self::process_lines(BufReader::new(content), &mut output, profile, store)?;
145 Ok(output)
146 }
147
148 fn process_stream(
149 &self,
150 reader: &mut dyn io::Read,
151 writer: &mut dyn io::Write,
152 profile: &FileTypeProfile,
153 store: &MappingStore,
154 ) -> Result<()> {
155 Self::process_lines(BufReader::new(reader), writer, profile, store)
156 }
157}
158
159#[cfg(test)]
160mod tests {
161 use super::*;
162 use crate::category::Category;
163 use crate::generator::HmacGenerator;
164 use crate::processor::profile::FieldRule;
165 use std::sync::Arc;
166
167 fn make_store() -> MappingStore {
168 let gen = Arc::new(HmacGenerator::new([42u8; 32]));
169 MappingStore::new(gen, None)
170 }
171
172 fn make_profile(fields: Vec<FieldRule>) -> FileTypeProfile {
173 FileTypeProfile::new("jsonl", fields).with_option("compact", "true")
174 }
175
176 #[test]
177 fn replaces_matched_fields_across_lines() {
178 let store = make_store();
179 let proc = JsonLinesProcessor;
180 let input = b"{\"email\":\"a@b.com\",\"level\":\"info\"}\n{\"email\":\"c@d.com\",\"level\":\"warn\"}\n";
181 let profile = make_profile(vec![FieldRule::new("email").with_category(Category::Email)]);
182
183 let result = proc.process(input, &profile, &store).unwrap();
184 let lines: Vec<&str> = std::str::from_utf8(&result).unwrap().lines().collect();
185
186 assert_eq!(lines.len(), 2);
187 let v0: Value = serde_json::from_str(lines[0]).unwrap();
188 let v1: Value = serde_json::from_str(lines[1]).unwrap();
189 assert_ne!(v0["email"].as_str().unwrap(), "a@b.com");
190 assert_ne!(v1["email"].as_str().unwrap(), "c@d.com");
191 assert_eq!(v0["level"].as_str().unwrap(), "info");
192 assert_eq!(v1["level"].as_str().unwrap(), "warn");
193 }
194
195 #[test]
196 fn process_stream_matches_process() {
197 let input = b"{\"email\":\"a@b.com\"}\n{\"email\":\"c@d.com\"}\n";
198 let profile = make_profile(vec![FieldRule::new("email").with_category(Category::Email)]);
199
200 let store1 = make_store();
202 let proc = JsonLinesProcessor;
203 let from_process = proc.process(input, &profile, &store1).unwrap();
204
205 let store2 = make_store();
207 let mut reader = io::Cursor::new(input);
208 let mut from_stream = Vec::new();
209 proc.process_stream(&mut reader, &mut from_stream, &profile, &store2)
210 .unwrap();
211
212 assert_eq!(from_process, from_stream);
213 }
214
215 #[test]
216 fn glob_suffix_pattern() {
217 let store = make_store();
218 let proc = JsonLinesProcessor;
219 let input = b"{\"db\":{\"password\":\"pw1\"},\"name\":\"app\"}\n";
220 let profile = make_profile(vec![
221 FieldRule::new("*.password").with_category(Category::Custom("pw".into()))
222 ]);
223
224 let result = proc.process(input, &profile, &store).unwrap();
225 let trimmed = result
226 .iter()
227 .rposition(|b| !b.is_ascii_whitespace())
228 .map_or(&[][..], |i| &result[..=i]);
229 let v: Value = serde_json::from_slice(trimmed).unwrap();
230 assert_ne!(v["db"]["password"].as_str().unwrap(), "pw1");
231 assert_eq!(v["name"].as_str().unwrap(), "app");
232 }
233
234 #[test]
235 fn skip_invalid_passes_through_bad_lines() {
236 let store = make_store();
237 let proc = JsonLinesProcessor;
238 let input = b"{\"email\":\"a@b.com\"}\nnot json at all\n{\"email\":\"c@d.com\"}\n";
239 let profile = FileTypeProfile::new(
240 "jsonl",
241 vec![FieldRule::new("email").with_category(Category::Email)],
242 )
243 .with_option("skip_invalid", "true")
244 .with_option("compact", "true");
245
246 let result = proc.process(input, &profile, &store).unwrap();
247 let text = std::str::from_utf8(&result).unwrap();
248 let lines: Vec<&str> = text.lines().collect();
249
250 assert_eq!(lines.len(), 3);
251 assert_eq!(lines[1], "not json at all");
252 let v0: Value = serde_json::from_str(lines[0]).unwrap();
253 assert_ne!(v0["email"].as_str().unwrap(), "a@b.com");
254 }
255
256 #[test]
257 fn error_on_invalid_line_by_default() {
258 let store = make_store();
259 let proc = JsonLinesProcessor;
260 let input = b"{\"email\":\"a@b.com\"}\nnot json\n";
261 let profile = make_profile(vec![FieldRule::new("email").with_category(Category::Email)]);
262
263 assert!(proc.process(input, &profile, &store).is_err());
264 }
265
266 #[test]
267 fn deterministic_same_value_same_replacement() {
268 let store = make_store();
269 let proc = JsonLinesProcessor;
270 let input = b"{\"email\":\"a@b.com\"}\n{\"email\":\"a@b.com\"}\n";
271 let profile = make_profile(vec![FieldRule::new("email").with_category(Category::Email)]);
272
273 let result = proc.process(input, &profile, &store).unwrap();
274 let lines: Vec<&str> = std::str::from_utf8(&result).unwrap().lines().collect();
275 let v0: Value = serde_json::from_str(lines[0]).unwrap();
276 let v1: Value = serde_json::from_str(lines[1]).unwrap();
277 assert_eq!(v0["email"].as_str().unwrap(), v1["email"].as_str().unwrap());
278 }
279
280 #[test]
281 fn can_handle_heuristic_multi_line_json_objects() {
282 let proc = JsonLinesProcessor;
283 let profile = FileTypeProfile::new("yaml", vec![]);
284 let input = b"{\"a\":1}\n{\"b\":2}\n";
285 assert!(proc.can_handle(input, &profile));
286 }
287
288 #[test]
289 fn can_handle_rejects_single_object() {
290 let proc = JsonLinesProcessor;
291 let profile = FileTypeProfile::new("yaml", vec![]);
292 let input = b"{\"a\":1}";
293 assert!(!proc.can_handle(input, &profile));
294 }
295
296 #[test]
297 fn supports_streaming_is_true() {
298 assert!(JsonLinesProcessor.supports_streaming());
299 }
300}