Skip to main content

everruns_runtime/
turn_strategy.rs

1// Shared turn-strategy planning for embedded, durable, and custom hosts.
2// Decision: everruns-runtime stays durable-agnostic and returns generic next-step plans.
3
4use crate::{RuntimeHostAdapter, RuntimeSessionLifecycle};
5use chrono::{DateTime, Utc};
6use everruns_core::atoms::{ActInput, AtomContext};
7use everruns_core::error::{AgentLoopError, Result};
8use everruns_core::events::TokenUsage;
9use everruns_core::typed_id::{AgentId, ExecId, HarnessId, MessageId, SessionId, TurnId};
10use everruns_core::{
11    Controls, ReasonResult, UserFacingError, UserFacingErrorContext,
12    classify_runtime_error_message, user_facing_error_codes,
13};
14use serde::{Deserialize, Serialize};
15use tracing::{debug, info};
16
17/// Host-owned state carried across turn phases.
18///
19/// Durable hosts can persist this between activities; in-memory hosts can hold
20/// it directly in memory. The type itself is runtime-level and has no durable
21/// engine coupling.
22///
23/// Hosts are expected to serialize this however they want. `everruns-runtime`
24/// only defines the fields required to resume the next semantic step.
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct RuntimeTurnState {
27    pub org_id: i64,
28    pub session_id: SessionId,
29    pub harness_id: HarnessId,
30    pub agent_id: Option<AgentId>,
31    pub input_message_id: MessageId,
32    #[serde(skip_serializing_if = "Option::is_none")]
33    pub turn_id: Option<TurnId>,
34    #[serde(skip_serializing_if = "Option::is_none", default)]
35    pub previous_response_id: Option<String>,
36    #[serde(default = "default_iteration")]
37    pub iteration: u32,
38    #[serde(skip_serializing_if = "Option::is_none", default)]
39    pub request_id: Option<String>,
40    #[serde(skip_serializing_if = "Option::is_none", default)]
41    pub started_at: Option<DateTime<Utc>>,
42    #[serde(skip_serializing_if = "Option::is_none", default)]
43    pub cumulative_usage: Option<TokenUsage>,
44    #[serde(default)]
45    pub tool_call_count: u32,
46    #[serde(default)]
47    pub llm_call_count: u32,
48    #[serde(skip_serializing_if = "Option::is_none", default)]
49    pub time_to_first_token_ms: Option<u64>,
50    #[serde(skip_serializing_if = "Option::is_none", default)]
51    pub final_message_id: Option<MessageId>,
52    #[serde(skip_serializing_if = "Option::is_none", default)]
53    pub final_answer_preview: Option<String>,
54}
55
56fn default_iteration() -> u32 {
57    1
58}
59
60/// Runtime-owned act scheduling payload.
61///
62/// Hosts enqueue or execute this immediately using their own worker model.
63#[derive(Debug, Clone)]
64pub struct RuntimeActPlan {
65    pub input: ActInput,
66    pub previous_response_id: Option<String>,
67    pub iteration: u32,
68    pub request_id: Option<String>,
69    pub resume_state: Box<RuntimeTurnState>,
70}
71
72/// Generic next-step decision for a host turn.
73///
74/// This intentionally stops at the semantic boundary:
75/// - runtime decides what should happen next
76/// - the host decides how to persist, enqueue, retry, or resume it
77#[derive(Debug, Clone)]
78pub enum RuntimeTurnPlan {
79    ScheduleReason(RuntimeTurnState),
80    ScheduleAct(RuntimeActPlan),
81    Complete { error: Option<String> },
82    WaitForToolResults { resume: RuntimeTurnState },
83}
84
85fn preview_final_answer(text: &str) -> Option<String> {
86    if text.is_empty() {
87        return None;
88    }
89
90    Some(text.chars().take(2000).collect())
91}
92
93fn add_usage(current: &mut Option<TokenUsage>, next: &TokenUsage) {
94    match current {
95        Some(current) => current.add(next),
96        None => *current = Some(next.clone()),
97    }
98}
99
100impl RuntimeTurnState {
101    fn with_reason_summary(&self, reason_result: &ReasonResult) -> Self {
102        let mut next = self.clone();
103        next.llm_call_count = next.llm_call_count.saturating_add(1);
104        next.tool_call_count = next
105            .tool_call_count
106            .saturating_add(reason_result.tool_calls.len() as u32);
107        if let Some(usage) = &reason_result.usage {
108            add_usage(&mut next.cumulative_usage, usage);
109        }
110        if next.time_to_first_token_ms.is_none() {
111            next.time_to_first_token_ms = reason_result.time_to_first_token_ms;
112        }
113        next.final_message_id = reason_result.output_message_id;
114        next.final_answer_preview = preview_final_answer(&reason_result.text);
115        next
116    }
117
118    fn duration_ms(&self) -> Option<u64> {
119        self.started_at
120            .map(|started_at| Utc::now().signed_duration_since(started_at))
121            .and_then(|duration| u64::try_from(duration.num_milliseconds()).ok())
122    }
123}
124
125fn classify_reason_failure(reason_result: &ReasonResult) -> UserFacingError {
126    // The reason atom already classified and disclosure-filtered the failure.
127    // Reuse it so the turn.failed event matches what the session message
128    // showed; re-classifying strings here could leak past a generic mode.
129    if let Some(user_error) = &reason_result.user_facing_error {
130        return user_error.clone();
131    }
132
133    let from_text =
134        classify_runtime_error_message(&reason_result.text, &UserFacingErrorContext::default());
135
136    let Some(error) = reason_result.error.as_deref() else {
137        return from_text;
138    };
139
140    let from_error = classify_runtime_error_message(error, &UserFacingErrorContext::default());
141
142    if from_error.code == user_facing_error_codes::PROCESSING_ERROR {
143        return from_text;
144    }
145
146    if from_error.code == from_text.code
147        && from_error.fields.is_empty()
148        && !from_text.fields.is_empty()
149    {
150        return from_text;
151    }
152
153    from_error
154}
155
156/// Determine the next host step after an activity finishes.
157///
158/// The host provides:
159/// - `completed_activity`: which phase just finished
160/// - `state`: host-carried turn state
161/// - `output`: serialized activity output
162/// - `pending_user_message_count`: number of queued steering messages already consumed by the host
163///
164/// Runtime owns the semantic decision. Hosts translate the returned plan into
165/// their own queueing / persistence model.
166///
167/// Typical host mapping:
168/// - `ScheduleReason` => enqueue or invoke a reason phase with the returned state
169/// - `ScheduleAct` => enqueue or invoke an act phase with the returned payload
170/// - `WaitForToolResults` => persist the resume state until external tool input arrives
171/// - `Complete` => mark the host-owned workflow/session turn complete
172pub async fn plan_next_host_turn<A: RuntimeHostAdapter>(
173    adapter: &A,
174    completed_activity: &str,
175    state: &RuntimeTurnState,
176    output: &serde_json::Value,
177    pending_user_message_count: usize,
178) -> Result<RuntimeTurnPlan> {
179    match completed_activity {
180        "process_input" => {
181            let turn_id: Option<TurnId> = output
182                .get("turn_id")
183                .and_then(|value| value.as_str())
184                .and_then(|value| value.parse().ok());
185            let next = RuntimeTurnState {
186                turn_id,
187                previous_response_id: None,
188                iteration: 1,
189                started_at: state.started_at.or_else(|| Some(Utc::now())),
190                ..state.clone()
191            };
192            debug!(session_id = %state.session_id, turn_id = ?turn_id, "planned reason step");
193            Ok(RuntimeTurnPlan::ScheduleReason(next))
194        }
195        "reason" => {
196            let reason_result: ReasonResult = serde_json::from_value(output.clone())
197                .map_err(|error| AgentLoopError::Internal(error.into()))?;
198            let response_id = reason_result.response_id.clone();
199            let summarized_state = state.with_reason_summary(&reason_result);
200
201            if reason_result.has_tool_calls && reason_result.success {
202                let session = adapter
203                    .session_store(state.org_id)
204                    .get_session(state.session_id)
205                    .await?;
206                let session_blueprint_id = session.as_ref().and_then(|s| s.blueprint_id.clone());
207                // Attach the session's workspace so tool file I/O addresses the
208                // (possibly shared) workspace, not the session's own keyspace.
209                let workspace_id = session.as_ref().map(|s| s.workspace_id);
210                let plan = RuntimeActPlan {
211                    input: ActInput {
212                        org_id: Some(state.org_id),
213                        context: AtomContext {
214                            session_id: state.session_id,
215                            turn_id: state.turn_id.unwrap_or_default(),
216                            input_message_id: state.input_message_id,
217                            exec_id: ExecId::new(),
218                            workspace_id,
219                        },
220                        harness_id: state.harness_id,
221                        agent_id: state.agent_id,
222                        tool_calls: reason_result.tool_calls,
223                        tool_definitions: reason_result.tool_definitions,
224                        locale: reason_result.locale,
225                        blueprint_id: session_blueprint_id,
226                        network_access: reason_result.network_access,
227                        // Request-level parallel tool calling preference, carried
228                        // from agent config through the reason path (EVE-598).
229                        parallel_tool_calls: reason_result.parallel_tool_calls,
230                    },
231                    previous_response_id: response_id,
232                    iteration: state.iteration,
233                    request_id: state.request_id.clone(),
234                    resume_state: Box::new(summarized_state),
235                };
236                return Ok(RuntimeTurnPlan::ScheduleAct(plan));
237            }
238
239            if reason_result.success && pending_user_message_count > 0 {
240                if pending_user_message_count > 1 {
241                    info!(
242                        session_id = %state.session_id,
243                        pending_user_message_count,
244                        "multiple steering messages arrived during turn"
245                    );
246                }
247
248                let next = RuntimeTurnState {
249                    previous_response_id: response_id,
250                    iteration: state.iteration.saturating_add(1),
251                    ..summarized_state
252                };
253                return Ok(RuntimeTurnPlan::ScheduleReason(next));
254            }
255
256            let lifecycle =
257                RuntimeSessionLifecycle::new(adapter.clone(), state.org_id, state.session_id);
258            let turn_id = state.turn_id.unwrap_or_default();
259
260            if reason_result.success {
261                lifecycle
262                    .emit_turn_completed(
263                        state.input_message_id,
264                        everruns_core::events::TurnCompletedData {
265                            turn_id,
266                            iterations: state.iteration,
267                            duration_ms: summarized_state.duration_ms(),
268                            usage: summarized_state.cumulative_usage.clone(),
269                            input_content: None,
270                            final_message_id: summarized_state.final_message_id,
271                            final_answer_preview: summarized_state.final_answer_preview.clone(),
272                            time_to_first_token_ms: summarized_state.time_to_first_token_ms,
273                            tool_call_count: Some(summarized_state.tool_call_count),
274                            llm_call_count: Some(summarized_state.llm_call_count),
275                            status: Some("completed".to_string()),
276                        },
277                    )
278                    .await;
279                lifecycle
280                    .emit_session_idled(
281                        turn_id,
282                        state.input_message_id,
283                        Some(state.iteration),
284                        summarized_state.cumulative_usage.clone(),
285                    )
286                    .await;
287            } else {
288                let user_error = classify_reason_failure(&reason_result);
289                lifecycle
290                    .turn_failed_with_disclosure(
291                        turn_id,
292                        state.input_message_id,
293                        &reason_result.text,
294                        Some(&user_error),
295                        reason_result.error_disclosure,
296                    )
297                    .await;
298            }
299
300            // turn_end lifecycle hooks (advisory). Fired once the turn reaches a
301            // terminal reason outcome on the durable/strategy path.
302            lifecycle
303                .fire_turn_end_hooks(
304                    state.harness_id,
305                    state.agent_id,
306                    turn_id,
307                    reason_result.success,
308                )
309                .await;
310
311            Ok(RuntimeTurnPlan::Complete {
312                error: reason_result.error,
313            })
314        }
315        "act" => {
316            if output
317                .get("blocked")
318                .and_then(|value| value.as_bool())
319                .unwrap_or(false)
320            {
321                return Ok(RuntimeTurnPlan::Complete { error: None });
322            }
323
324            let waiting_for_tool_results = output
325                .get("waiting_for_tool_results")
326                .and_then(|value| value.as_bool())
327                .unwrap_or(false);
328            let should_pause_for_tool_results = waiting_for_tool_results
329                && setup_connection_hint_enabled(adapter, state.org_id, state.session_id).await;
330
331            let next = RuntimeTurnState {
332                iteration: state.iteration.saturating_add(1),
333                ..state.clone()
334            };
335
336            if should_pause_for_tool_results {
337                let lifecycle =
338                    RuntimeSessionLifecycle::new(adapter.clone(), state.org_id, state.session_id);
339                lifecycle.waiting_for_tool_results().await;
340                return Ok(RuntimeTurnPlan::WaitForToolResults { resume: next });
341            }
342
343            if waiting_for_tool_results {
344                info!(
345                    session_id = %state.session_id,
346                    "setup_connection hint absent, continuing turn instead of pausing"
347                );
348            }
349
350            Ok(RuntimeTurnPlan::ScheduleReason(next))
351        }
352        other => Err(AgentLoopError::config(format!(
353            "Unknown activity type completed: {other}"
354        ))),
355    }
356}
357
358async fn setup_connection_hint_enabled<A: RuntimeHostAdapter>(
359    adapter: &A,
360    org_id: i64,
361    session_id: SessionId,
362) -> bool {
363    match adapter.session_store(org_id).get_session(session_id).await {
364        Ok(Some(session)) => {
365            let hints = Controls::resolve_hints(session.hints.as_ref(), None);
366            hints
367                .get("setup_connection")
368                .and_then(|value| value.as_bool())
369                .unwrap_or(false)
370        }
371        _ => false,
372    }
373}