tandem-server 0.4.23

HTTP server for Tandem engine APIs
Documentation
use futures::StreamExt;
use tandem_observability::{emit_event, ObservabilityEvent, ProcessKind};
use tandem_providers::{ChatMessage, StreamChunk, TokenUsage};
use tandem_types::ToolMode;
use tokio_util::sync::CancellationToken;
use tracing::Level;

use super::*;

pub(crate) async fn invoke_planner_provider(
    state: &AppState,
    session_id: &str,
    model: &tandem_types::ModelSpec,
    prompt: String,
    timeout_ms: u64,
) -> Result<String, tandem_plan_compiler::api::PlannerInvocationFailure> {
    let cancel = CancellationToken::new();
    emit_event(
        Level::INFO,
        ProcessKind::Engine,
        ObservabilityEvent {
            event: "provider.call.start",
            component: "workflow.planner",
            correlation_id: None,
            session_id: Some(session_id),
            run_id: None,
            message_id: None,
            provider_id: Some(model.provider_id.as_str()),
            model_id: Some(model.model_id.as_str()),
            status: Some("dispatch"),
            error_code: None,
            detail: Some("planner provider dispatch"),
        },
    );

    let planner_future = async {
        let messages = vec![ChatMessage {
            role: "user".to_string(),
            content: prompt,
            attachments: Vec::new(),
        }];
        let stream = state
            .providers
            .stream_for_provider(
                Some(model.provider_id.as_str()),
                Some(model.model_id.as_str()),
                messages,
                ToolMode::None,
                None,
                cancel.clone(),
            )
            .await
            .map_err(
                |error| tandem_plan_compiler::api::PlannerInvocationFailure {
                    reason:
                        super::workflow_planner_policy::classify_planner_provider_failure_reason(
                            &error.to_string(),
                        )
                        .to_string(),
                    detail: Some(truncate_text(&error.to_string(), 500)),
                },
            )?;
        tokio::pin!(stream);
        let mut output = String::new();
        let mut saw_first_delta = false;
        let mut usage: Option<TokenUsage> = None;
        while let Some(chunk) = stream.next().await {
            match chunk {
                Ok(StreamChunk::TextDelta(delta)) => {
                    if !saw_first_delta && !delta.trim().is_empty() {
                        saw_first_delta = true;
                        emit_event(
                            Level::INFO,
                            ProcessKind::Engine,
                            ObservabilityEvent {
                                event: "provider.call.first_byte",
                                component: "workflow.planner",
                                correlation_id: None,
                                session_id: Some(session_id),
                                run_id: None,
                                message_id: None,
                                provider_id: Some(model.provider_id.as_str()),
                                model_id: Some(model.model_id.as_str()),
                                status: Some("streaming"),
                                error_code: None,
                                detail: Some("first text delta"),
                            },
                        );
                    }
                    output.push_str(&delta);
                }
                Ok(StreamChunk::ReasoningDelta(delta)) => {
                    output.push_str(&delta);
                }
                Ok(StreamChunk::Done {
                    finish_reason: _,
                    usage: provider_usage,
                }) => {
                    usage = provider_usage;
                    break;
                }
                Ok(StreamChunk::ToolCallStart { .. })
                | Ok(StreamChunk::ToolCallDelta { .. })
                | Ok(StreamChunk::ToolCallEnd { .. }) => {}
                Err(error) => {
                    return Err(tandem_plan_compiler::api::PlannerInvocationFailure {
                        reason: super::workflow_planner_policy::classify_planner_provider_failure_reason(
                            &error.to_string(),
                        )
                        .to_string(),
                        detail: Some(truncate_text(&error.to_string(), 500)),
                    });
                }
            }
        }
        Ok::<(String, Option<TokenUsage>), tandem_plan_compiler::api::PlannerInvocationFailure>((
            output, usage,
        ))
    };

    match tokio::time::timeout(std::time::Duration::from_millis(timeout_ms), planner_future).await {
        Ok(Ok((output, usage))) => {
            let finish_detail = usage
                .as_ref()
                .map(|value| {
                    format!(
                        "planner stream complete (prompt={}, completion={})",
                        value.prompt_tokens, value.completion_tokens
                    )
                })
                .unwrap_or_else(|| "planner stream complete".to_string());
            emit_event(
                Level::INFO,
                ProcessKind::Engine,
                ObservabilityEvent {
                    event: "provider.call.finish",
                    component: "workflow.planner",
                    correlation_id: None,
                    session_id: Some(session_id),
                    run_id: None,
                    message_id: None,
                    provider_id: Some(model.provider_id.as_str()),
                    model_id: Some(model.model_id.as_str()),
                    status: Some("completed"),
                    error_code: None,
                    detail: Some(&finish_detail),
                },
            );
            Ok(output)
        }
        Ok(Err(error)) => {
            emit_event(
                Level::ERROR,
                ProcessKind::Engine,
                ObservabilityEvent {
                    event: "provider.call.error",
                    component: "workflow.planner",
                    correlation_id: None,
                    session_id: Some(session_id),
                    run_id: None,
                    message_id: None,
                    provider_id: Some(model.provider_id.as_str()),
                    model_id: Some(model.model_id.as_str()),
                    status: Some("failed"),
                    error_code: Some(error.reason.as_str()),
                    detail: error.detail.as_deref(),
                },
            );
            Err(error)
        }
        Err(_) => {
            cancel.cancel();
            emit_event(
                Level::WARN,
                ProcessKind::Engine,
                ObservabilityEvent {
                    event: "provider.call.error",
                    component: "workflow.planner",
                    correlation_id: None,
                    session_id: Some(session_id),
                    run_id: None,
                    message_id: None,
                    provider_id: Some(model.provider_id.as_str()),
                    model_id: Some(model.model_id.as_str()),
                    status: Some("failed"),
                    error_code: Some("timeout"),
                    detail: Some("workflow planner llm call timed out before completion"),
                },
            );
            Err(tandem_plan_compiler::api::PlannerInvocationFailure {
                reason: "timeout".to_string(),
                detail: Some("Workflow planner timed out before completion.".to_string()),
            })
        }
    }
}

fn truncate_text(input: &str, max_len: usize) -> String {
    let mut chars = input.chars();
    let truncated: String = chars.by_ref().take(max_len).collect();
    if chars.next().is_some() {
        format!("{}...", truncated.trim_end())
    } else {
        truncated
    }
}