Skip to main content

ollama_oxide/http/
streaming.rs

1//! Streaming response types for NDJSON APIs (e.g. `POST /api/chat` with `stream: true`).
2
3use std::io::{BufRead, BufReader};
4
5use crate::{ChatResponse, Error, Result};
6
7/// Async stream of [`ChatResponse`] events from a streaming chat request.
8///
9/// Each [`next`](Self::next) yields one NDJSON line deserialized as [`ChatResponse`].
10/// When the server closes the body, [`next`](Self::next) returns `None`.
11///
12/// # Examples
13///
14/// ```no_run
15/// use ollama_oxide::{ChatMessage, ChatRequest, OllamaApiAsync, OllamaClient};
16///
17/// #[tokio::main]
18/// async fn main() -> ollama_oxide::Result<()> {
19///     let client = OllamaClient::default()?;
20///     let request = ChatRequest::new("qwen3:0.6b", [ChatMessage::user("Hi!")]);
21///     let stream = client.chat_stream(&request).await?;
22///     while let Some(event) = stream.next().await {
23///         let chunk = event?;
24///         if let Some(s) = chunk.content() {
25///             print!("{}", s);
26///         }
27///     }
28///     Ok(())
29/// }
30/// ```
31pub struct ChatStream {
32    rx: tokio::sync::Mutex<tokio::sync::mpsc::Receiver<Result<ChatResponse>>>,
33}
34
35impl ChatStream {
36    /// Wraps a channel receiver produced by the HTTP client streaming helper.
37    pub(crate) fn new(rx: tokio::sync::mpsc::Receiver<Result<ChatResponse>>) -> Self {
38        Self {
39            rx: tokio::sync::Mutex::new(rx),
40        }
41    }
42
43    /// Returns the next event, or `None` when the stream has ended.
44    pub async fn next(&self) -> Option<Result<ChatResponse>> {
45        self.rx.lock().await.recv().await
46    }
47
48    /// Collects all events into a vector, stopping on the first error.
49    pub async fn collect(self) -> Result<Vec<ChatResponse>> {
50        let mut out = Vec::new();
51        let mut rx = self.rx.into_inner();
52        while let Some(item) = rx.recv().await {
53            match item {
54                Ok(v) => out.push(v),
55                Err(e) => return Err(e),
56            }
57        }
58        Ok(out)
59    }
60}
61
62/// Blocking iterator over [`ChatResponse`] events from a streaming chat request.
63///
64/// Implements [`Iterator`] so you can use `for`/`while let` over events.
65///
66/// # Examples
67///
68/// ```no_run
69/// use ollama_oxide::{ChatMessage, ChatRequest, OllamaApiSync, OllamaClient};
70///
71/// fn main() -> ollama_oxide::Result<()> {
72///     let client = OllamaClient::default()?;
73///     let request = ChatRequest::new("qwen3:0.6b", [ChatMessage::user("Hi!")]);
74///     let stream = client.chat_stream_blocking(&request)?;
75///     for event in stream {
76///         let chunk = event?;
77///         if let Some(s) = chunk.content() {
78///             print!("{}", s);
79///         }
80///     }
81///     Ok(())
82/// }
83/// ```
84pub struct ChatStreamBlocking {
85    lines: std::io::Lines<BufReader<reqwest::blocking::Response>>,
86}
87
88impl ChatStreamBlocking {
89    /// Builds a line iterator over the blocking response body.
90    pub(crate) fn new(response: reqwest::blocking::Response) -> Self {
91        Self {
92            lines: BufReader::new(response).lines(),
93        }
94    }
95}
96
97impl Iterator for ChatStreamBlocking {
98    type Item = Result<ChatResponse>;
99
100    fn next(&mut self) -> Option<Self::Item> {
101        loop {
102            match self.lines.next() {
103                None => return None,
104                Some(Err(e)) => return Some(Err(Error::StreamError(e.to_string()))),
105                Some(Ok(line)) => {
106                    let trimmed = line.trim();
107                    if trimmed.is_empty() {
108                        continue;
109                    }
110                    return Some(
111                        serde_json::from_str::<ChatResponse>(trimmed)
112                            .map_err(|e| Error::StreamError(e.to_string())),
113                    );
114                }
115            }
116        }
117    }
118}