Skip to main content

awaken_runtime/backend/
mod.rs

1//! Runtime execution backends and canonical request/result types.
2
3mod local;
4
5use std::sync::Arc;
6
7use async_trait::async_trait;
8use awaken_contract::contract::event::AgentEvent;
9use awaken_contract::contract::event_sink::EventSink;
10use awaken_contract::contract::identity::RunIdentity;
11use awaken_contract::contract::lifecycle::{RunStatus, TerminationReason};
12use awaken_contract::contract::message::{Message, Role, gen_message_id};
13use awaken_contract::contract::storage::{
14    MessageSeqRange, RunMessageInput, RunMessageOutput, RunOutcome, RunRecord, RunWaitingState,
15    ThreadRunStore, WaitingReason,
16};
17use awaken_contract::contract::suspension::ToolCallResume;
18use awaken_contract::contract::tool::ToolDescriptor;
19use awaken_contract::now_ms;
20use awaken_contract::registry_spec::RemoteEndpoint;
21use awaken_contract::state::PersistedState;
22use futures::channel::mpsc;
23use serde::{Deserialize, Serialize};
24use serde_json::{Value, json};
25
26use crate::cancellation::CancellationToken;
27use crate::inbox::{InboxReceiver, InboxSender};
28use crate::loop_runner::{AgentLoopError, AgentRunResult};
29use crate::phase::PhaseRuntime;
30use crate::registry::{ExecutionResolver, ResolvedBackendAgent, ResolvedExecution};
31
32pub use local::LocalBackend;
33
34const BACKEND_OUTPUT_STATE_KEY: &str = "__runtime_backend_output";
35
36/// Optional parent lineage for a backend run.
37#[derive(Debug, Clone, Default)]
38pub struct BackendParentContext {
39    pub parent_run_id: Option<String>,
40    pub parent_thread_id: Option<String>,
41    pub parent_tool_call_id: Option<String>,
42}
43
44/// Cooperative runtime controls exposed to a backend implementation.
45#[derive(Default)]
46pub struct BackendControl {
47    pub cancellation_token: Option<CancellationToken>,
48    pub decision_rx: Option<mpsc::UnboundedReceiver<Vec<(String, ToolCallResume)>>>,
49}
50
51/// How a backend can be interrupted after execution starts.
52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53pub enum BackendCancellationCapability {
54    None,
55    CooperativeToken,
56    RemoteAbort,
57    CooperativeTokenAndRemoteAbort,
58}
59
60impl BackendCancellationCapability {
61    #[must_use]
62    pub const fn supports_cooperative_token(self) -> bool {
63        matches!(
64            self,
65            Self::CooperativeToken | Self::CooperativeTokenAndRemoteAbort
66        )
67    }
68
69    #[must_use]
70    pub const fn supports_remote_abort(self) -> bool {
71        matches!(
72            self,
73            Self::RemoteAbort | Self::CooperativeTokenAndRemoteAbort
74        )
75    }
76}
77
78/// How a backend maintains state across root turns.
79#[derive(Debug, Clone, Copy, PartialEq, Eq)]
80pub enum BackendContinuationCapability {
81    None,
82    InProcessState,
83    RemoteState,
84}
85
86/// Which interrupted states can be represented without flattening them to errors.
87#[derive(Debug, Clone, Copy, PartialEq, Eq)]
88pub enum BackendWaitCapability {
89    None,
90    Input,
91    Auth,
92    InputAndAuth,
93}
94
95/// What transcript contract the backend consumes.
96#[derive(Debug, Clone, Copy, PartialEq, Eq)]
97pub enum BackendTranscriptCapability {
98    FullTranscript,
99    IncrementalUserMessagesWithRemoteState,
100    SinglePrompt,
101}
102
103/// What output shape the backend preserves.
104#[derive(Debug, Clone, Copy, PartialEq, Eq)]
105pub enum BackendOutputCapability {
106    Text,
107    TextAndArtifacts,
108}
109
110/// Optional execution capabilities exposed by a backend implementation.
111#[derive(Debug, Clone, Copy, PartialEq, Eq)]
112pub struct BackendCapabilities {
113    pub cancellation: BackendCancellationCapability,
114    pub decisions: bool,
115    pub overrides: bool,
116    pub frontend_tools: bool,
117    pub continuation: BackendContinuationCapability,
118    pub waits: BackendWaitCapability,
119    pub transcript: BackendTranscriptCapability,
120    pub output: BackendOutputCapability,
121}
122
123impl BackendCapabilities {
124    #[must_use]
125    pub const fn full() -> Self {
126        Self {
127            cancellation: BackendCancellationCapability::CooperativeToken,
128            decisions: true,
129            overrides: true,
130            frontend_tools: true,
131            continuation: BackendContinuationCapability::InProcessState,
132            waits: BackendWaitCapability::InputAndAuth,
133            transcript: BackendTranscriptCapability::FullTranscript,
134            output: BackendOutputCapability::TextAndArtifacts,
135        }
136    }
137
138    #[must_use]
139    pub const fn remote_stateless_text() -> Self {
140        Self {
141            cancellation: BackendCancellationCapability::None,
142            decisions: false,
143            overrides: false,
144            frontend_tools: false,
145            continuation: BackendContinuationCapability::None,
146            waits: BackendWaitCapability::None,
147            transcript: BackendTranscriptCapability::SinglePrompt,
148            output: BackendOutputCapability::Text,
149        }
150    }
151
152    #[must_use]
153    pub fn unsupported_root_features(
154        &self,
155        request: &BackendRootRunRequest<'_>,
156    ) -> Vec<&'static str> {
157        let mut unsupported = Vec::new();
158        if (!request.decisions.is_empty() || request.control.decision_rx.is_some())
159            && !self.decisions
160        {
161            unsupported.push("decisions");
162        }
163        if request.overrides.is_some() && !self.overrides {
164            unsupported.push("overrides");
165        }
166        if !request.frontend_tools.is_empty() && !self.frontend_tools {
167            unsupported.push("frontend_tools");
168        }
169        if request.is_continuation && self.continuation == BackendContinuationCapability::None {
170            unsupported.push("continuation");
171        }
172        unsupported
173    }
174
175    #[must_use]
176    pub fn unsupported_delegate_features(
177        &self,
178        request: &BackendDelegateRunRequest<'_>,
179    ) -> Vec<&'static str> {
180        let mut unsupported = Vec::new();
181        if request.policy.persistence != BackendDelegatePersistence::Ephemeral {
182            unsupported.push("delegate_persistence");
183        }
184        if request.policy.continuation != BackendDelegateContinuation::Disabled
185            && self.continuation == BackendContinuationCapability::None
186        {
187            unsupported.push("continuation");
188        }
189        unsupported
190    }
191}
192
193impl Default for BackendCapabilities {
194    fn default() -> Self {
195        Self::full()
196    }
197}
198
199/// Root execution request shared by local and remote root execution.
200pub struct BackendRootRunRequest<'a> {
201    pub agent_id: &'a str,
202    pub messages: Vec<Message>,
203    pub new_messages: Vec<Message>,
204    pub sink: Arc<dyn EventSink>,
205    pub resolver: &'a dyn ExecutionResolver,
206    pub run_identity: RunIdentity,
207    pub checkpoint_store: Option<&'a dyn ThreadRunStore>,
208    pub control: BackendControl,
209    pub decisions: Vec<(String, ToolCallResume)>,
210    pub overrides: Option<awaken_contract::contract::inference::InferenceOverride>,
211    pub frontend_tools: Vec<ToolDescriptor>,
212    pub local: Option<BackendLocalRootContext<'a>>,
213    pub inbox: Option<InboxReceiver>,
214    pub is_continuation: bool,
215}
216
217/// Local-only dependencies carried by the root request context.
218#[derive(Clone, Copy)]
219pub struct BackendLocalRootContext<'a> {
220    pub phase_runtime: &'a PhaseRuntime,
221}
222
223/// Delegate execution persistence policy.
224#[derive(Debug, Clone, Copy, PartialEq, Eq)]
225pub enum BackendDelegatePersistence {
226    Ephemeral,
227}
228
229/// Delegate execution continuation policy.
230#[derive(Debug, Clone, Copy, PartialEq, Eq)]
231pub enum BackendDelegateContinuation {
232    Disabled,
233}
234
235/// Explicit policy for delegated agent tool calls.
236#[derive(Debug, Clone, Copy, PartialEq, Eq)]
237pub struct BackendDelegatePolicy {
238    pub persistence: BackendDelegatePersistence,
239    pub continuation: BackendDelegateContinuation,
240}
241
242impl Default for BackendDelegatePolicy {
243    fn default() -> Self {
244        Self {
245            persistence: BackendDelegatePersistence::Ephemeral,
246            continuation: BackendDelegateContinuation::Disabled,
247        }
248    }
249}
250
251/// Delegate execution request. Delegates are explicitly child invocations.
252pub struct BackendDelegateRunRequest<'a> {
253    pub agent_id: &'a str,
254    pub messages: Vec<Message>,
255    pub new_messages: Vec<Message>,
256    pub sink: Arc<dyn EventSink>,
257    pub resolver: &'a dyn ExecutionResolver,
258    pub parent: BackendParentContext,
259    pub control: BackendControl,
260    pub policy: BackendDelegatePolicy,
261}
262
263/// Best-effort abort request for an in-flight backend execution.
264pub struct BackendAbortRequest<'a> {
265    pub agent_id: &'a str,
266    pub run_identity: &'a RunIdentity,
267    pub parent: Option<&'a BackendParentContext>,
268    pub persisted_state: Option<&'a PersistedState>,
269    pub is_continuation: bool,
270}
271
272/// Structured output preserved by a backend result.
273#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)]
274pub struct BackendRunOutput {
275    #[serde(default, skip_serializing_if = "Option::is_none")]
276    pub text: Option<String>,
277    #[serde(default, skip_serializing_if = "Vec::is_empty")]
278    pub artifacts: Vec<BackendOutputArtifact>,
279    #[serde(default, skip_serializing_if = "Option::is_none")]
280    pub raw: Option<Value>,
281}
282
283impl BackendRunOutput {
284    #[must_use]
285    pub fn from_text(text: Option<String>) -> Self {
286        Self {
287            text,
288            artifacts: Vec::new(),
289            raw: None,
290        }
291    }
292
293    #[must_use]
294    pub fn text_or<'a>(&'a self, fallback: &'a Option<String>) -> Option<String> {
295        self.text.clone().or_else(|| fallback.clone())
296    }
297}
298
299/// Backend artifact in a transport-neutral shape.
300#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
301pub struct BackendOutputArtifact {
302    #[serde(default, skip_serializing_if = "Option::is_none")]
303    pub id: Option<String>,
304    #[serde(default, skip_serializing_if = "Option::is_none")]
305    pub name: Option<String>,
306    #[serde(default, skip_serializing_if = "Option::is_none")]
307    pub media_type: Option<String>,
308    pub content: Value,
309}
310
311/// Result of executing an agent through a runtime backend.
312#[derive(Debug, Clone)]
313pub struct BackendRunResult {
314    pub agent_id: String,
315    pub status: BackendRunStatus,
316    pub termination: TerminationReason,
317    pub status_reason: Option<String>,
318    pub response: Option<String>,
319    pub output: BackendRunOutput,
320    pub steps: usize,
321    pub run_id: Option<String>,
322    pub inbox: Option<InboxSender>,
323    pub state: Option<PersistedState>,
324}
325
326/// Terminal status of a backend run.
327#[derive(Debug, Clone)]
328pub enum BackendRunStatus {
329    Completed,
330    WaitingInput(Option<String>),
331    WaitingAuth(Option<String>),
332    Suspended(Option<String>),
333    Failed(String),
334    Cancelled,
335    Timeout,
336}
337
338impl std::fmt::Display for BackendRunStatus {
339    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
340        match self {
341            Self::Completed => write!(f, "completed"),
342            Self::WaitingInput(Some(msg)) => write!(f, "waiting_input: {msg}"),
343            Self::WaitingInput(None) => write!(f, "waiting_input"),
344            Self::WaitingAuth(Some(msg)) => write!(f, "waiting_auth: {msg}"),
345            Self::WaitingAuth(None) => write!(f, "waiting_auth"),
346            Self::Suspended(Some(msg)) => write!(f, "suspended: {msg}"),
347            Self::Suspended(None) => write!(f, "suspended"),
348            Self::Failed(msg) => write!(f, "failed: {msg}"),
349            Self::Cancelled => write!(f, "cancelled"),
350            Self::Timeout => write!(f, "timeout"),
351        }
352    }
353}
354
355impl BackendRunStatus {
356    #[must_use]
357    pub fn durable_run_status(&self, termination: &TerminationReason) -> RunStatus {
358        match self {
359            Self::WaitingInput(_) | Self::WaitingAuth(_) | Self::Suspended(_) => RunStatus::Waiting,
360            Self::Completed => termination.to_run_status().0,
361            Self::Failed(_) | Self::Cancelled | Self::Timeout => RunStatus::Done,
362        }
363    }
364
365    #[must_use]
366    pub fn durable_status_reason(&self, termination: &TerminationReason) -> Option<String> {
367        match self {
368            Self::WaitingInput(_) => Some("input_required".to_string()),
369            Self::WaitingAuth(_) => Some("auth_required".to_string()),
370            Self::Suspended(_) => Some("suspended".to_string()),
371            Self::Timeout => Some("timeout".to_string()),
372            Self::Failed(_) => Some("error".to_string()),
373            Self::Cancelled => Some("cancelled".to_string()),
374            Self::Completed => termination.to_run_status().1,
375        }
376    }
377
378    #[must_use]
379    pub fn result_status_label(&self, termination: &TerminationReason) -> &'static str {
380        match self {
381            Self::Completed => run_status_label(termination.to_run_status().0),
382            Self::WaitingInput(_) => "waiting_input",
383            Self::WaitingAuth(_) => "waiting_auth",
384            Self::Suspended(_) => "suspended",
385            Self::Failed(_) => "failed",
386            Self::Cancelled => "cancelled",
387            Self::Timeout => "timeout",
388        }
389    }
390}
391
392/// Backend for executing an agent, either locally or through a remote transport.
393#[async_trait]
394pub trait ExecutionBackend: Send + Sync {
395    fn capabilities(&self) -> BackendCapabilities {
396        BackendCapabilities::remote_stateless_text()
397    }
398
399    async fn abort(&self, _request: BackendAbortRequest<'_>) -> Result<(), ExecutionBackendError> {
400        Ok(())
401    }
402
403    async fn execute_root(
404        &self,
405        _request: BackendRootRunRequest<'_>,
406    ) -> Result<BackendRunResult, ExecutionBackendError> {
407        Err(ExecutionBackendError::ExecutionFailed(
408            "backend does not support root execution".into(),
409        ))
410    }
411
412    async fn execute_delegate(
413        &self,
414        _request: BackendDelegateRunRequest<'_>,
415    ) -> Result<BackendRunResult, ExecutionBackendError> {
416        Err(ExecutionBackendError::ExecutionFailed(
417            "backend does not support delegated execution".into(),
418        ))
419    }
420}
421
422/// Factory for backend implementations backed by canonical `RemoteEndpoint` config.
423pub trait ExecutionBackendFactory: Send + Sync {
424    fn backend(&self) -> &str;
425
426    fn validate(&self, endpoint: &RemoteEndpoint) -> Result<(), ExecutionBackendFactoryError> {
427        self.build(endpoint).map(|_| ())
428    }
429
430    fn build(
431        &self,
432        endpoint: &RemoteEndpoint,
433    ) -> Result<Arc<dyn ExecutionBackend>, ExecutionBackendFactoryError>;
434}
435
436#[derive(Debug, thiserror::Error)]
437pub enum ExecutionBackendFactoryError {
438    #[error("{0}")]
439    InvalidConfig(String),
440}
441
442#[derive(Debug, thiserror::Error)]
443pub enum ExecutionBackendError {
444    #[error("agent not found: {0}")]
445    AgentNotFound(String),
446    #[error("execution failed: {0}")]
447    ExecutionFailed(String),
448    #[error("remote error: {0}")]
449    RemoteError(String),
450    #[error(transparent)]
451    Loop(#[from] AgentLoopError),
452}
453
454pub fn execution_capabilities(
455    execution: &ResolvedExecution,
456) -> Result<BackendCapabilities, ExecutionBackendError> {
457    match execution {
458        ResolvedExecution::Local(_) => Ok(LocalBackend::new().capabilities()),
459        ResolvedExecution::NonLocal(agent) => Ok(agent.backend()?.capabilities()),
460    }
461}
462
463pub fn validate_root_execution_request(
464    execution: &ResolvedExecution,
465    request: &BackendRootRunRequest<'_>,
466) -> Result<(), ExecutionBackendError> {
467    let unsupported = execution_capabilities(execution)?.unsupported_root_features(request);
468    if !unsupported.is_empty() {
469        return Err(ExecutionBackendError::ExecutionFailed(format!(
470            "agent '{}' backend does not support: {}",
471            request.agent_id,
472            unsupported.join(", ")
473        )));
474    }
475    Ok(())
476}
477
478pub fn validate_delegate_execution_request(
479    execution: &ResolvedExecution,
480    request: &BackendDelegateRunRequest<'_>,
481) -> Result<(), ExecutionBackendError> {
482    let unsupported = execution_capabilities(execution)?.unsupported_delegate_features(request);
483    if !unsupported.is_empty() {
484        return Err(ExecutionBackendError::ExecutionFailed(format!(
485            "agent '{}' backend does not support: {}",
486            request.agent_id,
487            unsupported.join(", ")
488        )));
489    }
490    Ok(())
491}
492
493pub async fn execute_resolved_delegate_execution(
494    execution: &ResolvedExecution,
495    request: BackendDelegateRunRequest<'_>,
496) -> Result<BackendRunResult, ExecutionBackendError> {
497    validate_delegate_execution_request(execution, &request)?;
498    match execution {
499        ResolvedExecution::Local(_) => LocalBackend::new().execute_delegate(request).await,
500        ResolvedExecution::NonLocal(agent) => agent.backend()?.execute_delegate(request).await,
501    }
502}
503
504/// Execute a remote root run including canonical runtime lifecycle events and persistence.
505pub async fn execute_remote_root_lifecycle(
506    agent: &ResolvedBackendAgent,
507    request: BackendRootRunRequest<'_>,
508    run_created_at: u64,
509    runtime_cancellation_token: CancellationToken,
510    previous_state: Option<PersistedState>,
511) -> Result<AgentRunResult, AgentLoopError> {
512    let backend = agent.backend().map_err(|error| {
513        AgentLoopError::RuntimeError(crate::RuntimeError::ResolveFailed {
514            message: error.to_string(),
515        })
516    })?;
517    let run_identity = request.run_identity.clone();
518    let sink = request.sink.clone();
519    let checkpoint_store = request.checkpoint_store;
520    let mut messages = request.messages.clone();
521    let input_message_count = messages.len();
522    let request_is_continuation = request.is_continuation;
523
524    sink.emit(AgentEvent::RunStart {
525        thread_id: run_identity.thread_id.clone(),
526        run_id: run_identity.run_id.clone(),
527        parent_run_id: run_identity.parent_run_id.clone(),
528        identity: Some(run_identity.clone()),
529    })
530    .await;
531    sink.emit(AgentEvent::StepStart {
532        message_id: gen_message_id(),
533    })
534    .await;
535
536    let execution_started_at = now_ms();
537    let backend_execution = backend.execute_root(request);
538    tokio::pin!(backend_execution);
539    let delegate_result = tokio::select! {
540        result = &mut backend_execution => {
541            match result {
542                Ok(result) => result,
543                Err(error) => {
544                    let error_message = remote_backend_error_message(error);
545                    let termination = TerminationReason::Error(error_message.clone());
546                    let latest_state = load_checkpoint_state(
547                        checkpoint_store,
548                        &run_identity.run_id,
549                        previous_state.clone(),
550                    )
551                    .await;
552                    return finish_remote_root_run(
553                        checkpoint_store,
554                        &run_identity.thread_id,
555                        &run_identity.run_id,
556                        &run_identity.agent_id,
557                        run_identity.parent_run_id.clone(),
558                        &run_identity,
559                        run_created_at,
560                        messages,
561                        input_message_count,
562                        BackendRunStatus::Failed(error_message),
563                        termination,
564                        None,
565                        0,
566                        String::new(),
567                        BackendRunOutput::default(),
568                        latest_state,
569                        &sink,
570                    )
571                    .await;
572                }
573            }
574        }
575        _ = runtime_cancellation_token.cancelled() => {
576            let latest_state = load_checkpoint_state(
577                checkpoint_store,
578                &run_identity.run_id,
579                previous_state.clone(),
580            )
581            .await;
582            if backend.capabilities().cancellation.supports_remote_abort()
583                && let Err(error) = backend
584                    .abort(BackendAbortRequest {
585                        agent_id: &run_identity.agent_id,
586                        run_identity: &run_identity,
587                        parent: None,
588                        persisted_state: latest_state.as_ref(),
589                        is_continuation: request_is_continuation,
590                    })
591                    .await
592            {
593                tracing::warn!(
594                    agent_id = %run_identity.agent_id,
595                    run_id = %run_identity.run_id,
596                    error = %error,
597                    "non-local backend abort hook failed after cancellation"
598                );
599            }
600            return finish_remote_root_run(
601                checkpoint_store,
602                &run_identity.thread_id,
603                &run_identity.run_id,
604                &run_identity.agent_id,
605                run_identity.parent_run_id.clone(),
606                &run_identity,
607                run_created_at,
608                messages,
609                input_message_count,
610                BackendRunStatus::Cancelled,
611                TerminationReason::Cancelled,
612                None,
613                0,
614                String::new(),
615                BackendRunOutput::default(),
616                latest_state,
617                &sink,
618            )
619            .await;
620        }
621    };
622
623    let termination = delegate_result.termination.clone();
624    let status_reason = delegate_result.status_reason.clone();
625    let mut output = delegate_result.output.clone();
626    let response = output
627        .text_or(&delegate_result.response)
628        .unwrap_or_default();
629    if output.text.is_none() && !response.is_empty() {
630        output.text = Some(response.clone());
631    }
632    let status = delegate_result.status;
633    let steps = delegate_result.steps;
634    let state = delegate_result.state.or(previous_state);
635    if !response.is_empty() {
636        sink.emit(AgentEvent::TextDelta {
637            delta: response.clone(),
638        })
639        .await;
640        messages.push(Message::assistant(response.clone()));
641    }
642
643    if matches!(
644        termination,
645        TerminationReason::NaturalEnd | TerminationReason::BehaviorRequested
646    ) {
647        sink.emit(AgentEvent::InferenceComplete {
648            model: agent.spec.model_id.clone(),
649            usage: None,
650            duration_ms: now_ms().saturating_sub(execution_started_at),
651        })
652        .await;
653    }
654
655    finish_remote_root_run(
656        checkpoint_store,
657        &run_identity.thread_id,
658        &run_identity.run_id,
659        &run_identity.agent_id,
660        run_identity.parent_run_id.clone(),
661        &run_identity,
662        run_created_at,
663        messages,
664        input_message_count,
665        status,
666        termination,
667        status_reason,
668        steps,
669        response,
670        output,
671        state,
672        &sink,
673    )
674    .await
675}
676
677async fn load_checkpoint_state(
678    storage: Option<&dyn ThreadRunStore>,
679    run_id: &str,
680    fallback: Option<PersistedState>,
681) -> Option<PersistedState> {
682    let Some(storage) = storage else {
683        return fallback;
684    };
685    match storage.load_run(run_id).await {
686        Ok(Some(run)) => run.state.or(fallback),
687        Ok(None) => fallback,
688        Err(error) => {
689            tracing::warn!(run_id, error = %error, "failed to load latest checkpoint state");
690            fallback
691        }
692    }
693}
694
695#[allow(clippy::too_many_arguments)]
696async fn finish_remote_root_run(
697    storage: Option<&dyn ThreadRunStore>,
698    thread_id: &str,
699    run_id: &str,
700    agent_id: &str,
701    parent_run_id: Option<String>,
702    run_identity: &RunIdentity,
703    run_created_at: u64,
704    messages: Vec<Message>,
705    input_message_count: usize,
706    backend_status: BackendRunStatus,
707    termination: TerminationReason,
708    status_reason_override: Option<String>,
709    steps: usize,
710    response: String,
711    output: BackendRunOutput,
712    state: Option<PersistedState>,
713    sink: &Arc<dyn EventSink>,
714) -> Result<AgentRunResult, AgentLoopError> {
715    let status = backend_status.durable_run_status(&termination);
716    let status_reason =
717        status_reason_override.or_else(|| backend_status.durable_status_reason(&termination));
718    let state = state_with_backend_output(state, &output);
719    let mut result_json = json!({
720        "response": response,
721        "status": backend_status.result_status_label(&termination),
722    });
723    if output != BackendRunOutput::default() {
724        result_json["output"] = serde_json::to_value(&output).unwrap_or(Value::Null);
725    }
726    if let Some(reason) = &status_reason {
727        result_json["status_reason"] = Value::String(reason.clone());
728    }
729
730    persist_remote_root_checkpoint(
731        storage,
732        thread_id,
733        run_id,
734        agent_id,
735        parent_run_id,
736        run_created_at,
737        &messages,
738        input_message_count,
739        status,
740        Some(termination.clone()),
741        status_reason.clone(),
742        (!response.is_empty()).then(|| response.clone()),
743        match &termination {
744            TerminationReason::Error(message) => Some(json!({ "message": message })),
745            _ => None,
746        },
747        run_identity,
748        steps,
749        state,
750    )
751    .await?;
752
753    sink.emit(AgentEvent::StepEnd).await;
754    sink.emit(AgentEvent::RunFinish {
755        thread_id: thread_id.to_string(),
756        run_id: run_id.to_string(),
757        identity: Some(run_identity.clone()),
758        result: Some(result_json),
759        termination: termination.clone(),
760    })
761    .await;
762
763    Ok(AgentRunResult {
764        run_id: run_id.to_string(),
765        response,
766        termination,
767        steps,
768    })
769}
770
771fn state_with_backend_output(
772    state: Option<PersistedState>,
773    output: &BackendRunOutput,
774) -> Option<PersistedState> {
775    if output == &BackendRunOutput::default() {
776        return state;
777    }
778
779    let mut state = state.unwrap_or(PersistedState {
780        revision: 0,
781        extensions: std::collections::HashMap::new(),
782    });
783    if let Ok(value) = serde_json::to_value(output) {
784        state
785            .extensions
786            .insert(BACKEND_OUTPUT_STATE_KEY.to_string(), value);
787    }
788    Some(state)
789}
790
791fn waiting_reason_from_backend_status(status_reason: Option<&str>) -> WaitingReason {
792    match status_reason {
793        Some("input_required" | "user_input_required") => WaitingReason::UserInput,
794        Some("auth_required" | "suspended") => WaitingReason::ToolPermission,
795        Some("awaiting_tasks") => WaitingReason::BackgroundTasks,
796        Some("rate_limit") => WaitingReason::RateLimit,
797        Some("manual_pause") => WaitingReason::ManualPause,
798        _ => WaitingReason::ExternalEvent,
799    }
800}
801
802#[allow(clippy::too_many_arguments)]
803async fn persist_remote_root_checkpoint(
804    storage: Option<&dyn ThreadRunStore>,
805    thread_id: &str,
806    run_id: &str,
807    agent_id: &str,
808    parent_run_id: Option<String>,
809    run_created_at: u64,
810    messages: &[Message],
811    input_message_count: usize,
812    status: RunStatus,
813    termination_reason: Option<TerminationReason>,
814    status_reason: Option<String>,
815    final_output: Option<String>,
816    error_payload: Option<Value>,
817    run_identity: &RunIdentity,
818    steps: usize,
819    state: Option<PersistedState>,
820) -> Result<(), AgentLoopError> {
821    let Some(storage) = storage else {
822        return Ok(());
823    };
824    let previous = storage
825        .load_run(run_id)
826        .await
827        .map_err(|error| AgentLoopError::StorageError(error.to_string()))?;
828    let created_at = previous
829        .as_ref()
830        .map(|record| record.created_at)
831        .unwrap_or(run_created_at / 1000);
832    let finished_at = status.is_terminal().then_some(now_ms() / 1000);
833    let outcome = termination_reason
834        .clone()
835        .map(|termination_reason| RunOutcome {
836            termination_reason,
837            final_output: final_output.clone(),
838            error_payload: error_payload.clone(),
839        });
840    let waiting = (status == RunStatus::Waiting).then(|| RunWaitingState {
841        reason: waiting_reason_from_backend_status(status_reason.as_deref()),
842        ticket_ids: Vec::new(),
843        tickets: Vec::new(),
844        since_dispatch_id: run_identity.trace.dispatch_id.clone(),
845        message: status_reason.clone(),
846    });
847    let (messages, input, output) = materialize_remote_message_log(
848        messages.to_vec(),
849        previous.as_ref(),
850        run_identity,
851        steps,
852        input_message_count,
853    );
854
855    let record = RunRecord {
856        run_id: run_id.to_string(),
857        thread_id: thread_id.to_string(),
858        agent_id: agent_id.to_string(),
859        parent_run_id,
860        request: previous.as_ref().and_then(|record| record.request.clone()),
861        input,
862        output,
863        status,
864        termination_reason,
865        final_output,
866        error_payload,
867        dispatch_id: run_identity.trace.dispatch_id.clone(),
868        session_id: run_identity.trace.session_id.clone(),
869        transport_request_id: run_identity.trace.transport_request_id.clone(),
870        waiting,
871        outcome,
872        created_at,
873        started_at: previous
874            .as_ref()
875            .and_then(|record| record.started_at)
876            .or(Some(run_created_at / 1000)),
877        finished_at,
878        updated_at: now_ms() / 1000,
879        steps,
880        input_tokens: 0,
881        output_tokens: 0,
882        state,
883    };
884    storage
885        .checkpoint(thread_id, &messages, &record)
886        .await
887        .map_err(|error| AgentLoopError::StorageError(error.to_string()))
888}
889
890fn materialize_remote_message_log(
891    mut messages: Vec<Message>,
892    previous: Option<&RunRecord>,
893    run_identity: &RunIdentity,
894    steps: usize,
895    input_message_count: usize,
896) -> (
897    Vec<Message>,
898    Option<RunMessageInput>,
899    Option<RunMessageOutput>,
900) {
901    let input = previous
902        .and_then(|record| record.input.clone())
903        .or_else(|| {
904            infer_remote_input_from_initial_messages(&run_identity.thread_id, input_message_count)
905        });
906    let output_start_seq = input
907        .as_ref()
908        .and_then(|input| input.range)
909        .map(|range| range.to_seq.saturating_add(1))
910        .unwrap_or(input_message_count as u64 + 1);
911    let step_index = (steps > 0).then_some(steps.saturating_sub(1) as u32);
912    let mut output_message_ids = Vec::new();
913    let mut output_from_seq = None;
914    let mut output_to_seq = None;
915    for (index, message) in messages.iter_mut().enumerate() {
916        let seq = index as u64 + 1;
917        if seq < output_start_seq || !is_remote_run_output_message(message) {
918            continue;
919        }
920        message.mark_produced_by(&run_identity.run_id, step_index);
921        output_from_seq.get_or_insert(seq);
922        output_to_seq = Some(seq);
923        if let Some(id) = message.id.clone() {
924            output_message_ids.push(id);
925        }
926    }
927    let output = if output_from_seq.is_none() {
928        previous.and_then(|record| record.output.clone())
929    } else {
930        Some(RunMessageOutput {
931            thread_id: run_identity.thread_id.clone(),
932            range: output_from_seq
933                .zip(output_to_seq)
934                .and_then(|(from, to)| MessageSeqRange::new(from, to)),
935            message_ids: output_message_ids,
936        })
937    };
938    (messages, input, output)
939}
940
941fn infer_remote_input_from_initial_messages(
942    thread_id: &str,
943    input_message_count: usize,
944) -> Option<RunMessageInput> {
945    if input_message_count == 0 {
946        return None;
947    }
948    let to_seq = input_message_count as u64;
949    Some(RunMessageInput {
950        thread_id: thread_id.to_string(),
951        range: MessageSeqRange::new(1, to_seq),
952        trigger_message_ids: Vec::new(),
953        selected_message_ids: Vec::new(),
954        context_policy: None,
955        compacted_snapshot_id: None,
956    })
957}
958
959fn is_remote_run_output_message(message: &Message) -> bool {
960    matches!(message.role, Role::Assistant | Role::Tool)
961}
962
963fn remote_backend_error_message(error: ExecutionBackendError) -> String {
964    match error {
965        ExecutionBackendError::AgentNotFound(message)
966        | ExecutionBackendError::ExecutionFailed(message)
967        | ExecutionBackendError::RemoteError(message) => message,
968        ExecutionBackendError::Loop(error) => error.to_string(),
969    }
970}
971
972fn run_status_label(status: RunStatus) -> &'static str {
973    match status {
974        RunStatus::Created => "created",
975        RunStatus::Running => "running",
976        RunStatus::Waiting => "waiting",
977        RunStatus::Done => "done",
978    }
979}
980
981#[cfg(test)]
982mod tests {
983    use super::*;
984
985    #[test]
986    fn backend_status_timeout_is_first_class_at_runtime_boundary() {
987        let status = BackendRunStatus::Timeout;
988
989        assert_eq!(
990            status.durable_run_status(&TerminationReason::Error("polling timeout exceeded".into())),
991            RunStatus::Done
992        );
993        assert_eq!(
994            status
995                .durable_status_reason(&TerminationReason::Error("polling timeout exceeded".into()))
996                .as_deref(),
997            Some("timeout")
998        );
999        assert_eq!(
1000            status
1001                .result_status_label(&TerminationReason::Error("polling timeout exceeded".into())),
1002            "timeout"
1003        );
1004    }
1005
1006    #[test]
1007    fn backend_status_waiting_is_first_class_at_runtime_boundary() {
1008        let status = BackendRunStatus::WaitingInput(Some("need details".into()));
1009
1010        assert_eq!(
1011            status.durable_run_status(&TerminationReason::Error("should not win".into())),
1012            RunStatus::Waiting
1013        );
1014        assert_eq!(
1015            status
1016                .durable_status_reason(&TerminationReason::Error("should not win".into()))
1017                .as_deref(),
1018            Some("input_required")
1019        );
1020        assert_eq!(
1021            status.result_status_label(&TerminationReason::Error("should not win".into())),
1022            "waiting_input"
1023        );
1024    }
1025}