Skip to main content

synapse_pingora/dlp/
stream.rs

1//! Streaming DLP Scanner
2//!
3//! Allows scanning arbitrarily large streams of data without buffering the entire
4//! content in memory. Handles patterns that cross chunk boundaries using an overlap buffer.
5
6use super::scanner::{DlpConfig, DlpConfigError, DlpScanner, ScanResult, SensitiveDataType};
7use std::collections::HashSet;
8use std::sync::Arc;
9
10/// Default overlap size (should be larger than the longest expected pattern)
11const DEFAULT_OVERLAP_SIZE: usize = 1024; // 1KB
12
13/// Default maximum buffer size (16MB)
14const DEFAULT_MAX_BUFFER_SIZE: usize = 16 * 1024 * 1024;
15
16/// Safety margin added to auto-calculated overlap
17const OVERLAP_SAFETY_MARGIN: usize = 64;
18
19/// Error type for streaming scanner operations
20#[derive(Debug, Clone, PartialEq, Eq)]
21pub enum StreamingError {
22    /// Buffer would exceed maximum allowed size
23    BufferOverflow {
24        current: usize,
25        incoming: usize,
26        max: usize,
27    },
28    /// Configuration error
29    Config(DlpConfigError),
30}
31
32impl std::fmt::Display for StreamingError {
33    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
34        match self {
35            Self::BufferOverflow {
36                current,
37                incoming,
38                max,
39            } => {
40                write!(
41                    f,
42                    "buffer overflow: current {} + incoming {} > max {}",
43                    current, incoming, max
44                )
45            }
46            Self::Config(e) => write!(f, "config error: {}", e),
47        }
48    }
49}
50
51impl std::error::Error for StreamingError {}
52
53impl From<DlpConfigError> for StreamingError {
54    fn from(e: DlpConfigError) -> Self {
55        Self::Config(e)
56    }
57}
58
59/// Streaming wrapper for DlpScanner
60pub struct StreamingScanner {
61    scanner: Arc<DlpScanner>,
62    buffer: Vec<u8>,
63    overlap_size: usize,
64    max_buffer_size: usize,
65    /// Tracks bytes that have been fully processed and shifted out of buffer
66    bytes_shifted: usize,
67    accumulated_results: ScanResult,
68    /// Track seen matches by (data_type, absolute_end_position) to deduplicate
69    seen_matches: HashSet<(SensitiveDataType, usize)>,
70}
71
72impl StreamingScanner {
73    /// Create a new streaming scanner using an existing scanner configuration
74    pub fn new(scanner: Arc<DlpScanner>) -> Self {
75        Self {
76            scanner,
77            buffer: Vec::with_capacity(DEFAULT_OVERLAP_SIZE * 2),
78            overlap_size: DEFAULT_OVERLAP_SIZE,
79            max_buffer_size: DEFAULT_MAX_BUFFER_SIZE,
80            bytes_shifted: 0,
81            accumulated_results: ScanResult::default(),
82            seen_matches: HashSet::new(),
83        }
84    }
85
86    /// Create a streaming scanner with auto-calculated overlap based on pattern lengths
87    pub fn with_auto_overlap(scanner: Arc<DlpScanner>, config: &DlpConfig) -> Self {
88        let overlap = config.max_pattern_length() + OVERLAP_SAFETY_MARGIN;
89        Self::new(scanner).with_overlap(overlap)
90    }
91
92    /// Set custom overlap size (max pattern length to detect across chunks)
93    pub fn with_overlap(mut self, size: usize) -> Self {
94        self.overlap_size = size;
95        self
96    }
97
98    /// Set maximum buffer size (default 16MB)
99    /// Returns error if a chunk would cause buffer to exceed this limit
100    pub fn with_max_buffer_size(mut self, size: usize) -> Self {
101        self.max_buffer_size = size;
102        self
103    }
104
105    /// Get the current absolute stream position (bytes processed so far)
106    pub fn bytes_processed(&self) -> usize {
107        self.bytes_shifted + self.buffer.len()
108    }
109
110    /// Process a new chunk of data
111    ///
112    /// Returns error if the buffer would exceed max_buffer_size
113    pub fn update(&mut self, chunk: &[u8]) -> Result<(), StreamingError> {
114        // Check buffer limit before appending
115        let new_size = self.buffer.len() + chunk.len();
116        if new_size > self.max_buffer_size {
117            return Err(StreamingError::BufferOverflow {
118                current: self.buffer.len(),
119                incoming: chunk.len(),
120                max: self.max_buffer_size,
121            });
122        }
123
124        // Track how much data was in buffer before this chunk (the overlap region)
125        let prev_len = self.buffer.len();
126
127        // Append new chunk to buffer
128        self.buffer.extend_from_slice(chunk);
129
130        // Scan the current buffer
131        let result = self.scanner.scan_bytes(&self.buffer);
132
133        // Process matches
134        if result.has_matches {
135            for m in result.matches {
136                // Calculate absolute stream position for this match
137                let abs_start = self.bytes_shifted + m.start;
138                let abs_end = self.bytes_shifted + m.end;
139
140                // Deduplication: Skip if we've already seen this match
141                // A match is considered duplicate if same type ends at same absolute position
142                let match_key = (m.data_type, abs_end);
143                if self.seen_matches.contains(&match_key) {
144                    continue;
145                }
146
147                // Only report matches that end in the "new" part of the buffer
148                // (i.e., end index > prev_len relative to current buffer)
149                // This prevents reporting the same match twice when it's fully in overlap
150                if m.end > prev_len {
151                    self.seen_matches.insert(match_key);
152
153                    // Create match with absolute stream offset
154                    let mut new_match = m;
155                    new_match.stream_offset = Some(abs_start);
156                    // Update start/end to absolute positions
157                    new_match.start = abs_start;
158                    new_match.end = abs_end;
159
160                    self.accumulated_results.matches.push(new_match);
161                    self.accumulated_results.match_count += 1;
162                    self.accumulated_results.has_matches = true;
163                }
164            }
165        }
166
167        // Prepare buffer for next chunk: keep only overlap
168        if self.buffer.len() > self.overlap_size {
169            let keep_start = self.buffer.len() - self.overlap_size;
170
171            // Track how many bytes we're shifting out
172            self.bytes_shifted += keep_start;
173
174            // Keep only the tail
175            self.buffer.drain(0..keep_start);
176        }
177
178        Ok(())
179    }
180
181    /// Finish the stream and get final results
182    #[must_use = "final scan results should be processed"]
183    pub fn finish(mut self) -> ScanResult {
184        // Update total bytes scanned
185        self.accumulated_results.content_length = self.bytes_shifted + self.buffer.len();
186        self.accumulated_results.scanned = true;
187        self.accumulated_results
188    }
189}
190
191#[cfg(test)]
192mod tests {
193    use super::*;
194    use crate::dlp::DlpConfig;
195
196    #[test]
197    fn test_streaming_split_pattern() {
198        let config = DlpConfig {
199            enabled: true,
200            ..Default::default()
201        };
202        let scanner = Arc::new(DlpScanner::new(config));
203        let mut stream = StreamingScanner::new(scanner);
204
205        // "Credit card: 4532015112830366" split across chunks
206        stream.update(b"Credit card: 45320151").unwrap();
207        stream.update(b"12830366 is valid.").unwrap();
208
209        let result = stream.finish();
210
211        assert!(result.has_matches);
212        assert_eq!(result.match_count, 1);
213        assert_eq!(
214            result.matches[0].data_type,
215            crate::dlp::SensitiveDataType::CreditCard
216        );
217        // Should have stream offset set
218        assert!(result.matches[0].stream_offset.is_some());
219    }
220
221    #[test]
222    fn test_streaming_no_duplicates() {
223        let config = DlpConfig {
224            enabled: true,
225            ..Default::default()
226        };
227        let scanner = Arc::new(DlpScanner::new(config));
228        let mut stream = StreamingScanner::new(scanner);
229
230        // Pattern fits in first chunk, but is retained in overlap
231        // Should not be reported twice
232        let chunk1 = b"Credit card: 4532015112830366 ";
233        let chunk2 = b"next chunk data";
234
235        stream.update(chunk1).unwrap();
236        stream.update(chunk2).unwrap();
237
238        let result = stream.finish();
239
240        assert_eq!(result.match_count, 1, "Should detect exactly once");
241    }
242
243    #[test]
244    fn test_buffer_overflow_protection() {
245        let config = DlpConfig {
246            enabled: true,
247            ..Default::default()
248        };
249        let scanner = Arc::new(DlpScanner::new(config));
250        let mut stream = StreamingScanner::new(scanner).with_max_buffer_size(100);
251
252        // First small chunk should work
253        assert!(stream.update(b"small data").is_ok());
254
255        // Large chunk should fail
256        let large_chunk = vec![b'x'; 200];
257        let result = stream.update(&large_chunk);
258        assert!(matches!(result, Err(StreamingError::BufferOverflow { .. })));
259    }
260
261    #[test]
262    fn test_stream_position_tracking() {
263        let config = DlpConfig {
264            enabled: true,
265            ..Default::default()
266        };
267        let scanner = Arc::new(DlpScanner::new(config));
268        let mut stream = StreamingScanner::new(scanner).with_overlap(50);
269
270        // Send data in chunks
271        stream.update(b"prefix data here ").unwrap(); // 17 bytes
272        stream.update(b"Card: 4532015112830366").unwrap(); // Card at position ~17
273
274        let result = stream.finish();
275
276        assert!(result.has_matches);
277        let credit_card = &result.matches[0];
278        assert!(credit_card.stream_offset.is_some());
279        // The credit card number starts after "Card: " which is at ~17 + 6 = 23
280        let offset = credit_card.stream_offset.unwrap();
281        assert!(offset >= 17, "Stream offset {} should be >= 17", offset);
282    }
283
284    #[test]
285    fn test_auto_overlap() {
286        let config = DlpConfig {
287            enabled: true,
288            custom_keywords: Some(vec!["VeryLongSecretKeyword123456789".to_string()]),
289            ..Default::default()
290        };
291        let scanner = Arc::new(DlpScanner::new(config.clone()));
292        let stream = StreamingScanner::with_auto_overlap(scanner, &config);
293
294        // Auto overlap should be at least max_pattern_length + safety margin
295        let expected_min = config.max_pattern_length() + OVERLAP_SAFETY_MARGIN;
296        assert!(
297            stream.overlap_size >= expected_min,
298            "overlap {} should be >= {}",
299            stream.overlap_size,
300            expected_min
301        );
302    }
303}