Skip to main content

fhp_tokenizer/
streaming.rs

1//! Streaming (chunk-based) tokenizer.
2//!
3//! [`StreamTokenizer`](crate::streaming::StreamTokenizer) processes input in arbitrary-sized chunks, carrying
4//! state across chunk boundaries. This enables parsing large files or
5//! network streams without loading the entire document into memory.
6
7use crate::extract::extract_tokens;
8use crate::structural::StructuralIndexer;
9use crate::token::Token;
10use fhp_core::tag::Tag;
11
12/// Maximum size of the residual buffer.
13///
14/// When a chunk boundary falls in the middle of a tag, we buffer up to
15/// this many bytes and prepend them to the next chunk.
16const MAX_RESIDUAL: usize = 4096;
17
18/// A streaming tokenizer that processes input chunk by chunk.
19///
20/// Maintains internal state so that token boundaries that span chunk
21/// boundaries are handled correctly.
22///
23/// # Example
24///
25/// ```
26/// use fhp_tokenizer::streaming::StreamTokenizer;
27/// use fhp_tokenizer::token::Token;
28///
29/// let mut tokenizer = StreamTokenizer::new();
30/// let mut all_tokens: Vec<Token<'static>> = Vec::new();
31///
32/// let html = b"<div>hello</div>";
33/// // Feed in small chunks.
34/// let owned = tokenizer.feed(&html[..5]);
35/// all_tokens.extend(owned);
36/// let owned = tokenizer.feed(&html[5..]);
37/// all_tokens.extend(owned);
38/// let owned = tokenizer.finish();
39/// all_tokens.extend(owned);
40///
41/// assert!(all_tokens.iter().any(|t| matches!(t, Token::OpenTag { .. })));
42/// ```
43pub struct StreamTokenizer {
44    indexer: StructuralIndexer,
45    /// Residual bytes from the previous chunk (partial tag).
46    residual: Vec<u8>,
47    /// Reusable working buffer to avoid per-feed allocation.
48    working: Vec<u8>,
49}
50
51#[derive(Clone, Copy, Debug, PartialEq, Eq)]
52struct SplitScan {
53    split: usize,
54    in_raw_text_context: bool,
55}
56
57impl StreamTokenizer {
58    /// Create a new streaming tokenizer.
59    pub fn new() -> Self {
60        Self {
61            indexer: StructuralIndexer::new(),
62            residual: Vec::with_capacity(256),
63            working: Vec::with_capacity(4096),
64        }
65    }
66
67    /// Feed a chunk of UTF-8 input and return any complete tokens.
68    ///
69    /// Tokens are returned as owned (`'static` lifetime) since the chunk
70    /// data may not live long enough. Text content is cloned into `Cow::Owned`.
71    pub fn feed(&mut self, chunk: &[u8]) -> Vec<Token<'static>> {
72        if chunk.is_empty() {
73            return Vec::new();
74        }
75
76        // Combine residual + new chunk into reusable working buffer.
77        // Take ownership to avoid borrow conflicts with process_chunk.
78        let mut working = std::mem::take(&mut self.working);
79        working.clear();
80        working.extend_from_slice(&self.residual);
81        working.extend_from_slice(chunk);
82        self.residual.clear();
83
84        // Find the last safe split point.
85        // Safe = end of a '>' that's not inside a string.
86        let scan = scan_safe_split(&working);
87        let split = scan.split;
88
89        if split == 0 {
90            // No complete tag boundary — buffer everything.
91            if working.len() > MAX_RESIDUAL && !scan.in_raw_text_context {
92                // Too large to buffer — force-process what we have.
93                let tokens = self.process_chunk(&working);
94                self.working = working;
95                return tokens;
96            }
97            // Swap: working becomes the residual, residual (now empty) becomes the
98            // reusable working buffer — no new allocation.
99            std::mem::swap(&mut self.residual, &mut working);
100            self.working = working;
101            return Vec::new();
102        }
103
104        // Process the safe portion.
105        let tokens = self.process_chunk(&working[..split]);
106
107        // Buffer the rest.
108        self.residual.extend_from_slice(&working[split..]);
109
110        // Return the working buffer for reuse.
111        self.working = working;
112
113        tokens
114    }
115
116    /// Feed a UTF-8 chunk and process complete tokens via callback without cloning.
117    ///
118    /// This path is intended for internal high-throughput consumers (e.g. tree
119    /// building) that can consume tokens immediately.
120    pub fn feed_str_with(&mut self, chunk: &str, mut on_token: impl FnMut(&Token<'_>)) {
121        if chunk.is_empty() {
122            return;
123        }
124
125        // Combine residual + new chunk into reusable working buffer.
126        let mut working = std::mem::take(&mut self.working);
127        working.clear();
128        working.extend_from_slice(&self.residual);
129        working.extend_from_slice(chunk.as_bytes());
130        self.residual.clear();
131
132        let scan = scan_safe_split(&working);
133        let split = scan.split;
134
135        if split == 0 {
136            if working.len() > MAX_RESIDUAL && !scan.in_raw_text_context {
137                // Too large to buffer — force-process what we have.
138                match std::str::from_utf8(&working) {
139                    Ok(text) => {
140                        let tokens = self.process_chunk_borrowed(text);
141                        for token in &tokens {
142                            on_token(token);
143                        }
144                    }
145                    Err(_) => {
146                        let text = String::from_utf8_lossy(&working).into_owned();
147                        let tokens = self.process_chunk_borrowed(&text);
148                        for token in &tokens {
149                            on_token(token);
150                        }
151                    }
152                }
153                self.working = working;
154                return;
155            }
156            std::mem::swap(&mut self.residual, &mut working);
157            self.working = working;
158            return;
159        }
160
161        match std::str::from_utf8(&working[..split]) {
162            Ok(text) => {
163                let tokens = self.process_chunk_borrowed(text);
164                for token in &tokens {
165                    on_token(token);
166                }
167            }
168            Err(_) => {
169                let text = String::from_utf8_lossy(&working[..split]).into_owned();
170                let tokens = self.process_chunk_borrowed(&text);
171                for token in &tokens {
172                    on_token(token);
173                }
174            }
175        }
176
177        self.residual.extend_from_slice(&working[split..]);
178        self.working = working;
179    }
180
181    /// Signal end of input and flush any remaining buffered data.
182    pub fn finish(&mut self) -> Vec<Token<'static>> {
183        if self.residual.is_empty() {
184            return Vec::new();
185        }
186        let remaining = std::mem::take(&mut self.residual);
187        self.process_chunk(&remaining)
188    }
189
190    /// Signal end of input and flush buffered tokens via callback without cloning.
191    pub fn finish_with(&mut self, mut on_token: impl FnMut(&Token<'_>)) {
192        if self.residual.is_empty() {
193            return;
194        }
195        let remaining = std::mem::take(&mut self.residual);
196        match std::str::from_utf8(&remaining) {
197            Ok(text) => {
198                let tokens = self.process_chunk_borrowed(text);
199                for token in &tokens {
200                    on_token(token);
201                }
202            }
203            Err(_) => {
204                let text = String::from_utf8_lossy(&remaining).into_owned();
205                let tokens = self.process_chunk_borrowed(&text);
206                for token in &tokens {
207                    on_token(token);
208                }
209            }
210        }
211    }
212
213    /// Process a complete chunk through the structural indexer + extractor.
214    fn process_chunk(&mut self, data: &[u8]) -> Vec<Token<'static>> {
215        match std::str::from_utf8(data) {
216            Ok(text) => {
217                let index = self.indexer.index(text.as_bytes());
218                let tokens = extract_tokens(text, &index);
219                tokens.into_iter().map(to_owned_token).collect()
220            }
221            Err(_) => {
222                let text = String::from_utf8_lossy(data).into_owned();
223                let index = self.indexer.index(text.as_bytes());
224                let tokens = extract_tokens(&text, &index);
225                tokens.into_iter().map(to_owned_token).collect()
226            }
227        }
228    }
229
230    /// Process a complete UTF-8 chunk and return borrowed tokens.
231    fn process_chunk_borrowed<'a>(&mut self, data: &'a str) -> Vec<Token<'a>> {
232        let index = self.indexer.index(data.as_bytes());
233        extract_tokens(data, &index)
234    }
235}
236
237impl Default for StreamTokenizer {
238    fn default() -> Self {
239        Self::new()
240    }
241}
242
243fn scan_safe_split(data: &[u8]) -> SplitScan {
244    #[derive(Clone, Copy)]
245    enum Mode {
246        Data,
247        Tag {
248            quote: Option<u8>,
249            open: usize,
250            raw_text_close: Option<Tag>,
251        },
252        Doctype {
253            quote: Option<u8>,
254        },
255        Comment,
256        CData,
257    }
258
259    let mut mode = Mode::Data;
260    let mut raw_text = None;
261    let mut i = 0usize;
262    let mut last_safe = 0usize;
263
264    while i < data.len() {
265        match mode {
266            Mode::Data => {
267                if let Some(tag) = raw_text {
268                    if data[i] == b'<' && is_raw_text_close(data, i, tag) {
269                        mode = Mode::Tag {
270                            quote: None,
271                            open: i,
272                            raw_text_close: Some(tag),
273                        };
274                    }
275                    i += 1;
276                    continue;
277                }
278
279                if data[i] == b'<' {
280                    // <!-- ... -->
281                    if i + 3 < data.len() && &data[i..i + 4] == b"<!--" {
282                        mode = Mode::Comment;
283                        i += 4;
284                        continue;
285                    }
286
287                    // <![CDATA[ ... ]]>
288                    if i + 8 < data.len() && &data[i..i + 9] == b"<![CDATA[" {
289                        mode = Mode::CData;
290                        i += 9;
291                        continue;
292                    }
293
294                    if i + 1 < data.len() {
295                        let next = data[i + 1];
296                        // <!DOCTYPE ...> or other <! ... >
297                        if next == b'!' {
298                            mode = Mode::Doctype { quote: None };
299                            i += 2;
300                            continue;
301                        }
302                        // Normal open/close tags. Ignore stray '<' in text.
303                        if next == b'/'
304                            || next.is_ascii_alphabetic()
305                            || next == b'_'
306                            || next == b'?'
307                        {
308                            mode = Mode::Tag {
309                                quote: None,
310                                open: i,
311                                raw_text_close: None,
312                            };
313                            i += 1;
314                            continue;
315                        }
316                    }
317                }
318                i += 1;
319            }
320            Mode::Tag {
321                mut quote,
322                open,
323                raw_text_close,
324            } => {
325                if let Some(q) = quote {
326                    if data[i] == q {
327                        quote = None;
328                    }
329                    mode = Mode::Tag {
330                        quote,
331                        open,
332                        raw_text_close,
333                    };
334                    i += 1;
335                    continue;
336                }
337                match data[i] {
338                    b'"' | b'\'' => {
339                        mode = Mode::Tag {
340                            quote: Some(data[i]),
341                            open,
342                            raw_text_close,
343                        };
344                        i += 1;
345                    }
346                    b'>' => {
347                        if raw_text_close.is_some() {
348                            last_safe = i + 1;
349                            raw_text = None;
350                        } else if let Some(tag) = raw_text_open_tag(&data[open + 1..i]) {
351                            raw_text = Some(tag);
352                        } else {
353                            last_safe = i + 1;
354                        }
355                        mode = Mode::Data;
356                        i += 1;
357                    }
358                    _ => i += 1,
359                }
360            }
361            Mode::Doctype { mut quote } => {
362                if let Some(q) = quote {
363                    if data[i] == q {
364                        quote = None;
365                    }
366                    mode = Mode::Doctype { quote };
367                    i += 1;
368                    continue;
369                }
370                match data[i] {
371                    b'"' | b'\'' => {
372                        mode = Mode::Doctype {
373                            quote: Some(data[i]),
374                        };
375                        i += 1;
376                    }
377                    b'>' => {
378                        last_safe = i + 1;
379                        mode = Mode::Data;
380                        i += 1;
381                    }
382                    _ => i += 1,
383                }
384            }
385            Mode::Comment => {
386                if i + 2 < data.len()
387                    && data[i] == b'-'
388                    && data[i + 1] == b'-'
389                    && data[i + 2] == b'>'
390                {
391                    last_safe = i + 3;
392                    mode = Mode::Data;
393                    i += 3;
394                } else {
395                    i += 1;
396                }
397            }
398            Mode::CData => {
399                if i + 2 < data.len()
400                    && data[i] == b']'
401                    && data[i + 1] == b']'
402                    && data[i + 2] == b'>'
403                {
404                    last_safe = i + 3;
405                    mode = Mode::Data;
406                    i += 3;
407                } else {
408                    i += 1;
409                }
410            }
411        }
412    }
413
414    SplitScan {
415        split: last_safe,
416        in_raw_text_context: raw_text.is_some()
417            || matches!(
418                mode,
419                Mode::Tag {
420                    raw_text_close: Some(_),
421                    ..
422                }
423            ),
424    }
425}
426
427fn raw_text_open_tag(tag_body: &[u8]) -> Option<Tag> {
428    if tag_body.is_empty() || tag_body[0] == b'/' {
429        return None;
430    }
431
432    let mut end = tag_body.len();
433    while end > 0 && tag_body[end - 1].is_ascii_whitespace() {
434        end -= 1;
435    }
436    if end == 0 || tag_body[end - 1] == b'/' {
437        return None;
438    }
439
440    let mut name_end = 0usize;
441    while name_end < end && !tag_body[name_end].is_ascii_whitespace() && tag_body[name_end] != b'/'
442    {
443        name_end += 1;
444    }
445    if name_end == 0 {
446        return None;
447    }
448
449    let tag = Tag::from_bytes(&tag_body[..name_end]);
450    tag.is_raw_text().then_some(tag)
451}
452
453fn is_raw_text_close(data: &[u8], pos: usize, tag: Tag) -> bool {
454    let remaining = &data[pos..];
455    if remaining.len() < 3 || remaining[1] != b'/' {
456        return false;
457    }
458
459    let tag_name = tag.as_str().unwrap_or("");
460    let name_len = tag_name.len();
461    if remaining.len() < 2 + name_len + 1 {
462        return false;
463    }
464
465    let candidate = &remaining[2..2 + name_len];
466    if !candidate.eq_ignore_ascii_case(tag_name.as_bytes()) {
467        return false;
468    }
469
470    let after = remaining[2 + name_len];
471    after == b'>' || after.is_ascii_whitespace()
472}
473
474/// Convert a borrowed token to an owned ('static) token.
475fn to_owned_token(token: Token<'_>) -> Token<'static> {
476    match token {
477        Token::OpenTag {
478            tag,
479            name,
480            attributes,
481            self_closing,
482        } => Token::OpenTag {
483            tag,
484            name: std::borrow::Cow::Owned(name.into_owned()),
485            attributes: attributes.into_iter().map(to_owned_attr).collect(),
486            self_closing,
487        },
488        Token::CloseTag { tag, name } => Token::CloseTag {
489            tag,
490            name: std::borrow::Cow::Owned(name.into_owned()),
491        },
492        Token::Text { content } => Token::Text {
493            content: std::borrow::Cow::Owned(content.into_owned()),
494        },
495        Token::Comment { content } => Token::Comment {
496            content: std::borrow::Cow::Owned(content.into_owned()),
497        },
498        Token::Doctype { content } => Token::Doctype {
499            content: std::borrow::Cow::Owned(content.into_owned()),
500        },
501        Token::CData { content } => Token::CData {
502            content: std::borrow::Cow::Owned(content.into_owned()),
503        },
504    }
505}
506
507/// Convert a borrowed attribute to owned.
508fn to_owned_attr(attr: crate::token::Attribute<'_>) -> crate::token::Attribute<'static> {
509    crate::token::Attribute {
510        name: std::borrow::Cow::Owned(attr.name.into_owned()),
511        value: attr.value.map(|v| std::borrow::Cow::Owned(v.into_owned())),
512    }
513}
514
515#[cfg(test)]
516mod tests {
517    use super::*;
518
519    #[test]
520    fn single_chunk() {
521        let mut tok = StreamTokenizer::new();
522        let tokens = tok.feed(b"<div>hello</div>");
523        let final_tokens = tok.finish();
524
525        let all: Vec<_> = tokens.into_iter().chain(final_tokens).collect();
526        assert!(all.iter().any(|t| matches!(t, Token::OpenTag { .. })));
527        assert!(all.iter().any(|t| matches!(t, Token::CloseTag { .. })));
528    }
529
530    #[test]
531    fn multi_chunk() {
532        let html = b"<div>hello</div>";
533        let mut tok = StreamTokenizer::new();
534        let mut all = Vec::new();
535
536        // Feed byte by byte.
537        for &b in html.iter() {
538            all.extend(tok.feed(&[b]));
539        }
540        all.extend(tok.finish());
541
542        let has_open = all.iter().any(|t| matches!(t, Token::OpenTag { .. }));
543        let has_close = all.iter().any(|t| matches!(t, Token::CloseTag { .. }));
544        let has_text = all.iter().any(|t| matches!(t, Token::Text { .. }));
545
546        assert!(has_open, "should have open tag");
547        assert!(has_close, "should have close tag");
548        assert!(has_text, "should have text");
549    }
550
551    #[test]
552    fn chunk_size_7() {
553        let html = b"<div class=\"test\">hello world</div>";
554        let mut tok = StreamTokenizer::new();
555        let mut all = Vec::new();
556
557        for chunk in html.chunks(7) {
558            all.extend(tok.feed(chunk));
559        }
560        all.extend(tok.finish());
561
562        assert!(all.iter().any(|t| matches!(t, Token::OpenTag { .. })));
563        assert!(all.iter().any(|t| matches!(t, Token::CloseTag { .. })));
564    }
565
566    #[test]
567    fn chunk_size_64() {
568        let html = b"<html><head><title>Test</title></head><body><div class=\"main\"><p>Hello</p></div></body></html>";
569        let mut tok = StreamTokenizer::new();
570        let mut all = Vec::new();
571
572        for chunk in html.chunks(64) {
573            all.extend(tok.feed(chunk));
574        }
575        all.extend(tok.finish());
576
577        let open_count = all
578            .iter()
579            .filter(|t| matches!(t, Token::OpenTag { .. }))
580            .count();
581        assert!(open_count >= 5, "should have multiple open tags");
582    }
583
584    #[test]
585    fn empty_chunks() {
586        let mut tok = StreamTokenizer::new();
587        let t1 = tok.feed(b"");
588        let t2 = tok.feed(b"<br/>");
589        let t3 = tok.feed(b"");
590        let t4 = tok.finish();
591
592        let all: Vec<_> = t1.into_iter().chain(t2).chain(t3).chain(t4).collect();
593        assert!(all.iter().any(|t| matches!(t, Token::OpenTag { .. })));
594    }
595
596    #[test]
597    fn find_safe_split_basic() {
598        assert_eq!(scan_safe_split(b"<div>hello</div>").split, 16);
599        assert_eq!(scan_safe_split(b"<div>hello").split, 5);
600        assert_eq!(scan_safe_split(b"hello").split, 0);
601    }
602
603    #[test]
604    fn find_safe_split_buffers_open_raw_text_context() {
605        let scan = scan_safe_split(b"<div><script>if(a<b)");
606
607        assert_eq!(scan.split, 5);
608        assert!(scan.in_raw_text_context);
609    }
610
611    #[test]
612    fn raw_text_split_after_script_open() {
613        let mut tok = StreamTokenizer::new();
614        let mut all = Vec::new();
615
616        all.extend(tok.feed(b"<script>"));
617        all.extend(tok.feed(b"if(a<b)"));
618        all.extend(tok.feed(b"{x()}</script>"));
619        all.extend(tok.finish());
620
621        let open_tags: Vec<_> = all
622            .iter()
623            .filter_map(|token| match token {
624                Token::OpenTag { tag, .. } => Some(*tag),
625                _ => None,
626            })
627            .collect();
628        let close_tags: Vec<_> = all
629            .iter()
630            .filter_map(|token| match token {
631                Token::CloseTag { tag, .. } => Some(*tag),
632                _ => None,
633            })
634            .collect();
635        let text: Vec<_> = all
636            .iter()
637            .filter_map(|token| match token {
638                Token::Text { content } => Some(content.as_ref()),
639                _ => None,
640            })
641            .collect();
642
643        assert_eq!(open_tags, vec![Tag::Script]);
644        assert_eq!(close_tags, vec![Tag::Script]);
645        assert_eq!(text, vec!["if(a<b){x()}"]);
646    }
647}