roboticus-api 0.11.3

HTTP routes, WebSocket, auth, rate limiting, and dashboard for the Roboticus agent runtime
Documentation
//! `execute_inference_pipeline`: unified post-prepare pipeline used by all entry points.
//! Also contains `record_cost`.

use roboticus_core::InputAuthority;

use super::super::AppState;
use super::super::decomposition::DelegationProvenance;
use super::super::guard_registry::{GuardContext, guard_sets};
use super::super::intent_registry::IntentRegistry;
use super::cache::{check_cache, store_in_cache};
use super::guard_fallback::{deterministic_quality_fallback, is_conversational_affirmation};
use super::post_turn::post_turn_ingest;
use super::react_loop::run_inference_and_react;
use super::types::{PipelineResult, PreparedInference};

/// Unified post-prepare pipeline used by all entry points (API, streaming, channel).
///
/// Handles: cache check → inference + ReAct → store assistant message → record cost →
/// background ingest → cache store. Callers only need to handle session setup,
/// input validation, and formatting the final response.
#[allow(clippy::too_many_arguments)] // central pipeline requires full request context
pub(crate) async fn execute_inference_pipeline(
    state: &AppState,
    prepared: &PreparedInference,
    session_id: &str,
    user_content: &str,
    turn_id: &str,
    authority: InputAuthority,
    channel_label: Option<&str>,
    delegation_provenance: &mut DelegationProvenance,
    topic_tag: Option<&str>,
) -> Result<PipelineResult, String> {
    // 1. Cache check — scoped to first-turn stateless queries only.
    //
    // Semantic response cache is DISABLED when:
    // - Any intent requires cache bypass (e.g., CurrentEvents, Execution)
    // - The session has active conversation context (turn > 1)
    //
    // Rationale: cached responses are context-free. In an active conversation,
    // the same words mean different things depending on prior turns. A cache
    // hit at turn 15 produces responses that ignore 14 turns of context,
    // causing infrastructure leaks, persona breaks, and off-topic responses.
    let registry = IntentRegistry::default_registry();
    let has_conversation_context = prepared.previous_assistant.is_some();
    let cached = if registry.should_bypass_cache(&prepared.intents) || has_conversation_context {
        if has_conversation_context {
            tracing::debug!("cache bypassed: active session context (turn > 1)");
        }
        None
    } else {
        check_cache(
            state,
            user_content,
            &prepared.cache_hash,
            prepared.query_embedding.as_deref(),
        )
        .await
    };

    if let Some(cached) = cached {
        let agent_name = {
            let cfg = state.config.read().await;
            cfg.agent.name.clone()
        };
        // Apply the cached guard set — includes SubagentClaim + LiteraryQuoteRetry
        // which were missing from the original inline guards.
        let cached_semantic_scores = super::super::guard_registry::precompute_guard_scores(
            &state.semantic_classifier,
            &cached.content,
        )
        .await;
        let subagent_names: Vec<String> = roboticus_db::agents::list_sub_agents(&state.db)
            .unwrap_or_default()
            .iter()
            .map(|a| a.name.to_ascii_lowercase())
            .collect();
        let cached_guard_ctx = GuardContext {
            user_prompt: user_content,
            intents: &prepared.intents,
            tool_results: &[],
            agent_name: &agent_name,
            resolved_model: &cached.model,
            delegation_provenance,
            previous_assistant: prepared.previous_assistant.as_deref(),
            prior_assistant_messages: &[],
            semantic_guard_scores: cached_semantic_scores,
            subagent_names,
        };
        let chain = guard_sets::cached();
        let guard_result = chain.apply(cached.content, &cached_guard_ctx);

        // If any guard requests a retry or the result is empty, discard the cache
        // hit and fall through to fresh inference.
        let discard_cache = guard_result.retry.is_some()
            || guard_result.content.trim().is_empty()
            || guard_result
                .content
                .contains("filtered internal execution protocol")
            || guard_result
                .content
                .contains("direct, user-facing response");
        let guarded_cached_content = if discard_cache {
            if is_conversational_affirmation(user_content) {
                // Affirmations need conversation context, not a canned response.
                // Fall through to fresh inference instead of the deterministic path.
                tracing::info!("cache discard on affirmation — routing to fresh inference");
                String::new()
            } else {
                deterministic_quality_fallback(user_content, &agent_name)
            }
        } else {
            guard_result.content
        };
        if discard_cache {
            if !is_conversational_affirmation(user_content) {
                tracing::warn!("discarding cache hit after guard chain flagged content");
            }
        } else {
            let cached_provider_prefix =
                roboticus_core::model::provider_prefix(&cached.model).to_string();
            record_cost(
                state,
                &cached.model,
                &cached_provider_prefix,
                0,
                0,
                0.0,
                Some("cached"),
                true,
                Some(0),
                None,
                false,
                Some(turn_id),
            );
            let asst_id = roboticus_db::sessions::append_message_with_topic(
                &state.db,
                session_id,
                "assistant",
                &guarded_cached_content,
                topic_tag,
            )
            .map_err(|e| format!("failed to store cached response: {e}"))?;
            if cached.model != prepared.model {
                state.event_bus.publish(
                    serde_json::json!({
                        "type": "model_shift",
                        "turn_id": turn_id,
                        "session_id": session_id,
                        "channel": channel_label.unwrap_or("unknown"),
                        "selected_model": prepared.model,
                        "executed_model": cached.model,
                        "reason": "cache_hit",
                    })
                    .to_string(),
                );
            }

            let mut result =
                PipelineResult::synthetic(guarded_cached_content, cached.model.clone(), asst_id);
            result.selected_model = prepared.model.clone();
            result.model_shift_from = if cached.model != prepared.model {
                Some(prepared.model.clone())
            } else {
                None
            };
            result.cached = true;
            result.tokens_saved = cached.tokens_saved;
            return Ok(result);
        }
    }

    // 2. Inference + ReAct loop
    let inference = run_inference_and_react(
        state,
        prepared,
        session_id,
        turn_id,
        authority,
        channel_label,
        delegation_provenance,
    )
    .await;

    // 3. Store assistant message (inherits user's topic tag)
    let asst_id = roboticus_db::sessions::append_message_with_topic(
        &state.db,
        session_id,
        "assistant",
        &inference.content,
        topic_tag,
    )
    .map_err(|e| format!("failed to store assistant response: {e}"))?;

    // 4. Record cost
    let executed_provider_prefix =
        roboticus_core::model::provider_prefix(&inference.model).to_string();
    record_cost(
        state,
        &inference.model,
        &executed_provider_prefix,
        inference.tokens_in,
        inference.tokens_out,
        inference.cost,
        None,
        false,
        Some(inference.latency_ms as i64),
        Some(inference.quality_score),
        inference.escalated,
        Some(turn_id),
    );

    // 6. Post-turn ingest (spawns background task)
    post_turn_ingest(
        state,
        session_id,
        user_content,
        &inference.content,
        &inference.tool_results,
        &prepared.system_prompt_hash,
    );

    // 6. Cache store
    store_in_cache(
        state,
        &prepared.cache_hash,
        user_content,
        &inference.content,
        &inference.model,
        inference.tokens_out,
        &prepared.intents,
    )
    .await;

    if inference.model != prepared.model {
        state.event_bus.publish(
            serde_json::json!({
                "type": "model_shift",
                "turn_id": turn_id,
                "session_id": session_id,
                "channel": channel_label.unwrap_or("unknown"),
                "selected_model": prepared.model,
                "executed_model": inference.model,
                "reason": "fallback",
            })
            .to_string(),
        );
    }

    Ok(PipelineResult::from_inference(
        &inference,
        prepared.model.clone(),
        asst_id,
    ))
}

/// Record inference cost metrics.
#[allow(clippy::too_many_arguments)] // thin pass-through to roboticus_db::metrics
pub(crate) fn record_cost(
    state: &AppState,
    model: &str,
    provider_prefix: &str,
    tokens_in: i64,
    tokens_out: i64,
    cost: f64,
    variant: Option<&str>,
    cached: bool,
    latency_ms: Option<i64>,
    quality_score: Option<f64>,
    escalation: bool,
    turn_id: Option<&str>,
) {
    roboticus_db::metrics::record_inference_cost(
        &state.db,
        model,
        provider_prefix,
        tokens_in,
        tokens_out,
        cost,
        variant,
        cached,
        latency_ms,
        quality_score,
        escalation,
        turn_id,
    )
    .inspect_err(|e| tracing::warn!(error = %e, "failed to record inference cost"))
    .ok();
}