use roboticus_core::InputAuthority;
use super::super::AppState;
use super::super::decomposition::DelegationProvenance;
use super::super::guard_registry::{GuardContext, guard_sets};
use super::super::intent_registry::IntentRegistry;
use super::cache::{check_cache, store_in_cache};
use super::guard_fallback::{deterministic_quality_fallback, is_conversational_affirmation};
use super::post_turn::post_turn_ingest;
use super::react_loop::run_inference_and_react;
use super::types::{PipelineResult, PreparedInference};
#[allow(clippy::too_many_arguments)] pub(crate) async fn execute_inference_pipeline(
state: &AppState,
prepared: &PreparedInference,
session_id: &str,
user_content: &str,
turn_id: &str,
authority: InputAuthority,
channel_label: Option<&str>,
delegation_provenance: &mut DelegationProvenance,
topic_tag: Option<&str>,
) -> Result<PipelineResult, String> {
let registry = IntentRegistry::default_registry();
let has_conversation_context = prepared.previous_assistant.is_some();
let cached = if registry.should_bypass_cache(&prepared.intents) || has_conversation_context {
if has_conversation_context {
tracing::debug!("cache bypassed: active session context (turn > 1)");
}
None
} else {
check_cache(
state,
user_content,
&prepared.cache_hash,
prepared.query_embedding.as_deref(),
)
.await
};
if let Some(cached) = cached {
let agent_name = {
let cfg = state.config.read().await;
cfg.agent.name.clone()
};
let cached_semantic_scores = super::super::guard_registry::precompute_guard_scores(
&state.semantic_classifier,
&cached.content,
)
.await;
let subagent_names: Vec<String> = roboticus_db::agents::list_sub_agents(&state.db)
.unwrap_or_default()
.iter()
.map(|a| a.name.to_ascii_lowercase())
.collect();
let cached_guard_ctx = GuardContext {
user_prompt: user_content,
intents: &prepared.intents,
tool_results: &[],
agent_name: &agent_name,
resolved_model: &cached.model,
delegation_provenance,
previous_assistant: prepared.previous_assistant.as_deref(),
prior_assistant_messages: &[],
semantic_guard_scores: cached_semantic_scores,
subagent_names,
};
let chain = guard_sets::cached();
let guard_result = chain.apply(cached.content, &cached_guard_ctx);
let discard_cache = guard_result.retry.is_some()
|| guard_result.content.trim().is_empty()
|| guard_result
.content
.contains("filtered internal execution protocol")
|| guard_result
.content
.contains("direct, user-facing response");
let guarded_cached_content = if discard_cache {
if is_conversational_affirmation(user_content) {
tracing::info!("cache discard on affirmation — routing to fresh inference");
String::new()
} else {
deterministic_quality_fallback(user_content, &agent_name)
}
} else {
guard_result.content
};
if discard_cache {
if !is_conversational_affirmation(user_content) {
tracing::warn!("discarding cache hit after guard chain flagged content");
}
} else {
let cached_provider_prefix =
roboticus_core::model::provider_prefix(&cached.model).to_string();
record_cost(
state,
&cached.model,
&cached_provider_prefix,
0,
0,
0.0,
Some("cached"),
true,
Some(0),
None,
false,
Some(turn_id),
);
let asst_id = roboticus_db::sessions::append_message_with_topic(
&state.db,
session_id,
"assistant",
&guarded_cached_content,
topic_tag,
)
.map_err(|e| format!("failed to store cached response: {e}"))?;
if cached.model != prepared.model {
state.event_bus.publish(
serde_json::json!({
"type": "model_shift",
"turn_id": turn_id,
"session_id": session_id,
"channel": channel_label.unwrap_or("unknown"),
"selected_model": prepared.model,
"executed_model": cached.model,
"reason": "cache_hit",
})
.to_string(),
);
}
let mut result =
PipelineResult::synthetic(guarded_cached_content, cached.model.clone(), asst_id);
result.selected_model = prepared.model.clone();
result.model_shift_from = if cached.model != prepared.model {
Some(prepared.model.clone())
} else {
None
};
result.cached = true;
result.tokens_saved = cached.tokens_saved;
return Ok(result);
}
}
let inference = run_inference_and_react(
state,
prepared,
session_id,
turn_id,
authority,
channel_label,
delegation_provenance,
)
.await;
let asst_id = roboticus_db::sessions::append_message_with_topic(
&state.db,
session_id,
"assistant",
&inference.content,
topic_tag,
)
.map_err(|e| format!("failed to store assistant response: {e}"))?;
let executed_provider_prefix =
roboticus_core::model::provider_prefix(&inference.model).to_string();
record_cost(
state,
&inference.model,
&executed_provider_prefix,
inference.tokens_in,
inference.tokens_out,
inference.cost,
None,
false,
Some(inference.latency_ms as i64),
Some(inference.quality_score),
inference.escalated,
Some(turn_id),
);
post_turn_ingest(
state,
session_id,
user_content,
&inference.content,
&inference.tool_results,
&prepared.system_prompt_hash,
);
store_in_cache(
state,
&prepared.cache_hash,
user_content,
&inference.content,
&inference.model,
inference.tokens_out,
&prepared.intents,
)
.await;
if inference.model != prepared.model {
state.event_bus.publish(
serde_json::json!({
"type": "model_shift",
"turn_id": turn_id,
"session_id": session_id,
"channel": channel_label.unwrap_or("unknown"),
"selected_model": prepared.model,
"executed_model": inference.model,
"reason": "fallback",
})
.to_string(),
);
}
Ok(PipelineResult::from_inference(
&inference,
prepared.model.clone(),
asst_id,
))
}
#[allow(clippy::too_many_arguments)] pub(crate) fn record_cost(
state: &AppState,
model: &str,
provider_prefix: &str,
tokens_in: i64,
tokens_out: i64,
cost: f64,
variant: Option<&str>,
cached: bool,
latency_ms: Option<i64>,
quality_score: Option<f64>,
escalation: bool,
turn_id: Option<&str>,
) {
roboticus_db::metrics::record_inference_cost(
&state.db,
model,
provider_prefix,
tokens_in,
tokens_out,
cost,
variant,
cached,
latency_ms,
quality_score,
escalation,
turn_id,
)
.inspect_err(|e| tracing::warn!(error = %e, "failed to record inference cost"))
.ok();
}