ollama_oxide/http/streaming.rs
1//! Streaming response types for NDJSON APIs (e.g. `POST /api/chat` with `stream: true`).
2
3use std::io::{BufRead, BufReader};
4
5use crate::{ChatResponse, Error, Result};
6
7/// Async stream of [`ChatResponse`] events from a streaming chat request.
8///
9/// Each [`next`](Self::next) yields one NDJSON line deserialized as [`ChatResponse`].
10/// When the server closes the body, [`next`](Self::next) returns `None`.
11///
12/// # Examples
13///
14/// ```no_run
15/// use ollama_oxide::{ChatMessage, ChatRequest, OllamaApiAsync, OllamaClient};
16///
17/// #[tokio::main]
18/// async fn main() -> ollama_oxide::Result<()> {
19/// let client = OllamaClient::default()?;
20/// let request = ChatRequest::new("qwen3:0.6b", [ChatMessage::user("Hi!")]);
21/// let stream = client.chat_stream(&request).await?;
22/// while let Some(event) = stream.next().await {
23/// let chunk = event?;
24/// if let Some(s) = chunk.content() {
25/// print!("{}", s);
26/// }
27/// }
28/// Ok(())
29/// }
30/// ```
31pub struct ChatStream {
32 rx: tokio::sync::Mutex<tokio::sync::mpsc::Receiver<Result<ChatResponse>>>,
33}
34
35impl ChatStream {
36 /// Wraps a channel receiver produced by the HTTP client streaming helper.
37 pub(crate) fn new(rx: tokio::sync::mpsc::Receiver<Result<ChatResponse>>) -> Self {
38 Self {
39 rx: tokio::sync::Mutex::new(rx),
40 }
41 }
42
43 /// Returns the next event, or `None` when the stream has ended.
44 pub async fn next(&self) -> Option<Result<ChatResponse>> {
45 self.rx.lock().await.recv().await
46 }
47
48 /// Collects all events into a vector, stopping on the first error.
49 pub async fn collect(self) -> Result<Vec<ChatResponse>> {
50 let mut out = Vec::new();
51 let mut rx = self.rx.into_inner();
52 while let Some(item) = rx.recv().await {
53 match item {
54 Ok(v) => out.push(v),
55 Err(e) => return Err(e),
56 }
57 }
58 Ok(out)
59 }
60}
61
62/// Blocking iterator over [`ChatResponse`] events from a streaming chat request.
63///
64/// Implements [`Iterator`] so you can use `for`/`while let` over events.
65///
66/// # Examples
67///
68/// ```no_run
69/// use ollama_oxide::{ChatMessage, ChatRequest, OllamaApiSync, OllamaClient};
70///
71/// fn main() -> ollama_oxide::Result<()> {
72/// let client = OllamaClient::default()?;
73/// let request = ChatRequest::new("qwen3:0.6b", [ChatMessage::user("Hi!")]);
74/// let stream = client.chat_stream_blocking(&request)?;
75/// for event in stream {
76/// let chunk = event?;
77/// if let Some(s) = chunk.content() {
78/// print!("{}", s);
79/// }
80/// }
81/// Ok(())
82/// }
83/// ```
84pub struct ChatStreamBlocking {
85 lines: std::io::Lines<BufReader<reqwest::blocking::Response>>,
86}
87
88impl ChatStreamBlocking {
89 /// Builds a line iterator over the blocking response body.
90 pub(crate) fn new(response: reqwest::blocking::Response) -> Self {
91 Self {
92 lines: BufReader::new(response).lines(),
93 }
94 }
95}
96
97impl Iterator for ChatStreamBlocking {
98 type Item = Result<ChatResponse>;
99
100 fn next(&mut self) -> Option<Self::Item> {
101 loop {
102 match self.lines.next() {
103 None => return None,
104 Some(Err(e)) => return Some(Err(Error::StreamError(e.to_string()))),
105 Some(Ok(line)) => {
106 let trimmed = line.trim();
107 if trimmed.is_empty() {
108 continue;
109 }
110 return Some(
111 serde_json::from_str::<ChatResponse>(trimmed)
112 .map_err(|e| Error::StreamError(e.to_string())),
113 );
114 }
115 }
116 }
117 }
118}