use crate::{RuntimeHostAdapter, RuntimeSessionLifecycle};
use chrono::{DateTime, Utc};
use everruns_core::atoms::{ActInput, AtomContext};
use everruns_core::error::{AgentLoopError, Result};
use everruns_core::events::TokenUsage;
use everruns_core::typed_id::{AgentId, ExecId, HarnessId, MessageId, SessionId, TurnId};
use everruns_core::{
Controls, ReasonResult, UserFacingError, UserFacingErrorContext,
classify_runtime_error_message, user_facing_error_codes,
};
use serde::{Deserialize, Serialize};
use tracing::{debug, info};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RuntimeTurnState {
pub org_id: i64,
pub session_id: SessionId,
pub harness_id: HarnessId,
pub agent_id: Option<AgentId>,
pub input_message_id: MessageId,
#[serde(skip_serializing_if = "Option::is_none")]
pub turn_id: Option<TurnId>,
#[serde(skip_serializing_if = "Option::is_none", default)]
pub previous_response_id: Option<String>,
#[serde(default = "default_iteration")]
pub iteration: u32,
#[serde(skip_serializing_if = "Option::is_none", default)]
pub request_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none", default)]
pub started_at: Option<DateTime<Utc>>,
#[serde(skip_serializing_if = "Option::is_none", default)]
pub cumulative_usage: Option<TokenUsage>,
#[serde(default)]
pub tool_call_count: u32,
#[serde(default)]
pub llm_call_count: u32,
#[serde(skip_serializing_if = "Option::is_none", default)]
pub time_to_first_token_ms: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none", default)]
pub final_message_id: Option<MessageId>,
#[serde(skip_serializing_if = "Option::is_none", default)]
pub final_answer_preview: Option<String>,
}
fn default_iteration() -> u32 {
1
}
#[derive(Debug, Clone)]
pub struct RuntimeActPlan {
pub input: ActInput,
pub previous_response_id: Option<String>,
pub iteration: u32,
pub request_id: Option<String>,
pub resume_state: Box<RuntimeTurnState>,
}
#[derive(Debug, Clone)]
pub enum RuntimeTurnPlan {
ScheduleReason(RuntimeTurnState),
ScheduleAct(RuntimeActPlan),
Complete { error: Option<String> },
WaitForToolResults { resume: RuntimeTurnState },
}
fn preview_final_answer(text: &str) -> Option<String> {
if text.is_empty() {
return None;
}
Some(text.chars().take(2000).collect())
}
fn add_usage(current: &mut Option<TokenUsage>, next: &TokenUsage) {
match current {
Some(current) => current.add(next),
None => *current = Some(next.clone()),
}
}
impl RuntimeTurnState {
fn with_reason_summary(&self, reason_result: &ReasonResult) -> Self {
let mut next = self.clone();
next.llm_call_count = next.llm_call_count.saturating_add(1);
next.tool_call_count = next
.tool_call_count
.saturating_add(reason_result.tool_calls.len() as u32);
if let Some(usage) = &reason_result.usage {
add_usage(&mut next.cumulative_usage, usage);
}
if next.time_to_first_token_ms.is_none() {
next.time_to_first_token_ms = reason_result.time_to_first_token_ms;
}
next.final_message_id = reason_result.output_message_id;
next.final_answer_preview = preview_final_answer(&reason_result.text);
next
}
fn duration_ms(&self) -> Option<u64> {
self.started_at
.map(|started_at| Utc::now().signed_duration_since(started_at))
.and_then(|duration| u64::try_from(duration.num_milliseconds()).ok())
}
}
fn classify_reason_failure(reason_result: &ReasonResult) -> UserFacingError {
let from_text =
classify_runtime_error_message(&reason_result.text, &UserFacingErrorContext::default());
let Some(error) = reason_result.error.as_deref() else {
return from_text;
};
let from_error = classify_runtime_error_message(error, &UserFacingErrorContext::default());
if from_error.code == user_facing_error_codes::PROCESSING_ERROR {
return from_text;
}
if from_error.code == from_text.code
&& from_error.fields.is_empty()
&& !from_text.fields.is_empty()
{
return from_text;
}
from_error
}
pub async fn plan_next_host_turn<A: RuntimeHostAdapter>(
adapter: &A,
completed_activity: &str,
state: &RuntimeTurnState,
output: &serde_json::Value,
pending_user_message_count: usize,
) -> Result<RuntimeTurnPlan> {
match completed_activity {
"process_input" => {
let turn_id: Option<TurnId> = output
.get("turn_id")
.and_then(|value| value.as_str())
.and_then(|value| value.parse().ok());
let next = RuntimeTurnState {
turn_id,
previous_response_id: None,
iteration: 1,
started_at: state.started_at.or_else(|| Some(Utc::now())),
..state.clone()
};
debug!(session_id = %state.session_id, turn_id = ?turn_id, "planned reason step");
Ok(RuntimeTurnPlan::ScheduleReason(next))
}
"reason" => {
let reason_result: ReasonResult = serde_json::from_value(output.clone())
.map_err(|error| AgentLoopError::Internal(error.into()))?;
let response_id = reason_result.response_id.clone();
let summarized_state = state.with_reason_summary(&reason_result);
if reason_result.has_tool_calls && reason_result.success {
let session_blueprint_id = adapter
.session_store(state.org_id)
.get_session(state.session_id)
.await?
.and_then(|session| session.blueprint_id);
let plan = RuntimeActPlan {
input: ActInput {
org_id: Some(state.org_id),
context: AtomContext {
session_id: state.session_id,
turn_id: state.turn_id.unwrap_or_default(),
input_message_id: state.input_message_id,
exec_id: ExecId::new(),
},
harness_id: state.harness_id,
agent_id: state.agent_id,
tool_calls: reason_result.tool_calls,
tool_definitions: reason_result.tool_definitions,
locale: reason_result.locale,
blueprint_id: session_blueprint_id,
network_access: reason_result.network_access,
parallel_tool_calls: None,
},
previous_response_id: response_id,
iteration: state.iteration,
request_id: state.request_id.clone(),
resume_state: Box::new(summarized_state),
};
return Ok(RuntimeTurnPlan::ScheduleAct(plan));
}
if reason_result.success && pending_user_message_count > 0 {
if pending_user_message_count > 1 {
info!(
session_id = %state.session_id,
pending_user_message_count,
"multiple steering messages arrived during turn"
);
}
let next = RuntimeTurnState {
previous_response_id: response_id,
iteration: state.iteration.saturating_add(1),
..summarized_state
};
return Ok(RuntimeTurnPlan::ScheduleReason(next));
}
let lifecycle =
RuntimeSessionLifecycle::new(adapter.clone(), state.org_id, state.session_id);
let turn_id = state.turn_id.unwrap_or_default();
if reason_result.success {
lifecycle
.emit_turn_completed(
state.input_message_id,
everruns_core::events::TurnCompletedData {
turn_id,
iterations: state.iteration,
duration_ms: summarized_state.duration_ms(),
usage: summarized_state.cumulative_usage.clone(),
input_content: None,
final_message_id: summarized_state.final_message_id,
final_answer_preview: summarized_state.final_answer_preview.clone(),
time_to_first_token_ms: summarized_state.time_to_first_token_ms,
tool_call_count: Some(summarized_state.tool_call_count),
llm_call_count: Some(summarized_state.llm_call_count),
status: Some("completed".to_string()),
},
)
.await;
lifecycle
.emit_session_idled(
turn_id,
state.input_message_id,
Some(state.iteration),
summarized_state.cumulative_usage.clone(),
)
.await;
} else {
let user_error = classify_reason_failure(&reason_result);
lifecycle
.turn_failed(
turn_id,
state.input_message_id,
&reason_result.text,
Some(&user_error),
)
.await;
}
lifecycle
.fire_turn_end_hooks(
state.harness_id,
state.agent_id,
turn_id,
reason_result.success,
)
.await;
Ok(RuntimeTurnPlan::Complete {
error: reason_result.error,
})
}
"act" => {
if output
.get("blocked")
.and_then(|value| value.as_bool())
.unwrap_or(false)
{
return Ok(RuntimeTurnPlan::Complete { error: None });
}
let waiting_for_tool_results = output
.get("waiting_for_tool_results")
.and_then(|value| value.as_bool())
.unwrap_or(false);
let should_pause_for_tool_results = waiting_for_tool_results
&& setup_connection_hint_enabled(adapter, state.org_id, state.session_id).await;
let next = RuntimeTurnState {
iteration: state.iteration.saturating_add(1),
..state.clone()
};
if should_pause_for_tool_results {
let lifecycle =
RuntimeSessionLifecycle::new(adapter.clone(), state.org_id, state.session_id);
lifecycle.waiting_for_tool_results().await;
return Ok(RuntimeTurnPlan::WaitForToolResults { resume: next });
}
if waiting_for_tool_results {
info!(
session_id = %state.session_id,
"setup_connection hint absent, continuing turn instead of pausing"
);
}
Ok(RuntimeTurnPlan::ScheduleReason(next))
}
other => Err(AgentLoopError::config(format!(
"Unknown activity type completed: {other}"
))),
}
}
async fn setup_connection_hint_enabled<A: RuntimeHostAdapter>(
adapter: &A,
org_id: i64,
session_id: SessionId,
) -> bool {
match adapter.session_store(org_id).get_session(session_id).await {
Ok(Some(session)) => {
let hints = Controls::resolve_hints(session.hints.as_ref(), None);
hints
.get("setup_connection")
.and_then(|value| value.as_bool())
.unwrap_or(false)
}
_ => false,
}
}