Skip to main content

everruns_runtime/
turn_strategy.rs

1// Shared turn-strategy planning for embedded, durable, and custom hosts.
2// Decision: everruns-runtime stays durable-agnostic and returns generic next-step plans.
3
4use crate::{RuntimeHostAdapter, RuntimeSessionLifecycle};
5use everruns_core::atoms::{ActInput, AtomContext};
6use everruns_core::error::{AgentLoopError, Result};
7use everruns_core::typed_id::{AgentId, ExecId, HarnessId, MessageId, SessionId, TurnId};
8use everruns_core::{
9    Controls, ReasonResult, UserFacingError, UserFacingErrorContext,
10    classify_runtime_error_message, user_facing_error_codes,
11};
12use serde::{Deserialize, Serialize};
13use tracing::{debug, info};
14
15/// Host-owned state carried across turn phases.
16///
17/// Durable hosts can persist this between activities; in-memory hosts can hold
18/// it directly in memory. The type itself is runtime-level and has no durable
19/// engine coupling.
20///
21/// Hosts are expected to serialize this however they want. `everruns-runtime`
22/// only defines the fields required to resume the next semantic step.
23#[derive(Debug, Clone, Serialize, Deserialize)]
24pub struct RuntimeTurnState {
25    pub org_id: i64,
26    pub session_id: SessionId,
27    pub harness_id: HarnessId,
28    pub agent_id: Option<AgentId>,
29    pub input_message_id: MessageId,
30    #[serde(skip_serializing_if = "Option::is_none")]
31    pub turn_id: Option<TurnId>,
32    #[serde(skip_serializing_if = "Option::is_none", default)]
33    pub previous_response_id: Option<String>,
34    #[serde(default = "default_iteration")]
35    pub iteration: u32,
36    #[serde(skip_serializing_if = "Option::is_none", default)]
37    pub request_id: Option<String>,
38}
39
40fn default_iteration() -> u32 {
41    1
42}
43
44/// Runtime-owned act scheduling payload.
45///
46/// Hosts enqueue or execute this immediately using their own worker model.
47#[derive(Debug, Clone)]
48pub struct RuntimeActPlan {
49    pub input: ActInput,
50    pub previous_response_id: Option<String>,
51    pub iteration: u32,
52    pub request_id: Option<String>,
53}
54
55/// Generic next-step decision for a host turn.
56///
57/// This intentionally stops at the semantic boundary:
58/// - runtime decides what should happen next
59/// - the host decides how to persist, enqueue, retry, or resume it
60#[derive(Debug, Clone)]
61pub enum RuntimeTurnPlan {
62    ScheduleReason(RuntimeTurnState),
63    ScheduleAct(RuntimeActPlan),
64    Complete { error: Option<String> },
65    WaitForToolResults { resume: RuntimeTurnState },
66}
67
68fn classify_reason_failure(reason_result: &ReasonResult) -> UserFacingError {
69    let from_text =
70        classify_runtime_error_message(&reason_result.text, &UserFacingErrorContext::default());
71
72    let Some(error) = reason_result.error.as_deref() else {
73        return from_text;
74    };
75
76    let from_error = classify_runtime_error_message(error, &UserFacingErrorContext::default());
77
78    if from_error.code == user_facing_error_codes::PROCESSING_ERROR {
79        return from_text;
80    }
81
82    if from_error.code == from_text.code
83        && from_error.fields.is_empty()
84        && !from_text.fields.is_empty()
85    {
86        return from_text;
87    }
88
89    from_error
90}
91
92/// Determine the next host step after an activity finishes.
93///
94/// The host provides:
95/// - `completed_activity`: which phase just finished
96/// - `state`: host-carried turn state
97/// - `output`: serialized activity output
98/// - `pending_user_message_count`: number of queued steering messages already consumed by the host
99///
100/// Runtime owns the semantic decision. Hosts translate the returned plan into
101/// their own queueing / persistence model.
102///
103/// Typical host mapping:
104/// - `ScheduleReason` => enqueue or invoke a reason phase with the returned state
105/// - `ScheduleAct` => enqueue or invoke an act phase with the returned payload
106/// - `WaitForToolResults` => persist the resume state until external tool input arrives
107/// - `Complete` => mark the host-owned workflow/session turn complete
108pub async fn plan_next_host_turn<A: RuntimeHostAdapter>(
109    adapter: &A,
110    completed_activity: &str,
111    state: &RuntimeTurnState,
112    output: &serde_json::Value,
113    pending_user_message_count: usize,
114) -> Result<RuntimeTurnPlan> {
115    match completed_activity {
116        "process_input" => {
117            let turn_id: Option<TurnId> = output
118                .get("turn_id")
119                .and_then(|value| value.as_str())
120                .and_then(|value| value.parse().ok());
121            let next = RuntimeTurnState {
122                turn_id,
123                previous_response_id: None,
124                iteration: 1,
125                ..state.clone()
126            };
127            debug!(session_id = %state.session_id, turn_id = ?turn_id, "planned reason step");
128            Ok(RuntimeTurnPlan::ScheduleReason(next))
129        }
130        "reason" => {
131            let reason_result: ReasonResult = serde_json::from_value(output.clone())
132                .map_err(|error| AgentLoopError::Internal(error.into()))?;
133            let response_id = reason_result.response_id.clone();
134
135            if reason_result.has_tool_calls && reason_result.success {
136                let session_blueprint_id = adapter
137                    .session_store(state.org_id)
138                    .get_session(state.session_id)
139                    .await?
140                    .and_then(|session| session.blueprint_id);
141                let plan = RuntimeActPlan {
142                    input: ActInput {
143                        org_id: Some(state.org_id),
144                        context: AtomContext {
145                            session_id: state.session_id,
146                            turn_id: state.turn_id.unwrap_or_default(),
147                            input_message_id: state.input_message_id,
148                            exec_id: ExecId::new(),
149                        },
150                        harness_id: state.harness_id,
151                        agent_id: state.agent_id,
152                        tool_calls: reason_result.tool_calls,
153                        tool_definitions: reason_result.tool_definitions,
154                        locale: reason_result.locale,
155                        blueprint_id: session_blueprint_id,
156                        network_access: reason_result.network_access,
157                    },
158                    previous_response_id: response_id,
159                    iteration: state.iteration,
160                    request_id: state.request_id.clone(),
161                };
162                return Ok(RuntimeTurnPlan::ScheduleAct(plan));
163            }
164
165            if reason_result.success && pending_user_message_count > 0 {
166                if pending_user_message_count > 1 {
167                    info!(
168                        session_id = %state.session_id,
169                        pending_user_message_count,
170                        "multiple steering messages arrived during turn"
171                    );
172                }
173
174                let next = RuntimeTurnState {
175                    previous_response_id: response_id,
176                    iteration: state.iteration.saturating_add(1),
177                    ..state.clone()
178                };
179                return Ok(RuntimeTurnPlan::ScheduleReason(next));
180            }
181
182            let lifecycle =
183                RuntimeSessionLifecycle::new(adapter.clone(), state.org_id, state.session_id);
184            let turn_id = state.turn_id.unwrap_or_default();
185
186            if reason_result.success {
187                lifecycle
188                    .emit_turn_completed(
189                        turn_id,
190                        state.input_message_id,
191                        state.iteration,
192                        reason_result.usage.clone(),
193                        None,
194                    )
195                    .await;
196                lifecycle
197                    .emit_session_idled(
198                        turn_id,
199                        state.input_message_id,
200                        Some(state.iteration),
201                        reason_result.usage.clone(),
202                    )
203                    .await;
204            } else {
205                let user_error = classify_reason_failure(&reason_result);
206                lifecycle
207                    .turn_failed(
208                        turn_id,
209                        state.input_message_id,
210                        &reason_result.text,
211                        Some(&user_error),
212                    )
213                    .await;
214            }
215
216            Ok(RuntimeTurnPlan::Complete {
217                error: reason_result.error,
218            })
219        }
220        "act" => {
221            if output
222                .get("blocked")
223                .and_then(|value| value.as_bool())
224                .unwrap_or(false)
225            {
226                return Ok(RuntimeTurnPlan::Complete { error: None });
227            }
228
229            let waiting_for_tool_results = output
230                .get("waiting_for_tool_results")
231                .and_then(|value| value.as_bool())
232                .unwrap_or(false);
233            let should_pause_for_tool_results = waiting_for_tool_results
234                && setup_connection_hint_enabled(adapter, state.org_id, state.session_id).await;
235
236            let next = RuntimeTurnState {
237                iteration: state.iteration.saturating_add(1),
238                ..state.clone()
239            };
240
241            if should_pause_for_tool_results {
242                let lifecycle =
243                    RuntimeSessionLifecycle::new(adapter.clone(), state.org_id, state.session_id);
244                lifecycle.waiting_for_tool_results().await;
245                return Ok(RuntimeTurnPlan::WaitForToolResults { resume: next });
246            }
247
248            if waiting_for_tool_results {
249                info!(
250                    session_id = %state.session_id,
251                    "setup_connection hint absent, continuing turn instead of pausing"
252                );
253            }
254
255            Ok(RuntimeTurnPlan::ScheduleReason(next))
256        }
257        other => Err(AgentLoopError::config(format!(
258            "Unknown activity type completed: {other}"
259        ))),
260    }
261}
262
263async fn setup_connection_hint_enabled<A: RuntimeHostAdapter>(
264    adapter: &A,
265    org_id: i64,
266    session_id: SessionId,
267) -> bool {
268    match adapter.session_store(org_id).get_session(session_id).await {
269        Ok(Some(session)) => {
270            let hints = Controls::resolve_hints(session.hints.as_ref(), None);
271            hints
272                .get("setup_connection")
273                .and_then(|value| value.as_bool())
274                .unwrap_or(false)
275        }
276        _ => false,
277    }
278}