Skip to main content

llm_transpile/
stream.rs

1//! stream.rs — Tokio-based Streaming Transpiler
2//!
3//! Delivers document chunks to the LLM before full processing completes,
4//! minimizing TTFT (Time-To-First-Token).
5//!
6//! # Pipeline
7//! ```text
8//! AsyncRead → IncrementalParser → AdaptiveCompressor → StreamingRenderer
9//!                                        ↑
10//!                              Switches to Compressed at 80% budget usage
11//! ```
12//!
13//! # Symbol substitution in streaming
14//!
15//! The single-pass streaming pipeline cannot discover all domain terms before the
16//! stream starts, so [`SymbolDict`] **remains empty by default**. Use
17//! [`StreamingTranspiler::with_dict`] to inject a pre-populated dictionary when
18//! domain terms are known in advance.
19//!
20//! # Token counting
21//!
22//! By default, [`estimate_tokens`] uses a Unicode-script heuristic (chars-per-token).
23//! Enable the `tiktoken` Cargo feature for accurate `cl100k_base` counting:
24//!
25//! ```toml
26//! [dependencies]
27//! llm-transpiler = { features = ["tiktoken"] }
28//! ```
29
30use std::pin::Pin;
31
32use futures::Stream;
33use tokio::sync::mpsc;
34use tokio_stream::wrappers::ReceiverStream;
35
36use crate::compressor::{AdaptiveCompressor, CompressionConfig};
37use crate::ir::{DocNode, FidelityLevel, IRDocument};
38use crate::renderer::render_node;
39use crate::symbol::SymbolDict;
40
41// ────────────────────────────────────────────────
42// 1. Chunk type
43// ────────────────────────────────────────────────
44
45/// A single output unit produced by the streaming transpiler.
46#[derive(Debug, Clone)]
47pub struct TranspileChunk {
48    /// Transmission sequence number (0-based).
49    pub sequence: usize,
50    /// Rendered text fragment.
51    pub content: String,
52    /// Approximate token count for this chunk.
53    ///
54    /// Uses the `tiktoken` feature (cl100k_base) when enabled; otherwise falls back
55    /// to the Unicode-script character heuristic.
56    pub token_count: usize,
57    /// Whether this is the final chunk.
58    pub is_final: bool,
59}
60
61impl TranspileChunk {
62    fn new(sequence: usize, content: String, is_final: bool) -> Self {
63        let token_count = estimate_tokens(&content);
64        Self {
65            sequence,
66            content,
67            token_count,
68            is_final,
69        }
70    }
71}
72
73// ────────────────────────────────────────────────
74// 2. Token estimation
75// ────────────────────────────────────────────────
76
77/// Returns the approximate token count for `text`.
78///
79/// ## Feature: `tiktoken` (accurate)
80/// When the `tiktoken` Cargo feature is enabled, uses OpenAI's `cl100k_base` tokenizer
81/// (GPT-4 / GPT-3.5-turbo vocabulary, also a reasonable approximation for Claude).
82/// The tokenizer is initialised once and cached in a `OnceLock`.
83///
84/// ## Default (heuristic)
85/// Applies a chars-per-token weight based on each character's Unicode script range,
86/// sums `1/cpt`, then takes the ceiling. **Not accurate for real models** — intended
87/// only as a lightweight approximation for systems that cannot carry the tiktoken
88/// dependency. For production use, enable the `tiktoken` feature.
89///
90/// | Script range | chars/token |
91/// |-------------|-------------|
92/// | Hiragana / Katakana / CJK / Hangul | 2 |
93/// | Arabic / Devanagari / Bengali / Tamil / Thai | 3 |
94/// | Emoji | 2 |
95/// | Latin and everything else | 4 |
96pub fn estimate_tokens(text: &str) -> usize {
97    #[cfg(feature = "tiktoken")]
98    {
99        use std::sync::OnceLock;
100        static BPE: OnceLock<tiktoken_rs::CoreBPE> = OnceLock::new();
101        let bpe = BPE.get_or_init(|| tiktoken_rs::cl100k_base().expect("cl100k_base init failed"));
102        bpe.encode_ordinary(text).len().max(1)
103    }
104
105    #[cfg(not(feature = "tiktoken"))]
106    {
107        let mut total = 0.0f64;
108        for c in text.chars() {
109            let cpt = chars_per_token(c);
110            total += 1.0 / cpt as f64;
111        }
112        (total.ceil() as usize).max(1)
113    }
114}
115
116/// Returns the chars-per-token value based on the Unicode codepoint range.
117///
118/// Not used when the `tiktoken` feature is active.
119#[cfg(not(feature = "tiktoken"))]
120fn chars_per_token(c: char) -> u32 {
121    let cp = c as u32;
122    match cp {
123        0x3040..=0x30FF => 2,   // Hiragana / Katakana
124        0x3400..=0x4DBF => 2,   // CJK Extension A
125        0x4E00..=0x9FFF => 2,   // CJK Unified Ideographs (BMP)
126        0xF900..=0xFAFF => 2,   // CJK Compatibility Ideographs
127        0xAC00..=0xD7FF => 2,   // Hangul Syllables (U+D7B0–D7FF: includes Jamo Extended-B)
128        0x1100..=0x11FF => 2,   // Hangul Jamo
129        0xA960..=0xA97F => 2,   // Hangul Jamo Extended-A
130        0x20000..=0x2A6DF => 2, // CJK Extension B
131        0x2A700..=0x2CEAF => 2, // CJK Extension C–F
132        0x2CEB0..=0x2EBEF => 2, // CJK Extension G
133        0x30000..=0x323AF => 2, // CJK Extension H–I
134        0x0600..=0x06FF => 3,   // Arabic
135        0x0750..=0x077F => 3,   // Arabic Supplement
136        0x0900..=0x097F => 3,   // Devanagari
137        0x0980..=0x09FF => 3,   // Bengali
138        0x0A00..=0x0A7F => 3,   // Gurmukhi
139        0x0B80..=0x0BFF => 3,   // Tamil
140        0x0E00..=0x0E7F => 3,   // Thai
141        // Emoji: ~1–2 tokens per char per GPT-4 → approximate as cpt=2
142        0x1F300..=0x1F9FF => 2, // Misc Symbols & Pictographs, Emoticons, Supplemental Symbols
143        0x1FA00..=0x1FAFF => 2, // Symbols and Pictographs Extended-A
144        _ => 4,                 // Latin and other scripts
145    }
146}
147
148// ────────────────────────────────────────────────
149// 3. StreamingTranspiler
150// ────────────────────────────────────────────────
151
152/// Default mpsc channel buffer size used when none is specified via
153/// [`StreamingTranspiler::with_channel_size`].
154const DEFAULT_CHANNEL_BUFFER: usize = 32;
155
156/// Tokio channel-based streaming transpiler.
157///
158/// # Symbol dictionary injection
159///
160/// By default the streaming path uses an **empty** [`SymbolDict`] because a
161/// single-pass stream cannot discover domain terms before it starts. Use
162/// [`StreamingTranspiler::with_dict`] to supply a pre-populated dictionary
163/// when terms are known in advance:
164///
165/// ```rust,no_run
166/// use llm_transpile::{FidelityLevel, SymbolDict, StreamingTranspiler};
167///
168/// let mut dict = SymbolDict::new();
169/// dict.intern("large language model").unwrap();
170/// dict.intern("retrieval-augmented generation").unwrap();
171///
172/// let transpiler = StreamingTranspiler::with_dict(4096, FidelityLevel::Semantic, dict);
173/// ```
174pub struct StreamingTranspiler {
175    compressor: AdaptiveCompressor,
176    budget: usize,
177    fidelity: FidelityLevel,
178    /// Pre-populated symbol dictionary used during streaming.
179    /// Empty by default; populate via `with_dict()` for domain-specific compression.
180    dict: SymbolDict,
181    /// Tokio mpsc channel buffer size.
182    /// Controls backpressure: the spawned producer task blocks when this many
183    /// chunks are in flight and the consumer has not yet polled them.
184    /// Larger values reduce producer stalls at the cost of higher memory usage;
185    /// smaller values increase backpressure and bound memory.
186    channel_buffer: usize,
187}
188
189impl StreamingTranspiler {
190    /// Creates a new transpiler with an empty symbol dictionary.
191    pub fn new(budget: usize, fidelity: FidelityLevel) -> Self {
192        Self {
193            compressor: AdaptiveCompressor::new(),
194            budget,
195            fidelity,
196            dict: SymbolDict::new(),
197            channel_buffer: DEFAULT_CHANNEL_BUFFER,
198        }
199    }
200
201    /// Creates a transpiler with a **pre-populated** symbol dictionary.
202    ///
203    /// Domain terms already registered in `dict` will be substituted with PUA
204    /// symbols during streaming and the `<D>` block will be emitted in the first
205    /// chunk. This is the recommended path when the document vocabulary is known
206    /// before streaming begins.
207    ///
208    /// # Example
209    /// ```rust,no_run
210    /// use llm_transpile::{FidelityLevel, SymbolDict, StreamingTranspiler};
211    ///
212    /// let mut dict = SymbolDict::new();
213    /// dict.intern("transformer model").unwrap();
214    ///
215    /// let transpiler = StreamingTranspiler::with_dict(8192, FidelityLevel::Semantic, dict);
216    /// ```
217    pub fn with_dict(budget: usize, fidelity: FidelityLevel, dict: SymbolDict) -> Self {
218        Self {
219            compressor: AdaptiveCompressor::new(),
220            budget,
221            fidelity,
222            dict,
223            channel_buffer: DEFAULT_CHANNEL_BUFFER,
224        }
225    }
226
227    /// Sets the Tokio mpsc channel buffer size and returns `self` for chaining.
228    ///
229    /// The internal pipeline uses a bounded `mpsc` channel to deliver chunks to
230    /// the caller. The buffer size controls **backpressure**: when `n` chunks are
231    /// already queued and the consumer has not polled them yet, the producer task
232    /// will `.await` before sending the next chunk. This bounds peak memory usage
233    /// to roughly `n` in-flight chunks at a time.
234    ///
235    /// - **Larger `n`**: fewer producer stalls, higher peak memory.
236    /// - **Smaller `n`**: tighter backpressure, lower memory footprint.
237    ///
238    /// Defaults to `32` when not set.
239    ///
240    /// # Example
241    /// ```rust,no_run
242    /// use llm_transpile::{FidelityLevel, StreamingTranspiler};
243    ///
244    /// let transpiler = StreamingTranspiler::new(4096, FidelityLevel::Semantic)
245    ///     .with_channel_size(8);
246    /// ```
247    pub fn with_channel_size(mut self, n: usize) -> Self {
248        // Tokio's mpsc::channel requires a capacity of at least 1; clamp to prevent panic.
249        self.channel_buffer = n.max(1);
250        self
251    }
252
253    /// Converts an `IRDocument` into a chunk stream.
254    ///
255    /// The first chunk always contains `<D>` (if non-empty) + `<H><B>`.
256    /// Automatically switches to `Compressed` fidelity when 80% of the budget is reached.
257    pub fn transpile(
258        self,
259        doc: IRDocument,
260    ) -> Pin<Box<dyn Stream<Item = Result<TranspileChunk, StreamError>> + Send>> {
261        let (tx, rx) =
262            mpsc::channel::<Result<TranspileChunk, StreamError>>(self.channel_buffer);
263        let stream = ReceiverStream::new(rx);
264
265        tokio::spawn(async move {
266            if let Err(e) = Self::run_pipeline(
267                doc,
268                self.budget,
269                self.fidelity,
270                self.compressor,
271                self.dict,
272                tx,
273            )
274            .await
275            {
276                // Error already sent over the channel; ignore at spawn level.
277                let _ = e;
278            }
279        });
280
281        Box::pin(stream)
282    }
283
284    async fn run_pipeline(
285        doc: IRDocument,
286        budget: usize,
287        fidelity: FidelityLevel,
288        compressor: AdaptiveCompressor,
289        dict: SymbolDict,
290        tx: mpsc::Sender<Result<TranspileChunk, StreamError>>,
291    ) -> Result<(), StreamError> {
292        let mut accumulated_tokens: usize = 0;
293        let mut sequence: usize = 0;
294
295        // ── Chunk 0: header (always first) ──────────────────────────────
296        let header_content = build_header_chunk(&doc, &dict);
297        accumulated_tokens += estimate_tokens(&header_content);
298
299        let total_nodes = doc.nodes.len();
300        let is_final_header = total_nodes == 0;
301
302        tx.send(Ok(TranspileChunk::new(
303            sequence,
304            header_content,
305            is_final_header,
306        )))
307        .await
308        .map_err(|_| StreamError::ChannelClosed)?;
309        sequence += 1;
310
311        if is_final_header {
312            return Ok(());
313        }
314
315        // ── Stream body nodes ────────────────────────────────────────────
316        let body_nodes: Vec<DocNode> = doc
317            .nodes
318            .into_iter()
319            .filter(|n| !matches!(n, crate::ir::DocNode::Metadata { .. }))
320            .collect();
321
322        // ── Batch compression (single pass over all body nodes) ──────────
323        // Compute effective fidelity once based on post-header token usage,
324        // then compress the entire batch in one call — O(N+M) Aho-Corasick
325        // pass instead of N separate passes, and no per-node Vec allocation.
326        let usage_after_header = if budget > 0 {
327            accumulated_tokens as f64 / budget as f64
328        } else {
329            1.0
330        };
331        let batch_fidelity = if fidelity != FidelityLevel::Lossless && usage_after_header >= 0.80 {
332            FidelityLevel::Compressed
333        } else {
334            fidelity
335        };
336        let batch_cfg = CompressionConfig {
337            budget,
338            current_tokens: accumulated_tokens,
339            fidelity: batch_fidelity,
340        };
341        let body_nodes = compressor.compress(body_nodes, &batch_cfg);
342
343        let body_len = body_nodes.len();
344        for (idx, node) in body_nodes.into_iter().enumerate() {
345            let is_last = body_len > 0 && idx == body_len - 1;
346
347            // Compression was already applied to the full batch above; render directly.
348            let chunk_text = render_node(&node, &dict);
349
350            if chunk_text.is_empty() {
351                continue; // Skip nodes entirely eliminated by compression
352            }
353
354            // Force final chunk when budget is exceeded
355            let tokens = estimate_tokens(&chunk_text);
356            accumulated_tokens += tokens;
357            let force_final = budget > 0 && accumulated_tokens >= budget;
358            let is_final = is_last || force_final;
359
360            // Append </B> closing tag to the final chunk
361            let content = if is_final {
362                format!("{}\n</B>", chunk_text.trim())
363            } else {
364                chunk_text
365            };
366
367            // TranspileChunk::new re-calls estimate_tokens internally, so
368            // token_count is recalculated based on content (including the </B> tag).
369            // accumulated_tokens is based on chunk_text — within acceptable error margin.
370            tx.send(Ok(TranspileChunk::new(sequence, content, is_final)))
371                .await
372                .map_err(|_| StreamError::ChannelClosed)?;
373            sequence += 1;
374
375            if force_final {
376                break;
377            }
378        }
379
380        // Guard for the edge case where body nodes existed but the final chunk was never sent
381        // (all nodes eliminated by compression)
382        if sequence == 1 {
383            tx.send(Ok(TranspileChunk::new(sequence, "</B>".to_string(), true)))
384                .await
385                .map_err(|_| StreamError::ChannelClosed)?;
386        }
387
388        Ok(())
389    }
390}
391
392// ────────────────────────────────────────────────
393// 4. Helper functions
394// ────────────────────────────────────────────────
395
396/// Builds the document header chunk text (`<D>?<H><B>` opening).
397fn build_header_chunk(doc: &IRDocument, dict: &SymbolDict) -> String {
398    let dict_block = dict.render_dict_header();
399    let yaml = crate::renderer::build_yaml_header(doc);
400
401    let mut out = String::new();
402    if !dict_block.is_empty() {
403        out.push_str(&dict_block);
404    }
405    if !yaml.is_empty() {
406        out.push_str("<H>\n");
407        out.push_str(yaml.trim());
408        out.push_str("\n</H>\n");
409    }
410    out.push_str("<B>");
411    out
412}
413
414// ────────────────────────────────────────────────
415// 5. Error type
416// ────────────────────────────────────────────────
417
418/// Streaming transpile error.
419#[derive(Debug, thiserror::Error)]
420pub enum StreamError {
421    #[error("stream channel closed")]
422    ChannelClosed,
423
424    #[error("parse failed: {0}")]
425    Parse(String),
426
427    #[error("input exceeds maximum allowed size of {0} bytes")]
428    InputTooLarge(usize),
429}
430
431// ────────────────────────────────────────────────
432// 6. Unit tests
433// ────────────────────────────────────────────────
434
435#[cfg(test)]
436mod tests {
437    use super::*;
438    use crate::ir::DocNode;
439    use futures::StreamExt;
440
441    fn make_doc(fidelity: FidelityLevel, paras: &[&str]) -> IRDocument {
442        let mut doc = IRDocument::new(fidelity, None);
443        doc.push(DocNode::Metadata {
444            key: "title".into(),
445            value: "스트리밍 테스트".into(),
446        });
447        for (i, &text) in paras.iter().enumerate() {
448            doc.push(DocNode::Para {
449                text: text.into(),
450                importance: 1.0 - (i as f32 * 0.1),
451            });
452        }
453        doc
454    }
455
456    #[tokio::test]
457    async fn first_chunk_contains_header() {
458        let doc = make_doc(FidelityLevel::Semantic, &["첫 번째 단락"]);
459        let transpiler = StreamingTranspiler::new(10_000, FidelityLevel::Semantic);
460        let mut stream = transpiler.transpile(doc);
461
462        let first = stream.next().await.unwrap().unwrap();
463        assert_eq!(first.sequence, 0);
464        assert!(
465            first.content.contains("<H>"),
466            "first chunk must contain the header"
467        );
468        assert!(
469            first.content.contains("<B>"),
470            "first chunk must contain the <B> opening"
471        );
472    }
473
474    #[tokio::test]
475    async fn last_chunk_is_marked_final() {
476        let doc = make_doc(FidelityLevel::Semantic, &["단락A", "단락B"]);
477        let transpiler = StreamingTranspiler::new(10_000, FidelityLevel::Semantic);
478        let mut stream = transpiler.transpile(doc);
479
480        let mut last_chunk = None;
481        while let Some(chunk) = stream.next().await {
482            last_chunk = Some(chunk.unwrap());
483        }
484        let last = last_chunk.expect("at least one chunk must exist");
485        assert!(last.is_final, "last chunk must have is_final=true");
486    }
487
488    #[tokio::test]
489    async fn budget_triggers_force_final() {
490        // Extremely low budget → force-final on the first body chunk
491        let doc = make_doc(
492            FidelityLevel::Semantic,
493            &["긴 내용 단락1", "긴 내용 단락2", "긴 내용 단락3"],
494        );
495        let transpiler = StreamingTranspiler::new(5, FidelityLevel::Semantic); // 5-token budget
496        let chunks: Vec<_> = transpiler.transpile(doc).collect::<Vec<_>>().await;
497
498        let finals: Vec<_> = chunks
499            .iter()
500            .filter(|c| c.as_ref().unwrap().is_final)
501            .collect();
502        assert_eq!(finals.len(), 1, "exactly one chunk must have is_final=true");
503    }
504
505    #[tokio::test]
506    async fn with_dict_emits_dict_block_in_first_chunk() {
507        let mut dict = SymbolDict::new();
508        dict.intern("대규모언어모델").unwrap();
509
510        let doc = make_doc(FidelityLevel::Semantic, &["대규모언어모델 연구 동향"]);
511        let transpiler = StreamingTranspiler::with_dict(10_000, FidelityLevel::Semantic, dict);
512        let mut stream = transpiler.transpile(doc);
513
514        let first = stream.next().await.unwrap().unwrap();
515        assert!(
516            first.content.contains("<D>"),
517            "first chunk must contain the <D> dictionary block when dict is pre-populated"
518        );
519        assert!(
520            first.content.contains("대규모언어모델"),
521            "dictionary block must list the interned term"
522        );
523    }
524
525    #[tokio::test]
526    async fn custom_channel_size_streaming_works() {
527        // Verify that a non-default channel buffer size (4) does not break
528        // streaming correctness: all chunks arrive, the last is marked final,
529        // and the closing </B> tag is present.
530        let doc = make_doc(
531            FidelityLevel::Semantic,
532            &["단락 one", "단락 two", "단락 three"],
533        );
534        let transpiler = StreamingTranspiler::new(10_000, FidelityLevel::Semantic)
535            .with_channel_size(4);
536
537        let chunks: Vec<_> = transpiler
538            .transpile(doc)
539            .collect::<Vec<_>>()
540            .await
541            .into_iter()
542            .map(|r| r.unwrap())
543            .collect();
544
545        assert!(!chunks.is_empty(), "must produce at least one chunk");
546
547        let final_count = chunks.iter().filter(|c| c.is_final).count();
548        assert_eq!(final_count, 1, "exactly one chunk must be final");
549
550        let last = chunks.last().unwrap();
551        assert!(last.is_final);
552        assert!(last.content.contains("</B>"), "last chunk must close <B>");
553    }
554
555    #[test]
556    fn estimate_tokens_nonzero() {
557        assert!(estimate_tokens("hello world") > 0);
558        assert!(estimate_tokens("") == 1); // min=1 guard
559    }
560
561    #[test]
562    fn estimate_tokens_empty_is_one() {
563        assert_eq!(estimate_tokens(""), 1);
564    }
565
566    #[test]
567    fn estimate_tokens_latin_positive() {
568        assert!(estimate_tokens("hello") > 0);
569    }
570
571    #[test]
572    #[cfg(not(feature = "tiktoken"))]
573    fn estimate_tokens_cjk_more_than_latin_same_char_count() {
574        // CJK 5 chars: 5 * (1/2) = 2.5 → ceil → 3 tokens
575        // Latin 5 chars: 5 * (1/4) = 1.25 → ceil → 2 tokens
576        // CJK token count > Latin token count
577        let cjk = estimate_tokens("こんにちは"); // Hiragana, 5 chars
578        let latin = estimate_tokens("hello"); // Latin, 5 chars
579        assert!(
580            cjk > latin,
581            "CJK 5 chars ({cjk}) must have more tokens than Latin 5 chars ({latin})"
582        );
583    }
584
585    #[test]
586    #[cfg(not(feature = "tiktoken"))]
587    fn estimate_tokens_hangul_more_than_latin() {
588        // Hangul 4 chars: 4 * (1/2) = 2.0 → ceil → 2 tokens
589        // Latin 4 chars: 4 * (1/4) = 1.0 → ceil → 1 token
590        let hangul = estimate_tokens("안녕하세");
591        let latin = estimate_tokens("hell");
592        assert!(
593            hangul > latin,
594            "Hangul ({hangul}) must have more tokens than Latin ({latin})"
595        );
596    }
597
598    #[test]
599    fn estimate_tokens_never_zero_for_nonempty() {
600        for text in &["a", "안", "あ", "ع", "क", "ก"] {
601            assert!(
602                estimate_tokens(text) >= 1,
603                "'{text}' must be at least 1 token"
604            );
605        }
606    }
607
608    /// Batch compression regression: with many identical paragraphs,
609    /// deduplication (DeduplicateAndLinearize stage) should fire and reduce
610    /// the chunk count compared to the raw paragraph count.
611    /// This test is only meaningful if compression fires (high budget usage).
612    #[tokio::test]
613    async fn batch_compression_deduplicates_identical_paras() {
614        // 5 identical paragraphs + 95% budget consumed → DeduplicateAndLinearize fires.
615        // After batch compression only 1 unique paragraph should remain, producing
616        // exactly 2 chunks: the header (seq=0) + 1 body chunk (seq=1, is_final).
617        let mut doc = IRDocument::new(FidelityLevel::Semantic, None);
618        doc.push(DocNode::Metadata {
619            key: "title".into(),
620            value: "배치 압축 테스트".into(),
621        });
622        // Use high-budget-usage to trigger DeduplicateAndLinearize
623        let para_text = "identical content paragraph.";
624        for _ in 0..5 {
625            doc.push(DocNode::Para {
626                text: para_text.into(),
627                importance: 1.0,
628            });
629        }
630
631        // budget=10: header already uses ~5 tokens → usage >80% → Compressed mode
632        let transpiler = StreamingTranspiler::new(10, FidelityLevel::Semantic);
633        let chunks: Vec<_> = transpiler
634            .transpile(doc)
635            .collect::<Vec<_>>()
636            .await
637            .into_iter()
638            .map(|r| r.unwrap())
639            .collect();
640
641        assert!(!chunks.is_empty(), "must produce at least one chunk");
642        let final_count = chunks.iter().filter(|c| c.is_final).count();
643        assert_eq!(final_count, 1, "exactly one chunk must be final");
644    }
645}