forge-guardrails 0.1.0

Foundation types for an LLM-agent workflow framework
Documentation
use std::sync::Arc;

use futures_util::StreamExt;
use serde_json::Value;

use crate::clients::base::{ChunkType, LLMClient, LLMRequestOptions, LLMResponse};
use crate::core::tool_spec::ToolSpec;
use crate::error::StreamError;

use super::response_shape::text_response_result;
use super::HandlerResult;

/// Forward requests to LLM Client without tools or guardrails.
pub async fn run_passthrough<C: LLMClient + 'static>(
    client: &Arc<C>,
    serialized: &[Value],
    _tools: Option<Vec<ToolSpec>>,
    options: LLMRequestOptions,
    model_name: &str,
    stream: bool,
    stream_include_usage: bool,
) -> Result<HandlerResult, String> {
    if stream {
        return run_passthrough_stream(
            client,
            serialized,
            options,
            model_name,
            stream_include_usage,
        )
        .await;
    }

    let envelope = client
        .send_envelope_with_options(serialized.to_vec(), None, options)
        .await
        .map_err(|e| e.to_string())?;
    let usage = envelope.usage;
    let usage_details = envelope.usage_details;

    match envelope.response {
        LLMResponse::Text(text) => Ok(text_response_result(
            &text,
            model_name,
            stream,
            stream_include_usage,
            usage.as_ref(),
            usage_details.as_ref(),
        )),
        LLMResponse::ToolCalls(_) => {
            Err("backend returned tool calls for request without tools".to_string())
        }
    }
}

async fn run_passthrough_stream<C: LLMClient + 'static>(
    client: &Arc<C>,
    serialized: &[Value],
    options: LLMRequestOptions,
    model_name: &str,
    include_usage: bool,
) -> Result<HandlerResult, String> {
    let mut backend_stream = client
        .send_stream_with_options(serialized.to_vec(), None, options)
        .await
        .map_err(|e| e.to_string())?;
    let client = client.clone();
    let model_name = model_name.to_string();
    let stream = async_stream::stream! {
        let completion_id = crate::proxy::proxy::openai_stream_completion_id();
        let mut emitted_text = false;

        while let Some(chunk_result) = backend_stream.next().await {
            let chunk = match chunk_result {
                Ok(chunk) => chunk,
                Err(err) => {
                    yield Err(err);
                    return;
                }
            };

            match chunk.chunk_type {
                ChunkType::TextDelta => {
                    if !chunk.content.is_empty() {
                        yield Ok(crate::proxy::proxy::text_delta_sse_event(
                            &completion_id,
                            &model_name,
                            &chunk.content,
                            !emitted_text,
                            None,
                        ));
                        emitted_text = true;
                    }
                }
                ChunkType::Final => {
                    let final_usage = chunk.usage;
                    let final_usage_details = chunk.usage_details;
                    if !emitted_text {
                        let content = match chunk.response {
                            Some(LLMResponse::Text(text)) => text.content,
                            Some(LLMResponse::ToolCalls(_)) => {
                                yield Err(StreamError::new(
                                    "backend returned tool calls for request without tools",
                                ));
                                return;
                            }
                            None => String::new(),
                        };
                        yield Ok(crate::proxy::proxy::text_delta_sse_event(
                            &completion_id,
                            &model_name,
                            &content,
                            true,
                            None,
                        ));
                    }
                    let usage_json = if include_usage {
                        let usage = final_usage.or_else(|| client.last_usage());
                        let usage_details =
                            final_usage_details.or_else(|| client.last_usage_details());
                        usage.as_ref().map(|u| {
                            crate::proxy::proxy::usage_to_openai_json_with_details(
                                Some(u),
                                usage_details.as_ref(),
                            )
                        })
                    } else {
                        None
                    };
                    yield Ok(crate::proxy::proxy::final_sse_event(
                        &completion_id,
                        &model_name,
                        "stop",
                        usage_json.as_ref(),
                    ));
                    return;
                }
                ChunkType::ToolCallDelta => {
                    yield Err(StreamError::new(
                        "backend streamed tool calls for request without tools",
                    ));
                    return;
                }
                ChunkType::Retry => {}
            }
        }

        yield Err(StreamError::default());
    };

    Ok(HandlerResult::StreamBody(Box::pin(stream)))
}