Skip to main content

everruns_runtime/
turn_strategy.rs

1// Shared turn-strategy planning for embedded, durable, and custom hosts.
2// Decision: everruns-runtime stays durable-agnostic and returns generic next-step plans.
3
4use crate::{RuntimeHostAdapter, RuntimeSessionLifecycle};
5use chrono::{DateTime, Utc};
6use everruns_core::atoms::{ActInput, AtomContext};
7use everruns_core::error::{AgentLoopError, Result};
8use everruns_core::events::TokenUsage;
9use everruns_core::typed_id::{AgentId, ExecId, HarnessId, MessageId, SessionId, TurnId};
10use everruns_core::{
11    Controls, ReasonResult, UserFacingError, UserFacingErrorContext,
12    classify_runtime_error_message, user_facing_error_codes,
13};
14use serde::{Deserialize, Serialize};
15use tracing::{debug, info};
16
17/// Host-owned state carried across turn phases.
18///
19/// Durable hosts can persist this between activities; in-memory hosts can hold
20/// it directly in memory. The type itself is runtime-level and has no durable
21/// engine coupling.
22///
23/// Hosts are expected to serialize this however they want. `everruns-runtime`
24/// only defines the fields required to resume the next semantic step.
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct RuntimeTurnState {
27    pub org_id: i64,
28    pub session_id: SessionId,
29    pub harness_id: HarnessId,
30    pub agent_id: Option<AgentId>,
31    pub input_message_id: MessageId,
32    #[serde(skip_serializing_if = "Option::is_none")]
33    pub turn_id: Option<TurnId>,
34    #[serde(skip_serializing_if = "Option::is_none", default)]
35    pub previous_response_id: Option<String>,
36    #[serde(default = "default_iteration")]
37    pub iteration: u32,
38    #[serde(skip_serializing_if = "Option::is_none", default)]
39    pub request_id: Option<String>,
40    #[serde(skip_serializing_if = "Option::is_none", default)]
41    pub started_at: Option<DateTime<Utc>>,
42    #[serde(skip_serializing_if = "Option::is_none", default)]
43    pub cumulative_usage: Option<TokenUsage>,
44    #[serde(default)]
45    pub tool_call_count: u32,
46    #[serde(default)]
47    pub llm_call_count: u32,
48    #[serde(skip_serializing_if = "Option::is_none", default)]
49    pub time_to_first_token_ms: Option<u64>,
50    #[serde(skip_serializing_if = "Option::is_none", default)]
51    pub final_message_id: Option<MessageId>,
52    #[serde(skip_serializing_if = "Option::is_none", default)]
53    pub final_answer_preview: Option<String>,
54}
55
56fn default_iteration() -> u32 {
57    1
58}
59
60/// Runtime-owned act scheduling payload.
61///
62/// Hosts enqueue or execute this immediately using their own worker model.
63#[derive(Debug, Clone)]
64pub struct RuntimeActPlan {
65    pub input: ActInput,
66    pub previous_response_id: Option<String>,
67    pub iteration: u32,
68    pub request_id: Option<String>,
69    pub resume_state: Box<RuntimeTurnState>,
70}
71
72/// Generic next-step decision for a host turn.
73///
74/// This intentionally stops at the semantic boundary:
75/// - runtime decides what should happen next
76/// - the host decides how to persist, enqueue, retry, or resume it
77#[derive(Debug, Clone)]
78pub enum RuntimeTurnPlan {
79    ScheduleReason(RuntimeTurnState),
80    ScheduleAct(RuntimeActPlan),
81    Complete { error: Option<String> },
82    WaitForToolResults { resume: RuntimeTurnState },
83}
84
85fn preview_final_answer(text: &str) -> Option<String> {
86    if text.is_empty() {
87        return None;
88    }
89
90    Some(text.chars().take(2000).collect())
91}
92
93fn add_usage(current: &mut Option<TokenUsage>, next: &TokenUsage) {
94    match current {
95        Some(current) => current.add(next),
96        None => *current = Some(next.clone()),
97    }
98}
99
100impl RuntimeTurnState {
101    fn with_reason_summary(&self, reason_result: &ReasonResult) -> Self {
102        let mut next = self.clone();
103        next.llm_call_count = next.llm_call_count.saturating_add(1);
104        next.tool_call_count = next
105            .tool_call_count
106            .saturating_add(reason_result.tool_calls.len() as u32);
107        if let Some(usage) = &reason_result.usage {
108            add_usage(&mut next.cumulative_usage, usage);
109        }
110        if next.time_to_first_token_ms.is_none() {
111            next.time_to_first_token_ms = reason_result.time_to_first_token_ms;
112        }
113        next.final_message_id = reason_result.output_message_id;
114        next.final_answer_preview = preview_final_answer(&reason_result.text);
115        next
116    }
117
118    fn duration_ms(&self) -> Option<u64> {
119        self.started_at
120            .map(|started_at| Utc::now().signed_duration_since(started_at))
121            .and_then(|duration| u64::try_from(duration.num_milliseconds()).ok())
122    }
123}
124
125fn classify_reason_failure(reason_result: &ReasonResult) -> UserFacingError {
126    // The reason atom already classified and disclosure-filtered the failure.
127    // Reuse it so the turn.failed event matches what the session message
128    // showed; re-classifying strings here could leak past a generic mode.
129    if let Some(user_error) = &reason_result.user_facing_error {
130        return user_error.clone();
131    }
132
133    let from_text =
134        classify_runtime_error_message(&reason_result.text, &UserFacingErrorContext::default());
135
136    let Some(error) = reason_result.error.as_deref() else {
137        return from_text;
138    };
139
140    let from_error = classify_runtime_error_message(error, &UserFacingErrorContext::default());
141
142    if from_error.code == user_facing_error_codes::PROCESSING_ERROR {
143        return from_text;
144    }
145
146    if from_error.code == from_text.code
147        && from_error.fields.is_empty()
148        && !from_text.fields.is_empty()
149    {
150        return from_text;
151    }
152
153    from_error
154}
155
156/// Determine the next host step after an activity finishes.
157///
158/// The host provides:
159/// - `completed_activity`: which phase just finished
160/// - `state`: host-carried turn state
161/// - `output`: serialized activity output
162/// - `pending_user_message_count`: number of queued steering messages already consumed by the host
163///
164/// Runtime owns the semantic decision. Hosts translate the returned plan into
165/// their own queueing / persistence model.
166///
167/// Typical host mapping:
168/// - `ScheduleReason` => enqueue or invoke a reason phase with the returned state
169/// - `ScheduleAct` => enqueue or invoke an act phase with the returned payload
170/// - `WaitForToolResults` => persist the resume state until external tool input arrives
171/// - `Complete` => mark the host-owned workflow/session turn complete
172pub async fn plan_next_host_turn<A: RuntimeHostAdapter>(
173    adapter: &A,
174    completed_activity: &str,
175    state: &RuntimeTurnState,
176    output: &serde_json::Value,
177    pending_user_message_count: usize,
178) -> Result<RuntimeTurnPlan> {
179    match completed_activity {
180        "process_input" => {
181            let turn_id: Option<TurnId> = output
182                .get("turn_id")
183                .and_then(|value| value.as_str())
184                .and_then(|value| value.parse().ok());
185            let next = RuntimeTurnState {
186                turn_id,
187                previous_response_id: None,
188                iteration: 1,
189                started_at: state.started_at.or_else(|| Some(Utc::now())),
190                ..state.clone()
191            };
192            debug!(session_id = %state.session_id, turn_id = ?turn_id, "planned reason step");
193            Ok(RuntimeTurnPlan::ScheduleReason(next))
194        }
195        "reason" => {
196            let reason_result: ReasonResult = serde_json::from_value(output.clone())
197                .map_err(|error| AgentLoopError::Internal(error.into()))?;
198            let response_id = reason_result.response_id.clone();
199            let summarized_state = state.with_reason_summary(&reason_result);
200
201            if reason_result.has_tool_calls && reason_result.success {
202                let session = adapter
203                    .session_store(state.org_id)
204                    .get_session(state.session_id)
205                    .await?;
206                let session_blueprint_id = session.as_ref().and_then(|s| s.blueprint_id.clone());
207                // Attach the session's workspace so tool file I/O addresses the
208                // (possibly shared) workspace, not the session's own keyspace.
209                let workspace_id = session.as_ref().map(|s| s.workspace_id);
210                let plan = RuntimeActPlan {
211                    input: ActInput {
212                        org_id: Some(state.org_id),
213                        context: AtomContext {
214                            session_id: state.session_id,
215                            turn_id: state.turn_id.unwrap_or_default(),
216                            input_message_id: state.input_message_id,
217                            exec_id: ExecId::new(),
218                            workspace_id,
219                        },
220                        harness_id: state.harness_id,
221                        agent_id: state.agent_id,
222                        tool_calls: reason_result.tool_calls,
223                        tool_definitions: reason_result.tool_definitions,
224                        locale: reason_result.locale,
225                        blueprint_id: session_blueprint_id,
226                        network_access: reason_result.network_access,
227                        // Request-level `parallel_tool_calls` is not yet plumbed
228                        // into the reason path; the act scheduler defaults to its
229                        // class-aware concurrent schedule.
230                        parallel_tool_calls: None,
231                    },
232                    previous_response_id: response_id,
233                    iteration: state.iteration,
234                    request_id: state.request_id.clone(),
235                    resume_state: Box::new(summarized_state),
236                };
237                return Ok(RuntimeTurnPlan::ScheduleAct(plan));
238            }
239
240            if reason_result.success && pending_user_message_count > 0 {
241                if pending_user_message_count > 1 {
242                    info!(
243                        session_id = %state.session_id,
244                        pending_user_message_count,
245                        "multiple steering messages arrived during turn"
246                    );
247                }
248
249                let next = RuntimeTurnState {
250                    previous_response_id: response_id,
251                    iteration: state.iteration.saturating_add(1),
252                    ..summarized_state
253                };
254                return Ok(RuntimeTurnPlan::ScheduleReason(next));
255            }
256
257            let lifecycle =
258                RuntimeSessionLifecycle::new(adapter.clone(), state.org_id, state.session_id);
259            let turn_id = state.turn_id.unwrap_or_default();
260
261            if reason_result.success {
262                lifecycle
263                    .emit_turn_completed(
264                        state.input_message_id,
265                        everruns_core::events::TurnCompletedData {
266                            turn_id,
267                            iterations: state.iteration,
268                            duration_ms: summarized_state.duration_ms(),
269                            usage: summarized_state.cumulative_usage.clone(),
270                            input_content: None,
271                            final_message_id: summarized_state.final_message_id,
272                            final_answer_preview: summarized_state.final_answer_preview.clone(),
273                            time_to_first_token_ms: summarized_state.time_to_first_token_ms,
274                            tool_call_count: Some(summarized_state.tool_call_count),
275                            llm_call_count: Some(summarized_state.llm_call_count),
276                            status: Some("completed".to_string()),
277                        },
278                    )
279                    .await;
280                lifecycle
281                    .emit_session_idled(
282                        turn_id,
283                        state.input_message_id,
284                        Some(state.iteration),
285                        summarized_state.cumulative_usage.clone(),
286                    )
287                    .await;
288            } else {
289                let user_error = classify_reason_failure(&reason_result);
290                lifecycle
291                    .turn_failed_with_disclosure(
292                        turn_id,
293                        state.input_message_id,
294                        &reason_result.text,
295                        Some(&user_error),
296                        reason_result.error_disclosure,
297                    )
298                    .await;
299            }
300
301            // turn_end lifecycle hooks (advisory). Fired once the turn reaches a
302            // terminal reason outcome on the durable/strategy path.
303            lifecycle
304                .fire_turn_end_hooks(
305                    state.harness_id,
306                    state.agent_id,
307                    turn_id,
308                    reason_result.success,
309                )
310                .await;
311
312            Ok(RuntimeTurnPlan::Complete {
313                error: reason_result.error,
314            })
315        }
316        "act" => {
317            if output
318                .get("blocked")
319                .and_then(|value| value.as_bool())
320                .unwrap_or(false)
321            {
322                return Ok(RuntimeTurnPlan::Complete { error: None });
323            }
324
325            let waiting_for_tool_results = output
326                .get("waiting_for_tool_results")
327                .and_then(|value| value.as_bool())
328                .unwrap_or(false);
329            let should_pause_for_tool_results = waiting_for_tool_results
330                && setup_connection_hint_enabled(adapter, state.org_id, state.session_id).await;
331
332            let next = RuntimeTurnState {
333                iteration: state.iteration.saturating_add(1),
334                ..state.clone()
335            };
336
337            if should_pause_for_tool_results {
338                let lifecycle =
339                    RuntimeSessionLifecycle::new(adapter.clone(), state.org_id, state.session_id);
340                lifecycle.waiting_for_tool_results().await;
341                return Ok(RuntimeTurnPlan::WaitForToolResults { resume: next });
342            }
343
344            if waiting_for_tool_results {
345                info!(
346                    session_id = %state.session_id,
347                    "setup_connection hint absent, continuing turn instead of pausing"
348                );
349            }
350
351            Ok(RuntimeTurnPlan::ScheduleReason(next))
352        }
353        other => Err(AgentLoopError::config(format!(
354            "Unknown activity type completed: {other}"
355        ))),
356    }
357}
358
359async fn setup_connection_hint_enabled<A: RuntimeHostAdapter>(
360    adapter: &A,
361    org_id: i64,
362    session_id: SessionId,
363) -> bool {
364    match adapter.session_store(org_id).get_session(session_id).await {
365        Ok(Some(session)) => {
366            let hints = Controls::resolve_hints(session.hints.as_ref(), None);
367            hints
368                .get("setup_connection")
369                .and_then(|value| value.as_bool())
370                .unwrap_or(false)
371        }
372        _ => false,
373    }
374}