agentix 0.15.0

Multi-provider LLM client for Rust — streaming, non-streaming, tool calls, MCP, DeepSeek, OpenAI, Anthropic, Gemini
Documentation
pub mod request;
pub mod response;

use eventsource_stream::Eventsource;
use futures::{StreamExt, stream::BoxStream};
use tracing::debug;

use crate::config::AgentConfig;
use crate::error::ApiError;
use crate::msg::LlmEvent;
use crate::provider::{PostConfig, post_streaming, post_json};
use crate::request::{Message, ToolCall};
use crate::raw::shared::ToolDefinition;
use crate::types::{CompleteResponse, FinishReason, PartialToolCall, StreamBufs, ToolCallChunk, UsageStats};

use response::Response;

pub(crate) async fn stream_gemini(
    token:    &str,
    http:     &reqwest::Client,
    config:   &AgentConfig,
    messages: &[Message],
    tools:    &[ToolDefinition],
) -> Result<BoxStream<'static, LlmEvent>, ApiError> {
    let req = request::build_gemini_request(config, messages, tools);
    let url = format!(
        "{}/models/{}:streamGenerateContent?alt=sse",
        config.base_url.trim_end_matches('/'),
        config.model,
    );

    let resp = post_streaming(http, &url, &req, token, &PostConfig {
        use_query_key:  true,
        auth_header:    None,
        extra_headers:  &[],
        max_retries:    config.max_retries,
        retry_delay_ms: config.retry_delay_ms,
    }).await?;

    Ok(async_stream::stream! {
        let mut bufs = StreamBufs::new();
        let mut sse  = resp.bytes_stream().eventsource();

        while let Some(ev_res) = sse.next().await {
            match ev_res {
                Ok(ev) if ev.data == "[DONE]" => break,
                Ok(ev) => match serde_json::from_str::<Response>(&ev.data) {
                    Ok(chunk) => for lev in parse_chunk(chunk, &mut bufs) { yield lev; },
                    Err(e)    => debug!(data = %ev.data, error = %e, "gemini chunk parse failed"),
                },
                Err(e) => { yield LlmEvent::Error(e.to_string()); break; }
            }
        }
        for tc in finalize(&mut bufs) { yield LlmEvent::ToolCall(tc); }
        yield LlmEvent::Done;
    }.boxed())
}

pub(crate) async fn complete_gemini(
    token:    &str,
    http:     &reqwest::Client,
    config:   &AgentConfig,
    messages: &[Message],
    tools:    &[ToolDefinition],
) -> Result<CompleteResponse, ApiError> {
    let req = request::build_gemini_request(config, messages, tools);
    let url = format!(
        "{}/models/{}:generateContent",
        config.base_url.trim_end_matches('/'),
        config.model,
    );

    let body = post_json(http, &url, &req, token, &PostConfig {
        use_query_key:  true,
        auth_header:    None,
        extra_headers:  &[],
        max_retries:    config.max_retries,
        retry_delay_ms: config.retry_delay_ms,
    }).await?;

    let raw: Response = serde_json::from_str(&body)
        .map_err(ApiError::Json)?;

    let mut content_buf = String::new();
    let mut tool_calls = Vec::new();
    let mut finish_reason = None;

    if let Some(candidate) = raw.candidates.and_then(|mut c| if c.is_empty() { None } else { Some(c.remove(0)) }) {
        finish_reason = candidate.finish_reason.as_deref().map(FinishReason::from);
        for part in candidate.content.parts {
            if let Some(t) = part.text.filter(|s| !s.is_empty()) {
                content_buf.push_str(&t);
            }
            if let Some(fc) = part.function_call {
                tool_calls.push(ToolCall {
                    id: fc.name.clone(),
                    name: fc.name,
                    arguments: serde_json::to_string(&fc.args).unwrap_or_default(),
                });
            }
        }
    }

    Ok(CompleteResponse {
        content: if content_buf.is_empty() { None } else { Some(content_buf) },
        reasoning: None,
        tool_calls,
        usage: raw.usage_metadata.map(UsageStats::from).unwrap_or_default(),
        finish_reason: finish_reason.unwrap_or_default(),
    })
}

fn parse_chunk(chunk: Response, bufs: &mut StreamBufs) -> Vec<LlmEvent> {
    let mut events = Vec::new();
    if let Some(u) = chunk.usage_metadata {
        events.push(LlmEvent::Usage(UsageStats::from(u)));
    }
    let candidate = match chunk.candidates.and_then(|mut c| if c.is_empty() { None } else { Some(c.remove(0)) }) {
        Some(c) => c,
        None    => return events,
    };
    for part in candidate.content.parts {
        if let Some(t) = part.text.filter(|s| !s.is_empty()) {
            bufs.content_buf.push_str(&t);
            events.push(LlmEvent::Token(t));
        }
        if let Some(fc) = part.function_call {
            let idx = bufs.tool_call_bufs.len();
            let args = serde_json::to_string(&fc.args).unwrap_or_default();
            events.push(LlmEvent::ToolCallChunk(ToolCallChunk {
                id: fc.name.clone(), name: fc.name.clone(), delta: String::new(), index: idx as u32,
            }));
            events.push(LlmEvent::ToolCallChunk(ToolCallChunk {
                id: fc.name.clone(), name: fc.name.clone(), delta: args.clone(), index: idx as u32,
            }));
            bufs.tool_call_bufs.push(Some(PartialToolCall {
                id: fc.name.clone(), name: fc.name, arguments: args,
            }));
        }
    }
    events
}

fn finalize(bufs: &mut StreamBufs) -> Vec<ToolCall> {
    bufs.tool_call_bufs.drain(..).flatten().map(|p| ToolCall {
        id: p.id, name: p.name, arguments: p.arguments,
    }).collect()
}