llm 1.3.8

A Rust library unifying multiple LLM backends.
Documentation
use futures::StreamExt;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;

use llm::chat::StreamResponse;
use llm::error::LLMError;

use crate::conversation::ToolInvocation;
use crate::runtime::{AppEvent, StreamEvent};

use super::helpers::{flush_text, flush_text_if_needed};
use super::manager::StreamRequest;

pub async fn stream_struct(
    request: &StreamRequest,
    sender: &mpsc::Sender<AppEvent>,
    cancel: &CancellationToken,
) -> Result<(), LLMError> {
    let mut stream = request
        .provider
        .chat_stream_struct(&request.messages)
        .await?;
    let mut buffer = String::new();
    while let Some(chunk) = stream.next().await {
        if cancel.is_cancelled() {
            return Ok(());
        }
        handle_chunk(chunk?, request, sender, &mut buffer).await;
    }
    flush_text(&mut buffer, request, sender).await;
    Ok(())
}

async fn handle_chunk(
    chunk: StreamResponse,
    request: &StreamRequest,
    sender: &mpsc::Sender<AppEvent>,
    buffer: &mut String,
) {
    handle_usage(&chunk, request, sender).await;
    handle_delta(&chunk, request, sender, buffer).await;
}

async fn handle_usage(
    chunk: &StreamResponse,
    request: &StreamRequest,
    sender: &mpsc::Sender<AppEvent>,
) {
    let Some(usage) = chunk.usage.clone() else {
        return;
    };
    let event = StreamEvent::Usage {
        conversation_id: request.conversation_id,
        message_id: request.message_id,
        usage,
    };
    let _ = sender.send(AppEvent::Stream(event)).await;
}

async fn handle_delta(
    chunk: &StreamResponse,
    request: &StreamRequest,
    sender: &mpsc::Sender<AppEvent>,
    buffer: &mut String,
) {
    let Some(choice) = chunk.choices.first() else {
        return;
    };
    if let Some(content) = &choice.delta.content {
        buffer.push_str(content);
        flush_text_if_needed(buffer.len(), buffer, request, sender).await;
    }
    if let Some(tool_calls) = &choice.delta.tool_calls {
        for call in tool_calls {
            let invocation = ToolInvocation::from_call(call);
            let event = StreamEvent::ToolCallComplete {
                conversation_id: request.conversation_id,
                invocation,
            };
            let _ = sender.send(AppEvent::Stream(event)).await;
        }
    }
}