Skip to main content

cloakrs_adapters/
logstream.rs

1//! Log stream adapter for line-by-line scanning.
2
3use crate::{scan_json_str, JsonScanOptions};
4use cloakrs_core::{PiiEntity, Result, Scanner};
5use serde::{Deserialize, Serialize};
6use std::io::{BufRead, Write};
7
8/// The practical shape detected for one log line.
9#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
10pub enum LogLineFormat {
11    /// Plain text or an unrecognized log format.
12    Plaintext,
13    /// A valid JSON log line.
14    Json,
15    /// A log line containing `key=value` fields.
16    KeyValue,
17}
18
19/// PII findings for one streamed log line.
20///
21/// Spans are byte offsets relative to the original line content.
22#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
23pub struct LogLineScanResult {
24    /// One-based line number.
25    pub line_number: usize,
26    /// Detected practical log format for the line.
27    pub format: LogLineFormat,
28    /// Findings detected on this line.
29    pub findings: Vec<PiiEntity>,
30    /// Masked line when the scanner has masking enabled.
31    pub masked_line: Option<String>,
32}
33
34/// Scans log text line by line.
35///
36/// # Examples
37///
38/// ```
39/// use cloakrs_adapters::scan_log_str;
40/// use cloakrs_core::{Confidence, EntityType, Locale, PiiEntity, Recognizer, Scanner, Span};
41///
42/// struct Email;
43/// impl Recognizer for Email {
44///     fn id(&self) -> &str { "email_test" }
45///     fn entity_type(&self) -> EntityType { EntityType::Email }
46///     fn supported_locales(&self) -> &[Locale] { &[] }
47///     fn scan(&self, text: &str) -> Vec<PiiEntity> {
48///         text.find('@').map(|_| PiiEntity {
49///             entity_type: EntityType::Email,
50///             span: Span::new(0, text.len()),
51///             text: text.to_string(),
52///             confidence: Confidence::new(0.9).unwrap(),
53///             recognizer_id: self.id().to_string(),
54///         }).into_iter().collect()
55///     }
56/// }
57///
58/// let scanner = Scanner::builder().recognizer(Email).build().unwrap();
59/// let result = scan_log_str("user=a@test\n", &scanner).unwrap();
60/// assert_eq!(result.lines.len(), 1);
61/// ```
62pub fn scan_log_str(input: &str, scanner: &Scanner) -> Result<LogStreamScanResult> {
63    let mut output = Vec::new();
64    let lines = mask_log_reader(input.as_bytes(), &mut output, scanner)?;
65    let masked_log = String::from_utf8(output)
66        .map_err(|error| cloakrs_core::CloakError::ConfigError(error.to_string()))?;
67    Ok(LogStreamScanResult { lines, masked_log })
68}
69
70/// Result of scanning a log stream.
71#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
72pub struct LogStreamScanResult {
73    /// Findings grouped by log line.
74    pub lines: Vec<LogLineScanResult>,
75    /// Masked log text.
76    pub masked_log: String,
77}
78
79/// Streams logs from a reader to a writer, masking each line immediately.
80///
81/// This function processes exactly one input line at a time.
82pub fn mask_log_reader<R, W>(
83    reader: R,
84    mut writer: W,
85    scanner: &Scanner,
86) -> Result<Vec<LogLineScanResult>>
87where
88    R: BufRead,
89    W: Write,
90{
91    let mut results = Vec::new();
92    for (index, line) in reader.lines().enumerate() {
93        let line = line?;
94        let result = scan_log_line(index + 1, &line, scanner)?;
95        writer.write_all(result.masked_line.as_deref().unwrap_or(&line).as_bytes())?;
96        writer.write_all(b"\n")?;
97        results.push(result);
98    }
99    Ok(results)
100}
101
102fn scan_log_line(line_number: usize, line: &str, scanner: &Scanner) -> Result<LogLineScanResult> {
103    if let Some(result) = scan_json_log_line(line_number, line, scanner)? {
104        return Ok(result);
105    }
106    if let Some(result) = scan_key_value_log_line(line_number, line, scanner)? {
107        return Ok(result);
108    }
109
110    let scan = scanner.scan(line)?;
111    Ok(LogLineScanResult {
112        line_number,
113        format: LogLineFormat::Plaintext,
114        findings: scan.findings,
115        masked_line: scan.masked_text,
116    })
117}
118
119fn scan_json_log_line(
120    line_number: usize,
121    line: &str,
122    scanner: &Scanner,
123) -> Result<Option<LogLineScanResult>> {
124    let trimmed = line.trim_start();
125    if !trimmed.starts_with('{') && !trimmed.starts_with('[') {
126        return Ok(None);
127    }
128
129    let Ok(result) = scan_json_str(line, scanner, &JsonScanOptions::default()) else {
130        return Ok(None);
131    };
132    let findings = result
133        .strings
134        .into_iter()
135        .flat_map(|string| string.findings)
136        .collect();
137    let masked_line = Some(serde_json::to_string(&result.masked_json)?);
138    Ok(Some(LogLineScanResult {
139        line_number,
140        format: LogLineFormat::Json,
141        findings,
142        masked_line,
143    }))
144}
145
146fn scan_key_value_log_line(
147    line_number: usize,
148    line: &str,
149    scanner: &Scanner,
150) -> Result<Option<LogLineScanResult>> {
151    let mut findings = Vec::new();
152    let mut replacements = Vec::new();
153    let mut saw_key_value = false;
154
155    for (value_start, value_end) in key_value_spans(line) {
156        saw_key_value = true;
157        let value = &line[value_start..value_end];
158        let scan = scanner.scan(value)?;
159        if scan.findings.is_empty() {
160            continue;
161        }
162
163        findings.extend(scan.findings.into_iter().map(|mut finding| {
164            finding.span.start += value_start;
165            finding.span.end += value_start;
166            finding
167        }));
168        if let Some(masked_value) = scan.masked_text {
169            replacements.push((value_start, value_end, masked_value));
170        }
171    }
172
173    if !saw_key_value {
174        return Ok(None);
175    }
176
177    let mut masked_line = line.to_string();
178    for (start, end, replacement) in replacements.into_iter().rev() {
179        masked_line.replace_range(start..end, &replacement);
180    }
181
182    Ok(Some(LogLineScanResult {
183        line_number,
184        format: LogLineFormat::KeyValue,
185        findings,
186        masked_line: Some(masked_line),
187    }))
188}
189
190fn key_value_spans(line: &str) -> Vec<(usize, usize)> {
191    let mut spans = Vec::new();
192    let mut index = 0;
193    while index < line.len() {
194        index = skip_whitespace(line, index);
195        if index >= line.len() {
196            break;
197        }
198
199        let token_start = index;
200        while index < line.len() {
201            let ch = line[index..].chars().next().unwrap_or_default();
202            if ch == '=' || ch.is_whitespace() {
203                break;
204            }
205            index += ch.len_utf8();
206        }
207
208        if index >= line.len() || !line[index..].starts_with('=') || index == token_start {
209            index = skip_token(line, index);
210            continue;
211        }
212
213        index += 1;
214        if index >= line.len() {
215            continue;
216        }
217
218        let quote = line[index..]
219            .chars()
220            .next()
221            .filter(|ch| *ch == '"' || *ch == '\'');
222        let value_start = if let Some(quote) = quote {
223            index += quote.len_utf8();
224            index
225        } else {
226            index
227        };
228
229        while index < line.len() {
230            let ch = line[index..].chars().next().unwrap_or_default();
231            if quote.is_some_and(|quote| ch == quote) || quote.is_none() && ch.is_whitespace() {
232                break;
233            }
234            if ch == '\\' {
235                index += ch.len_utf8();
236                if index < line.len() {
237                    let escaped = line[index..].chars().next().unwrap_or_default();
238                    index += escaped.len_utf8();
239                }
240                continue;
241            }
242            index += ch.len_utf8();
243        }
244
245        if value_start < index {
246            spans.push((value_start, index));
247        }
248        index = skip_token(line, index);
249    }
250    spans
251}
252
253fn skip_whitespace(line: &str, mut index: usize) -> usize {
254    while index < line.len() {
255        let ch = line[index..].chars().next().unwrap_or_default();
256        if !ch.is_whitespace() {
257            break;
258        }
259        index += ch.len_utf8();
260    }
261    index
262}
263
264fn skip_token(line: &str, mut index: usize) -> usize {
265    while index < line.len() {
266        let ch = line[index..].chars().next().unwrap_or_default();
267        index += ch.len_utf8();
268        if ch.is_whitespace() {
269            break;
270        }
271    }
272    index
273}
274
275#[cfg(test)]
276mod tests {
277    use super::*;
278    use cloakrs_core::Locale;
279    use cloakrs_patterns::default_registry;
280
281    fn scanner() -> Scanner {
282        default_registry()
283            .into_scanner_builder()
284            .locale(Locale::US)
285            .build()
286            .unwrap()
287    }
288
289    #[test]
290    fn test_scan_log_str_plaintext_masks_line() {
291        let result = scan_log_str("login jane@example.com\n", &scanner()).unwrap();
292        assert_eq!(result.lines[0].format, LogLineFormat::Plaintext);
293        assert!(result.masked_log.contains("[EMAIL]"));
294    }
295
296    #[test]
297    fn test_scan_log_str_json_line_masks_string_value() {
298        let result =
299            scan_log_str(r#"{"level":"info","email":"jane@example.com"}"#, &scanner()).unwrap();
300        assert_eq!(result.lines[0].format, LogLineFormat::Json);
301        assert_eq!(result.lines[0].findings.len(), 1);
302        assert!(result.masked_log.contains(r#""email":"[EMAIL]""#));
303    }
304
305    #[test]
306    fn test_scan_log_str_key_value_line_detects_format() {
307        let result = scan_log_str("level=info email=jane@example.com", &scanner()).unwrap();
308        assert_eq!(result.lines[0].format, LogLineFormat::KeyValue);
309        assert!(result.masked_log.contains("email=[EMAIL]"));
310    }
311
312    #[test]
313    fn test_mask_log_reader_writes_each_line() {
314        let input = "a jane@example.com\nb ops@example.com\n";
315        let mut output = Vec::new();
316        let results = mask_log_reader(input.as_bytes(), &mut output, &scanner()).unwrap();
317        let output = String::from_utf8(output).unwrap();
318        assert_eq!(results.len(), 2);
319        assert_eq!(output.matches("[EMAIL]").count(), 2);
320    }
321}