Skip to main content

everruns_runtime/
turn_strategy.rs

1// Shared turn-strategy planning for embedded, durable, and custom hosts.
2// Decision: everruns-runtime stays durable-agnostic and returns generic next-step plans.
3
4use crate::{RuntimeHostAdapter, RuntimeSessionLifecycle};
5use chrono::{DateTime, Utc};
6use everruns_core::atoms::{ActInput, AtomContext};
7use everruns_core::error::{AgentLoopError, Result};
8use everruns_core::events::TokenUsage;
9use everruns_core::typed_id::{AgentId, ExecId, HarnessId, MessageId, SessionId, TurnId};
10use everruns_core::{
11    Controls, ReasonResult, UserFacingError, UserFacingErrorContext,
12    classify_runtime_error_message, user_facing_error_codes,
13};
14use serde::{Deserialize, Serialize};
15use tracing::{debug, info};
16
17/// Host-owned state carried across turn phases.
18///
19/// Durable hosts can persist this between activities; in-memory hosts can hold
20/// it directly in memory. The type itself is runtime-level and has no durable
21/// engine coupling.
22///
23/// Hosts are expected to serialize this however they want. `everruns-runtime`
24/// only defines the fields required to resume the next semantic step.
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct RuntimeTurnState {
27    pub org_id: i64,
28    pub session_id: SessionId,
29    pub harness_id: HarnessId,
30    pub agent_id: Option<AgentId>,
31    pub input_message_id: MessageId,
32    #[serde(skip_serializing_if = "Option::is_none")]
33    pub turn_id: Option<TurnId>,
34    #[serde(skip_serializing_if = "Option::is_none", default)]
35    pub previous_response_id: Option<String>,
36    #[serde(default = "default_iteration")]
37    pub iteration: u32,
38    #[serde(skip_serializing_if = "Option::is_none", default)]
39    pub request_id: Option<String>,
40    #[serde(skip_serializing_if = "Option::is_none", default)]
41    pub started_at: Option<DateTime<Utc>>,
42    #[serde(skip_serializing_if = "Option::is_none", default)]
43    pub cumulative_usage: Option<TokenUsage>,
44    #[serde(default)]
45    pub tool_call_count: u32,
46    #[serde(default)]
47    pub llm_call_count: u32,
48    #[serde(skip_serializing_if = "Option::is_none", default)]
49    pub time_to_first_token_ms: Option<u64>,
50    #[serde(skip_serializing_if = "Option::is_none", default)]
51    pub final_message_id: Option<MessageId>,
52    #[serde(skip_serializing_if = "Option::is_none", default)]
53    pub final_answer_preview: Option<String>,
54}
55
56fn default_iteration() -> u32 {
57    1
58}
59
60/// Runtime-owned act scheduling payload.
61///
62/// Hosts enqueue or execute this immediately using their own worker model.
63#[derive(Debug, Clone)]
64pub struct RuntimeActPlan {
65    pub input: ActInput,
66    pub previous_response_id: Option<String>,
67    pub iteration: u32,
68    pub request_id: Option<String>,
69    pub resume_state: Box<RuntimeTurnState>,
70}
71
72/// Generic next-step decision for a host turn.
73///
74/// This intentionally stops at the semantic boundary:
75/// - runtime decides what should happen next
76/// - the host decides how to persist, enqueue, retry, or resume it
77#[derive(Debug, Clone)]
78pub enum RuntimeTurnPlan {
79    ScheduleReason(RuntimeTurnState),
80    ScheduleAct(RuntimeActPlan),
81    Complete { error: Option<String> },
82    WaitForToolResults { resume: RuntimeTurnState },
83}
84
85fn preview_final_answer(text: &str) -> Option<String> {
86    if text.is_empty() {
87        return None;
88    }
89
90    Some(text.chars().take(2000).collect())
91}
92
93fn add_usage(current: &mut Option<TokenUsage>, next: &TokenUsage) {
94    match current {
95        Some(current) => current.add(next),
96        None => *current = Some(next.clone()),
97    }
98}
99
100impl RuntimeTurnState {
101    fn with_reason_summary(&self, reason_result: &ReasonResult) -> Self {
102        let mut next = self.clone();
103        next.llm_call_count = next.llm_call_count.saturating_add(1);
104        next.tool_call_count = next
105            .tool_call_count
106            .saturating_add(reason_result.tool_calls.len() as u32);
107        if let Some(usage) = &reason_result.usage {
108            add_usage(&mut next.cumulative_usage, usage);
109        }
110        if next.time_to_first_token_ms.is_none() {
111            next.time_to_first_token_ms = reason_result.time_to_first_token_ms;
112        }
113        next.final_message_id = reason_result.output_message_id;
114        next.final_answer_preview = preview_final_answer(&reason_result.text);
115        next
116    }
117
118    fn duration_ms(&self) -> Option<u64> {
119        self.started_at
120            .map(|started_at| Utc::now().signed_duration_since(started_at))
121            .and_then(|duration| u64::try_from(duration.num_milliseconds()).ok())
122    }
123}
124
125fn classify_reason_failure(reason_result: &ReasonResult) -> UserFacingError {
126    let from_text =
127        classify_runtime_error_message(&reason_result.text, &UserFacingErrorContext::default());
128
129    let Some(error) = reason_result.error.as_deref() else {
130        return from_text;
131    };
132
133    let from_error = classify_runtime_error_message(error, &UserFacingErrorContext::default());
134
135    if from_error.code == user_facing_error_codes::PROCESSING_ERROR {
136        return from_text;
137    }
138
139    if from_error.code == from_text.code
140        && from_error.fields.is_empty()
141        && !from_text.fields.is_empty()
142    {
143        return from_text;
144    }
145
146    from_error
147}
148
149/// Determine the next host step after an activity finishes.
150///
151/// The host provides:
152/// - `completed_activity`: which phase just finished
153/// - `state`: host-carried turn state
154/// - `output`: serialized activity output
155/// - `pending_user_message_count`: number of queued steering messages already consumed by the host
156///
157/// Runtime owns the semantic decision. Hosts translate the returned plan into
158/// their own queueing / persistence model.
159///
160/// Typical host mapping:
161/// - `ScheduleReason` => enqueue or invoke a reason phase with the returned state
162/// - `ScheduleAct` => enqueue or invoke an act phase with the returned payload
163/// - `WaitForToolResults` => persist the resume state until external tool input arrives
164/// - `Complete` => mark the host-owned workflow/session turn complete
165pub async fn plan_next_host_turn<A: RuntimeHostAdapter>(
166    adapter: &A,
167    completed_activity: &str,
168    state: &RuntimeTurnState,
169    output: &serde_json::Value,
170    pending_user_message_count: usize,
171) -> Result<RuntimeTurnPlan> {
172    match completed_activity {
173        "process_input" => {
174            let turn_id: Option<TurnId> = output
175                .get("turn_id")
176                .and_then(|value| value.as_str())
177                .and_then(|value| value.parse().ok());
178            let next = RuntimeTurnState {
179                turn_id,
180                previous_response_id: None,
181                iteration: 1,
182                started_at: state.started_at.or_else(|| Some(Utc::now())),
183                ..state.clone()
184            };
185            debug!(session_id = %state.session_id, turn_id = ?turn_id, "planned reason step");
186            Ok(RuntimeTurnPlan::ScheduleReason(next))
187        }
188        "reason" => {
189            let reason_result: ReasonResult = serde_json::from_value(output.clone())
190                .map_err(|error| AgentLoopError::Internal(error.into()))?;
191            let response_id = reason_result.response_id.clone();
192            let summarized_state = state.with_reason_summary(&reason_result);
193
194            if reason_result.has_tool_calls && reason_result.success {
195                let session_blueprint_id = adapter
196                    .session_store(state.org_id)
197                    .get_session(state.session_id)
198                    .await?
199                    .and_then(|session| session.blueprint_id);
200                let plan = RuntimeActPlan {
201                    input: ActInput {
202                        org_id: Some(state.org_id),
203                        context: AtomContext {
204                            session_id: state.session_id,
205                            turn_id: state.turn_id.unwrap_or_default(),
206                            input_message_id: state.input_message_id,
207                            exec_id: ExecId::new(),
208                        },
209                        harness_id: state.harness_id,
210                        agent_id: state.agent_id,
211                        tool_calls: reason_result.tool_calls,
212                        tool_definitions: reason_result.tool_definitions,
213                        locale: reason_result.locale,
214                        blueprint_id: session_blueprint_id,
215                        network_access: reason_result.network_access,
216                    },
217                    previous_response_id: response_id,
218                    iteration: state.iteration,
219                    request_id: state.request_id.clone(),
220                    resume_state: Box::new(summarized_state),
221                };
222                return Ok(RuntimeTurnPlan::ScheduleAct(plan));
223            }
224
225            if reason_result.success && pending_user_message_count > 0 {
226                if pending_user_message_count > 1 {
227                    info!(
228                        session_id = %state.session_id,
229                        pending_user_message_count,
230                        "multiple steering messages arrived during turn"
231                    );
232                }
233
234                let next = RuntimeTurnState {
235                    previous_response_id: response_id,
236                    iteration: state.iteration.saturating_add(1),
237                    ..summarized_state
238                };
239                return Ok(RuntimeTurnPlan::ScheduleReason(next));
240            }
241
242            let lifecycle =
243                RuntimeSessionLifecycle::new(adapter.clone(), state.org_id, state.session_id);
244            let turn_id = state.turn_id.unwrap_or_default();
245
246            if reason_result.success {
247                lifecycle
248                    .emit_turn_completed(
249                        state.input_message_id,
250                        everruns_core::events::TurnCompletedData {
251                            turn_id,
252                            iterations: state.iteration,
253                            duration_ms: summarized_state.duration_ms(),
254                            usage: summarized_state.cumulative_usage.clone(),
255                            input_content: None,
256                            final_message_id: summarized_state.final_message_id,
257                            final_answer_preview: summarized_state.final_answer_preview.clone(),
258                            time_to_first_token_ms: summarized_state.time_to_first_token_ms,
259                            tool_call_count: Some(summarized_state.tool_call_count),
260                            llm_call_count: Some(summarized_state.llm_call_count),
261                            status: Some("completed".to_string()),
262                        },
263                    )
264                    .await;
265                lifecycle
266                    .emit_session_idled(
267                        turn_id,
268                        state.input_message_id,
269                        Some(state.iteration),
270                        summarized_state.cumulative_usage.clone(),
271                    )
272                    .await;
273            } else {
274                let user_error = classify_reason_failure(&reason_result);
275                lifecycle
276                    .turn_failed(
277                        turn_id,
278                        state.input_message_id,
279                        &reason_result.text,
280                        Some(&user_error),
281                    )
282                    .await;
283            }
284
285            Ok(RuntimeTurnPlan::Complete {
286                error: reason_result.error,
287            })
288        }
289        "act" => {
290            if output
291                .get("blocked")
292                .and_then(|value| value.as_bool())
293                .unwrap_or(false)
294            {
295                return Ok(RuntimeTurnPlan::Complete { error: None });
296            }
297
298            let waiting_for_tool_results = output
299                .get("waiting_for_tool_results")
300                .and_then(|value| value.as_bool())
301                .unwrap_or(false);
302            let should_pause_for_tool_results = waiting_for_tool_results
303                && setup_connection_hint_enabled(adapter, state.org_id, state.session_id).await;
304
305            let next = RuntimeTurnState {
306                iteration: state.iteration.saturating_add(1),
307                ..state.clone()
308            };
309
310            if should_pause_for_tool_results {
311                let lifecycle =
312                    RuntimeSessionLifecycle::new(adapter.clone(), state.org_id, state.session_id);
313                lifecycle.waiting_for_tool_results().await;
314                return Ok(RuntimeTurnPlan::WaitForToolResults { resume: next });
315            }
316
317            if waiting_for_tool_results {
318                info!(
319                    session_id = %state.session_id,
320                    "setup_connection hint absent, continuing turn instead of pausing"
321                );
322            }
323
324            Ok(RuntimeTurnPlan::ScheduleReason(next))
325        }
326        other => Err(AgentLoopError::config(format!(
327            "Unknown activity type completed: {other}"
328        ))),
329    }
330}
331
332async fn setup_connection_hint_enabled<A: RuntimeHostAdapter>(
333    adapter: &A,
334    org_id: i64,
335    session_id: SessionId,
336) -> bool {
337    match adapter.session_store(org_id).get_session(session_id).await {
338        Ok(Some(session)) => {
339            let hints = Controls::resolve_hints(session.hints.as_ref(), None);
340            hints
341                .get("setup_connection")
342                .and_then(|value| value.as_bool())
343                .unwrap_or(false)
344        }
345        _ => false,
346    }
347}