1use crate::{RuntimeHostAdapter, RuntimeSessionLifecycle};
5use chrono::{DateTime, Utc};
6use everruns_core::atoms::{ActInput, AtomContext};
7use everruns_core::error::{AgentLoopError, Result};
8use everruns_core::events::TokenUsage;
9use everruns_core::typed_id::{AgentId, ExecId, HarnessId, MessageId, SessionId, TurnId};
10use everruns_core::{
11 Controls, ReasonResult, UserFacingError, UserFacingErrorContext,
12 classify_runtime_error_message, user_facing_error_codes,
13};
14use serde::{Deserialize, Serialize};
15use tracing::{debug, info};
16
17#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct RuntimeTurnState {
27 pub org_id: i64,
28 pub session_id: SessionId,
29 pub harness_id: HarnessId,
30 pub agent_id: Option<AgentId>,
31 pub input_message_id: MessageId,
32 #[serde(skip_serializing_if = "Option::is_none")]
33 pub turn_id: Option<TurnId>,
34 #[serde(skip_serializing_if = "Option::is_none", default)]
35 pub previous_response_id: Option<String>,
36 #[serde(default = "default_iteration")]
37 pub iteration: u32,
38 #[serde(skip_serializing_if = "Option::is_none", default)]
39 pub request_id: Option<String>,
40 #[serde(skip_serializing_if = "Option::is_none", default)]
41 pub started_at: Option<DateTime<Utc>>,
42 #[serde(skip_serializing_if = "Option::is_none", default)]
43 pub cumulative_usage: Option<TokenUsage>,
44 #[serde(default)]
45 pub tool_call_count: u32,
46 #[serde(default)]
47 pub llm_call_count: u32,
48 #[serde(skip_serializing_if = "Option::is_none", default)]
49 pub time_to_first_token_ms: Option<u64>,
50 #[serde(skip_serializing_if = "Option::is_none", default)]
51 pub final_message_id: Option<MessageId>,
52 #[serde(skip_serializing_if = "Option::is_none", default)]
53 pub final_answer_preview: Option<String>,
54}
55
56fn default_iteration() -> u32 {
57 1
58}
59
60#[derive(Debug, Clone)]
64pub struct RuntimeActPlan {
65 pub input: ActInput,
66 pub previous_response_id: Option<String>,
67 pub iteration: u32,
68 pub request_id: Option<String>,
69 pub resume_state: Box<RuntimeTurnState>,
70}
71
72#[derive(Debug, Clone)]
78pub enum RuntimeTurnPlan {
79 ScheduleReason(RuntimeTurnState),
80 ScheduleAct(RuntimeActPlan),
81 Complete { error: Option<String> },
82 WaitForToolResults { resume: RuntimeTurnState },
83}
84
85fn preview_final_answer(text: &str) -> Option<String> {
86 if text.is_empty() {
87 return None;
88 }
89
90 Some(text.chars().take(2000).collect())
91}
92
93fn add_usage(current: &mut Option<TokenUsage>, next: &TokenUsage) {
94 match current {
95 Some(current) => current.add(next),
96 None => *current = Some(next.clone()),
97 }
98}
99
100impl RuntimeTurnState {
101 fn with_reason_summary(&self, reason_result: &ReasonResult) -> Self {
102 let mut next = self.clone();
103 next.llm_call_count = next.llm_call_count.saturating_add(1);
104 next.tool_call_count = next
105 .tool_call_count
106 .saturating_add(reason_result.tool_calls.len() as u32);
107 if let Some(usage) = &reason_result.usage {
108 add_usage(&mut next.cumulative_usage, usage);
109 }
110 if next.time_to_first_token_ms.is_none() {
111 next.time_to_first_token_ms = reason_result.time_to_first_token_ms;
112 }
113 next.final_message_id = reason_result.output_message_id;
114 next.final_answer_preview = preview_final_answer(&reason_result.text);
115 next
116 }
117
118 fn duration_ms(&self) -> Option<u64> {
119 self.started_at
120 .map(|started_at| Utc::now().signed_duration_since(started_at))
121 .and_then(|duration| u64::try_from(duration.num_milliseconds()).ok())
122 }
123}
124
125fn classify_reason_failure(reason_result: &ReasonResult) -> UserFacingError {
126 let from_text =
127 classify_runtime_error_message(&reason_result.text, &UserFacingErrorContext::default());
128
129 let Some(error) = reason_result.error.as_deref() else {
130 return from_text;
131 };
132
133 let from_error = classify_runtime_error_message(error, &UserFacingErrorContext::default());
134
135 if from_error.code == user_facing_error_codes::PROCESSING_ERROR {
136 return from_text;
137 }
138
139 if from_error.code == from_text.code
140 && from_error.fields.is_empty()
141 && !from_text.fields.is_empty()
142 {
143 return from_text;
144 }
145
146 from_error
147}
148
149pub async fn plan_next_host_turn<A: RuntimeHostAdapter>(
166 adapter: &A,
167 completed_activity: &str,
168 state: &RuntimeTurnState,
169 output: &serde_json::Value,
170 pending_user_message_count: usize,
171) -> Result<RuntimeTurnPlan> {
172 match completed_activity {
173 "process_input" => {
174 let turn_id: Option<TurnId> = output
175 .get("turn_id")
176 .and_then(|value| value.as_str())
177 .and_then(|value| value.parse().ok());
178 let next = RuntimeTurnState {
179 turn_id,
180 previous_response_id: None,
181 iteration: 1,
182 started_at: state.started_at.or_else(|| Some(Utc::now())),
183 ..state.clone()
184 };
185 debug!(session_id = %state.session_id, turn_id = ?turn_id, "planned reason step");
186 Ok(RuntimeTurnPlan::ScheduleReason(next))
187 }
188 "reason" => {
189 let reason_result: ReasonResult = serde_json::from_value(output.clone())
190 .map_err(|error| AgentLoopError::Internal(error.into()))?;
191 let response_id = reason_result.response_id.clone();
192 let summarized_state = state.with_reason_summary(&reason_result);
193
194 if reason_result.has_tool_calls && reason_result.success {
195 let session_blueprint_id = adapter
196 .session_store(state.org_id)
197 .get_session(state.session_id)
198 .await?
199 .and_then(|session| session.blueprint_id);
200 let plan = RuntimeActPlan {
201 input: ActInput {
202 org_id: Some(state.org_id),
203 context: AtomContext {
204 session_id: state.session_id,
205 turn_id: state.turn_id.unwrap_or_default(),
206 input_message_id: state.input_message_id,
207 exec_id: ExecId::new(),
208 },
209 harness_id: state.harness_id,
210 agent_id: state.agent_id,
211 tool_calls: reason_result.tool_calls,
212 tool_definitions: reason_result.tool_definitions,
213 locale: reason_result.locale,
214 blueprint_id: session_blueprint_id,
215 network_access: reason_result.network_access,
216 },
217 previous_response_id: response_id,
218 iteration: state.iteration,
219 request_id: state.request_id.clone(),
220 resume_state: Box::new(summarized_state),
221 };
222 return Ok(RuntimeTurnPlan::ScheduleAct(plan));
223 }
224
225 if reason_result.success && pending_user_message_count > 0 {
226 if pending_user_message_count > 1 {
227 info!(
228 session_id = %state.session_id,
229 pending_user_message_count,
230 "multiple steering messages arrived during turn"
231 );
232 }
233
234 let next = RuntimeTurnState {
235 previous_response_id: response_id,
236 iteration: state.iteration.saturating_add(1),
237 ..summarized_state
238 };
239 return Ok(RuntimeTurnPlan::ScheduleReason(next));
240 }
241
242 let lifecycle =
243 RuntimeSessionLifecycle::new(adapter.clone(), state.org_id, state.session_id);
244 let turn_id = state.turn_id.unwrap_or_default();
245
246 if reason_result.success {
247 lifecycle
248 .emit_turn_completed(
249 state.input_message_id,
250 everruns_core::events::TurnCompletedData {
251 turn_id,
252 iterations: state.iteration,
253 duration_ms: summarized_state.duration_ms(),
254 usage: summarized_state.cumulative_usage.clone(),
255 input_content: None,
256 final_message_id: summarized_state.final_message_id,
257 final_answer_preview: summarized_state.final_answer_preview.clone(),
258 time_to_first_token_ms: summarized_state.time_to_first_token_ms,
259 tool_call_count: Some(summarized_state.tool_call_count),
260 llm_call_count: Some(summarized_state.llm_call_count),
261 status: Some("completed".to_string()),
262 },
263 )
264 .await;
265 lifecycle
266 .emit_session_idled(
267 turn_id,
268 state.input_message_id,
269 Some(state.iteration),
270 summarized_state.cumulative_usage.clone(),
271 )
272 .await;
273 } else {
274 let user_error = classify_reason_failure(&reason_result);
275 lifecycle
276 .turn_failed(
277 turn_id,
278 state.input_message_id,
279 &reason_result.text,
280 Some(&user_error),
281 )
282 .await;
283 }
284
285 Ok(RuntimeTurnPlan::Complete {
286 error: reason_result.error,
287 })
288 }
289 "act" => {
290 if output
291 .get("blocked")
292 .and_then(|value| value.as_bool())
293 .unwrap_or(false)
294 {
295 return Ok(RuntimeTurnPlan::Complete { error: None });
296 }
297
298 let waiting_for_tool_results = output
299 .get("waiting_for_tool_results")
300 .and_then(|value| value.as_bool())
301 .unwrap_or(false);
302 let should_pause_for_tool_results = waiting_for_tool_results
303 && setup_connection_hint_enabled(adapter, state.org_id, state.session_id).await;
304
305 let next = RuntimeTurnState {
306 iteration: state.iteration.saturating_add(1),
307 ..state.clone()
308 };
309
310 if should_pause_for_tool_results {
311 let lifecycle =
312 RuntimeSessionLifecycle::new(adapter.clone(), state.org_id, state.session_id);
313 lifecycle.waiting_for_tool_results().await;
314 return Ok(RuntimeTurnPlan::WaitForToolResults { resume: next });
315 }
316
317 if waiting_for_tool_results {
318 info!(
319 session_id = %state.session_id,
320 "setup_connection hint absent, continuing turn instead of pausing"
321 );
322 }
323
324 Ok(RuntimeTurnPlan::ScheduleReason(next))
325 }
326 other => Err(AgentLoopError::config(format!(
327 "Unknown activity type completed: {other}"
328 ))),
329 }
330}
331
332async fn setup_connection_hint_enabled<A: RuntimeHostAdapter>(
333 adapter: &A,
334 org_id: i64,
335 session_id: SessionId,
336) -> bool {
337 match adapter.session_store(org_id).get_session(session_id).await {
338 Ok(Some(session)) => {
339 let hints = Controls::resolve_hints(session.hints.as_ref(), None);
340 hints
341 .get("setup_connection")
342 .and_then(|value| value.as_bool())
343 .unwrap_or(false)
344 }
345 _ => false,
346 }
347}