#![allow(dead_code)]
use ratatui::style::{Modifier, Style};
use ratatui::text::{Line, Span};
use std::time::Instant;
use unicode_width::UnicodeWidthStr;
use crate::palette;
pub mod chunking;
pub mod commit_tick;
pub mod line_buffer;
pub use chunking::{AdaptiveChunkingPolicy, ChunkingMode};
pub use commit_tick::{StreamChunker, run_commit_tick};
pub use line_buffer::LineBuffer;
#[derive(Debug, Clone)]
pub struct MarkdownStreamCollector {
buffer: String,
committed_line_count: usize,
width: Option<usize>,
is_streaming: bool,
is_thinking: bool,
}
impl Default for MarkdownStreamCollector {
fn default() -> Self {
Self::new(None, false)
}
}
impl MarkdownStreamCollector {
pub fn new(width: Option<usize>, is_thinking: bool) -> Self {
Self {
buffer: String::new(),
committed_line_count: 0,
width,
is_streaming: true,
is_thinking,
}
}
pub fn push(&mut self, content: &str) {
self.buffer.push_str(content);
}
pub fn current_content(&self) -> &str {
&self.buffer
}
pub fn has_complete_lines(&self) -> bool {
self.buffer.contains('\n')
}
pub fn commit_complete_lines(&mut self) -> Vec<Line<'static>> {
let committed = self.commit_complete_text();
if committed.is_empty() {
return Vec::new();
}
self.render_lines(&committed)
}
pub fn commit_complete_text(&mut self) -> String {
if self.buffer.is_empty() {
return String::new();
}
let Some(last_newline_idx) = self.buffer.rfind('\n') else {
return String::new(); };
let complete_portion = self.buffer[..=last_newline_idx].to_string();
self.buffer = self.buffer[last_newline_idx + 1..].to_string();
self.committed_line_count = 0;
complete_portion
}
pub fn finalize(&mut self) -> Vec<Line<'static>> {
let remaining = self.finalize_text();
if remaining.is_empty() {
return Vec::new();
}
self.render_lines(&remaining)
}
pub fn finalize_text(&mut self) -> String {
self.is_streaming = false;
if self.buffer.is_empty() {
return String::new();
}
let remaining = self.buffer.clone();
self.buffer.clear();
self.committed_line_count = 0;
remaining
}
pub fn all_lines(&self) -> Vec<Line<'static>> {
self.render_lines(&self.buffer)
}
fn render_lines(&self, content: &str) -> Vec<Line<'static>> {
let width = self.width.unwrap_or(80);
let style = if self.is_thinking {
Style::default()
.fg(palette::STATUS_WARNING)
.add_modifier(Modifier::DIM | Modifier::ITALIC)
} else {
Style::default()
};
let mut lines = Vec::new();
for line in content.lines() {
let wrapped = wrap_line(line, width);
for wrapped_line in wrapped {
lines.push(Line::from(Span::styled(wrapped_line, style)));
}
}
if content.ends_with('\n') {
lines.push(Line::from(""));
}
lines
}
pub fn is_streaming(&self) -> bool {
self.is_streaming
}
pub fn buffer_len(&self) -> usize {
self.buffer.len()
}
pub fn clear(&mut self) {
self.buffer.clear();
self.committed_line_count = 0;
}
}
fn wrap_line(line: &str, width: usize) -> Vec<String> {
if line.is_empty() {
return vec![String::new()];
}
let mut result = Vec::new();
let mut current_line = String::new();
let mut current_width = 0;
for word in line.split_whitespace() {
let word_width = word.width();
if current_width == 0 {
current_line = word.to_string();
current_width = word_width;
} else if current_width + 1 + word_width <= width {
current_line.push(' ');
current_line.push_str(word);
current_width += 1 + word_width;
} else {
result.push(current_line);
current_line = word.to_string();
current_width = word_width;
}
}
if !current_line.is_empty() {
result.push(current_line);
}
if result.is_empty() {
vec![String::new()]
} else {
result
}
}
#[derive(Debug, Default)]
struct BlockState {
line_buffer: LineBuffer,
bypass_gate: bool,
collector: MarkdownStreamCollector,
chunker: StreamChunker,
policy: AdaptiveChunkingPolicy,
}
#[derive(Debug, Default)]
pub struct StreamingState {
blocks: Vec<Option<BlockState>>,
pub is_active: bool,
pub accumulated_text: String,
pub accumulated_thinking: String,
}
impl StreamingState {
pub fn new() -> Self {
Self::default()
}
pub fn start_text(&mut self, index: usize, width: Option<usize>) {
self.ensure_capacity(index);
self.blocks[index] = Some(BlockState {
line_buffer: LineBuffer::new(),
bypass_gate: true,
collector: MarkdownStreamCollector::new(width, false),
chunker: StreamChunker::new(),
policy: AdaptiveChunkingPolicy::new(),
});
self.is_active = true;
}
pub fn start_thinking(&mut self, index: usize, width: Option<usize>) {
self.ensure_capacity(index);
self.blocks[index] = Some(BlockState {
line_buffer: LineBuffer::new(),
bypass_gate: true,
collector: MarkdownStreamCollector::new(width, true),
chunker: StreamChunker::new(),
policy: AdaptiveChunkingPolicy::new(),
});
self.is_active = true;
}
pub fn push_content(&mut self, index: usize, content: &str) {
if let Some(Some(block)) = self.blocks.get_mut(index) {
if block.collector.is_thinking {
self.accumulated_thinking.push_str(content);
} else {
self.accumulated_text.push_str(content);
}
let downstream: String = if block.bypass_gate {
content.to_string()
} else {
block.line_buffer.push(content);
block.line_buffer.take_committable()
};
if downstream.is_empty() {
return;
}
if block.bypass_gate {
block.chunker.push_delta(&downstream);
} else {
block.collector.push(&downstream);
let committed = block.collector.commit_complete_text();
if !committed.is_empty() {
block.chunker.push_delta(&committed);
}
}
}
}
pub fn commit_lines(&mut self, index: usize) -> Vec<Line<'static>> {
let text = self.commit_text(index);
if text.is_empty() {
return Vec::new();
}
let style = if self
.blocks
.get(index)
.and_then(|b| b.as_ref())
.is_some_and(|b| b.collector.is_thinking)
{
Style::default()
.fg(palette::STATUS_WARNING)
.add_modifier(Modifier::DIM | Modifier::ITALIC)
} else {
Style::default()
};
let mut lines = Vec::new();
for line in text.lines() {
lines.push(Line::from(Span::styled(line.to_string(), style)));
}
if text.ends_with('\n') {
lines.push(Line::from(""));
}
lines
}
pub fn commit_text(&mut self, index: usize) -> String {
if let Some(Some(block)) = self.blocks.get_mut(index) {
let now = Instant::now();
let out = run_commit_tick(&mut block.policy, &mut block.chunker, now);
out.committed_text
} else {
String::new()
}
}
pub fn chunking_mode(&self, index: usize) -> Option<ChunkingMode> {
self.blocks
.get(index)
.and_then(|b| b.as_ref())
.map(|b| b.policy.mode())
}
pub fn has_pending_chunker_lines(&self, index: usize) -> bool {
self.blocks
.get(index)
.and_then(|b| b.as_ref())
.is_some_and(|b| b.chunker.queued_lines() > 0)
}
pub fn finalize_block(&mut self, index: usize) -> Vec<Line<'static>> {
let text = self.finalize_block_text(index);
if text.is_empty() {
return Vec::new();
}
let style = if self
.blocks
.get(index)
.and_then(|b| b.as_ref())
.is_some_and(|b| b.collector.is_thinking)
{
Style::default()
.fg(palette::STATUS_WARNING)
.add_modifier(Modifier::DIM | Modifier::ITALIC)
} else {
Style::default()
};
let mut lines = Vec::new();
for line in text.lines() {
lines.push(Line::from(Span::styled(line.to_string(), style)));
}
if text.ends_with('\n') {
lines.push(Line::from(""));
}
lines
}
pub fn finalize_block_text(&mut self, index: usize) -> String {
if let Some(Some(block)) = self.blocks.get_mut(index) {
let gate_tail = block.line_buffer.flush();
if !gate_tail.is_empty() {
block.collector.push(&gate_tail);
}
let post_flush = block.collector.commit_complete_text();
if !post_flush.is_empty() {
block.chunker.push_delta(&post_flush);
}
let tail = block.collector.finalize_text();
let mut out = block.chunker.drain_remaining();
if !tail.is_empty() {
out.push_str(&tail);
}
self.check_active();
out
} else {
String::new()
}
}
pub fn finalize_all(&mut self) -> Vec<(usize, Vec<Line<'static>>)> {
let mut result = Vec::new();
let len = self.blocks.len();
for i in 0..len {
let lines = self.finalize_block(i);
if !lines.is_empty() {
result.push((i, lines));
}
}
self.is_active = false;
result
}
pub fn set_low_motion(&mut self, low_motion: bool) {
for block in self.blocks.iter_mut().flatten() {
block.policy.set_low_motion(low_motion);
}
}
fn check_active(&mut self) {
self.is_active = self.blocks.iter().any(|b| {
b.as_ref()
.is_some_and(|state| state.collector.is_streaming())
});
}
fn ensure_capacity(&mut self, index: usize) {
while self.blocks.len() <= index {
self.blocks.push(None);
}
}
pub fn reset(&mut self) {
self.blocks.clear();
self.is_active = false;
self.accumulated_text.clear();
self.accumulated_thinking.clear();
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_commit_complete_lines() {
let mut collector = MarkdownStreamCollector::new(Some(80), false);
collector.push("Hello ");
let lines = collector.commit_complete_lines();
assert!(lines.is_empty());
collector.push("World\n");
let lines = collector.commit_complete_lines();
assert_eq!(lines.len(), 2);
collector.push("Second line");
let lines = collector.commit_complete_lines();
assert!(lines.is_empty());
let lines = collector.finalize();
assert_eq!(lines.len(), 1); }
#[test]
fn test_wrap_line() {
let result = wrap_line("This is a long line that should be wrapped", 20);
assert!(result.len() > 1);
}
#[test]
fn assistant_text_streams_before_newline() {
let mut state = StreamingState::new();
state.start_text(0, None);
state.push_content(0, "hello world");
assert_eq!(state.commit_text(0), "h");
assert_eq!(state.commit_text(0), "e");
assert!(state.has_pending_chunker_lines(0));
}
#[test]
fn thinking_text_streams_before_newline() {
let mut state = StreamingState::new();
state.start_thinking(0, None);
state.push_content(0, "thinking deeply");
assert_eq!(state.commit_text(0), "t");
assert_eq!(state.commit_text(0), "h");
assert!(state.has_pending_chunker_lines(0));
}
#[test]
fn finalize_preserves_uncommitted_micro_chunks() {
let mut state = StreamingState::new();
state.start_text(0, None);
state.push_content(0, "abc");
assert_eq!(state.commit_text(0), "a");
assert_eq!(state.finalize_block_text(0), "bc");
}
}