use std::pin::Pin;
use futures::Stream;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use crate::compressor::{AdaptiveCompressor, CompressionConfig};
use crate::ir::{DocNode, FidelityLevel, IRDocument};
use crate::renderer::render_node;
use crate::symbol::SymbolDict;
#[derive(Debug, Clone)]
pub struct TranspileChunk {
pub sequence: usize,
pub content: String,
pub token_count: usize,
pub is_final: bool,
}
impl TranspileChunk {
fn new(sequence: usize, content: String, is_final: bool) -> Self {
let token_count = estimate_tokens(&content);
Self {
sequence,
content,
token_count,
is_final,
}
}
}
pub fn estimate_tokens(text: &str) -> usize {
#[cfg(feature = "tiktoken")]
{
use std::sync::OnceLock;
static BPE: OnceLock<tiktoken_rs::CoreBPE> = OnceLock::new();
let bpe = BPE.get_or_init(|| tiktoken_rs::cl100k_base().expect("cl100k_base init failed"));
bpe.encode_ordinary(text).len().max(1)
}
#[cfg(not(feature = "tiktoken"))]
{
let mut total = 0.0f64;
for c in text.chars() {
let cpt = chars_per_token(c);
total += 1.0 / cpt as f64;
}
(total.ceil() as usize).max(1)
}
}
#[cfg(not(feature = "tiktoken"))]
fn chars_per_token(c: char) -> u32 {
let cp = c as u32;
match cp {
0x3040..=0x30FF => 2, 0x3400..=0x4DBF => 2, 0x4E00..=0x9FFF => 2, 0xF900..=0xFAFF => 2, 0xAC00..=0xD7FF => 2, 0x1100..=0x11FF => 2, 0xA960..=0xA97F => 2, 0x20000..=0x2A6DF => 2, 0x2A700..=0x2CEAF => 2, 0x2CEB0..=0x2EBEF => 2, 0x30000..=0x323AF => 2, 0x0600..=0x06FF => 3, 0x0750..=0x077F => 3, 0x0900..=0x097F => 3, 0x0980..=0x09FF => 3, 0x0A00..=0x0A7F => 3, 0x0B80..=0x0BFF => 3, 0x0E00..=0x0E7F => 3, 0x1F300..=0x1F9FF => 2, 0x1FA00..=0x1FAFF => 2, _ => 4, }
}
const DEFAULT_CHANNEL_BUFFER: usize = 32;
pub struct StreamingTranspiler {
compressor: AdaptiveCompressor,
budget: usize,
fidelity: FidelityLevel,
dict: SymbolDict,
channel_buffer: usize,
}
impl StreamingTranspiler {
pub fn new(budget: usize, fidelity: FidelityLevel) -> Self {
Self {
compressor: AdaptiveCompressor::new(),
budget,
fidelity,
dict: SymbolDict::new(),
channel_buffer: DEFAULT_CHANNEL_BUFFER,
}
}
pub fn with_dict(budget: usize, fidelity: FidelityLevel, dict: SymbolDict) -> Self {
Self {
compressor: AdaptiveCompressor::new(),
budget,
fidelity,
dict,
channel_buffer: DEFAULT_CHANNEL_BUFFER,
}
}
pub fn with_channel_size(mut self, n: usize) -> Self {
self.channel_buffer = n.max(1);
self
}
pub fn transpile(
self,
doc: IRDocument,
) -> Pin<Box<dyn Stream<Item = Result<TranspileChunk, StreamError>> + Send>> {
let (tx, rx) =
mpsc::channel::<Result<TranspileChunk, StreamError>>(self.channel_buffer);
let stream = ReceiverStream::new(rx);
tokio::spawn(async move {
if let Err(e) = Self::run_pipeline(
doc,
self.budget,
self.fidelity,
self.compressor,
self.dict,
tx,
)
.await
{
let _ = e;
}
});
Box::pin(stream)
}
async fn run_pipeline(
doc: IRDocument,
budget: usize,
fidelity: FidelityLevel,
compressor: AdaptiveCompressor,
dict: SymbolDict,
tx: mpsc::Sender<Result<TranspileChunk, StreamError>>,
) -> Result<(), StreamError> {
let mut accumulated_tokens: usize = 0;
let mut sequence: usize = 0;
let header_content = build_header_chunk(&doc, &dict);
accumulated_tokens += estimate_tokens(&header_content);
let total_nodes = doc.nodes.len();
let is_final_header = total_nodes == 0;
tx.send(Ok(TranspileChunk::new(
sequence,
header_content,
is_final_header,
)))
.await
.map_err(|_| StreamError::ChannelClosed)?;
sequence += 1;
if is_final_header {
return Ok(());
}
let body_nodes: Vec<DocNode> = doc
.nodes
.into_iter()
.filter(|n| !matches!(n, crate::ir::DocNode::Metadata { .. }))
.collect();
let usage_after_header = if budget > 0 {
accumulated_tokens as f64 / budget as f64
} else {
1.0
};
let batch_fidelity = if fidelity != FidelityLevel::Lossless && usage_after_header >= 0.80 {
FidelityLevel::Compressed
} else {
fidelity
};
let batch_cfg = CompressionConfig {
budget,
current_tokens: accumulated_tokens,
fidelity: batch_fidelity,
};
let body_nodes = compressor.compress(body_nodes, &batch_cfg);
let body_len = body_nodes.len();
for (idx, node) in body_nodes.into_iter().enumerate() {
let is_last = body_len > 0 && idx == body_len - 1;
let chunk_text = render_node(&node, &dict);
if chunk_text.is_empty() {
continue; }
let tokens = estimate_tokens(&chunk_text);
accumulated_tokens += tokens;
let force_final = budget > 0 && accumulated_tokens >= budget;
let is_final = is_last || force_final;
let content = if is_final {
format!("{}\n</B>", chunk_text.trim())
} else {
chunk_text
};
tx.send(Ok(TranspileChunk::new(sequence, content, is_final)))
.await
.map_err(|_| StreamError::ChannelClosed)?;
sequence += 1;
if force_final {
break;
}
}
if sequence == 1 {
tx.send(Ok(TranspileChunk::new(sequence, "</B>".to_string(), true)))
.await
.map_err(|_| StreamError::ChannelClosed)?;
}
Ok(())
}
}
fn build_header_chunk(doc: &IRDocument, dict: &SymbolDict) -> String {
let dict_block = dict.render_dict_header();
let yaml = crate::renderer::build_yaml_header(doc);
let mut out = String::new();
if !dict_block.is_empty() {
out.push_str(&dict_block);
}
if !yaml.is_empty() {
out.push_str("<H>\n");
out.push_str(yaml.trim());
out.push_str("\n</H>\n");
}
out.push_str("<B>");
out
}
#[derive(Debug, thiserror::Error)]
pub enum StreamError {
#[error("stream channel closed")]
ChannelClosed,
#[error("parse failed: {0}")]
Parse(String),
#[error("input exceeds maximum allowed size of {0} bytes")]
InputTooLarge(usize),
}
#[cfg(test)]
mod tests {
use super::*;
use crate::ir::DocNode;
use futures::StreamExt;
fn make_doc(fidelity: FidelityLevel, paras: &[&str]) -> IRDocument {
let mut doc = IRDocument::new(fidelity, None);
doc.push(DocNode::Metadata {
key: "title".into(),
value: "스트리밍 테스트".into(),
});
for (i, &text) in paras.iter().enumerate() {
doc.push(DocNode::Para {
text: text.into(),
importance: 1.0 - (i as f32 * 0.1),
});
}
doc
}
#[tokio::test]
async fn first_chunk_contains_header() {
let doc = make_doc(FidelityLevel::Semantic, &["첫 번째 단락"]);
let transpiler = StreamingTranspiler::new(10_000, FidelityLevel::Semantic);
let mut stream = transpiler.transpile(doc);
let first = stream.next().await.unwrap().unwrap();
assert_eq!(first.sequence, 0);
assert!(
first.content.contains("<H>"),
"first chunk must contain the header"
);
assert!(
first.content.contains("<B>"),
"first chunk must contain the <B> opening"
);
}
#[tokio::test]
async fn last_chunk_is_marked_final() {
let doc = make_doc(FidelityLevel::Semantic, &["단락A", "단락B"]);
let transpiler = StreamingTranspiler::new(10_000, FidelityLevel::Semantic);
let mut stream = transpiler.transpile(doc);
let mut last_chunk = None;
while let Some(chunk) = stream.next().await {
last_chunk = Some(chunk.unwrap());
}
let last = last_chunk.expect("at least one chunk must exist");
assert!(last.is_final, "last chunk must have is_final=true");
}
#[tokio::test]
async fn budget_triggers_force_final() {
let doc = make_doc(
FidelityLevel::Semantic,
&["긴 내용 단락1", "긴 내용 단락2", "긴 내용 단락3"],
);
let transpiler = StreamingTranspiler::new(5, FidelityLevel::Semantic); let chunks: Vec<_> = transpiler.transpile(doc).collect::<Vec<_>>().await;
let finals: Vec<_> = chunks
.iter()
.filter(|c| c.as_ref().unwrap().is_final)
.collect();
assert_eq!(finals.len(), 1, "exactly one chunk must have is_final=true");
}
#[tokio::test]
async fn with_dict_emits_dict_block_in_first_chunk() {
let mut dict = SymbolDict::new();
dict.intern("대규모언어모델").unwrap();
let doc = make_doc(FidelityLevel::Semantic, &["대규모언어모델 연구 동향"]);
let transpiler = StreamingTranspiler::with_dict(10_000, FidelityLevel::Semantic, dict);
let mut stream = transpiler.transpile(doc);
let first = stream.next().await.unwrap().unwrap();
assert!(
first.content.contains("<D>"),
"first chunk must contain the <D> dictionary block when dict is pre-populated"
);
assert!(
first.content.contains("대규모언어모델"),
"dictionary block must list the interned term"
);
}
#[tokio::test]
async fn custom_channel_size_streaming_works() {
let doc = make_doc(
FidelityLevel::Semantic,
&["단락 one", "단락 two", "단락 three"],
);
let transpiler = StreamingTranspiler::new(10_000, FidelityLevel::Semantic)
.with_channel_size(4);
let chunks: Vec<_> = transpiler
.transpile(doc)
.collect::<Vec<_>>()
.await
.into_iter()
.map(|r| r.unwrap())
.collect();
assert!(!chunks.is_empty(), "must produce at least one chunk");
let final_count = chunks.iter().filter(|c| c.is_final).count();
assert_eq!(final_count, 1, "exactly one chunk must be final");
let last = chunks.last().unwrap();
assert!(last.is_final);
assert!(last.content.contains("</B>"), "last chunk must close <B>");
}
#[test]
fn estimate_tokens_nonzero() {
assert!(estimate_tokens("hello world") > 0);
assert!(estimate_tokens("") == 1); }
#[test]
fn estimate_tokens_empty_is_one() {
assert_eq!(estimate_tokens(""), 1);
}
#[test]
fn estimate_tokens_latin_positive() {
assert!(estimate_tokens("hello") > 0);
}
#[test]
#[cfg(not(feature = "tiktoken"))]
fn estimate_tokens_cjk_more_than_latin_same_char_count() {
let cjk = estimate_tokens("こんにちは"); let latin = estimate_tokens("hello"); assert!(
cjk > latin,
"CJK 5 chars ({cjk}) must have more tokens than Latin 5 chars ({latin})"
);
}
#[test]
#[cfg(not(feature = "tiktoken"))]
fn estimate_tokens_hangul_more_than_latin() {
let hangul = estimate_tokens("안녕하세");
let latin = estimate_tokens("hell");
assert!(
hangul > latin,
"Hangul ({hangul}) must have more tokens than Latin ({latin})"
);
}
#[test]
fn estimate_tokens_never_zero_for_nonempty() {
for text in &["a", "안", "あ", "ع", "क", "ก"] {
assert!(
estimate_tokens(text) >= 1,
"'{text}' must be at least 1 token"
);
}
}
#[tokio::test]
async fn batch_compression_deduplicates_identical_paras() {
let mut doc = IRDocument::new(FidelityLevel::Semantic, None);
doc.push(DocNode::Metadata {
key: "title".into(),
value: "배치 압축 테스트".into(),
});
let para_text = "identical content paragraph.";
for _ in 0..5 {
doc.push(DocNode::Para {
text: para_text.into(),
importance: 1.0,
});
}
let transpiler = StreamingTranspiler::new(10, FidelityLevel::Semantic);
let chunks: Vec<_> = transpiler
.transpile(doc)
.collect::<Vec<_>>()
.await
.into_iter()
.map(|r| r.unwrap())
.collect();
assert!(!chunks.is_empty(), "must produce at least one chunk");
let final_count = chunks.iter().filter(|c| c.is_final).count();
assert_eq!(final_count, 1, "exactly one chunk must be final");
}
}