1use crate::{RuntimeHostAdapter, RuntimeSessionLifecycle};
5use chrono::{DateTime, Utc};
6use everruns_core::atoms::{ActInput, AtomContext};
7use everruns_core::error::{AgentLoopError, Result};
8use everruns_core::events::TokenUsage;
9use everruns_core::typed_id::{AgentId, ExecId, HarnessId, MessageId, SessionId, TurnId};
10use everruns_core::{
11 Controls, ReasonResult, UserFacingError, UserFacingErrorContext,
12 classify_runtime_error_message, user_facing_error_codes,
13};
14use serde::{Deserialize, Serialize};
15use tracing::{debug, info};
16
17#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct RuntimeTurnState {
27 pub org_id: i64,
28 pub session_id: SessionId,
29 pub harness_id: HarnessId,
30 pub agent_id: Option<AgentId>,
31 pub input_message_id: MessageId,
32 #[serde(skip_serializing_if = "Option::is_none")]
33 pub turn_id: Option<TurnId>,
34 #[serde(skip_serializing_if = "Option::is_none", default)]
35 pub previous_response_id: Option<String>,
36 #[serde(default = "default_iteration")]
37 pub iteration: u32,
38 #[serde(skip_serializing_if = "Option::is_none", default)]
39 pub request_id: Option<String>,
40 #[serde(skip_serializing_if = "Option::is_none", default)]
41 pub started_at: Option<DateTime<Utc>>,
42 #[serde(skip_serializing_if = "Option::is_none", default)]
43 pub cumulative_usage: Option<TokenUsage>,
44 #[serde(default)]
45 pub tool_call_count: u32,
46 #[serde(default)]
47 pub llm_call_count: u32,
48 #[serde(skip_serializing_if = "Option::is_none", default)]
49 pub time_to_first_token_ms: Option<u64>,
50 #[serde(skip_serializing_if = "Option::is_none", default)]
51 pub final_message_id: Option<MessageId>,
52 #[serde(skip_serializing_if = "Option::is_none", default)]
53 pub final_answer_preview: Option<String>,
54}
55
56fn default_iteration() -> u32 {
57 1
58}
59
60#[derive(Debug, Clone)]
64pub struct RuntimeActPlan {
65 pub input: ActInput,
66 pub previous_response_id: Option<String>,
67 pub iteration: u32,
68 pub request_id: Option<String>,
69 pub resume_state: Box<RuntimeTurnState>,
70}
71
72#[derive(Debug, Clone)]
78pub enum RuntimeTurnPlan {
79 ScheduleReason(RuntimeTurnState),
80 ScheduleAct(RuntimeActPlan),
81 Complete { error: Option<String> },
82 WaitForToolResults { resume: RuntimeTurnState },
83}
84
85fn preview_final_answer(text: &str) -> Option<String> {
86 if text.is_empty() {
87 return None;
88 }
89
90 Some(text.chars().take(2000).collect())
91}
92
93fn add_usage(current: &mut Option<TokenUsage>, next: &TokenUsage) {
94 match current {
95 Some(current) => current.add(next),
96 None => *current = Some(next.clone()),
97 }
98}
99
100impl RuntimeTurnState {
101 fn with_reason_summary(&self, reason_result: &ReasonResult) -> Self {
102 let mut next = self.clone();
103 next.llm_call_count = next.llm_call_count.saturating_add(1);
104 next.tool_call_count = next
105 .tool_call_count
106 .saturating_add(reason_result.tool_calls.len() as u32);
107 if let Some(usage) = &reason_result.usage {
108 add_usage(&mut next.cumulative_usage, usage);
109 }
110 if next.time_to_first_token_ms.is_none() {
111 next.time_to_first_token_ms = reason_result.time_to_first_token_ms;
112 }
113 next.final_message_id = reason_result.output_message_id;
114 next.final_answer_preview = preview_final_answer(&reason_result.text);
115 next
116 }
117
118 fn duration_ms(&self) -> Option<u64> {
119 self.started_at
120 .map(|started_at| Utc::now().signed_duration_since(started_at))
121 .and_then(|duration| u64::try_from(duration.num_milliseconds()).ok())
122 }
123}
124
125fn classify_reason_failure(reason_result: &ReasonResult) -> UserFacingError {
126 if let Some(user_error) = &reason_result.user_facing_error {
130 return user_error.clone();
131 }
132
133 let from_text =
134 classify_runtime_error_message(&reason_result.text, &UserFacingErrorContext::default());
135
136 let Some(error) = reason_result.error.as_deref() else {
137 return from_text;
138 };
139
140 let from_error = classify_runtime_error_message(error, &UserFacingErrorContext::default());
141
142 if from_error.code == user_facing_error_codes::PROCESSING_ERROR {
143 return from_text;
144 }
145
146 if from_error.code == from_text.code
147 && from_error.fields.is_empty()
148 && !from_text.fields.is_empty()
149 {
150 return from_text;
151 }
152
153 from_error
154}
155
156pub async fn plan_next_host_turn<A: RuntimeHostAdapter>(
173 adapter: &A,
174 completed_activity: &str,
175 state: &RuntimeTurnState,
176 output: &serde_json::Value,
177 pending_user_message_count: usize,
178) -> Result<RuntimeTurnPlan> {
179 match completed_activity {
180 "process_input" => {
181 let turn_id: Option<TurnId> = output
182 .get("turn_id")
183 .and_then(|value| value.as_str())
184 .and_then(|value| value.parse().ok());
185 let next = RuntimeTurnState {
186 turn_id,
187 previous_response_id: None,
188 iteration: 1,
189 started_at: state.started_at.or_else(|| Some(Utc::now())),
190 ..state.clone()
191 };
192 debug!(session_id = %state.session_id, turn_id = ?turn_id, "planned reason step");
193 Ok(RuntimeTurnPlan::ScheduleReason(next))
194 }
195 "reason" => {
196 let reason_result: ReasonResult = serde_json::from_value(output.clone())
197 .map_err(|error| AgentLoopError::Internal(error.into()))?;
198 let response_id = reason_result.response_id.clone();
199 let summarized_state = state.with_reason_summary(&reason_result);
200
201 if reason_result.has_tool_calls && reason_result.success {
202 let session = adapter
203 .session_store(state.org_id)
204 .get_session(state.session_id)
205 .await?;
206 let session_blueprint_id = session.as_ref().and_then(|s| s.blueprint_id.clone());
207 let workspace_id = session.as_ref().map(|s| s.workspace_id);
210 let plan = RuntimeActPlan {
211 input: ActInput {
212 org_id: Some(state.org_id),
213 context: AtomContext {
214 session_id: state.session_id,
215 turn_id: state.turn_id.unwrap_or_default(),
216 input_message_id: state.input_message_id,
217 exec_id: ExecId::new(),
218 workspace_id,
219 },
220 harness_id: state.harness_id,
221 agent_id: state.agent_id,
222 tool_calls: reason_result.tool_calls,
223 tool_definitions: reason_result.tool_definitions,
224 locale: reason_result.locale,
225 blueprint_id: session_blueprint_id,
226 network_access: reason_result.network_access,
227 parallel_tool_calls: None,
231 },
232 previous_response_id: response_id,
233 iteration: state.iteration,
234 request_id: state.request_id.clone(),
235 resume_state: Box::new(summarized_state),
236 };
237 return Ok(RuntimeTurnPlan::ScheduleAct(plan));
238 }
239
240 if reason_result.success && pending_user_message_count > 0 {
241 if pending_user_message_count > 1 {
242 info!(
243 session_id = %state.session_id,
244 pending_user_message_count,
245 "multiple steering messages arrived during turn"
246 );
247 }
248
249 let next = RuntimeTurnState {
250 previous_response_id: response_id,
251 iteration: state.iteration.saturating_add(1),
252 ..summarized_state
253 };
254 return Ok(RuntimeTurnPlan::ScheduleReason(next));
255 }
256
257 let lifecycle =
258 RuntimeSessionLifecycle::new(adapter.clone(), state.org_id, state.session_id);
259 let turn_id = state.turn_id.unwrap_or_default();
260
261 if reason_result.success {
262 lifecycle
263 .emit_turn_completed(
264 state.input_message_id,
265 everruns_core::events::TurnCompletedData {
266 turn_id,
267 iterations: state.iteration,
268 duration_ms: summarized_state.duration_ms(),
269 usage: summarized_state.cumulative_usage.clone(),
270 input_content: None,
271 final_message_id: summarized_state.final_message_id,
272 final_answer_preview: summarized_state.final_answer_preview.clone(),
273 time_to_first_token_ms: summarized_state.time_to_first_token_ms,
274 tool_call_count: Some(summarized_state.tool_call_count),
275 llm_call_count: Some(summarized_state.llm_call_count),
276 status: Some("completed".to_string()),
277 },
278 )
279 .await;
280 lifecycle
281 .emit_session_idled(
282 turn_id,
283 state.input_message_id,
284 Some(state.iteration),
285 summarized_state.cumulative_usage.clone(),
286 )
287 .await;
288 } else {
289 let user_error = classify_reason_failure(&reason_result);
290 lifecycle
291 .turn_failed_with_disclosure(
292 turn_id,
293 state.input_message_id,
294 &reason_result.text,
295 Some(&user_error),
296 reason_result.error_disclosure,
297 )
298 .await;
299 }
300
301 lifecycle
304 .fire_turn_end_hooks(
305 state.harness_id,
306 state.agent_id,
307 turn_id,
308 reason_result.success,
309 )
310 .await;
311
312 Ok(RuntimeTurnPlan::Complete {
313 error: reason_result.error,
314 })
315 }
316 "act" => {
317 if output
318 .get("blocked")
319 .and_then(|value| value.as_bool())
320 .unwrap_or(false)
321 {
322 return Ok(RuntimeTurnPlan::Complete { error: None });
323 }
324
325 let waiting_for_tool_results = output
326 .get("waiting_for_tool_results")
327 .and_then(|value| value.as_bool())
328 .unwrap_or(false);
329 let should_pause_for_tool_results = waiting_for_tool_results
330 && setup_connection_hint_enabled(adapter, state.org_id, state.session_id).await;
331
332 let next = RuntimeTurnState {
333 iteration: state.iteration.saturating_add(1),
334 ..state.clone()
335 };
336
337 if should_pause_for_tool_results {
338 let lifecycle =
339 RuntimeSessionLifecycle::new(adapter.clone(), state.org_id, state.session_id);
340 lifecycle.waiting_for_tool_results().await;
341 return Ok(RuntimeTurnPlan::WaitForToolResults { resume: next });
342 }
343
344 if waiting_for_tool_results {
345 info!(
346 session_id = %state.session_id,
347 "setup_connection hint absent, continuing turn instead of pausing"
348 );
349 }
350
351 Ok(RuntimeTurnPlan::ScheduleReason(next))
352 }
353 other => Err(AgentLoopError::config(format!(
354 "Unknown activity type completed: {other}"
355 ))),
356 }
357}
358
359async fn setup_connection_hint_enabled<A: RuntimeHostAdapter>(
360 adapter: &A,
361 org_id: i64,
362 session_id: SessionId,
363) -> bool {
364 match adapter.session_store(org_id).get_session(session_id).await {
365 Ok(Some(session)) => {
366 let hints = Controls::resolve_hints(session.hints.as_ref(), None);
367 hints
368 .get("setup_connection")
369 .and_then(|value| value.as_bool())
370 .unwrap_or(false)
371 }
372 _ => false,
373 }
374}