sanitize_engine/processor/
jsonl_proc.rs1use crate::error::{Result, SanitizeError};
36use crate::processor::json_proc::walk_json;
37use crate::processor::{FileTypeProfile, Processor};
38use crate::store::MappingStore;
39use serde_json::Value;
40use std::io::{self, BufRead, BufReader, Write};
41
42pub struct JsonLinesProcessor;
44
45impl JsonLinesProcessor {
46 fn process_lines(
50 reader: impl BufRead,
51 writer: &mut dyn Write,
52 profile: &FileTypeProfile,
53 store: &MappingStore,
54 ) -> Result<()> {
55 let skip_invalid = profile
56 .options
57 .get("skip_invalid")
58 .is_some_and(|v| v == "true");
59
60 let compact = profile
61 .options
62 .get("compact")
63 .map_or(true, |v| v != "false");
64
65 for (line_no, line_result) in reader.lines().enumerate() {
66 let raw_line = line_result?;
67
68 if raw_line.trim().is_empty() {
69 continue;
70 }
71
72 let mut value: Value = match serde_json::from_str(&raw_line) {
73 Ok(v) => v,
74 Err(e) => {
75 if skip_invalid {
76 writer.write_all(raw_line.as_bytes())?;
77 writer.write_all(b"\n")?;
78 continue;
79 }
80 return Err(SanitizeError::ParseError {
81 format: "JSONL".into(),
82 message: format!("line {}: {}", line_no + 1, e),
83 });
84 }
85 };
86
87 walk_json(&mut value, "", profile, store, 0)?;
88
89 let serialised = if compact {
90 serde_json::to_vec(&value)
91 } else {
92 serde_json::to_vec_pretty(&value)
93 }
94 .map_err(|e| SanitizeError::IoError(format!("JSONL serialize error: {}", e)))?;
95
96 writer.write_all(&serialised)?;
97 writer.write_all(b"\n")?;
98 }
99
100 Ok(())
101 }
102}
103
104impl Processor for JsonLinesProcessor {
105 fn name(&self) -> &'static str {
106 "jsonl"
107 }
108
109 fn can_handle(&self, content: &[u8], profile: &FileTypeProfile) -> bool {
110 if profile.processor == "jsonl" {
111 return true;
112 }
113 let Ok(text) = std::str::from_utf8(content) else {
116 return false;
117 };
118 let mut lines = text.lines().filter(|l| !l.trim().is_empty());
119 let first = match lines.next() {
120 Some(l) => l.trim_start(),
121 None => return false,
122 };
123 first.starts_with('{') && lines.next().is_some()
124 }
125
126 fn supports_streaming(&self) -> bool {
127 true
128 }
129
130 fn process(
131 &self,
132 content: &[u8],
133 profile: &FileTypeProfile,
134 store: &MappingStore,
135 ) -> Result<Vec<u8>> {
136 std::str::from_utf8(content).map_err(|e| SanitizeError::ParseError {
138 format: "JSONL".into(),
139 message: format!("invalid UTF-8: {}", e),
140 })?;
141 let mut output = Vec::with_capacity(content.len());
142 Self::process_lines(BufReader::new(content), &mut output, profile, store)?;
143 Ok(output)
144 }
145
146 fn process_stream(
147 &self,
148 reader: &mut dyn io::Read,
149 writer: &mut dyn io::Write,
150 profile: &FileTypeProfile,
151 store: &MappingStore,
152 ) -> Result<()> {
153 Self::process_lines(BufReader::new(reader), writer, profile, store)
154 }
155}
156
157#[cfg(test)]
158mod tests {
159 use super::*;
160 use crate::category::Category;
161 use crate::generator::HmacGenerator;
162 use crate::processor::profile::FieldRule;
163 use std::sync::Arc;
164
165 fn make_store() -> MappingStore {
166 let gen = Arc::new(HmacGenerator::new([42u8; 32]));
167 MappingStore::new(gen, None)
168 }
169
170 fn make_profile(fields: Vec<FieldRule>) -> FileTypeProfile {
171 FileTypeProfile::new("jsonl", fields).with_option("compact", "true")
172 }
173
174 #[test]
175 fn replaces_matched_fields_across_lines() {
176 let store = make_store();
177 let proc = JsonLinesProcessor;
178 let input = b"{\"email\":\"a@b.com\",\"level\":\"info\"}\n{\"email\":\"c@d.com\",\"level\":\"warn\"}\n";
179 let profile = make_profile(vec![FieldRule::new("email").with_category(Category::Email)]);
180
181 let result = proc.process(input, &profile, &store).unwrap();
182 let lines: Vec<&str> = std::str::from_utf8(&result).unwrap().lines().collect();
183
184 assert_eq!(lines.len(), 2);
185 let v0: Value = serde_json::from_str(lines[0]).unwrap();
186 let v1: Value = serde_json::from_str(lines[1]).unwrap();
187 assert_ne!(v0["email"].as_str().unwrap(), "a@b.com");
188 assert_ne!(v1["email"].as_str().unwrap(), "c@d.com");
189 assert_eq!(v0["level"].as_str().unwrap(), "info");
190 assert_eq!(v1["level"].as_str().unwrap(), "warn");
191 }
192
193 #[test]
194 fn process_stream_matches_process() {
195 let input = b"{\"email\":\"a@b.com\"}\n{\"email\":\"c@d.com\"}\n";
196 let profile = make_profile(vec![FieldRule::new("email").with_category(Category::Email)]);
197
198 let store1 = make_store();
200 let proc = JsonLinesProcessor;
201 let from_process = proc.process(input, &profile, &store1).unwrap();
202
203 let store2 = make_store();
205 let mut reader = io::Cursor::new(input);
206 let mut from_stream = Vec::new();
207 proc.process_stream(&mut reader, &mut from_stream, &profile, &store2)
208 .unwrap();
209
210 assert_eq!(from_process, from_stream);
211 }
212
213 #[test]
214 fn glob_suffix_pattern() {
215 let store = make_store();
216 let proc = JsonLinesProcessor;
217 let input = b"{\"db\":{\"password\":\"pw1\"},\"name\":\"app\"}\n";
218 let profile = make_profile(vec![
219 FieldRule::new("*.password").with_category(Category::Custom("pw".into()))
220 ]);
221
222 let result = proc.process(input, &profile, &store).unwrap();
223 let trimmed = result
224 .iter()
225 .rposition(|b| !b.is_ascii_whitespace())
226 .map_or(&[][..], |i| &result[..=i]);
227 let v: Value = serde_json::from_slice(trimmed).unwrap();
228 assert_ne!(v["db"]["password"].as_str().unwrap(), "pw1");
229 assert_eq!(v["name"].as_str().unwrap(), "app");
230 }
231
232 #[test]
233 fn skip_invalid_passes_through_bad_lines() {
234 let store = make_store();
235 let proc = JsonLinesProcessor;
236 let input = b"{\"email\":\"a@b.com\"}\nnot json at all\n{\"email\":\"c@d.com\"}\n";
237 let profile = FileTypeProfile::new(
238 "jsonl",
239 vec![FieldRule::new("email").with_category(Category::Email)],
240 )
241 .with_option("skip_invalid", "true")
242 .with_option("compact", "true");
243
244 let result = proc.process(input, &profile, &store).unwrap();
245 let text = std::str::from_utf8(&result).unwrap();
246 let lines: Vec<&str> = text.lines().collect();
247
248 assert_eq!(lines.len(), 3);
249 assert_eq!(lines[1], "not json at all");
250 let v0: Value = serde_json::from_str(lines[0]).unwrap();
251 assert_ne!(v0["email"].as_str().unwrap(), "a@b.com");
252 }
253
254 #[test]
255 fn error_on_invalid_line_by_default() {
256 let store = make_store();
257 let proc = JsonLinesProcessor;
258 let input = b"{\"email\":\"a@b.com\"}\nnot json\n";
259 let profile = make_profile(vec![FieldRule::new("email").with_category(Category::Email)]);
260
261 assert!(proc.process(input, &profile, &store).is_err());
262 }
263
264 #[test]
265 fn deterministic_same_value_same_replacement() {
266 let store = make_store();
267 let proc = JsonLinesProcessor;
268 let input = b"{\"email\":\"a@b.com\"}\n{\"email\":\"a@b.com\"}\n";
269 let profile = make_profile(vec![FieldRule::new("email").with_category(Category::Email)]);
270
271 let result = proc.process(input, &profile, &store).unwrap();
272 let lines: Vec<&str> = std::str::from_utf8(&result).unwrap().lines().collect();
273 let v0: Value = serde_json::from_str(lines[0]).unwrap();
274 let v1: Value = serde_json::from_str(lines[1]).unwrap();
275 assert_eq!(v0["email"].as_str().unwrap(), v1["email"].as_str().unwrap());
276 }
277
278 #[test]
279 fn can_handle_heuristic_multi_line_json_objects() {
280 let proc = JsonLinesProcessor;
281 let profile = FileTypeProfile::new("yaml", vec![]);
282 let input = b"{\"a\":1}\n{\"b\":2}\n";
283 assert!(proc.can_handle(input, &profile));
284 }
285
286 #[test]
287 fn can_handle_rejects_single_object() {
288 let proc = JsonLinesProcessor;
289 let profile = FileTypeProfile::new("yaml", vec![]);
290 let input = b"{\"a\":1}";
291 assert!(!proc.can_handle(input, &profile));
292 }
293
294 #[test]
295 fn supports_streaming_is_true() {
296 assert!(JsonLinesProcessor.supports_streaming());
297 }
298}