llm_transpile/stream.rs
1//! stream.rs — Tokio-based Streaming Transpiler
2//!
3//! Delivers document chunks to the LLM before full processing completes,
4//! minimizing TTFT (Time-To-First-Token).
5//!
6//! # Pipeline
7//! ```text
8//! AsyncRead → IncrementalParser → AdaptiveCompressor → StreamingRenderer
9//! ↑
10//! Switches to Compressed at 80% budget usage
11//! ```
12//!
13//! # Symbol substitution in streaming
14//!
15//! The single-pass streaming pipeline cannot discover all domain terms before the
16//! stream starts, so [`SymbolDict`] **remains empty by default**. Use
17//! [`StreamingTranspiler::with_dict`] to inject a pre-populated dictionary when
18//! domain terms are known in advance.
19//!
20//! # Token counting
21//!
22//! By default, [`estimate_tokens`] uses a Unicode-script heuristic (chars-per-token).
23//! Enable the `tiktoken` Cargo feature for accurate `cl100k_base` counting:
24//!
25//! ```toml
26//! [dependencies]
27//! llm-transpiler = { features = ["tiktoken"] }
28//! ```
29
30use std::pin::Pin;
31
32use futures::Stream;
33use tokio::sync::mpsc;
34use tokio_stream::wrappers::ReceiverStream;
35
36use crate::compressor::{AdaptiveCompressor, CompressionConfig};
37use crate::ir::{DocNode, FidelityLevel, IRDocument};
38use crate::renderer::render_node;
39use crate::symbol::SymbolDict;
40
41// ────────────────────────────────────────────────
42// 1. Chunk type
43// ────────────────────────────────────────────────
44
45/// A single output unit produced by the streaming transpiler.
46#[derive(Debug, Clone)]
47pub struct TranspileChunk {
48 /// Transmission sequence number (0-based).
49 pub sequence: usize,
50 /// Rendered text fragment.
51 pub content: String,
52 /// Approximate token count for this chunk.
53 ///
54 /// Uses the `tiktoken` feature (cl100k_base) when enabled; otherwise falls back
55 /// to the Unicode-script character heuristic.
56 pub token_count: usize,
57 /// Whether this is the final chunk.
58 pub is_final: bool,
59}
60
61impl TranspileChunk {
62 fn new(sequence: usize, content: String, is_final: bool) -> Self {
63 let token_count = estimate_tokens(&content);
64 Self {
65 sequence,
66 content,
67 token_count,
68 is_final,
69 }
70 }
71}
72
73// ────────────────────────────────────────────────
74// 2. Token estimation
75// ────────────────────────────────────────────────
76
77/// Returns the approximate token count for `text`.
78///
79/// ## Feature: `tiktoken` (accurate)
80/// When the `tiktoken` Cargo feature is enabled, uses OpenAI's `cl100k_base` tokenizer
81/// (GPT-4 / GPT-3.5-turbo vocabulary, also a reasonable approximation for Claude).
82/// The tokenizer is initialised once and cached in a `OnceLock`.
83///
84/// ## Default (heuristic)
85/// Applies a chars-per-token weight based on each character's Unicode script range,
86/// sums `1/cpt`, then takes the ceiling. **Not accurate for real models** — intended
87/// only as a lightweight approximation for systems that cannot carry the tiktoken
88/// dependency. For production use, enable the `tiktoken` feature.
89///
90/// | Script range | chars/token |
91/// |-------------|-------------|
92/// | Hiragana / Katakana / CJK / Hangul | 2 |
93/// | Arabic / Devanagari / Bengali / Tamil / Thai | 3 |
94/// | Emoji | 2 |
95/// | Latin and everything else | 4 |
96pub fn estimate_tokens(text: &str) -> usize {
97 #[cfg(feature = "tiktoken")]
98 {
99 use std::sync::OnceLock;
100 static BPE: OnceLock<tiktoken_rs::CoreBPE> = OnceLock::new();
101 let bpe = BPE.get_or_init(|| tiktoken_rs::cl100k_base().expect("cl100k_base init failed"));
102 bpe.encode_ordinary(text).len().max(1)
103 }
104
105 #[cfg(not(feature = "tiktoken"))]
106 {
107 let mut total = 0.0f64;
108 for c in text.chars() {
109 let cpt = chars_per_token(c);
110 total += 1.0 / cpt as f64;
111 }
112 (total.ceil() as usize).max(1)
113 }
114}
115
116/// Returns the chars-per-token value based on the Unicode codepoint range.
117///
118/// Not used when the `tiktoken` feature is active.
119#[cfg(not(feature = "tiktoken"))]
120fn chars_per_token(c: char) -> u32 {
121 let cp = c as u32;
122 match cp {
123 0x3040..=0x30FF => 2, // Hiragana / Katakana
124 0x3400..=0x4DBF => 2, // CJK Extension A
125 0x4E00..=0x9FFF => 2, // CJK Unified Ideographs (BMP)
126 0xF900..=0xFAFF => 2, // CJK Compatibility Ideographs
127 0xAC00..=0xD7FF => 2, // Hangul Syllables (U+D7B0–D7FF: includes Jamo Extended-B)
128 0x1100..=0x11FF => 2, // Hangul Jamo
129 0xA960..=0xA97F => 2, // Hangul Jamo Extended-A
130 0x20000..=0x2A6DF => 2, // CJK Extension B
131 0x2A700..=0x2CEAF => 2, // CJK Extension C–F
132 0x2CEB0..=0x2EBEF => 2, // CJK Extension G
133 0x30000..=0x323AF => 2, // CJK Extension H–I
134 0x0600..=0x06FF => 3, // Arabic
135 0x0750..=0x077F => 3, // Arabic Supplement
136 0x0900..=0x097F => 3, // Devanagari
137 0x0980..=0x09FF => 3, // Bengali
138 0x0A00..=0x0A7F => 3, // Gurmukhi
139 0x0B80..=0x0BFF => 3, // Tamil
140 0x0E00..=0x0E7F => 3, // Thai
141 // Emoji: ~1–2 tokens per char per GPT-4 → approximate as cpt=2
142 0x1F300..=0x1F9FF => 2, // Misc Symbols & Pictographs, Emoticons, Supplemental Symbols
143 0x1FA00..=0x1FAFF => 2, // Symbols and Pictographs Extended-A
144 _ => 4, // Latin and other scripts
145 }
146}
147
148// ────────────────────────────────────────────────
149// 3. StreamingTranspiler
150// ────────────────────────────────────────────────
151
152/// Default mpsc channel buffer size used when none is specified via
153/// [`StreamingTranspiler::with_channel_size`].
154const DEFAULT_CHANNEL_BUFFER: usize = 32;
155
156/// Tokio channel-based streaming transpiler.
157///
158/// # Symbol dictionary injection
159///
160/// By default the streaming path uses an **empty** [`SymbolDict`] because a
161/// single-pass stream cannot discover domain terms before it starts. Use
162/// [`StreamingTranspiler::with_dict`] to supply a pre-populated dictionary
163/// when terms are known in advance:
164///
165/// ```rust,no_run
166/// use llm_transpile::{FidelityLevel, SymbolDict, StreamingTranspiler};
167///
168/// let mut dict = SymbolDict::new();
169/// dict.intern("large language model").unwrap();
170/// dict.intern("retrieval-augmented generation").unwrap();
171///
172/// let transpiler = StreamingTranspiler::with_dict(4096, FidelityLevel::Semantic, dict);
173/// ```
174pub struct StreamingTranspiler {
175 compressor: AdaptiveCompressor,
176 budget: usize,
177 fidelity: FidelityLevel,
178 /// Pre-populated symbol dictionary used during streaming.
179 /// Empty by default; populate via `with_dict()` for domain-specific compression.
180 dict: SymbolDict,
181 /// Tokio mpsc channel buffer size.
182 /// Controls backpressure: the spawned producer task blocks when this many
183 /// chunks are in flight and the consumer has not yet polled them.
184 /// Larger values reduce producer stalls at the cost of higher memory usage;
185 /// smaller values increase backpressure and bound memory.
186 channel_buffer: usize,
187}
188
189impl StreamingTranspiler {
190 /// Creates a new transpiler with an empty symbol dictionary.
191 pub fn new(budget: usize, fidelity: FidelityLevel) -> Self {
192 Self {
193 compressor: AdaptiveCompressor::new(),
194 budget,
195 fidelity,
196 dict: SymbolDict::new(),
197 channel_buffer: DEFAULT_CHANNEL_BUFFER,
198 }
199 }
200
201 /// Creates a transpiler with a **pre-populated** symbol dictionary.
202 ///
203 /// Domain terms already registered in `dict` will be substituted with PUA
204 /// symbols during streaming and the `<D>` block will be emitted in the first
205 /// chunk. This is the recommended path when the document vocabulary is known
206 /// before streaming begins.
207 ///
208 /// # Example
209 /// ```rust,no_run
210 /// use llm_transpile::{FidelityLevel, SymbolDict, StreamingTranspiler};
211 ///
212 /// let mut dict = SymbolDict::new();
213 /// dict.intern("transformer model").unwrap();
214 ///
215 /// let transpiler = StreamingTranspiler::with_dict(8192, FidelityLevel::Semantic, dict);
216 /// ```
217 pub fn with_dict(budget: usize, fidelity: FidelityLevel, dict: SymbolDict) -> Self {
218 Self {
219 compressor: AdaptiveCompressor::new(),
220 budget,
221 fidelity,
222 dict,
223 channel_buffer: DEFAULT_CHANNEL_BUFFER,
224 }
225 }
226
227 /// Sets the Tokio mpsc channel buffer size and returns `self` for chaining.
228 ///
229 /// The internal pipeline uses a bounded `mpsc` channel to deliver chunks to
230 /// the caller. The buffer size controls **backpressure**: when `n` chunks are
231 /// already queued and the consumer has not polled them yet, the producer task
232 /// will `.await` before sending the next chunk. This bounds peak memory usage
233 /// to roughly `n` in-flight chunks at a time.
234 ///
235 /// - **Larger `n`**: fewer producer stalls, higher peak memory.
236 /// - **Smaller `n`**: tighter backpressure, lower memory footprint.
237 ///
238 /// Defaults to `32` when not set.
239 ///
240 /// # Example
241 /// ```rust,no_run
242 /// use llm_transpile::{FidelityLevel, StreamingTranspiler};
243 ///
244 /// let transpiler = StreamingTranspiler::new(4096, FidelityLevel::Semantic)
245 /// .with_channel_size(8);
246 /// ```
247 pub fn with_channel_size(mut self, n: usize) -> Self {
248 // Tokio's mpsc::channel requires a capacity of at least 1; clamp to prevent panic.
249 self.channel_buffer = n.max(1);
250 self
251 }
252
253 /// Converts an `IRDocument` into a chunk stream.
254 ///
255 /// The first chunk always contains `<D>` (if non-empty) + `<H><B>`.
256 /// Automatically switches to `Compressed` fidelity when 80% of the budget is reached.
257 pub fn transpile(
258 self,
259 doc: IRDocument,
260 ) -> Pin<Box<dyn Stream<Item = Result<TranspileChunk, StreamError>> + Send>> {
261 let (tx, rx) =
262 mpsc::channel::<Result<TranspileChunk, StreamError>>(self.channel_buffer);
263 let stream = ReceiverStream::new(rx);
264
265 tokio::spawn(async move {
266 if let Err(e) = Self::run_pipeline(
267 doc,
268 self.budget,
269 self.fidelity,
270 self.compressor,
271 self.dict,
272 tx,
273 )
274 .await
275 {
276 // Error already sent over the channel; ignore at spawn level.
277 let _ = e;
278 }
279 });
280
281 Box::pin(stream)
282 }
283
284 async fn run_pipeline(
285 doc: IRDocument,
286 budget: usize,
287 fidelity: FidelityLevel,
288 compressor: AdaptiveCompressor,
289 dict: SymbolDict,
290 tx: mpsc::Sender<Result<TranspileChunk, StreamError>>,
291 ) -> Result<(), StreamError> {
292 let mut accumulated_tokens: usize = 0;
293 let mut sequence: usize = 0;
294
295 // ── Chunk 0: header (always first) ──────────────────────────────
296 let header_content = build_header_chunk(&doc, &dict);
297 accumulated_tokens += estimate_tokens(&header_content);
298
299 let total_nodes = doc.nodes.len();
300 let is_final_header = total_nodes == 0;
301
302 tx.send(Ok(TranspileChunk::new(
303 sequence,
304 header_content,
305 is_final_header,
306 )))
307 .await
308 .map_err(|_| StreamError::ChannelClosed)?;
309 sequence += 1;
310
311 if is_final_header {
312 return Ok(());
313 }
314
315 // ── Stream body nodes ────────────────────────────────────────────
316 let body_nodes: Vec<DocNode> = doc
317 .nodes
318 .into_iter()
319 .filter(|n| !matches!(n, crate::ir::DocNode::Metadata { .. }))
320 .collect();
321
322 // ── Batch compression (single pass over all body nodes) ──────────
323 // Compute effective fidelity once based on post-header token usage,
324 // then compress the entire batch in one call — O(N+M) Aho-Corasick
325 // pass instead of N separate passes, and no per-node Vec allocation.
326 let usage_after_header = if budget > 0 {
327 accumulated_tokens as f64 / budget as f64
328 } else {
329 1.0
330 };
331 let batch_fidelity = if fidelity != FidelityLevel::Lossless && usage_after_header >= 0.80 {
332 FidelityLevel::Compressed
333 } else {
334 fidelity
335 };
336 let batch_cfg = CompressionConfig {
337 budget,
338 current_tokens: accumulated_tokens,
339 fidelity: batch_fidelity,
340 };
341 let body_nodes = compressor.compress(body_nodes, &batch_cfg);
342
343 let body_len = body_nodes.len();
344 for (idx, node) in body_nodes.into_iter().enumerate() {
345 let is_last = body_len > 0 && idx == body_len - 1;
346
347 // Compression was already applied to the full batch above; render directly.
348 let chunk_text = render_node(&node, &dict);
349
350 if chunk_text.is_empty() {
351 continue; // Skip nodes entirely eliminated by compression
352 }
353
354 // Force final chunk when budget is exceeded
355 let tokens = estimate_tokens(&chunk_text);
356 accumulated_tokens += tokens;
357 let force_final = budget > 0 && accumulated_tokens >= budget;
358 let is_final = is_last || force_final;
359
360 // Append </B> closing tag to the final chunk
361 let content = if is_final {
362 format!("{}\n</B>", chunk_text.trim())
363 } else {
364 chunk_text
365 };
366
367 // TranspileChunk::new re-calls estimate_tokens internally, so
368 // token_count is recalculated based on content (including the </B> tag).
369 // accumulated_tokens is based on chunk_text — within acceptable error margin.
370 tx.send(Ok(TranspileChunk::new(sequence, content, is_final)))
371 .await
372 .map_err(|_| StreamError::ChannelClosed)?;
373 sequence += 1;
374
375 if force_final {
376 break;
377 }
378 }
379
380 // Guard for the edge case where body nodes existed but the final chunk was never sent
381 // (all nodes eliminated by compression)
382 if sequence == 1 {
383 tx.send(Ok(TranspileChunk::new(sequence, "</B>".to_string(), true)))
384 .await
385 .map_err(|_| StreamError::ChannelClosed)?;
386 }
387
388 Ok(())
389 }
390}
391
392// ────────────────────────────────────────────────
393// 4. Helper functions
394// ────────────────────────────────────────────────
395
396/// Builds the document header chunk text (`<D>?<H><B>` opening).
397fn build_header_chunk(doc: &IRDocument, dict: &SymbolDict) -> String {
398 let dict_block = dict.render_dict_header();
399 let yaml = crate::renderer::build_yaml_header(doc);
400
401 let mut out = String::new();
402 if !dict_block.is_empty() {
403 out.push_str(&dict_block);
404 }
405 if !yaml.is_empty() {
406 out.push_str("<H>\n");
407 out.push_str(yaml.trim());
408 out.push_str("\n</H>\n");
409 }
410 out.push_str("<B>");
411 out
412}
413
414// ────────────────────────────────────────────────
415// 5. Error type
416// ────────────────────────────────────────────────
417
418/// Streaming transpile error.
419#[derive(Debug, thiserror::Error)]
420pub enum StreamError {
421 #[error("stream channel closed")]
422 ChannelClosed,
423
424 #[error("parse failed: {0}")]
425 Parse(String),
426
427 #[error("input exceeds maximum allowed size of {0} bytes")]
428 InputTooLarge(usize),
429}
430
431// ────────────────────────────────────────────────
432// 6. Unit tests
433// ────────────────────────────────────────────────
434
435#[cfg(test)]
436mod tests {
437 use super::*;
438 use crate::ir::DocNode;
439 use futures::StreamExt;
440
441 fn make_doc(fidelity: FidelityLevel, paras: &[&str]) -> IRDocument {
442 let mut doc = IRDocument::new(fidelity, None);
443 doc.push(DocNode::Metadata {
444 key: "title".into(),
445 value: "스트리밍 테스트".into(),
446 });
447 for (i, &text) in paras.iter().enumerate() {
448 doc.push(DocNode::Para {
449 text: text.into(),
450 importance: 1.0 - (i as f32 * 0.1),
451 });
452 }
453 doc
454 }
455
456 #[tokio::test]
457 async fn first_chunk_contains_header() {
458 let doc = make_doc(FidelityLevel::Semantic, &["첫 번째 단락"]);
459 let transpiler = StreamingTranspiler::new(10_000, FidelityLevel::Semantic);
460 let mut stream = transpiler.transpile(doc);
461
462 let first = stream.next().await.unwrap().unwrap();
463 assert_eq!(first.sequence, 0);
464 assert!(
465 first.content.contains("<H>"),
466 "first chunk must contain the header"
467 );
468 assert!(
469 first.content.contains("<B>"),
470 "first chunk must contain the <B> opening"
471 );
472 }
473
474 #[tokio::test]
475 async fn last_chunk_is_marked_final() {
476 let doc = make_doc(FidelityLevel::Semantic, &["단락A", "단락B"]);
477 let transpiler = StreamingTranspiler::new(10_000, FidelityLevel::Semantic);
478 let mut stream = transpiler.transpile(doc);
479
480 let mut last_chunk = None;
481 while let Some(chunk) = stream.next().await {
482 last_chunk = Some(chunk.unwrap());
483 }
484 let last = last_chunk.expect("at least one chunk must exist");
485 assert!(last.is_final, "last chunk must have is_final=true");
486 }
487
488 #[tokio::test]
489 async fn budget_triggers_force_final() {
490 // Extremely low budget → force-final on the first body chunk
491 let doc = make_doc(
492 FidelityLevel::Semantic,
493 &["긴 내용 단락1", "긴 내용 단락2", "긴 내용 단락3"],
494 );
495 let transpiler = StreamingTranspiler::new(5, FidelityLevel::Semantic); // 5-token budget
496 let chunks: Vec<_> = transpiler.transpile(doc).collect::<Vec<_>>().await;
497
498 let finals: Vec<_> = chunks
499 .iter()
500 .filter(|c| c.as_ref().unwrap().is_final)
501 .collect();
502 assert_eq!(finals.len(), 1, "exactly one chunk must have is_final=true");
503 }
504
505 #[tokio::test]
506 async fn with_dict_emits_dict_block_in_first_chunk() {
507 let mut dict = SymbolDict::new();
508 dict.intern("대규모언어모델").unwrap();
509
510 let doc = make_doc(FidelityLevel::Semantic, &["대규모언어모델 연구 동향"]);
511 let transpiler = StreamingTranspiler::with_dict(10_000, FidelityLevel::Semantic, dict);
512 let mut stream = transpiler.transpile(doc);
513
514 let first = stream.next().await.unwrap().unwrap();
515 assert!(
516 first.content.contains("<D>"),
517 "first chunk must contain the <D> dictionary block when dict is pre-populated"
518 );
519 assert!(
520 first.content.contains("대규모언어모델"),
521 "dictionary block must list the interned term"
522 );
523 }
524
525 #[tokio::test]
526 async fn custom_channel_size_streaming_works() {
527 // Verify that a non-default channel buffer size (4) does not break
528 // streaming correctness: all chunks arrive, the last is marked final,
529 // and the closing </B> tag is present.
530 let doc = make_doc(
531 FidelityLevel::Semantic,
532 &["단락 one", "단락 two", "단락 three"],
533 );
534 let transpiler = StreamingTranspiler::new(10_000, FidelityLevel::Semantic)
535 .with_channel_size(4);
536
537 let chunks: Vec<_> = transpiler
538 .transpile(doc)
539 .collect::<Vec<_>>()
540 .await
541 .into_iter()
542 .map(|r| r.unwrap())
543 .collect();
544
545 assert!(!chunks.is_empty(), "must produce at least one chunk");
546
547 let final_count = chunks.iter().filter(|c| c.is_final).count();
548 assert_eq!(final_count, 1, "exactly one chunk must be final");
549
550 let last = chunks.last().unwrap();
551 assert!(last.is_final);
552 assert!(last.content.contains("</B>"), "last chunk must close <B>");
553 }
554
555 #[test]
556 fn estimate_tokens_nonzero() {
557 assert!(estimate_tokens("hello world") > 0);
558 assert!(estimate_tokens("") == 1); // min=1 guard
559 }
560
561 #[test]
562 fn estimate_tokens_empty_is_one() {
563 assert_eq!(estimate_tokens(""), 1);
564 }
565
566 #[test]
567 fn estimate_tokens_latin_positive() {
568 assert!(estimate_tokens("hello") > 0);
569 }
570
571 #[test]
572 #[cfg(not(feature = "tiktoken"))]
573 fn estimate_tokens_cjk_more_than_latin_same_char_count() {
574 // CJK 5 chars: 5 * (1/2) = 2.5 → ceil → 3 tokens
575 // Latin 5 chars: 5 * (1/4) = 1.25 → ceil → 2 tokens
576 // CJK token count > Latin token count
577 let cjk = estimate_tokens("こんにちは"); // Hiragana, 5 chars
578 let latin = estimate_tokens("hello"); // Latin, 5 chars
579 assert!(
580 cjk > latin,
581 "CJK 5 chars ({cjk}) must have more tokens than Latin 5 chars ({latin})"
582 );
583 }
584
585 #[test]
586 #[cfg(not(feature = "tiktoken"))]
587 fn estimate_tokens_hangul_more_than_latin() {
588 // Hangul 4 chars: 4 * (1/2) = 2.0 → ceil → 2 tokens
589 // Latin 4 chars: 4 * (1/4) = 1.0 → ceil → 1 token
590 let hangul = estimate_tokens("안녕하세");
591 let latin = estimate_tokens("hell");
592 assert!(
593 hangul > latin,
594 "Hangul ({hangul}) must have more tokens than Latin ({latin})"
595 );
596 }
597
598 #[test]
599 fn estimate_tokens_never_zero_for_nonempty() {
600 for text in &["a", "안", "あ", "ع", "क", "ก"] {
601 assert!(
602 estimate_tokens(text) >= 1,
603 "'{text}' must be at least 1 token"
604 );
605 }
606 }
607
608 /// Batch compression regression: with many identical paragraphs,
609 /// deduplication (DeduplicateAndLinearize stage) should fire and reduce
610 /// the chunk count compared to the raw paragraph count.
611 /// This test is only meaningful if compression fires (high budget usage).
612 #[tokio::test]
613 async fn batch_compression_deduplicates_identical_paras() {
614 // 5 identical paragraphs + 95% budget consumed → DeduplicateAndLinearize fires.
615 // After batch compression only 1 unique paragraph should remain, producing
616 // exactly 2 chunks: the header (seq=0) + 1 body chunk (seq=1, is_final).
617 let mut doc = IRDocument::new(FidelityLevel::Semantic, None);
618 doc.push(DocNode::Metadata {
619 key: "title".into(),
620 value: "배치 압축 테스트".into(),
621 });
622 // Use high-budget-usage to trigger DeduplicateAndLinearize
623 let para_text = "identical content paragraph.";
624 for _ in 0..5 {
625 doc.push(DocNode::Para {
626 text: para_text.into(),
627 importance: 1.0,
628 });
629 }
630
631 // budget=10: header already uses ~5 tokens → usage >80% → Compressed mode
632 let transpiler = StreamingTranspiler::new(10, FidelityLevel::Semantic);
633 let chunks: Vec<_> = transpiler
634 .transpile(doc)
635 .collect::<Vec<_>>()
636 .await
637 .into_iter()
638 .map(|r| r.unwrap())
639 .collect();
640
641 assert!(!chunks.is_empty(), "must produce at least one chunk");
642 let final_count = chunks.iter().filter(|c| c.is_final).count();
643 assert_eq!(final_count, 1, "exactly one chunk must be final");
644 }
645}