Skip to main content

everruns_runtime/
turn_strategy.rs

1// Shared turn-strategy planning for embedded, durable, and custom hosts.
2// Decision: everruns-runtime stays durable-agnostic and returns generic next-step plans.
3
4use crate::{RuntimeHostAdapter, RuntimeSessionLifecycle};
5use chrono::{DateTime, Utc};
6use everruns_core::atoms::{ActInput, AtomContext};
7use everruns_core::error::{AgentLoopError, Result};
8use everruns_core::events::TokenUsage;
9use everruns_core::typed_id::{AgentId, ExecId, HarnessId, MessageId, SessionId, TurnId};
10use everruns_core::{
11    Controls, ReasonResult, UserFacingError, UserFacingErrorContext,
12    classify_runtime_error_message, user_facing_error_codes,
13};
14use serde::{Deserialize, Serialize};
15use tracing::{debug, info};
16
17/// Host-owned state carried across turn phases.
18///
19/// Durable hosts can persist this between activities; in-memory hosts can hold
20/// it directly in memory. The type itself is runtime-level and has no durable
21/// engine coupling.
22///
23/// Hosts are expected to serialize this however they want. `everruns-runtime`
24/// only defines the fields required to resume the next semantic step.
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct RuntimeTurnState {
27    pub org_id: i64,
28    pub session_id: SessionId,
29    pub harness_id: HarnessId,
30    pub agent_id: Option<AgentId>,
31    pub input_message_id: MessageId,
32    #[serde(skip_serializing_if = "Option::is_none")]
33    pub turn_id: Option<TurnId>,
34    #[serde(skip_serializing_if = "Option::is_none", default)]
35    pub previous_response_id: Option<String>,
36    #[serde(default = "default_iteration")]
37    pub iteration: u32,
38    #[serde(skip_serializing_if = "Option::is_none", default)]
39    pub request_id: Option<String>,
40    #[serde(skip_serializing_if = "Option::is_none", default)]
41    pub started_at: Option<DateTime<Utc>>,
42    #[serde(skip_serializing_if = "Option::is_none", default)]
43    pub cumulative_usage: Option<TokenUsage>,
44    #[serde(default)]
45    pub tool_call_count: u32,
46    #[serde(default)]
47    pub llm_call_count: u32,
48    #[serde(skip_serializing_if = "Option::is_none", default)]
49    pub time_to_first_token_ms: Option<u64>,
50    #[serde(skip_serializing_if = "Option::is_none", default)]
51    pub final_message_id: Option<MessageId>,
52    #[serde(skip_serializing_if = "Option::is_none", default)]
53    pub final_answer_preview: Option<String>,
54}
55
56fn default_iteration() -> u32 {
57    1
58}
59
60/// Runtime-owned act scheduling payload.
61///
62/// Hosts enqueue or execute this immediately using their own worker model.
63#[derive(Debug, Clone)]
64pub struct RuntimeActPlan {
65    pub input: ActInput,
66    pub previous_response_id: Option<String>,
67    pub iteration: u32,
68    pub request_id: Option<String>,
69    pub resume_state: Box<RuntimeTurnState>,
70}
71
72/// Generic next-step decision for a host turn.
73///
74/// This intentionally stops at the semantic boundary:
75/// - runtime decides what should happen next
76/// - the host decides how to persist, enqueue, retry, or resume it
77#[derive(Debug, Clone)]
78pub enum RuntimeTurnPlan {
79    ScheduleReason(RuntimeTurnState),
80    ScheduleAct(RuntimeActPlan),
81    Complete { error: Option<String> },
82    WaitForToolResults { resume: RuntimeTurnState },
83}
84
85fn preview_final_answer(text: &str) -> Option<String> {
86    if text.is_empty() {
87        return None;
88    }
89
90    Some(text.chars().take(2000).collect())
91}
92
93fn add_usage(current: &mut Option<TokenUsage>, next: &TokenUsage) {
94    match current {
95        Some(current) => current.add(next),
96        None => *current = Some(next.clone()),
97    }
98}
99
100impl RuntimeTurnState {
101    fn with_reason_summary(&self, reason_result: &ReasonResult) -> Self {
102        let mut next = self.clone();
103        next.llm_call_count = next.llm_call_count.saturating_add(1);
104        next.tool_call_count = next
105            .tool_call_count
106            .saturating_add(reason_result.tool_calls.len() as u32);
107        if let Some(usage) = &reason_result.usage {
108            add_usage(&mut next.cumulative_usage, usage);
109        }
110        if next.time_to_first_token_ms.is_none() {
111            next.time_to_first_token_ms = reason_result.time_to_first_token_ms;
112        }
113        next.final_message_id = reason_result.output_message_id;
114        next.final_answer_preview = preview_final_answer(&reason_result.text);
115        next
116    }
117
118    fn duration_ms(&self) -> Option<u64> {
119        self.started_at
120            .map(|started_at| Utc::now().signed_duration_since(started_at))
121            .and_then(|duration| u64::try_from(duration.num_milliseconds()).ok())
122    }
123}
124
125fn classify_reason_failure(reason_result: &ReasonResult) -> UserFacingError {
126    let from_text =
127        classify_runtime_error_message(&reason_result.text, &UserFacingErrorContext::default());
128
129    let Some(error) = reason_result.error.as_deref() else {
130        return from_text;
131    };
132
133    let from_error = classify_runtime_error_message(error, &UserFacingErrorContext::default());
134
135    if from_error.code == user_facing_error_codes::PROCESSING_ERROR {
136        return from_text;
137    }
138
139    if from_error.code == from_text.code
140        && from_error.fields.is_empty()
141        && !from_text.fields.is_empty()
142    {
143        return from_text;
144    }
145
146    from_error
147}
148
149/// Determine the next host step after an activity finishes.
150///
151/// The host provides:
152/// - `completed_activity`: which phase just finished
153/// - `state`: host-carried turn state
154/// - `output`: serialized activity output
155/// - `pending_user_message_count`: number of queued steering messages already consumed by the host
156///
157/// Runtime owns the semantic decision. Hosts translate the returned plan into
158/// their own queueing / persistence model.
159///
160/// Typical host mapping:
161/// - `ScheduleReason` => enqueue or invoke a reason phase with the returned state
162/// - `ScheduleAct` => enqueue or invoke an act phase with the returned payload
163/// - `WaitForToolResults` => persist the resume state until external tool input arrives
164/// - `Complete` => mark the host-owned workflow/session turn complete
165pub async fn plan_next_host_turn<A: RuntimeHostAdapter>(
166    adapter: &A,
167    completed_activity: &str,
168    state: &RuntimeTurnState,
169    output: &serde_json::Value,
170    pending_user_message_count: usize,
171) -> Result<RuntimeTurnPlan> {
172    match completed_activity {
173        "process_input" => {
174            let turn_id: Option<TurnId> = output
175                .get("turn_id")
176                .and_then(|value| value.as_str())
177                .and_then(|value| value.parse().ok());
178            let next = RuntimeTurnState {
179                turn_id,
180                previous_response_id: None,
181                iteration: 1,
182                started_at: state.started_at.or_else(|| Some(Utc::now())),
183                ..state.clone()
184            };
185            debug!(session_id = %state.session_id, turn_id = ?turn_id, "planned reason step");
186            Ok(RuntimeTurnPlan::ScheduleReason(next))
187        }
188        "reason" => {
189            let reason_result: ReasonResult = serde_json::from_value(output.clone())
190                .map_err(|error| AgentLoopError::Internal(error.into()))?;
191            let response_id = reason_result.response_id.clone();
192            let summarized_state = state.with_reason_summary(&reason_result);
193
194            if reason_result.has_tool_calls && reason_result.success {
195                let session_blueprint_id = adapter
196                    .session_store(state.org_id)
197                    .get_session(state.session_id)
198                    .await?
199                    .and_then(|session| session.blueprint_id);
200                let plan = RuntimeActPlan {
201                    input: ActInput {
202                        org_id: Some(state.org_id),
203                        context: AtomContext {
204                            session_id: state.session_id,
205                            turn_id: state.turn_id.unwrap_or_default(),
206                            input_message_id: state.input_message_id,
207                            exec_id: ExecId::new(),
208                        },
209                        harness_id: state.harness_id,
210                        agent_id: state.agent_id,
211                        tool_calls: reason_result.tool_calls,
212                        tool_definitions: reason_result.tool_definitions,
213                        locale: reason_result.locale,
214                        blueprint_id: session_blueprint_id,
215                        network_access: reason_result.network_access,
216                        // Request-level `parallel_tool_calls` is not yet plumbed
217                        // into the reason path; the act scheduler defaults to its
218                        // class-aware concurrent schedule.
219                        parallel_tool_calls: None,
220                    },
221                    previous_response_id: response_id,
222                    iteration: state.iteration,
223                    request_id: state.request_id.clone(),
224                    resume_state: Box::new(summarized_state),
225                };
226                return Ok(RuntimeTurnPlan::ScheduleAct(plan));
227            }
228
229            if reason_result.success && pending_user_message_count > 0 {
230                if pending_user_message_count > 1 {
231                    info!(
232                        session_id = %state.session_id,
233                        pending_user_message_count,
234                        "multiple steering messages arrived during turn"
235                    );
236                }
237
238                let next = RuntimeTurnState {
239                    previous_response_id: response_id,
240                    iteration: state.iteration.saturating_add(1),
241                    ..summarized_state
242                };
243                return Ok(RuntimeTurnPlan::ScheduleReason(next));
244            }
245
246            let lifecycle =
247                RuntimeSessionLifecycle::new(adapter.clone(), state.org_id, state.session_id);
248            let turn_id = state.turn_id.unwrap_or_default();
249
250            if reason_result.success {
251                lifecycle
252                    .emit_turn_completed(
253                        state.input_message_id,
254                        everruns_core::events::TurnCompletedData {
255                            turn_id,
256                            iterations: state.iteration,
257                            duration_ms: summarized_state.duration_ms(),
258                            usage: summarized_state.cumulative_usage.clone(),
259                            input_content: None,
260                            final_message_id: summarized_state.final_message_id,
261                            final_answer_preview: summarized_state.final_answer_preview.clone(),
262                            time_to_first_token_ms: summarized_state.time_to_first_token_ms,
263                            tool_call_count: Some(summarized_state.tool_call_count),
264                            llm_call_count: Some(summarized_state.llm_call_count),
265                            status: Some("completed".to_string()),
266                        },
267                    )
268                    .await;
269                lifecycle
270                    .emit_session_idled(
271                        turn_id,
272                        state.input_message_id,
273                        Some(state.iteration),
274                        summarized_state.cumulative_usage.clone(),
275                    )
276                    .await;
277            } else {
278                let user_error = classify_reason_failure(&reason_result);
279                lifecycle
280                    .turn_failed(
281                        turn_id,
282                        state.input_message_id,
283                        &reason_result.text,
284                        Some(&user_error),
285                    )
286                    .await;
287            }
288
289            // turn_end lifecycle hooks (advisory). Fired once the turn reaches a
290            // terminal reason outcome on the durable/strategy path.
291            lifecycle
292                .fire_turn_end_hooks(
293                    state.harness_id,
294                    state.agent_id,
295                    turn_id,
296                    reason_result.success,
297                )
298                .await;
299
300            Ok(RuntimeTurnPlan::Complete {
301                error: reason_result.error,
302            })
303        }
304        "act" => {
305            if output
306                .get("blocked")
307                .and_then(|value| value.as_bool())
308                .unwrap_or(false)
309            {
310                return Ok(RuntimeTurnPlan::Complete { error: None });
311            }
312
313            let waiting_for_tool_results = output
314                .get("waiting_for_tool_results")
315                .and_then(|value| value.as_bool())
316                .unwrap_or(false);
317            let should_pause_for_tool_results = waiting_for_tool_results
318                && setup_connection_hint_enabled(adapter, state.org_id, state.session_id).await;
319
320            let next = RuntimeTurnState {
321                iteration: state.iteration.saturating_add(1),
322                ..state.clone()
323            };
324
325            if should_pause_for_tool_results {
326                let lifecycle =
327                    RuntimeSessionLifecycle::new(adapter.clone(), state.org_id, state.session_id);
328                lifecycle.waiting_for_tool_results().await;
329                return Ok(RuntimeTurnPlan::WaitForToolResults { resume: next });
330            }
331
332            if waiting_for_tool_results {
333                info!(
334                    session_id = %state.session_id,
335                    "setup_connection hint absent, continuing turn instead of pausing"
336                );
337            }
338
339            Ok(RuntimeTurnPlan::ScheduleReason(next))
340        }
341        other => Err(AgentLoopError::config(format!(
342            "Unknown activity type completed: {other}"
343        ))),
344    }
345}
346
347async fn setup_connection_hint_enabled<A: RuntimeHostAdapter>(
348    adapter: &A,
349    org_id: i64,
350    session_id: SessionId,
351) -> bool {
352    match adapter.session_store(org_id).get_session(session_id).await {
353        Ok(Some(session)) => {
354            let hints = Controls::resolve_hints(session.hints.as_ref(), None);
355            hints
356                .get("setup_connection")
357                .and_then(|value| value.as_bool())
358                .unwrap_or(false)
359        }
360        _ => false,
361    }
362}