machi 0.8.1

A Web3-native AI Agent Framework
Documentation
//! Ollama stream parsing.

use serde::Deserialize;

use super::client::OllamaToolCall;
use crate::error::{LlmError, Result};
use crate::stream::{StopReason, StreamChunk};
use crate::usage::Usage;

/// Ollama streaming response chunk.
#[derive(Debug, Clone, Deserialize)]
struct OllamaStreamChunk {
    pub message: OllamaStreamMessage,
    pub done: bool,
    #[serde(default)]
    pub done_reason: Option<String>,
    #[serde(default)]
    pub prompt_eval_count: Option<u32>,
    #[serde(default)]
    pub eval_count: Option<u32>,
}

/// Ollama stream message.
#[derive(Debug, Clone, Deserialize)]
struct OllamaStreamMessage {
    #[serde(default)]
    pub content: String,
    #[serde(default)]
    pub tool_calls: Option<Vec<OllamaToolCall>>,
    #[serde(default)]
    pub thinking: Option<String>,
}

/// Parse a streaming response line from Ollama.
pub fn parse_stream_line(line: &str) -> Option<Result<StreamChunk>> {
    let line = line.trim();
    if line.is_empty() {
        return None;
    }

    match serde_json::from_str::<OllamaStreamChunk>(line) {
        Ok(chunk) => Some(Ok(convert_chunk(&chunk))),
        Err(e) => {
            tracing::warn!("Failed to parse Ollama chunk: {e}, line: {line}");
            Some(Err(LlmError::stream(format!("Parse error: {e}")).into()))
        }
    }
}

/// Generate a unique tool call ID.
fn generate_tool_call_id() -> String {
    format!("call_{}", uuid::Uuid::new_v4())
}

/// Convert an Ollama stream chunk to our format.
fn convert_chunk(chunk: &OllamaStreamChunk) -> StreamChunk {
    // Handle completion
    if chunk.done {
        let stop_reason = match chunk.done_reason.as_deref() {
            Some("length") => StopReason::Length,
            // "stop", None, and any other value defaults to Stop
            _ => StopReason::Stop,
        };

        // If there's usage info, return that first
        if let (Some(prompt_count), Some(eval_count)) = (chunk.prompt_eval_count, chunk.eval_count)
        {
            return StreamChunk::Usage(Usage::new(prompt_count, eval_count));
        }

        return StreamChunk::done(Some(stop_reason));
    }

    // Handle thinking/reasoning content from reasoning models
    if let Some(thinking) = &chunk.message.thinking
        && !thinking.is_empty()
    {
        return StreamChunk::reasoning(thinking);
    }

    // Handle tool calls
    if let Some(tool_calls) = &chunk.message.tool_calls
        && let Some((index, tc)) = tool_calls.iter().enumerate().next()
    {
        // For Ollama, tool calls come complete in one chunk
        return StreamChunk::tool_use_start(index, generate_tool_call_id(), &tc.function.name);
    }

    // Handle text content
    if !chunk.message.content.is_empty() {
        return StreamChunk::text(&chunk.message.content);
    }

    // Empty chunk
    StreamChunk::text("")
}