Skip to main content

fhp_tree/
streaming.rs

1//! Streaming and incremental HTML parsing.
2//!
3//! [`StreamParser`](crate::streaming::StreamParser) builds a DOM tree incrementally from byte chunks,
4//! handling encoding detection automatically. It buffers the first 1 KB
5//! of input (matching the meta prescan limit) before detecting encoding,
6//! then decodes and tokenizes all data into the tree.
7//!
8//! [`EarlyStopParser`](crate::streaming::EarlyStopParser) adds predicate-based early termination — the parse
9//! stops as soon as a matching node is found, saving work on large documents.
10//!
11//! # Example
12//!
13//! ```
14//! use fhp_tree::streaming::{StreamParser, parse_stream};
15//!
16//! let html = b"<div><p>Hello</p></div>";
17//! let doc = parse_stream(html.chunks(7)).unwrap();
18//! assert_eq!(doc.root().text_content(), "Hello");
19//! ```
20
21use encoding_rs::Encoding;
22use fhp_core::error::EncodingError;
23use fhp_tokenizer::streaming::StreamTokenizer;
24
25use crate::builder::TreeBuilder;
26use crate::node::{Node, NodeId};
27use crate::{Document, HtmlError, MAX_INPUT_SIZE};
28
29/// Maximum bytes to buffer before encoding detection (matches prescan limit).
30const PRESCAN_LIMIT: usize = 1024;
31
32/// Status returned by [`EarlyStopParser::feed`].
33pub enum ParseStatus {
34    /// More data is needed to satisfy the predicate.
35    NeedMore,
36    /// A node matching the predicate was found.
37    Found(NodeId),
38    /// Parsing finished without finding a match.
39    Done(Document),
40}
41
42impl core::fmt::Debug for ParseStatus {
43    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
44        match self {
45            Self::NeedMore => write!(f, "NeedMore"),
46            Self::Found(id) => write!(f, "Found({id:?})"),
47            Self::Done(doc) => write!(f, "Done(Document {{ nodes: {} }})", doc.node_count()),
48        }
49    }
50}
51
52/// A streaming HTML parser that builds a DOM tree incrementally.
53///
54/// Buffers the first 1 KB of input for encoding detection (matching the
55/// HTML spec's meta prescan limit), then decodes and tokenizes all
56/// subsequent chunks into the tree.
57///
58/// # Example
59///
60/// ```
61/// use fhp_tree::streaming::StreamParser;
62///
63/// let mut parser = StreamParser::new();
64/// parser.feed(b"<div>");
65/// parser.feed(b"<p>Hello</p>");
66/// parser.feed(b"</div>");
67/// let doc = parser.finish().unwrap();
68/// assert_eq!(doc.root().text_content(), "Hello");
69/// ```
70pub struct StreamParser {
71    tokenizer: StreamTokenizer,
72    builder: TreeBuilder,
73    decoder: Option<encoding_rs::Decoder>,
74    detected_encoding: Option<&'static Encoding>,
75    /// Buffered initial bytes before encoding detection.
76    initial_buf: Vec<u8>,
77    /// Whether encoding has been detected and initial buffer flushed.
78    encoding_detected: bool,
79    /// Total raw input bytes received.
80    seen_input_size: usize,
81    /// Maximum allowed input size.
82    max_input_size: usize,
83    /// Input exceeded `max_input_size`.
84    input_too_large: bool,
85    /// First decoding error seen during streaming decode.
86    decode_error: Option<EncodingError>,
87    /// Approximate input byte offset consumed by decoder.
88    decoded_input_offset: usize,
89}
90
91impl StreamParser {
92    /// Create a new streaming parser.
93    pub fn new() -> Self {
94        Self {
95            tokenizer: StreamTokenizer::new(),
96            builder: TreeBuilder::new(),
97            decoder: None,
98            detected_encoding: None,
99            initial_buf: Vec::with_capacity(PRESCAN_LIMIT),
100            encoding_detected: false,
101            seen_input_size: 0,
102            max_input_size: MAX_INPUT_SIZE,
103            input_too_large: false,
104            decode_error: None,
105            decoded_input_offset: 0,
106        }
107    }
108
109    /// Feed a chunk of raw bytes into the parser.
110    ///
111    /// Initial chunks are buffered until 1 KB is reached for encoding
112    /// detection. After that, each chunk is decoded and processed
113    /// immediately.
114    pub fn feed(&mut self, chunk: &[u8]) {
115        if chunk.is_empty() || self.input_too_large {
116            return;
117        }
118
119        self.seen_input_size = self.seen_input_size.saturating_add(chunk.len());
120        if self.seen_input_size > self.max_input_size {
121            self.input_too_large = true;
122            return;
123        }
124
125        if !self.encoding_detected {
126            self.initial_buf.extend_from_slice(chunk);
127            if self.initial_buf.len() >= PRESCAN_LIMIT {
128                self.flush_initial_buf();
129            }
130            return;
131        }
132
133        let text = self.decode_bytes(chunk, false);
134        self.process_text(&text);
135    }
136
137    /// Finish parsing and return the completed document.
138    ///
139    /// If encoding hasn't been detected yet (total input < 1 KB), detection
140    /// happens now.
141    pub fn finish(mut self) -> Result<Document, HtmlError> {
142        if self.input_too_large {
143            return Err(HtmlError::InputTooLarge {
144                size: self.seen_input_size,
145                max: self.max_input_size,
146            });
147        }
148
149        // If encoding was never detected, do it now with whatever we have.
150        if !self.encoding_detected {
151            self.flush_initial_buf();
152        }
153
154        if let Some(err) = self.decode_error.take() {
155            return Err(HtmlError::Encoding(err));
156        }
157
158        // Signal end-of-stream to the decoder for any trailing bytes.
159        let trailing = self.decode_bytes(&[], true);
160        if !trailing.is_empty() {
161            self.process_text(&trailing);
162        }
163
164        if let Some(err) = self.decode_error.take() {
165            return Err(HtmlError::Encoding(err));
166        }
167
168        // Flush the tokenizer.
169        {
170            let tokenizer = &mut self.tokenizer;
171            let builder = &mut self.builder;
172            tokenizer.finish_with(|token| {
173                builder.process(token);
174            });
175        }
176
177        let (arena, root) = self.builder.finish();
178        Ok(Document { arena, root })
179    }
180
181    /// Detect encoding from the initial buffer and process its contents.
182    fn flush_initial_buf(&mut self) {
183        let buf = std::mem::take(&mut self.initial_buf);
184        let encoding = fhp_encoding::detect(&buf);
185        let bom_len = bom_length(&buf, encoding);
186        self.decoder = Some(encoding.new_decoder_without_bom_handling());
187        self.detected_encoding = Some(encoding);
188        self.encoding_detected = true;
189
190        let data = &buf[bom_len..];
191        if !data.is_empty() {
192            let text = self.decode_bytes(data, false);
193            self.process_text(&text);
194        }
195    }
196
197    /// Decode raw bytes to a UTF-8 string using the stateful decoder.
198    fn decode_bytes(&mut self, bytes: &[u8], last: bool) -> String {
199        let decoder = self.decoder.as_mut().expect("decoder not initialized");
200        let max_len = decoder
201            .max_utf8_buffer_length(bytes.len())
202            .unwrap_or(bytes.len() * 4 + 4);
203        let mut output = String::with_capacity(max_len);
204
205        let mut pos = 0;
206        loop {
207            let (result, read, had_errors) =
208                decoder.decode_to_string(&bytes[pos..], &mut output, last);
209            if had_errors && self.decode_error.is_none() {
210                let encoding = self.detected_encoding.unwrap_or(encoding_rs::UTF_8);
211                self.decode_error = Some(EncodingError::MalformedInput {
212                    encoding: encoding.name(),
213                    offset: self.decoded_input_offset.saturating_add(pos),
214                });
215            }
216            pos += read;
217            match result {
218                encoding_rs::CoderResult::InputEmpty => break,
219                encoding_rs::CoderResult::OutputFull => {
220                    let additional = decoder
221                        .max_utf8_buffer_length(bytes.len() - pos)
222                        .unwrap_or(16);
223                    output.reserve(additional);
224                }
225            }
226        }
227
228        self.decoded_input_offset = self.decoded_input_offset.saturating_add(bytes.len());
229        output
230    }
231
232    /// Feed decoded text to the tokenizer, then process tokens with the builder.
233    fn process_text(&mut self, text: &str) {
234        let tokenizer = &mut self.tokenizer;
235        let builder = &mut self.builder;
236        tokenizer.feed_str_with(text, |token| {
237            builder.process(token);
238        });
239    }
240}
241
242impl Default for StreamParser {
243    fn default() -> Self {
244        Self::new()
245    }
246}
247
248/// A streaming parser that stops as soon as a predicate matches a node.
249///
250/// Useful for extracting a single element from a large HTML document
251/// without parsing the entire thing.
252///
253/// # Example
254///
255/// ```
256/// use fhp_tree::streaming::EarlyStopParser;
257/// use fhp_tree::streaming::ParseStatus;
258/// use fhp_core::tag::Tag;
259///
260/// let mut parser = EarlyStopParser::stop_when(|node| node.tag == Tag::A);
261/// let html = b"<div><p>text</p><a href=\"#\">link</a><span>more</span></div>";
262///
263/// match parser.feed(html) {
264///     ParseStatus::Found(id) => {
265///         // Found the <a> tag — no need to parse <span>more</span>.
266///     }
267///     _ => panic!("expected Found"),
268/// }
269/// ```
270pub struct EarlyStopParser {
271    tokenizer: StreamTokenizer,
272    builder: TreeBuilder,
273    decoder: Option<encoding_rs::Decoder>,
274    encoding_detected: bool,
275    predicate: Box<dyn Fn(&Node) -> bool>,
276    found: Option<NodeId>,
277}
278
279impl EarlyStopParser {
280    /// Create a parser that stops when `predicate` returns `true` for a node.
281    pub fn stop_when(predicate: impl Fn(&Node) -> bool + 'static) -> Self {
282        Self {
283            tokenizer: StreamTokenizer::new(),
284            builder: TreeBuilder::new(),
285            decoder: None,
286            encoding_detected: false,
287            predicate: Box::new(predicate),
288            found: None,
289        }
290    }
291
292    /// Feed a chunk of raw bytes and check the predicate against new nodes.
293    ///
294    /// Returns [`ParseStatus::Found`] as soon as a matching node is created,
295    /// or [`ParseStatus::NeedMore`] if the predicate hasn't matched yet.
296    ///
297    /// Encoding is detected eagerly from the first non-empty chunk to
298    /// minimise latency. For accurate meta-charset detection with small
299    /// chunks, use [`StreamParser`] instead.
300    pub fn feed(&mut self, chunk: &[u8]) -> ParseStatus {
301        if let Some(id) = self.found {
302            return ParseStatus::Found(id);
303        }
304
305        if chunk.is_empty() {
306            return ParseStatus::NeedMore;
307        }
308
309        // Detect encoding eagerly from the first chunk.
310        if !self.encoding_detected {
311            let encoding = fhp_encoding::detect(chunk);
312            let bom_len = bom_length(chunk, encoding);
313            self.decoder = Some(encoding.new_decoder_without_bom_handling());
314            self.encoding_detected = true;
315
316            let data = &chunk[bom_len..];
317            if data.is_empty() {
318                return ParseStatus::NeedMore;
319            }
320            let text = self.decode_bytes(data, false);
321            return self.process_and_check(&text);
322        }
323
324        let text = self.decode_bytes(chunk, false);
325        self.process_and_check(&text)
326    }
327
328    /// Finish parsing and return the document.
329    ///
330    /// If the predicate was already matched, returns [`ParseStatus::Found`].
331    /// Otherwise returns [`ParseStatus::Done`] with the complete document.
332    pub fn finish(mut self) -> ParseStatus {
333        if let Some(id) = self.found {
334            return ParseStatus::Found(id);
335        }
336
337        // If nothing was ever fed, initialise a UTF-8 decoder.
338        if !self.encoding_detected {
339            self.decoder = Some(encoding_rs::UTF_8.new_decoder_without_bom_handling());
340            self.encoding_detected = true;
341        }
342
343        // Signal end-of-stream to decoder.
344        let trailing = self.decode_bytes(&[], true);
345        if !trailing.is_empty() {
346            if let ParseStatus::Found(id) = self.process_and_check(&trailing) {
347                return ParseStatus::Found(id);
348            }
349        }
350
351        // Flush tokenizer.
352        {
353            let tokenizer = &mut self.tokenizer;
354            let builder = &mut self.builder;
355            let predicate = &self.predicate;
356            let found = &mut self.found;
357            tokenizer.finish_with(|token| {
358                if found.is_some() {
359                    return;
360                }
361                if let Some(node_id) = builder.process(token) {
362                    let node = builder.arena.get(node_id);
363                    if predicate(node) {
364                        *found = Some(node_id);
365                    }
366                }
367            });
368        }
369        if let Some(id) = self.found {
370            return ParseStatus::Found(id);
371        }
372
373        let (arena, root) = self.builder.finish();
374        ParseStatus::Done(Document { arena, root })
375    }
376
377    /// Decode, tokenize, build, and check predicate. Returns Found or NeedMore.
378    fn process_and_check(&mut self, text: &str) -> ParseStatus {
379        {
380            let tokenizer = &mut self.tokenizer;
381            let builder = &mut self.builder;
382            let predicate = &self.predicate;
383            let found = &mut self.found;
384            tokenizer.feed_str_with(text, |token| {
385                if found.is_some() {
386                    return;
387                }
388                if let Some(node_id) = builder.process(token) {
389                    let node = builder.arena.get(node_id);
390                    if predicate(node) {
391                        *found = Some(node_id);
392                    }
393                }
394            });
395        }
396        match self.found {
397            Some(id) => ParseStatus::Found(id),
398            None => ParseStatus::NeedMore,
399        }
400    }
401
402    /// Decode raw bytes using the stateful decoder.
403    fn decode_bytes(&mut self, bytes: &[u8], last: bool) -> String {
404        let decoder = self.decoder.as_mut().expect("decoder not initialized");
405        let max_len = decoder
406            .max_utf8_buffer_length(bytes.len())
407            .unwrap_or(bytes.len() * 4 + 4);
408        let mut output = String::with_capacity(max_len);
409
410        let mut pos = 0;
411        loop {
412            let (result, read, _had_errors) =
413                decoder.decode_to_string(&bytes[pos..], &mut output, last);
414            pos += read;
415            match result {
416                encoding_rs::CoderResult::InputEmpty => break,
417                encoding_rs::CoderResult::OutputFull => {
418                    let additional = decoder
419                        .max_utf8_buffer_length(bytes.len() - pos)
420                        .unwrap_or(16);
421                    output.reserve(additional);
422                }
423            }
424        }
425
426        output
427    }
428}
429
430/// Parse an HTML document from an iterator of byte chunks.
431///
432/// Detects encoding from the first 1 KB and builds the tree incrementally.
433///
434/// # Example
435///
436/// ```
437/// use fhp_tree::streaming::parse_stream;
438///
439/// let html = b"<div><p>Hello</p></div>";
440/// let doc = parse_stream(html.chunks(64)).unwrap();
441/// assert_eq!(doc.root().text_content(), "Hello");
442/// ```
443pub fn parse_stream<'a>(chunks: impl Iterator<Item = &'a [u8]>) -> Result<Document, HtmlError> {
444    let mut parser = StreamParser::new();
445    for chunk in chunks {
446        parser.feed(chunk);
447    }
448    parser.finish()
449}
450
451/// Determine the BOM length to strip for a given encoding.
452fn bom_length(input: &[u8], encoding: &'static Encoding) -> usize {
453    if encoding == encoding_rs::UTF_8
454        && input.len() >= 3
455        && input[0] == 0xEF
456        && input[1] == 0xBB
457        && input[2] == 0xBF
458    {
459        return 3;
460    }
461    if encoding == encoding_rs::UTF_16LE && input.len() >= 2 && input[0] == 0xFF && input[1] == 0xFE
462    {
463        return 2;
464    }
465    if encoding == encoding_rs::UTF_16BE && input.len() >= 2 && input[0] == 0xFE && input[1] == 0xFF
466    {
467        return 2;
468    }
469    0
470}
471
472#[cfg(test)]
473mod tests {
474    use super::*;
475    use fhp_core::tag::Tag;
476
477    #[test]
478    fn stream_parser_single_chunk() {
479        let mut parser = StreamParser::new();
480        parser.feed(b"<div><p>Hello</p></div>");
481        let doc = parser.finish().unwrap();
482        assert_eq!(doc.root().text_content(), "Hello");
483    }
484
485    #[test]
486    fn stream_parser_multiple_chunks() {
487        let mut parser = StreamParser::new();
488        parser.feed(b"<div>");
489        parser.feed(b"<p>Hello</p>");
490        parser.feed(b"</div>");
491        let doc = parser.finish().unwrap();
492        assert_eq!(doc.root().text_content(), "Hello");
493    }
494
495    #[test]
496    fn stream_parser_empty_chunks() {
497        let mut parser = StreamParser::new();
498        parser.feed(b"");
499        parser.feed(b"<p>ok</p>");
500        parser.feed(b"");
501        let doc = parser.finish().unwrap();
502        assert_eq!(doc.root().text_content(), "ok");
503    }
504
505    #[test]
506    fn stream_parser_byte_by_byte() {
507        let html = b"<div>hi</div>";
508        let mut parser = StreamParser::new();
509        for &b in html.iter() {
510            parser.feed(&[b]);
511        }
512        let doc = parser.finish().unwrap();
513        let text = doc.root().text_content();
514        assert!(text.contains("hi"), "text: {text}");
515    }
516
517    #[test]
518    fn parse_stream_convenience() {
519        let html = b"<div><p>Hello</p></div>";
520        let doc = parse_stream(html.chunks(7)).unwrap();
521        assert_eq!(doc.root().text_content(), "Hello");
522    }
523
524    #[test]
525    fn early_stop_finds_tag() {
526        let mut parser = EarlyStopParser::stop_when(|node| node.tag == Tag::A);
527        let html = b"<div><p>text</p><a href=\"#\">link</a><span>more</span></div>";
528        match parser.feed(html) {
529            ParseStatus::Found(_id) => {}
530            other => panic!("expected Found, got {other:?}"),
531        }
532    }
533
534    #[test]
535    fn early_stop_not_found() {
536        let mut parser = EarlyStopParser::stop_when(|node| node.tag == Tag::A);
537        let html = b"<div><p>no links here</p></div>";
538        match parser.feed(html) {
539            ParseStatus::NeedMore => {}
540            other => panic!("expected NeedMore, got {other:?}"),
541        }
542        match parser.finish() {
543            ParseStatus::Done(doc) => {
544                assert_eq!(doc.root().text_content(), "no links here");
545            }
546            other => panic!("expected Done, got {other:?}"),
547        }
548    }
549
550    #[test]
551    fn early_stop_multi_chunk() {
552        let mut parser = EarlyStopParser::stop_when(|node| node.tag == Tag::A);
553        assert!(matches!(
554            parser.feed(b"<div><p>text</p>"),
555            ParseStatus::NeedMore
556        ));
557        match parser.feed(b"<a href=\"#\">link</a></div>") {
558            ParseStatus::Found(_id) => {}
559            other => panic!("expected Found, got {other:?}"),
560        }
561    }
562
563    #[test]
564    fn bom_length_utf8() {
565        assert_eq!(bom_length(b"\xEF\xBB\xBF<html>", encoding_rs::UTF_8), 3);
566        assert_eq!(bom_length(b"<html>", encoding_rs::UTF_8), 0);
567    }
568
569    #[test]
570    fn bom_length_utf16() {
571        assert_eq!(bom_length(b"\xFF\xFE<\x00", encoding_rs::UTF_16LE), 2);
572        assert_eq!(bom_length(b"\xFE\xFF\x00<", encoding_rs::UTF_16BE), 2);
573    }
574
575    #[test]
576    fn stream_encoding_windows_1254() {
577        let mut html = b"<meta charset=\"windows-1254\"><body>".to_vec();
578        html.extend_from_slice(&[0xFE, 0xF0]); // ş, ğ in windows-1254
579        html.extend_from_slice(b"</body>");
580
581        let doc = parse_stream(html.chunks(15)).unwrap();
582        let text = doc.root().text_content();
583        assert!(text.contains('ş'), "text: {text}");
584        assert!(text.contains('ğ'), "text: {text}");
585    }
586}