Skip to main content

fuzzy_regex/api/
streaming.rs

1//! Streaming API for fuzzy regex matching.
2//!
3//! This module provides types for processing text streams incrementally,
4//! allowing fuzzy regex matching on large files, network streams, or any
5//! byte source without loading everything into memory.
6//!
7//! # Example
8//!
9//! ```
10//! use fuzzy_regex::FuzzyRegex;
11//!
12//! let re = FuzzyRegex::new("(?:hello){e<=1}").unwrap();
13//! let mut stream = re.stream();
14//!
15//! // Feed chunks of data
16//! let chunk1 = b"This is a test with hel";
17//! let chunk2 = b"lo world in it";
18//!
19//! for m in stream.feed(chunk1) {
20//!     println!("Match at {}-{}", m.start(), m.end());
21//! }
22//! for m in stream.feed(chunk2) {
23//!     println!("Match at {}-{}", m.start(), m.end());
24//! }
25//!
26//! // Finish processing to get any remaining matches
27//! if let Some(m) = stream.finish() {
28//!     println!("Final match at {}-{}", m.start(), m.end());
29//! }
30//! ```
31
32use std::io::Read;
33
34use super::FuzzyRegex;
35use crate::engine::FuzzyBridge;
36
37/// A match found during streaming search.
38///
39/// Contains the byte offsets within the entire stream (not just the current chunk).
40#[derive(Debug, Clone)]
41pub struct StreamingMatch {
42    start: usize,
43    end: usize,
44    edits: u8,
45    similarity: f32,
46}
47
48impl StreamingMatch {
49    /// Create a new streaming match.
50    #[inline]
51    pub(crate) fn new(start: usize, end: usize, edits: u8, similarity: f32) -> Self {
52        Self {
53            start,
54            end,
55            edits,
56            similarity,
57        }
58    }
59
60    /// Get the start byte offset in the stream.
61    #[inline]
62    #[must_use]
63    pub fn start(&self) -> usize {
64        self.start
65    }
66
67    /// Get the end byte offset in the stream.
68    #[inline]
69    #[must_use]
70    pub fn end(&self) -> usize {
71        self.end
72    }
73
74    /// Get the number of edits (edit distance) for this match.
75    #[inline]
76    #[must_use]
77    pub fn edits(&self) -> u8 {
78        self.edits
79    }
80
81    /// Get the similarity score (0.0 to 1.0).
82    #[inline]
83    #[must_use]
84    pub fn similarity(&self) -> f32 {
85        self.similarity
86    }
87
88    /// Get the length of the match in bytes.
89    #[inline]
90    #[must_use]
91    pub fn len(&self) -> usize {
92        self.end - self.start
93    }
94
95    /// Check if the match is empty.
96    #[inline]
97    #[must_use]
98    pub fn is_empty(&self) -> bool {
99        self.start == self.end
100    }
101}
102
103/// A streaming matcher for incremental fuzzy regex matching.
104///
105/// This type maintains state across multiple `feed()` calls, allowing
106/// matches to span chunk boundaries.
107///
108/// # Example
109///
110/// ```
111/// use fuzzy_regex::FuzzyRegex;
112///
113/// let re = FuzzyRegex::new("(?:hello){e<=1}").unwrap();
114/// let mut stream = re.stream();
115///
116/// // Process data in chunks
117/// for chunk in [b"hel".as_slice(), b"lo world".as_slice()] {
118///     for m in stream.feed(chunk) {
119///         println!("Found match at offset {}", m.start());
120///     }
121/// }
122/// ```
123pub struct StreamingMatcher<'r> {
124    /// Reference to the compiled regex.
125    regex: &'r FuzzyRegex,
126    /// Buffer for potential cross-boundary matches.
127    buffer: Vec<u8>,
128    /// Total bytes processed (global offset).
129    global_offset: usize,
130    /// Maximum buffer size needed (`pattern_len` + `max_edits`).
131    max_buffer_size: usize,
132    /// Similarity threshold.
133    threshold: f32,
134    /// Collected matches from the last feed.
135    pending_matches: Vec<StreamingMatch>,
136}
137
138impl<'r> StreamingMatcher<'r> {
139    /// Create a new streaming matcher.
140    pub(crate) fn new(regex: &'r FuzzyRegex, threshold: f32) -> Self {
141        // Calculate buffer size based on pattern characteristics
142        let max_buffer_size =
143            regex.max_pattern_len().unwrap_or(64) + regex.max_edits().unwrap_or(2) as usize + 4; // Extra for UTF-8 boundaries
144
145        Self {
146            regex,
147            buffer: Vec::with_capacity(max_buffer_size),
148            global_offset: 0,
149            max_buffer_size,
150            threshold,
151            pending_matches: Vec::new(),
152        }
153    }
154
155    /// Feed a chunk of bytes into the matcher.
156    ///
157    /// Returns an iterator over matches found in this chunk (including
158    /// matches that span from the previous chunk).
159    pub fn feed(&mut self, chunk: &[u8]) -> FeedMatches<'_> {
160        self.pending_matches.clear();
161
162        if chunk.is_empty() {
163            return FeedMatches {
164                matches: &self.pending_matches,
165                index: 0,
166            };
167        }
168
169        // Combine buffer with new chunk for cross-boundary matching
170        let search_data: Vec<u8>;
171        let buffer_len = self.buffer.len();
172        let search_offset: usize;
173
174        if buffer_len > 0 {
175            // Prepend buffer to chunk
176            search_data = [&self.buffer[..], chunk].concat();
177            search_offset = self.global_offset - buffer_len;
178        } else {
179            search_data = chunk.to_vec();
180            search_offset = self.global_offset;
181        }
182
183        // Perform search on combined data
184        self.search_bytes(&search_data, search_offset, buffer_len);
185
186        // Update buffer for next chunk - keep last N bytes for cross-boundary matches
187        self.buffer.clear();
188        let keep_bytes = self.max_buffer_size.min(chunk.len());
189        if keep_bytes > 0 {
190            let start = chunk.len() - keep_bytes;
191            self.buffer.extend_from_slice(&chunk[start..]);
192        }
193
194        // Update global offset
195        self.global_offset += chunk.len();
196
197        FeedMatches {
198            matches: &self.pending_matches,
199            index: 0,
200        }
201    }
202
203    /// Signal end of stream and return any final match.
204    ///
205    /// Call this after all data has been fed to handle matches at the
206    /// end of the stream.
207    pub fn finish(&mut self) -> Option<StreamingMatch> {
208        if self.buffer.is_empty() {
209            return None;
210        }
211
212        // Search remaining buffer data
213        self.pending_matches.clear();
214        let search_offset = self.global_offset - self.buffer.len();
215        let buffer_copy = self.buffer.clone();
216        self.search_bytes(&buffer_copy, search_offset, 0);
217
218        self.buffer.clear();
219        self.pending_matches.pop()
220    }
221
222    /// Reset the matcher state for reuse.
223    pub fn reset(&mut self) {
224        self.buffer.clear();
225        self.global_offset = 0;
226        self.pending_matches.clear();
227    }
228
229    /// Get the current position in the stream (total bytes processed).
230    #[inline]
231    #[must_use]
232    pub fn position(&self) -> usize {
233        self.global_offset
234    }
235
236    /// Search bytes and collect matches.
237    fn search_bytes(&mut self, data: &[u8], base_offset: usize, skip_before: usize) {
238        // Use the fuzzy bridge for streaming search if available
239        if let Some(bridge) = self.regex.fuzzy_bridge() {
240            self.search_with_bridge(bridge, data, base_offset, skip_before);
241        } else {
242            // Fall back to string-based search
243            if let Ok(text) = std::str::from_utf8(data) {
244                self.search_string_fallback(text, base_offset, skip_before);
245            }
246        }
247    }
248
249    /// Search using the fuzzy bridge (Bitap streaming).
250    fn search_with_bridge(
251        &mut self,
252        bridge: &FuzzyBridge,
253        data: &[u8],
254        base_offset: usize,
255        skip_before: usize,
256    ) {
257        // Use multi-pattern streaming search
258        // Returns (pattern_idx, start, result) where result.end is the actual end
259        if let Some((_pattern_idx, start, result)) =
260            bridge.find_first_multi_pattern_individual(data, self.threshold, &[0])
261        {
262            // Include matches that:
263            // - End after skip_before (spans into new data), OR
264            // - Start at/after skip_before (entirely in new data)
265            // Skip matches that end within the buffer (already processed)
266            if result.end > skip_before {
267                self.pending_matches.push(StreamingMatch::new(
268                    base_offset + start,
269                    base_offset + result.end,
270                    result.total_edits(),
271                    result.similarity,
272                ));
273            }
274        }
275    }
276
277    /// Fallback search using string API.
278    fn search_string_fallback(&mut self, text: &str, base_offset: usize, skip_before: usize) {
279        if let Some(m) = self.regex.find(text) {
280            // Include matches that end after skip_before (spans into new data)
281            if m.end() > skip_before {
282                self.pending_matches.push(StreamingMatch::new(
283                    base_offset + m.start(),
284                    base_offset + m.end(),
285                    0,
286                    1.0,
287                ));
288            }
289        }
290    }
291
292    /// Process a reader, yielding matches.
293    ///
294    /// Reads the reader in chunks and yields matches as they are found.
295    ///
296    /// # Example
297    ///
298    /// ```no_run
299    /// use fuzzy_regex::FuzzyRegex;
300    /// use std::fs::File;
301    /// use std::io::BufReader;
302    ///
303    /// let re = FuzzyRegex::new("(?:hello){e<=1}").unwrap();
304    /// let mut stream = re.stream();
305    ///
306    /// let file = File::open("large_file.txt").unwrap();
307    /// for m in stream.search_reader(BufReader::new(file)) {
308    ///     println!("Match at {}-{}", m.start(), m.end());
309    /// }
310    /// ```
311    pub fn search_reader<R: Read>(self, reader: R) -> ReaderMatches<'r, R> {
312        ReaderMatches::new(self, reader)
313    }
314}
315
316/// Iterator over matches from a single `feed()` call.
317pub struct FeedMatches<'a> {
318    matches: &'a [StreamingMatch],
319    index: usize,
320}
321
322impl Iterator for FeedMatches<'_> {
323    type Item = StreamingMatch;
324
325    fn next(&mut self) -> Option<Self::Item> {
326        if self.index < self.matches.len() {
327            let m = self.matches[self.index].clone();
328            self.index += 1;
329            Some(m)
330        } else {
331            None
332        }
333    }
334}
335
336impl ExactSizeIterator for FeedMatches<'_> {
337    fn len(&self) -> usize {
338        self.matches.len() - self.index
339    }
340}
341
342/// Iterator over matches from a reader.
343pub struct ReaderMatches<'r, R: Read> {
344    matcher: StreamingMatcher<'r>,
345    reader: R,
346    buffer: Vec<u8>,
347    chunk_size: usize,
348    pending: Vec<StreamingMatch>,
349    pending_index: usize,
350    finished: bool,
351}
352
353impl<'r, R: Read> ReaderMatches<'r, R> {
354    fn new(matcher: StreamingMatcher<'r>, reader: R) -> Self {
355        let chunk_size = 8192; // 8KB chunks
356        Self {
357            matcher,
358            reader,
359            buffer: vec![0u8; chunk_size],
360            chunk_size,
361            pending: Vec::new(),
362            pending_index: 0,
363            finished: false,
364        }
365    }
366
367    /// Set the chunk size for reading.
368    #[must_use]
369    pub fn with_chunk_size(mut self, size: usize) -> Self {
370        self.chunk_size = size;
371        self.buffer = vec![0u8; size];
372        self
373    }
374}
375
376impl<R: Read> Iterator for ReaderMatches<'_, R> {
377    type Item = StreamingMatch;
378
379    fn next(&mut self) -> Option<Self::Item> {
380        loop {
381            // Return pending matches first
382            if self.pending_index < self.pending.len() {
383                let m = self.pending[self.pending_index].clone();
384                self.pending_index += 1;
385                return Some(m);
386            }
387
388            if self.finished {
389                return None;
390            }
391
392            // Read next chunk
393            match self.reader.read(&mut self.buffer) {
394                Ok(0) => {
395                    // End of stream
396                    self.finished = true;
397                    if let Some(m) = self.matcher.finish() {
398                        return Some(m);
399                    }
400                    return None;
401                }
402                Ok(n) => {
403                    // Process chunk
404                    self.pending.clear();
405                    self.pending_index = 0;
406                    for m in self.matcher.feed(&self.buffer[..n]) {
407                        self.pending.push(m);
408                    }
409                }
410                Err(_) => {
411                    self.finished = true;
412                    return None;
413                }
414            }
415        }
416    }
417}
418
419/// Iterator over matches in a byte slice (non-streaming).
420pub struct ByteMatches<'r, 't> {
421    regex: &'r FuzzyRegex,
422    text: &'t [u8],
423    last_end: usize,
424}
425
426impl<'r, 't> ByteMatches<'r, 't> {
427    pub(crate) fn new(regex: &'r FuzzyRegex, text: &'t [u8]) -> Self {
428        Self {
429            regex,
430            text,
431            last_end: 0,
432        }
433    }
434}
435
436impl Iterator for ByteMatches<'_, '_> {
437    type Item = StreamingMatch;
438
439    fn next(&mut self) -> Option<Self::Item> {
440        if self.last_end >= self.text.len() {
441            return None;
442        }
443
444        // Try to search from last_end
445        let search_slice = &self.text[self.last_end..];
446
447        // Use fuzzy bridge if available
448        if let Some(bridge) = self.regex.fuzzy_bridge() {
449            // Returns (pattern_idx, start, result) where result.end is the actual end
450            if let Some((_pattern_idx, start, result)) =
451                bridge.find_first_multi_pattern_individual(search_slice, 0.0, &[0])
452            {
453                let abs_start = self.last_end + start;
454                let abs_end = self.last_end + result.end;
455                self.last_end = abs_end.max(self.last_end + 1);
456                return Some(StreamingMatch::new(
457                    abs_start,
458                    abs_end,
459                    result.total_edits(),
460                    result.similarity,
461                ));
462            }
463        } else {
464            // Fall back to string API
465            if let Ok(text) = std::str::from_utf8(search_slice)
466                && let Some(m) = self.regex.find(text)
467            {
468                let abs_start = self.last_end + m.start();
469                let abs_end = self.last_end + m.end();
470                self.last_end = abs_end.max(self.last_end + 1);
471                return Some(StreamingMatch::new(abs_start, abs_end, 0, 1.0));
472            }
473        }
474
475        None
476    }
477}