synapse_pingora/dlp/
stream.rs1use super::scanner::{DlpConfig, DlpConfigError, DlpScanner, ScanResult, SensitiveDataType};
7use std::collections::HashSet;
8use std::sync::Arc;
9
10const DEFAULT_OVERLAP_SIZE: usize = 1024; const DEFAULT_MAX_BUFFER_SIZE: usize = 16 * 1024 * 1024;
15
16const OVERLAP_SAFETY_MARGIN: usize = 64;
18
19#[derive(Debug, Clone, PartialEq, Eq)]
21pub enum StreamingError {
22 BufferOverflow {
24 current: usize,
25 incoming: usize,
26 max: usize,
27 },
28 Config(DlpConfigError),
30}
31
32impl std::fmt::Display for StreamingError {
33 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
34 match self {
35 Self::BufferOverflow {
36 current,
37 incoming,
38 max,
39 } => {
40 write!(
41 f,
42 "buffer overflow: current {} + incoming {} > max {}",
43 current, incoming, max
44 )
45 }
46 Self::Config(e) => write!(f, "config error: {}", e),
47 }
48 }
49}
50
51impl std::error::Error for StreamingError {}
52
53impl From<DlpConfigError> for StreamingError {
54 fn from(e: DlpConfigError) -> Self {
55 Self::Config(e)
56 }
57}
58
59pub struct StreamingScanner {
61 scanner: Arc<DlpScanner>,
62 buffer: Vec<u8>,
63 overlap_size: usize,
64 max_buffer_size: usize,
65 bytes_shifted: usize,
67 accumulated_results: ScanResult,
68 seen_matches: HashSet<(SensitiveDataType, usize)>,
70}
71
72impl StreamingScanner {
73 pub fn new(scanner: Arc<DlpScanner>) -> Self {
75 Self {
76 scanner,
77 buffer: Vec::with_capacity(DEFAULT_OVERLAP_SIZE * 2),
78 overlap_size: DEFAULT_OVERLAP_SIZE,
79 max_buffer_size: DEFAULT_MAX_BUFFER_SIZE,
80 bytes_shifted: 0,
81 accumulated_results: ScanResult::default(),
82 seen_matches: HashSet::new(),
83 }
84 }
85
86 pub fn with_auto_overlap(scanner: Arc<DlpScanner>, config: &DlpConfig) -> Self {
88 let overlap = config.max_pattern_length() + OVERLAP_SAFETY_MARGIN;
89 Self::new(scanner).with_overlap(overlap)
90 }
91
92 pub fn with_overlap(mut self, size: usize) -> Self {
94 self.overlap_size = size;
95 self
96 }
97
98 pub fn with_max_buffer_size(mut self, size: usize) -> Self {
101 self.max_buffer_size = size;
102 self
103 }
104
105 pub fn bytes_processed(&self) -> usize {
107 self.bytes_shifted + self.buffer.len()
108 }
109
110 pub fn update(&mut self, chunk: &[u8]) -> Result<(), StreamingError> {
114 let new_size = self.buffer.len() + chunk.len();
116 if new_size > self.max_buffer_size {
117 return Err(StreamingError::BufferOverflow {
118 current: self.buffer.len(),
119 incoming: chunk.len(),
120 max: self.max_buffer_size,
121 });
122 }
123
124 let prev_len = self.buffer.len();
126
127 self.buffer.extend_from_slice(chunk);
129
130 let result = self.scanner.scan_bytes(&self.buffer);
132
133 if result.has_matches {
135 for m in result.matches {
136 let abs_start = self.bytes_shifted + m.start;
138 let abs_end = self.bytes_shifted + m.end;
139
140 let match_key = (m.data_type, abs_end);
143 if self.seen_matches.contains(&match_key) {
144 continue;
145 }
146
147 if m.end > prev_len {
151 self.seen_matches.insert(match_key);
152
153 let mut new_match = m;
155 new_match.stream_offset = Some(abs_start);
156 new_match.start = abs_start;
158 new_match.end = abs_end;
159
160 self.accumulated_results.matches.push(new_match);
161 self.accumulated_results.match_count += 1;
162 self.accumulated_results.has_matches = true;
163 }
164 }
165 }
166
167 if self.buffer.len() > self.overlap_size {
169 let keep_start = self.buffer.len() - self.overlap_size;
170
171 self.bytes_shifted += keep_start;
173
174 self.buffer.drain(0..keep_start);
176 }
177
178 Ok(())
179 }
180
181 #[must_use = "final scan results should be processed"]
183 pub fn finish(mut self) -> ScanResult {
184 self.accumulated_results.content_length = self.bytes_shifted + self.buffer.len();
186 self.accumulated_results.scanned = true;
187 self.accumulated_results
188 }
189}
190
191#[cfg(test)]
192mod tests {
193 use super::*;
194 use crate::dlp::DlpConfig;
195
196 #[test]
197 fn test_streaming_split_pattern() {
198 let config = DlpConfig {
199 enabled: true,
200 ..Default::default()
201 };
202 let scanner = Arc::new(DlpScanner::new(config));
203 let mut stream = StreamingScanner::new(scanner);
204
205 stream.update(b"Credit card: 45320151").unwrap();
207 stream.update(b"12830366 is valid.").unwrap();
208
209 let result = stream.finish();
210
211 assert!(result.has_matches);
212 assert_eq!(result.match_count, 1);
213 assert_eq!(
214 result.matches[0].data_type,
215 crate::dlp::SensitiveDataType::CreditCard
216 );
217 assert!(result.matches[0].stream_offset.is_some());
219 }
220
221 #[test]
222 fn test_streaming_no_duplicates() {
223 let config = DlpConfig {
224 enabled: true,
225 ..Default::default()
226 };
227 let scanner = Arc::new(DlpScanner::new(config));
228 let mut stream = StreamingScanner::new(scanner);
229
230 let chunk1 = b"Credit card: 4532015112830366 ";
233 let chunk2 = b"next chunk data";
234
235 stream.update(chunk1).unwrap();
236 stream.update(chunk2).unwrap();
237
238 let result = stream.finish();
239
240 assert_eq!(result.match_count, 1, "Should detect exactly once");
241 }
242
243 #[test]
244 fn test_buffer_overflow_protection() {
245 let config = DlpConfig {
246 enabled: true,
247 ..Default::default()
248 };
249 let scanner = Arc::new(DlpScanner::new(config));
250 let mut stream = StreamingScanner::new(scanner).with_max_buffer_size(100);
251
252 assert!(stream.update(b"small data").is_ok());
254
255 let large_chunk = vec![b'x'; 200];
257 let result = stream.update(&large_chunk);
258 assert!(matches!(result, Err(StreamingError::BufferOverflow { .. })));
259 }
260
261 #[test]
262 fn test_stream_position_tracking() {
263 let config = DlpConfig {
264 enabled: true,
265 ..Default::default()
266 };
267 let scanner = Arc::new(DlpScanner::new(config));
268 let mut stream = StreamingScanner::new(scanner).with_overlap(50);
269
270 stream.update(b"prefix data here ").unwrap(); stream.update(b"Card: 4532015112830366").unwrap(); let result = stream.finish();
275
276 assert!(result.has_matches);
277 let credit_card = &result.matches[0];
278 assert!(credit_card.stream_offset.is_some());
279 let offset = credit_card.stream_offset.unwrap();
281 assert!(offset >= 17, "Stream offset {} should be >= 17", offset);
282 }
283
284 #[test]
285 fn test_auto_overlap() {
286 let config = DlpConfig {
287 enabled: true,
288 custom_keywords: Some(vec!["VeryLongSecretKeyword123456789".to_string()]),
289 ..Default::default()
290 };
291 let scanner = Arc::new(DlpScanner::new(config.clone()));
292 let stream = StreamingScanner::with_auto_overlap(scanner, &config);
293
294 let expected_min = config.max_pattern_length() + OVERLAP_SAFETY_MARGIN;
296 assert!(
297 stream.overlap_size >= expected_min,
298 "overlap {} should be >= {}",
299 stream.overlap_size,
300 expected_min
301 );
302 }
303}