1use crate::{RuntimeHostAdapter, RuntimeSessionLifecycle};
5use chrono::{DateTime, Utc};
6use everruns_core::atoms::{ActInput, AtomContext};
7use everruns_core::error::{AgentLoopError, Result};
8use everruns_core::events::TokenUsage;
9use everruns_core::typed_id::{AgentId, ExecId, HarnessId, MessageId, SessionId, TurnId};
10use everruns_core::{
11 Controls, ReasonResult, UserFacingError, UserFacingErrorContext,
12 classify_runtime_error_message, user_facing_error_codes,
13};
14use serde::{Deserialize, Serialize};
15use tracing::{debug, info};
16
17#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct RuntimeTurnState {
27 pub org_id: i64,
28 pub session_id: SessionId,
29 pub harness_id: HarnessId,
30 pub agent_id: Option<AgentId>,
31 pub input_message_id: MessageId,
32 #[serde(skip_serializing_if = "Option::is_none")]
33 pub turn_id: Option<TurnId>,
34 #[serde(skip_serializing_if = "Option::is_none", default)]
35 pub previous_response_id: Option<String>,
36 #[serde(default = "default_iteration")]
37 pub iteration: u32,
38 #[serde(skip_serializing_if = "Option::is_none", default)]
39 pub request_id: Option<String>,
40 #[serde(skip_serializing_if = "Option::is_none", default)]
41 pub started_at: Option<DateTime<Utc>>,
42 #[serde(skip_serializing_if = "Option::is_none", default)]
43 pub cumulative_usage: Option<TokenUsage>,
44 #[serde(default)]
45 pub tool_call_count: u32,
46 #[serde(default)]
47 pub llm_call_count: u32,
48 #[serde(skip_serializing_if = "Option::is_none", default)]
49 pub time_to_first_token_ms: Option<u64>,
50 #[serde(skip_serializing_if = "Option::is_none", default)]
51 pub final_message_id: Option<MessageId>,
52 #[serde(skip_serializing_if = "Option::is_none", default)]
53 pub final_answer_preview: Option<String>,
54}
55
56fn default_iteration() -> u32 {
57 1
58}
59
60#[derive(Debug, Clone)]
64pub struct RuntimeActPlan {
65 pub input: ActInput,
66 pub previous_response_id: Option<String>,
67 pub iteration: u32,
68 pub request_id: Option<String>,
69 pub resume_state: Box<RuntimeTurnState>,
70}
71
72#[derive(Debug, Clone)]
78pub enum RuntimeTurnPlan {
79 ScheduleReason(RuntimeTurnState),
80 ScheduleAct(RuntimeActPlan),
81 Complete { error: Option<String> },
82 WaitForToolResults { resume: RuntimeTurnState },
83}
84
85fn preview_final_answer(text: &str) -> Option<String> {
86 if text.is_empty() {
87 return None;
88 }
89
90 Some(text.chars().take(2000).collect())
91}
92
93fn add_usage(current: &mut Option<TokenUsage>, next: &TokenUsage) {
94 match current {
95 Some(current) => current.add(next),
96 None => *current = Some(next.clone()),
97 }
98}
99
100impl RuntimeTurnState {
101 fn with_reason_summary(&self, reason_result: &ReasonResult) -> Self {
102 let mut next = self.clone();
103 next.llm_call_count = next.llm_call_count.saturating_add(1);
104 next.tool_call_count = next
105 .tool_call_count
106 .saturating_add(reason_result.tool_calls.len() as u32);
107 if let Some(usage) = &reason_result.usage {
108 add_usage(&mut next.cumulative_usage, usage);
109 }
110 if next.time_to_first_token_ms.is_none() {
111 next.time_to_first_token_ms = reason_result.time_to_first_token_ms;
112 }
113 next.final_message_id = reason_result.output_message_id;
114 next.final_answer_preview = preview_final_answer(&reason_result.text);
115 next
116 }
117
118 fn duration_ms(&self) -> Option<u64> {
119 self.started_at
120 .map(|started_at| Utc::now().signed_duration_since(started_at))
121 .and_then(|duration| u64::try_from(duration.num_milliseconds()).ok())
122 }
123}
124
125fn classify_reason_failure(reason_result: &ReasonResult) -> UserFacingError {
126 let from_text =
127 classify_runtime_error_message(&reason_result.text, &UserFacingErrorContext::default());
128
129 let Some(error) = reason_result.error.as_deref() else {
130 return from_text;
131 };
132
133 let from_error = classify_runtime_error_message(error, &UserFacingErrorContext::default());
134
135 if from_error.code == user_facing_error_codes::PROCESSING_ERROR {
136 return from_text;
137 }
138
139 if from_error.code == from_text.code
140 && from_error.fields.is_empty()
141 && !from_text.fields.is_empty()
142 {
143 return from_text;
144 }
145
146 from_error
147}
148
149pub async fn plan_next_host_turn<A: RuntimeHostAdapter>(
166 adapter: &A,
167 completed_activity: &str,
168 state: &RuntimeTurnState,
169 output: &serde_json::Value,
170 pending_user_message_count: usize,
171) -> Result<RuntimeTurnPlan> {
172 match completed_activity {
173 "process_input" => {
174 let turn_id: Option<TurnId> = output
175 .get("turn_id")
176 .and_then(|value| value.as_str())
177 .and_then(|value| value.parse().ok());
178 let next = RuntimeTurnState {
179 turn_id,
180 previous_response_id: None,
181 iteration: 1,
182 started_at: state.started_at.or_else(|| Some(Utc::now())),
183 ..state.clone()
184 };
185 debug!(session_id = %state.session_id, turn_id = ?turn_id, "planned reason step");
186 Ok(RuntimeTurnPlan::ScheduleReason(next))
187 }
188 "reason" => {
189 let reason_result: ReasonResult = serde_json::from_value(output.clone())
190 .map_err(|error| AgentLoopError::Internal(error.into()))?;
191 let response_id = reason_result.response_id.clone();
192 let summarized_state = state.with_reason_summary(&reason_result);
193
194 if reason_result.has_tool_calls && reason_result.success {
195 let session_blueprint_id = adapter
196 .session_store(state.org_id)
197 .get_session(state.session_id)
198 .await?
199 .and_then(|session| session.blueprint_id);
200 let plan = RuntimeActPlan {
201 input: ActInput {
202 org_id: Some(state.org_id),
203 context: AtomContext {
204 session_id: state.session_id,
205 turn_id: state.turn_id.unwrap_or_default(),
206 input_message_id: state.input_message_id,
207 exec_id: ExecId::new(),
208 },
209 harness_id: state.harness_id,
210 agent_id: state.agent_id,
211 tool_calls: reason_result.tool_calls,
212 tool_definitions: reason_result.tool_definitions,
213 locale: reason_result.locale,
214 blueprint_id: session_blueprint_id,
215 network_access: reason_result.network_access,
216 parallel_tool_calls: None,
220 },
221 previous_response_id: response_id,
222 iteration: state.iteration,
223 request_id: state.request_id.clone(),
224 resume_state: Box::new(summarized_state),
225 };
226 return Ok(RuntimeTurnPlan::ScheduleAct(plan));
227 }
228
229 if reason_result.success && pending_user_message_count > 0 {
230 if pending_user_message_count > 1 {
231 info!(
232 session_id = %state.session_id,
233 pending_user_message_count,
234 "multiple steering messages arrived during turn"
235 );
236 }
237
238 let next = RuntimeTurnState {
239 previous_response_id: response_id,
240 iteration: state.iteration.saturating_add(1),
241 ..summarized_state
242 };
243 return Ok(RuntimeTurnPlan::ScheduleReason(next));
244 }
245
246 let lifecycle =
247 RuntimeSessionLifecycle::new(adapter.clone(), state.org_id, state.session_id);
248 let turn_id = state.turn_id.unwrap_or_default();
249
250 if reason_result.success {
251 lifecycle
252 .emit_turn_completed(
253 state.input_message_id,
254 everruns_core::events::TurnCompletedData {
255 turn_id,
256 iterations: state.iteration,
257 duration_ms: summarized_state.duration_ms(),
258 usage: summarized_state.cumulative_usage.clone(),
259 input_content: None,
260 final_message_id: summarized_state.final_message_id,
261 final_answer_preview: summarized_state.final_answer_preview.clone(),
262 time_to_first_token_ms: summarized_state.time_to_first_token_ms,
263 tool_call_count: Some(summarized_state.tool_call_count),
264 llm_call_count: Some(summarized_state.llm_call_count),
265 status: Some("completed".to_string()),
266 },
267 )
268 .await;
269 lifecycle
270 .emit_session_idled(
271 turn_id,
272 state.input_message_id,
273 Some(state.iteration),
274 summarized_state.cumulative_usage.clone(),
275 )
276 .await;
277 } else {
278 let user_error = classify_reason_failure(&reason_result);
279 lifecycle
280 .turn_failed(
281 turn_id,
282 state.input_message_id,
283 &reason_result.text,
284 Some(&user_error),
285 )
286 .await;
287 }
288
289 lifecycle
292 .fire_turn_end_hooks(
293 state.harness_id,
294 state.agent_id,
295 turn_id,
296 reason_result.success,
297 )
298 .await;
299
300 Ok(RuntimeTurnPlan::Complete {
301 error: reason_result.error,
302 })
303 }
304 "act" => {
305 if output
306 .get("blocked")
307 .and_then(|value| value.as_bool())
308 .unwrap_or(false)
309 {
310 return Ok(RuntimeTurnPlan::Complete { error: None });
311 }
312
313 let waiting_for_tool_results = output
314 .get("waiting_for_tool_results")
315 .and_then(|value| value.as_bool())
316 .unwrap_or(false);
317 let should_pause_for_tool_results = waiting_for_tool_results
318 && setup_connection_hint_enabled(adapter, state.org_id, state.session_id).await;
319
320 let next = RuntimeTurnState {
321 iteration: state.iteration.saturating_add(1),
322 ..state.clone()
323 };
324
325 if should_pause_for_tool_results {
326 let lifecycle =
327 RuntimeSessionLifecycle::new(adapter.clone(), state.org_id, state.session_id);
328 lifecycle.waiting_for_tool_results().await;
329 return Ok(RuntimeTurnPlan::WaitForToolResults { resume: next });
330 }
331
332 if waiting_for_tool_results {
333 info!(
334 session_id = %state.session_id,
335 "setup_connection hint absent, continuing turn instead of pausing"
336 );
337 }
338
339 Ok(RuntimeTurnPlan::ScheduleReason(next))
340 }
341 other => Err(AgentLoopError::config(format!(
342 "Unknown activity type completed: {other}"
343 ))),
344 }
345}
346
347async fn setup_connection_hint_enabled<A: RuntimeHostAdapter>(
348 adapter: &A,
349 org_id: i64,
350 session_id: SessionId,
351) -> bool {
352 match adapter.session_store(org_id).get_session(session_id).await {
353 Ok(Some(session)) => {
354 let hints = Controls::resolve_hints(session.hints.as_ref(), None);
355 hints
356 .get("setup_connection")
357 .and_then(|value| value.as_bool())
358 .unwrap_or(false)
359 }
360 _ => false,
361 }
362}