use std::time::Duration;
use futures::stream::{BoxStream, StreamExt};
use serde_json::Value;
use super::base::{ChatCompletionChunk, ChunkChoice};
pub struct LLMGenerator;
impl LLMGenerator {
pub fn parse(line: &str) -> Option<ChatCompletionChunk> {
let trimmed = line.trim();
if trimmed.is_empty() {
return None;
}
let payload = trimmed
.strip_prefix("data:")
.map(|s| s.trim())
.unwrap_or(trimmed);
if payload == "[DONE]" {
return None;
}
serde_json::from_str::<ChatCompletionChunk>(payload).ok()
}
pub fn make_chunk(
content: &str,
model: &str,
chat_id: &str,
last: bool,
) -> ChatCompletionChunk {
let finish_reason = if last { Some("stop".to_string()) } else { None };
let delta = if last {
serde_json::json!({})
} else {
serde_json::json!({"content": content, "role": "assistant"})
};
ChatCompletionChunk {
id: chat_id.to_string(),
object: "chat.completion.chunk".to_string(),
created: chrono::Utc::now().timestamp(),
model: model.to_string(),
choices: vec![ChunkChoice {
index: 0,
delta,
finish_reason,
}],
extras: Default::default(),
}
}
pub fn process<'a>(
stream: BoxStream<'a, Value>,
_model: &str,
delay: Duration,
) -> BoxStream<'a, ChatCompletionChunk> {
let stream = stream.filter_map(move |v| async move {
let chunk: ChatCompletionChunk = serde_json::from_value(v).ok()?;
if !delay.is_zero() {
tokio::time::sleep(delay).await;
}
Some(chunk)
});
Box::pin(stream)
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::stream;
#[test]
fn parse_handles_done_sentinel() {
assert!(LLMGenerator::parse("data: [DONE]").is_none());
assert!(LLMGenerator::parse("").is_none());
}
#[test]
fn parse_decodes_valid_chunk() {
let line = r#"data: {"id":"c1","object":"chat.completion.chunk","created":1,"model":"gpt-4o","choices":[]}"#;
let parsed = LLMGenerator::parse(line).expect("parse");
assert_eq!(parsed.id, "c1");
assert_eq!(parsed.model, "gpt-4o");
}
#[test]
fn make_chunk_tags_last_with_stop() {
let c = LLMGenerator::make_chunk("hi", "gpt-4o", "cid", false);
assert_eq!(c.choices[0].finish_reason, None);
let c2 = LLMGenerator::make_chunk("", "gpt-4o", "cid", true);
assert_eq!(c2.choices[0].finish_reason.as_deref(), Some("stop"));
}
#[tokio::test]
async fn process_filters_unparseable() {
let raw = stream::iter(vec![
serde_json::json!({"id": "c1", "choices": []}),
serde_json::json!({"not": "a chunk"}),
serde_json::json!({"id": "c2", "choices": []}),
]);
let mut out = LLMGenerator::process(Box::pin(raw), "m", Duration::ZERO);
let first = out.next().await.unwrap();
assert_eq!(first.id, "c1");
let _ = out.next().await;
let _ = out.next().await;
}
}