collet 0.1.0

Relentless agentic coding orchestrator with zero-drop agent loops
Documentation
use crate::api::models::ChatChunk;
use anyhow::Result;
use futures::StreamExt;
use tokio::sync::mpsc;

/// Events emitted during streaming.
#[derive(Debug, Clone)]
pub enum StreamEvent {
    /// A text token was received.
    Token(String),
    /// A reasoning/thinking token (preserved for thought continuity).
    Reasoning(String),
    /// A tool call is being accumulated.
    ToolCallStart {
        index: u32,
        id: String,
        name: String,
    },
    /// Tool call arguments chunk.
    ToolCallArgs { index: u32, args_chunk: String },
    /// Stream completed, with optional token usage from the final chunk.
    Done {
        prompt_tokens: u32,
        completion_tokens: u32,
        cached_tokens: u32,
    },
    /// Error occurred.
    // Emitted on parse or network errors during streaming; not yet matched by callers.
    Error(String),
}

/// Process an SSE streaming response and emit events.
pub async fn process_stream(
    response: reqwest::Response,
    tx: mpsc::UnboundedSender<StreamEvent>,
) -> Result<()> {
    let mut stream = response.bytes_stream();
    // Pre-allocate SSE buffer to reduce realloc churn during streaming.
    let mut buffer = String::with_capacity(4096);
    let mut prompt_tokens: u32 = 0;
    let mut completion_tokens: u32 = 0;
    let mut cached_tokens: u32 = 0;

    while let Some(chunk) = stream.next().await {
        let chunk = chunk?;
        buffer.push_str(&String::from_utf8_lossy(&chunk));

        // Process complete SSE lines (handle both \n and \r\n per SSE spec).
        while let Some(line_end) = buffer.find('\n') {
            let line = buffer[..line_end].trim_end_matches('\r').to_string();
            buffer.drain(..=line_end);

            if line.is_empty() || line.starts_with(':') {
                continue;
            }

            if let Some(data) = line.strip_prefix("data: ") {
                if data == "[DONE]" {
                    let _ = tx.send(StreamEvent::Done {
                        prompt_tokens,
                        completion_tokens,
                        cached_tokens,
                    });
                    return Ok(());
                }

                match serde_json::from_str::<ChatChunk>(data) {
                    Ok(chunk) => {
                        tracing::trace!(chunk_id = %chunk.id, "Streaming chunk received");
                        // Capture usage from final chunk (Z.ai / OpenAI send it here).
                        // cached_tokens may appear as top-level field or nested in
                        // prompt_tokens_details depending on provider.
                        if let Some(ref usage) = chunk.usage {
                            tracing::trace!(total_tokens = usage.total_tokens, "Streaming usage");

                            prompt_tokens = usage.prompt_tokens;
                            completion_tokens = usage.completion_tokens;
                            // Prefer nested details; fall back to top-level field.
                            cached_tokens = usage
                                .prompt_tokens_details
                                .as_ref()
                                .map(|d| d.cached_tokens)
                                .unwrap_or(usage.cached_tokens);
                        }

                        for choice in &chunk.choices {
                            // Skip non-primary choices (index > 0) for single-output models.
                            if choice.index > 0 {
                                tracing::trace!(
                                    index = choice.index,
                                    "Skipping non-primary streaming choice"
                                );
                                continue;
                            }
                            if let Some(ref role) = choice.delta.role {
                                tracing::trace!(role = %role, "Streaming delta role");
                            }

                            // Reasoning content (preserved thinking)
                            if let Some(ref reasoning) = choice.delta.reasoning_content {
                                let _ = tx.send(StreamEvent::Reasoning(reasoning.clone()));
                            }

                            // Text content
                            if let Some(ref content) = choice.delta.content {
                                let _ = tx.send(StreamEvent::Token(content.text_content()));
                            }

                            // Tool calls
                            if let Some(ref tool_calls) = choice.delta.tool_calls {
                                for tc in tool_calls {
                                    if let Some(ref id) = tc.id {
                                        let name = tc
                                            .function
                                            .as_ref()
                                            .and_then(|f| f.name.clone())
                                            .unwrap_or_default();
                                        let _ = tx.send(StreamEvent::ToolCallStart {
                                            index: tc.index,
                                            id: id.clone(),
                                            name,
                                        });
                                    }
                                    if let Some(ref func) = tc.function
                                        && let Some(ref args) = func.arguments
                                    {
                                        let _ = tx.send(StreamEvent::ToolCallArgs {
                                            index: tc.index,
                                            args_chunk: args.clone(),
                                        });
                                    }
                                }
                            }

                            if choice.finish_reason.is_some() {
                                let _ = tx.send(StreamEvent::Done {
                                    prompt_tokens,
                                    completion_tokens,
                                    cached_tokens,
                                });
                            }
                        }
                    }
                    Err(e) => {
                        tracing::warn!("Failed to parse SSE chunk: {e}");
                        let _ = tx.send(StreamEvent::Error(format!("Parse error: {e}")));
                    }
                }
            }
        }
    }

    let _ = tx.send(StreamEvent::Done {
        prompt_tokens,
        completion_tokens,
        cached_tokens,
    });
    Ok(())
}