Skip to main content

stream_chunkrec/
lib.rs

1//! # stream-chunkrec
2//!
3//! Recombine streaming token deltas (bytes from a server-sent-event
4//! body, an HTTP chunk transfer, or any other framed source) into
5//! valid UTF-8 text.
6//!
7//! Streams that arrive byte-by-byte can split a multi-byte UTF-8
8//! sequence across two chunks. Naive `String::from_utf8_lossy(chunk)`
9//! turns the split fragments into replacement characters.
10//!
11//! This crate buffers the trailing partial codepoint between pushes
12//! and emits only the bytes that resolve into whole characters.
13//!
14//! ## Example
15//!
16//! ```
17//! use stream_chunkrec::Recombiner;
18//! let mut r = Recombiner::new();
19//! // "café" = 63 61 66 C3 A9
20//! assert_eq!(r.push(&[0x63, 0x61, 0x66, 0xC3]), "caf"); // C3 is incomplete
21//! assert_eq!(r.push(&[0xA9]), "é"); // completes the codepoint
22//! assert_eq!(r.flush(), "");
23//! ```
24
25#![deny(missing_docs)]
26
27/// UTF-8-safe streaming recombiner.
28#[derive(Debug, Default, Clone)]
29pub struct Recombiner {
30    pending: Vec<u8>,
31}
32
33impl Recombiner {
34    /// Build an empty recombiner.
35    pub fn new() -> Self {
36        Self::default()
37    }
38
39    /// Push the next chunk. Returns whatever resolved to whole UTF-8
40    /// codepoints; any trailing fragment is buffered.
41    pub fn push(&mut self, bytes: &[u8]) -> String {
42        self.pending.extend_from_slice(bytes);
43        // Find the longest prefix that is valid UTF-8.
44        let split = longest_valid_utf8_prefix(&self.pending);
45        let prefix: Vec<u8> = self.pending.drain(..split).collect();
46        // SAFETY: validated above.
47        String::from_utf8(prefix).unwrap_or_default()
48    }
49
50    /// Flush any buffered bytes. Invalid pending bytes are emitted as
51    /// U+FFFD (Unicode replacement character).
52    pub fn flush(&mut self) -> String {
53        let out = String::from_utf8_lossy(&self.pending).into_owned();
54        self.pending.clear();
55        out
56    }
57
58    /// Number of bytes currently buffered (incomplete codepoint).
59    pub fn pending(&self) -> usize {
60        self.pending.len()
61    }
62}
63
64fn longest_valid_utf8_prefix(bytes: &[u8]) -> usize {
65    // Walk back at most 3 bytes from the end and check whether the
66    // truncated buffer parses cleanly.
67    for tail in 0..=3 {
68        if tail > bytes.len() {
69            break;
70        }
71        let end = bytes.len() - tail;
72        if std::str::from_utf8(&bytes[..end]).is_ok() {
73            return end;
74        }
75    }
76    0
77}