use bytes::Bytes;
use futures::Stream;
use std::pin::Pin;
use std::task::{Context, Poll};
use crate::errors::Result;
use crate::types::chat::{ChatResponse, ChatStreamEvent};
use crate::types::OllamaError;
pub struct ChatStreamParser<S>
where
S: Stream<Item = Result<Bytes>> + Send + Unpin,
{
inner: S,
buffer: Vec<u8>,
}
impl<S> ChatStreamParser<S>
where
S: Stream<Item = Result<Bytes>> + Send + Unpin,
{
pub fn new(stream: S) -> Self {
Self {
inner: stream,
buffer: Vec::new(),
}
}
fn parse_lines(&mut self) -> Option<Result<ChatStreamEvent>> {
loop {
let newline_pos = self.buffer.iter().position(|&b| b == b'\n')?;
let line_bytes = self.buffer.drain(..=newline_pos).collect::<Vec<u8>>(); let line_str = String::from_utf8_lossy(&line_bytes);
let line_str = line_str.trim();
if line_str.is_empty() {
continue; }
match serde_json::from_str::<ChatResponse>(line_str) {
Ok(event) => return Some(Ok(ChatStreamEvent::Message(event))),
Err(e) => match serde_json::from_str::<OllamaError>(line_str) {
Ok(error) => return Some(Ok(ChatStreamEvent::Error(error.error))),
Err(_) => {
return Some(Ok(ChatStreamEvent::Partial {
partial: line_str.to_string(),
error: e.to_string().into(),
}));
}
},
}
}
}
}
impl<S> Stream for ChatStreamParser<S>
where
S: Stream<Item = Result<Bytes>> + Send + Unpin,
{
type Item = Result<ChatStreamEvent>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
if let Some(event) = self.parse_lines() {
return Poll::Ready(Some(event));
}
if self.buffer.is_empty() {
match Pin::new(&mut self.inner).poll_next(cx) {
Poll::Ready(Some(Ok(bytes))) => {
self.buffer.extend_from_slice(&bytes);
continue; }
Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))),
Poll::Ready(None) => return Poll::Ready(None), Poll::Pending => return Poll::Pending,
}
} else {
match Pin::new(&mut self.inner).poll_next(cx) {
Poll::Ready(Some(Ok(bytes))) => {
self.buffer.extend_from_slice(&bytes);
continue;
}
Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))),
Poll::Ready(None) => {
let content = String::from_utf8_lossy(&self.buffer).to_string();
self.buffer.clear();
if !content.trim().is_empty() {
return Poll::Ready(Some(Ok(ChatStreamEvent::Partial {
partial: content,
error: None,
})));
}
return Poll::Ready(None);
}
Poll::Pending => return Poll::Pending,
}
}
}
}
}