llmkit-ollama 0.1.0

Ollama (local Llama/Mistral) provider adapter for llmkit-rs
Documentation
//! Ollama NDJSON → [`StreamDelta`] parsing.
//!
//! Ollama streams newline-delimited JSON objects (not SSE). Each line is a
//! [`ChatResponseBody`]; the final one has `done: true` and carries the token
//! counts.

use futures::StreamExt;
use llmkit_core::{ChatStream, LlmError, StreamDelta, TokenUsage};

use crate::types::ChatResponseBody;

pub(crate) fn parse(resp: reqwest::Response) -> ChatStream {
    let stream = async_stream::stream! {
        let mut bytes = resp.bytes_stream();
        let mut buf: Vec<u8> = Vec::new();
        let mut usage = TokenUsage::default();

        'outer: while let Some(chunk) = bytes.next().await {
            let chunk = match chunk {
                Ok(c) => c,
                Err(e) => {
                    yield Err(LlmError::Stream(e.to_string()));
                    return;
                }
            };
            buf.extend_from_slice(&chunk);

            // Drain whole lines from the buffer.
            while let Some(pos) = buf.iter().position(|&b| b == b'\n') {
                let line: Vec<u8> = buf.drain(..=pos).collect();
                let line = &line[..line.len() - 1]; // strip '\n'
                if line.is_empty() {
                    continue;
                }

                let obj: ChatResponseBody = match serde_json::from_slice(line) {
                    Ok(o) => o,
                    Err(e) => {
                        yield Err(LlmError::Stream(format!("malformed ndjson: {e}")));
                        continue;
                    }
                };

                if let Some(msg) = &obj.message {
                    if !msg.content.is_empty() {
                        yield Ok(StreamDelta::Text(msg.content.clone()));
                    }
                    if let Some(calls) = &msg.tool_calls {
                        for c in calls {
                            yield Ok(StreamDelta::ToolCall {
                                id: None,
                                name: Some(c.function.name.clone()),
                                input_delta: c.function.arguments.to_string(),
                            });
                        }
                    }
                }

                if obj.done {
                    usage = TokenUsage::new(
                        obj.prompt_eval_count.unwrap_or(0),
                        obj.eval_count.unwrap_or(0),
                    );
                    break 'outer;
                }
            }
        }

        yield Ok(StreamDelta::Done { usage });
    };

    Box::pin(stream)
}