1mod local;
4
5use std::sync::Arc;
6
7use async_trait::async_trait;
8use awaken_contract::contract::event::AgentEvent;
9use awaken_contract::contract::event_sink::EventSink;
10use awaken_contract::contract::identity::RunIdentity;
11use awaken_contract::contract::lifecycle::{RunStatus, TerminationReason};
12use awaken_contract::contract::message::{Message, Role, gen_message_id};
13use awaken_contract::contract::storage::{
14 MessageSeqRange, RunMessageInput, RunMessageOutput, RunOutcome, RunRecord, RunWaitingState,
15 ThreadRunStore, WaitingReason,
16};
17use awaken_contract::contract::suspension::ToolCallResume;
18use awaken_contract::contract::tool::ToolDescriptor;
19use awaken_contract::now_ms;
20use awaken_contract::registry_spec::RemoteEndpoint;
21use awaken_contract::state::PersistedState;
22use futures::channel::mpsc;
23use serde::{Deserialize, Serialize};
24use serde_json::{Value, json};
25
26use crate::cancellation::CancellationToken;
27use crate::inbox::{InboxReceiver, InboxSender};
28use crate::loop_runner::{AgentLoopError, AgentRunResult};
29use crate::phase::PhaseRuntime;
30use crate::registry::{ExecutionResolver, ResolvedBackendAgent, ResolvedExecution};
31
32pub use local::LocalBackend;
33
34const BACKEND_OUTPUT_STATE_KEY: &str = "__runtime_backend_output";
35
36#[derive(Debug, Clone, Default)]
38pub struct BackendParentContext {
39 pub parent_run_id: Option<String>,
40 pub parent_thread_id: Option<String>,
41 pub parent_tool_call_id: Option<String>,
42}
43
44#[derive(Default)]
46pub struct BackendControl {
47 pub cancellation_token: Option<CancellationToken>,
48 pub decision_rx: Option<mpsc::UnboundedReceiver<Vec<(String, ToolCallResume)>>>,
49}
50
51#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53pub enum BackendCancellationCapability {
54 None,
55 CooperativeToken,
56 RemoteAbort,
57 CooperativeTokenAndRemoteAbort,
58}
59
60impl BackendCancellationCapability {
61 #[must_use]
62 pub const fn supports_cooperative_token(self) -> bool {
63 matches!(
64 self,
65 Self::CooperativeToken | Self::CooperativeTokenAndRemoteAbort
66 )
67 }
68
69 #[must_use]
70 pub const fn supports_remote_abort(self) -> bool {
71 matches!(
72 self,
73 Self::RemoteAbort | Self::CooperativeTokenAndRemoteAbort
74 )
75 }
76}
77
78#[derive(Debug, Clone, Copy, PartialEq, Eq)]
80pub enum BackendContinuationCapability {
81 None,
82 InProcessState,
83 RemoteState,
84}
85
86#[derive(Debug, Clone, Copy, PartialEq, Eq)]
88pub enum BackendWaitCapability {
89 None,
90 Input,
91 Auth,
92 InputAndAuth,
93}
94
95#[derive(Debug, Clone, Copy, PartialEq, Eq)]
97pub enum BackendTranscriptCapability {
98 FullTranscript,
99 IncrementalUserMessagesWithRemoteState,
100 SinglePrompt,
101}
102
103#[derive(Debug, Clone, Copy, PartialEq, Eq)]
105pub enum BackendOutputCapability {
106 Text,
107 TextAndArtifacts,
108}
109
110#[derive(Debug, Clone, Copy, PartialEq, Eq)]
112pub struct BackendCapabilities {
113 pub cancellation: BackendCancellationCapability,
114 pub decisions: bool,
115 pub overrides: bool,
116 pub frontend_tools: bool,
117 pub continuation: BackendContinuationCapability,
118 pub waits: BackendWaitCapability,
119 pub transcript: BackendTranscriptCapability,
120 pub output: BackendOutputCapability,
121}
122
123impl BackendCapabilities {
124 #[must_use]
125 pub const fn full() -> Self {
126 Self {
127 cancellation: BackendCancellationCapability::CooperativeToken,
128 decisions: true,
129 overrides: true,
130 frontend_tools: true,
131 continuation: BackendContinuationCapability::InProcessState,
132 waits: BackendWaitCapability::InputAndAuth,
133 transcript: BackendTranscriptCapability::FullTranscript,
134 output: BackendOutputCapability::TextAndArtifacts,
135 }
136 }
137
138 #[must_use]
139 pub const fn remote_stateless_text() -> Self {
140 Self {
141 cancellation: BackendCancellationCapability::None,
142 decisions: false,
143 overrides: false,
144 frontend_tools: false,
145 continuation: BackendContinuationCapability::None,
146 waits: BackendWaitCapability::None,
147 transcript: BackendTranscriptCapability::SinglePrompt,
148 output: BackendOutputCapability::Text,
149 }
150 }
151
152 #[must_use]
153 pub fn unsupported_root_features(
154 &self,
155 request: &BackendRootRunRequest<'_>,
156 ) -> Vec<&'static str> {
157 let mut unsupported = Vec::new();
158 if (!request.decisions.is_empty() || request.control.decision_rx.is_some())
159 && !self.decisions
160 {
161 unsupported.push("decisions");
162 }
163 if request.overrides.is_some() && !self.overrides {
164 unsupported.push("overrides");
165 }
166 if !request.frontend_tools.is_empty() && !self.frontend_tools {
167 unsupported.push("frontend_tools");
168 }
169 if request.is_continuation && self.continuation == BackendContinuationCapability::None {
170 unsupported.push("continuation");
171 }
172 unsupported
173 }
174
175 #[must_use]
176 pub fn unsupported_delegate_features(
177 &self,
178 request: &BackendDelegateRunRequest<'_>,
179 ) -> Vec<&'static str> {
180 let mut unsupported = Vec::new();
181 if request.policy.persistence != BackendDelegatePersistence::Ephemeral {
182 unsupported.push("delegate_persistence");
183 }
184 if request.policy.continuation != BackendDelegateContinuation::Disabled
185 && self.continuation == BackendContinuationCapability::None
186 {
187 unsupported.push("continuation");
188 }
189 unsupported
190 }
191}
192
193impl Default for BackendCapabilities {
194 fn default() -> Self {
195 Self::full()
196 }
197}
198
199pub struct BackendRootRunRequest<'a> {
201 pub agent_id: &'a str,
202 pub messages: Vec<Message>,
203 pub new_messages: Vec<Message>,
204 pub sink: Arc<dyn EventSink>,
205 pub resolver: &'a dyn ExecutionResolver,
206 pub run_identity: RunIdentity,
207 pub checkpoint_store: Option<&'a dyn ThreadRunStore>,
208 pub control: BackendControl,
209 pub decisions: Vec<(String, ToolCallResume)>,
210 pub overrides: Option<awaken_contract::contract::inference::InferenceOverride>,
211 pub frontend_tools: Vec<ToolDescriptor>,
212 pub local: Option<BackendLocalRootContext<'a>>,
213 pub inbox: Option<InboxReceiver>,
214 pub is_continuation: bool,
215}
216
217#[derive(Clone, Copy)]
219pub struct BackendLocalRootContext<'a> {
220 pub phase_runtime: &'a PhaseRuntime,
221}
222
223#[derive(Debug, Clone, Copy, PartialEq, Eq)]
225pub enum BackendDelegatePersistence {
226 Ephemeral,
227}
228
229#[derive(Debug, Clone, Copy, PartialEq, Eq)]
231pub enum BackendDelegateContinuation {
232 Disabled,
233}
234
235#[derive(Debug, Clone, Copy, PartialEq, Eq)]
237pub struct BackendDelegatePolicy {
238 pub persistence: BackendDelegatePersistence,
239 pub continuation: BackendDelegateContinuation,
240}
241
242impl Default for BackendDelegatePolicy {
243 fn default() -> Self {
244 Self {
245 persistence: BackendDelegatePersistence::Ephemeral,
246 continuation: BackendDelegateContinuation::Disabled,
247 }
248 }
249}
250
251pub struct BackendDelegateRunRequest<'a> {
253 pub agent_id: &'a str,
254 pub messages: Vec<Message>,
255 pub new_messages: Vec<Message>,
256 pub sink: Arc<dyn EventSink>,
257 pub resolver: &'a dyn ExecutionResolver,
258 pub parent: BackendParentContext,
259 pub control: BackendControl,
260 pub policy: BackendDelegatePolicy,
261}
262
263pub struct BackendAbortRequest<'a> {
265 pub agent_id: &'a str,
266 pub run_identity: &'a RunIdentity,
267 pub parent: Option<&'a BackendParentContext>,
268 pub persisted_state: Option<&'a PersistedState>,
269 pub is_continuation: bool,
270}
271
272#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)]
274pub struct BackendRunOutput {
275 #[serde(default, skip_serializing_if = "Option::is_none")]
276 pub text: Option<String>,
277 #[serde(default, skip_serializing_if = "Vec::is_empty")]
278 pub artifacts: Vec<BackendOutputArtifact>,
279 #[serde(default, skip_serializing_if = "Option::is_none")]
280 pub raw: Option<Value>,
281}
282
283impl BackendRunOutput {
284 #[must_use]
285 pub fn from_text(text: Option<String>) -> Self {
286 Self {
287 text,
288 artifacts: Vec::new(),
289 raw: None,
290 }
291 }
292
293 #[must_use]
294 pub fn text_or<'a>(&'a self, fallback: &'a Option<String>) -> Option<String> {
295 self.text.clone().or_else(|| fallback.clone())
296 }
297}
298
299#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
301pub struct BackendOutputArtifact {
302 #[serde(default, skip_serializing_if = "Option::is_none")]
303 pub id: Option<String>,
304 #[serde(default, skip_serializing_if = "Option::is_none")]
305 pub name: Option<String>,
306 #[serde(default, skip_serializing_if = "Option::is_none")]
307 pub media_type: Option<String>,
308 pub content: Value,
309}
310
311#[derive(Debug, Clone)]
313pub struct BackendRunResult {
314 pub agent_id: String,
315 pub status: BackendRunStatus,
316 pub termination: TerminationReason,
317 pub status_reason: Option<String>,
318 pub response: Option<String>,
319 pub output: BackendRunOutput,
320 pub steps: usize,
321 pub run_id: Option<String>,
322 pub inbox: Option<InboxSender>,
323 pub state: Option<PersistedState>,
324}
325
326#[derive(Debug, Clone)]
328pub enum BackendRunStatus {
329 Completed,
330 WaitingInput(Option<String>),
331 WaitingAuth(Option<String>),
332 Suspended(Option<String>),
333 Failed(String),
334 Cancelled,
335 Timeout,
336}
337
338impl std::fmt::Display for BackendRunStatus {
339 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
340 match self {
341 Self::Completed => write!(f, "completed"),
342 Self::WaitingInput(Some(msg)) => write!(f, "waiting_input: {msg}"),
343 Self::WaitingInput(None) => write!(f, "waiting_input"),
344 Self::WaitingAuth(Some(msg)) => write!(f, "waiting_auth: {msg}"),
345 Self::WaitingAuth(None) => write!(f, "waiting_auth"),
346 Self::Suspended(Some(msg)) => write!(f, "suspended: {msg}"),
347 Self::Suspended(None) => write!(f, "suspended"),
348 Self::Failed(msg) => write!(f, "failed: {msg}"),
349 Self::Cancelled => write!(f, "cancelled"),
350 Self::Timeout => write!(f, "timeout"),
351 }
352 }
353}
354
355impl BackendRunStatus {
356 #[must_use]
357 pub fn durable_run_status(&self, termination: &TerminationReason) -> RunStatus {
358 match self {
359 Self::WaitingInput(_) | Self::WaitingAuth(_) | Self::Suspended(_) => RunStatus::Waiting,
360 Self::Completed => termination.to_run_status().0,
361 Self::Failed(_) | Self::Cancelled | Self::Timeout => RunStatus::Done,
362 }
363 }
364
365 #[must_use]
366 pub fn durable_status_reason(&self, termination: &TerminationReason) -> Option<String> {
367 match self {
368 Self::WaitingInput(_) => Some("input_required".to_string()),
369 Self::WaitingAuth(_) => Some("auth_required".to_string()),
370 Self::Suspended(_) => Some("suspended".to_string()),
371 Self::Timeout => Some("timeout".to_string()),
372 Self::Failed(_) => Some("error".to_string()),
373 Self::Cancelled => Some("cancelled".to_string()),
374 Self::Completed => termination.to_run_status().1,
375 }
376 }
377
378 #[must_use]
379 pub fn result_status_label(&self, termination: &TerminationReason) -> &'static str {
380 match self {
381 Self::Completed => run_status_label(termination.to_run_status().0),
382 Self::WaitingInput(_) => "waiting_input",
383 Self::WaitingAuth(_) => "waiting_auth",
384 Self::Suspended(_) => "suspended",
385 Self::Failed(_) => "failed",
386 Self::Cancelled => "cancelled",
387 Self::Timeout => "timeout",
388 }
389 }
390}
391
392#[async_trait]
394pub trait ExecutionBackend: Send + Sync {
395 fn capabilities(&self) -> BackendCapabilities {
396 BackendCapabilities::remote_stateless_text()
397 }
398
399 async fn abort(&self, _request: BackendAbortRequest<'_>) -> Result<(), ExecutionBackendError> {
400 Ok(())
401 }
402
403 async fn execute_root(
404 &self,
405 _request: BackendRootRunRequest<'_>,
406 ) -> Result<BackendRunResult, ExecutionBackendError> {
407 Err(ExecutionBackendError::ExecutionFailed(
408 "backend does not support root execution".into(),
409 ))
410 }
411
412 async fn execute_delegate(
413 &self,
414 _request: BackendDelegateRunRequest<'_>,
415 ) -> Result<BackendRunResult, ExecutionBackendError> {
416 Err(ExecutionBackendError::ExecutionFailed(
417 "backend does not support delegated execution".into(),
418 ))
419 }
420}
421
422pub trait ExecutionBackendFactory: Send + Sync {
424 fn backend(&self) -> &str;
425
426 fn validate(&self, endpoint: &RemoteEndpoint) -> Result<(), ExecutionBackendFactoryError> {
427 self.build(endpoint).map(|_| ())
428 }
429
430 fn build(
431 &self,
432 endpoint: &RemoteEndpoint,
433 ) -> Result<Arc<dyn ExecutionBackend>, ExecutionBackendFactoryError>;
434}
435
436#[derive(Debug, thiserror::Error)]
437pub enum ExecutionBackendFactoryError {
438 #[error("{0}")]
439 InvalidConfig(String),
440}
441
442#[derive(Debug, thiserror::Error)]
443pub enum ExecutionBackendError {
444 #[error("agent not found: {0}")]
445 AgentNotFound(String),
446 #[error("execution failed: {0}")]
447 ExecutionFailed(String),
448 #[error("remote error: {0}")]
449 RemoteError(String),
450 #[error(transparent)]
451 Loop(#[from] AgentLoopError),
452}
453
454pub fn execution_capabilities(
455 execution: &ResolvedExecution,
456) -> Result<BackendCapabilities, ExecutionBackendError> {
457 match execution {
458 ResolvedExecution::Local(_) => Ok(LocalBackend::new().capabilities()),
459 ResolvedExecution::NonLocal(agent) => Ok(agent.backend()?.capabilities()),
460 }
461}
462
463pub fn validate_root_execution_request(
464 execution: &ResolvedExecution,
465 request: &BackendRootRunRequest<'_>,
466) -> Result<(), ExecutionBackendError> {
467 let unsupported = execution_capabilities(execution)?.unsupported_root_features(request);
468 if !unsupported.is_empty() {
469 return Err(ExecutionBackendError::ExecutionFailed(format!(
470 "agent '{}' backend does not support: {}",
471 request.agent_id,
472 unsupported.join(", ")
473 )));
474 }
475 Ok(())
476}
477
478pub fn validate_delegate_execution_request(
479 execution: &ResolvedExecution,
480 request: &BackendDelegateRunRequest<'_>,
481) -> Result<(), ExecutionBackendError> {
482 let unsupported = execution_capabilities(execution)?.unsupported_delegate_features(request);
483 if !unsupported.is_empty() {
484 return Err(ExecutionBackendError::ExecutionFailed(format!(
485 "agent '{}' backend does not support: {}",
486 request.agent_id,
487 unsupported.join(", ")
488 )));
489 }
490 Ok(())
491}
492
493pub async fn execute_resolved_delegate_execution(
494 execution: &ResolvedExecution,
495 request: BackendDelegateRunRequest<'_>,
496) -> Result<BackendRunResult, ExecutionBackendError> {
497 validate_delegate_execution_request(execution, &request)?;
498 match execution {
499 ResolvedExecution::Local(_) => LocalBackend::new().execute_delegate(request).await,
500 ResolvedExecution::NonLocal(agent) => agent.backend()?.execute_delegate(request).await,
501 }
502}
503
504pub async fn execute_remote_root_lifecycle(
506 agent: &ResolvedBackendAgent,
507 request: BackendRootRunRequest<'_>,
508 run_created_at: u64,
509 runtime_cancellation_token: CancellationToken,
510 previous_state: Option<PersistedState>,
511) -> Result<AgentRunResult, AgentLoopError> {
512 let backend = agent.backend().map_err(|error| {
513 AgentLoopError::RuntimeError(crate::RuntimeError::ResolveFailed {
514 message: error.to_string(),
515 })
516 })?;
517 let run_identity = request.run_identity.clone();
518 let sink = request.sink.clone();
519 let checkpoint_store = request.checkpoint_store;
520 let mut messages = request.messages.clone();
521 let input_message_count = messages.len();
522 let request_is_continuation = request.is_continuation;
523
524 sink.emit(AgentEvent::RunStart {
525 thread_id: run_identity.thread_id.clone(),
526 run_id: run_identity.run_id.clone(),
527 parent_run_id: run_identity.parent_run_id.clone(),
528 identity: Some(run_identity.clone()),
529 })
530 .await;
531 sink.emit(AgentEvent::StepStart {
532 message_id: gen_message_id(),
533 })
534 .await;
535
536 let execution_started_at = now_ms();
537 let backend_execution = backend.execute_root(request);
538 tokio::pin!(backend_execution);
539 let delegate_result = tokio::select! {
540 result = &mut backend_execution => {
541 match result {
542 Ok(result) => result,
543 Err(error) => {
544 let error_message = remote_backend_error_message(error);
545 let termination = TerminationReason::Error(error_message.clone());
546 let latest_state = load_checkpoint_state(
547 checkpoint_store,
548 &run_identity.run_id,
549 previous_state.clone(),
550 )
551 .await;
552 return finish_remote_root_run(
553 checkpoint_store,
554 &run_identity.thread_id,
555 &run_identity.run_id,
556 &run_identity.agent_id,
557 run_identity.parent_run_id.clone(),
558 &run_identity,
559 run_created_at,
560 messages,
561 input_message_count,
562 BackendRunStatus::Failed(error_message),
563 termination,
564 None,
565 0,
566 String::new(),
567 BackendRunOutput::default(),
568 latest_state,
569 &sink,
570 )
571 .await;
572 }
573 }
574 }
575 _ = runtime_cancellation_token.cancelled() => {
576 let latest_state = load_checkpoint_state(
577 checkpoint_store,
578 &run_identity.run_id,
579 previous_state.clone(),
580 )
581 .await;
582 if backend.capabilities().cancellation.supports_remote_abort()
583 && let Err(error) = backend
584 .abort(BackendAbortRequest {
585 agent_id: &run_identity.agent_id,
586 run_identity: &run_identity,
587 parent: None,
588 persisted_state: latest_state.as_ref(),
589 is_continuation: request_is_continuation,
590 })
591 .await
592 {
593 tracing::warn!(
594 agent_id = %run_identity.agent_id,
595 run_id = %run_identity.run_id,
596 error = %error,
597 "non-local backend abort hook failed after cancellation"
598 );
599 }
600 return finish_remote_root_run(
601 checkpoint_store,
602 &run_identity.thread_id,
603 &run_identity.run_id,
604 &run_identity.agent_id,
605 run_identity.parent_run_id.clone(),
606 &run_identity,
607 run_created_at,
608 messages,
609 input_message_count,
610 BackendRunStatus::Cancelled,
611 TerminationReason::Cancelled,
612 None,
613 0,
614 String::new(),
615 BackendRunOutput::default(),
616 latest_state,
617 &sink,
618 )
619 .await;
620 }
621 };
622
623 let termination = delegate_result.termination.clone();
624 let status_reason = delegate_result.status_reason.clone();
625 let mut output = delegate_result.output.clone();
626 let response = output
627 .text_or(&delegate_result.response)
628 .unwrap_or_default();
629 if output.text.is_none() && !response.is_empty() {
630 output.text = Some(response.clone());
631 }
632 let status = delegate_result.status;
633 let steps = delegate_result.steps;
634 let state = delegate_result.state.or(previous_state);
635 if !response.is_empty() {
636 sink.emit(AgentEvent::TextDelta {
637 delta: response.clone(),
638 })
639 .await;
640 messages.push(Message::assistant(response.clone()));
641 }
642
643 if matches!(
644 termination,
645 TerminationReason::NaturalEnd | TerminationReason::BehaviorRequested
646 ) {
647 sink.emit(AgentEvent::InferenceComplete {
648 model: agent.spec.model_id.clone(),
649 usage: None,
650 duration_ms: now_ms().saturating_sub(execution_started_at),
651 })
652 .await;
653 }
654
655 finish_remote_root_run(
656 checkpoint_store,
657 &run_identity.thread_id,
658 &run_identity.run_id,
659 &run_identity.agent_id,
660 run_identity.parent_run_id.clone(),
661 &run_identity,
662 run_created_at,
663 messages,
664 input_message_count,
665 status,
666 termination,
667 status_reason,
668 steps,
669 response,
670 output,
671 state,
672 &sink,
673 )
674 .await
675}
676
677async fn load_checkpoint_state(
678 storage: Option<&dyn ThreadRunStore>,
679 run_id: &str,
680 fallback: Option<PersistedState>,
681) -> Option<PersistedState> {
682 let Some(storage) = storage else {
683 return fallback;
684 };
685 match storage.load_run(run_id).await {
686 Ok(Some(run)) => run.state.or(fallback),
687 Ok(None) => fallback,
688 Err(error) => {
689 tracing::warn!(run_id, error = %error, "failed to load latest checkpoint state");
690 fallback
691 }
692 }
693}
694
695#[allow(clippy::too_many_arguments)]
696async fn finish_remote_root_run(
697 storage: Option<&dyn ThreadRunStore>,
698 thread_id: &str,
699 run_id: &str,
700 agent_id: &str,
701 parent_run_id: Option<String>,
702 run_identity: &RunIdentity,
703 run_created_at: u64,
704 messages: Vec<Message>,
705 input_message_count: usize,
706 backend_status: BackendRunStatus,
707 termination: TerminationReason,
708 status_reason_override: Option<String>,
709 steps: usize,
710 response: String,
711 output: BackendRunOutput,
712 state: Option<PersistedState>,
713 sink: &Arc<dyn EventSink>,
714) -> Result<AgentRunResult, AgentLoopError> {
715 let status = backend_status.durable_run_status(&termination);
716 let status_reason =
717 status_reason_override.or_else(|| backend_status.durable_status_reason(&termination));
718 let state = state_with_backend_output(state, &output);
719 let mut result_json = json!({
720 "response": response,
721 "status": backend_status.result_status_label(&termination),
722 });
723 if output != BackendRunOutput::default() {
724 result_json["output"] = serde_json::to_value(&output).unwrap_or(Value::Null);
725 }
726 if let Some(reason) = &status_reason {
727 result_json["status_reason"] = Value::String(reason.clone());
728 }
729
730 persist_remote_root_checkpoint(
731 storage,
732 thread_id,
733 run_id,
734 agent_id,
735 parent_run_id,
736 run_created_at,
737 &messages,
738 input_message_count,
739 status,
740 Some(termination.clone()),
741 status_reason.clone(),
742 (!response.is_empty()).then(|| response.clone()),
743 match &termination {
744 TerminationReason::Error(message) => Some(json!({ "message": message })),
745 _ => None,
746 },
747 run_identity,
748 steps,
749 state,
750 )
751 .await?;
752
753 sink.emit(AgentEvent::StepEnd).await;
754 sink.emit(AgentEvent::RunFinish {
755 thread_id: thread_id.to_string(),
756 run_id: run_id.to_string(),
757 identity: Some(run_identity.clone()),
758 result: Some(result_json),
759 termination: termination.clone(),
760 })
761 .await;
762
763 Ok(AgentRunResult {
764 run_id: run_id.to_string(),
765 response,
766 termination,
767 steps,
768 })
769}
770
771fn state_with_backend_output(
772 state: Option<PersistedState>,
773 output: &BackendRunOutput,
774) -> Option<PersistedState> {
775 if output == &BackendRunOutput::default() {
776 return state;
777 }
778
779 let mut state = state.unwrap_or(PersistedState {
780 revision: 0,
781 extensions: std::collections::HashMap::new(),
782 });
783 if let Ok(value) = serde_json::to_value(output) {
784 state
785 .extensions
786 .insert(BACKEND_OUTPUT_STATE_KEY.to_string(), value);
787 }
788 Some(state)
789}
790
791fn waiting_reason_from_backend_status(status_reason: Option<&str>) -> WaitingReason {
792 match status_reason {
793 Some("input_required" | "user_input_required") => WaitingReason::UserInput,
794 Some("auth_required" | "suspended") => WaitingReason::ToolPermission,
795 Some("awaiting_tasks") => WaitingReason::BackgroundTasks,
796 Some("rate_limit") => WaitingReason::RateLimit,
797 Some("manual_pause") => WaitingReason::ManualPause,
798 _ => WaitingReason::ExternalEvent,
799 }
800}
801
802#[allow(clippy::too_many_arguments)]
803async fn persist_remote_root_checkpoint(
804 storage: Option<&dyn ThreadRunStore>,
805 thread_id: &str,
806 run_id: &str,
807 agent_id: &str,
808 parent_run_id: Option<String>,
809 run_created_at: u64,
810 messages: &[Message],
811 input_message_count: usize,
812 status: RunStatus,
813 termination_reason: Option<TerminationReason>,
814 status_reason: Option<String>,
815 final_output: Option<String>,
816 error_payload: Option<Value>,
817 run_identity: &RunIdentity,
818 steps: usize,
819 state: Option<PersistedState>,
820) -> Result<(), AgentLoopError> {
821 let Some(storage) = storage else {
822 return Ok(());
823 };
824 let previous = storage
825 .load_run(run_id)
826 .await
827 .map_err(|error| AgentLoopError::StorageError(error.to_string()))?;
828 let created_at = previous
829 .as_ref()
830 .map(|record| record.created_at)
831 .unwrap_or(run_created_at / 1000);
832 let finished_at = status.is_terminal().then_some(now_ms() / 1000);
833 let outcome = termination_reason
834 .clone()
835 .map(|termination_reason| RunOutcome {
836 termination_reason,
837 final_output: final_output.clone(),
838 error_payload: error_payload.clone(),
839 });
840 let waiting = (status == RunStatus::Waiting).then(|| RunWaitingState {
841 reason: waiting_reason_from_backend_status(status_reason.as_deref()),
842 ticket_ids: Vec::new(),
843 tickets: Vec::new(),
844 since_dispatch_id: run_identity.trace.dispatch_id.clone(),
845 message: status_reason.clone(),
846 });
847 let (messages, input, output) = materialize_remote_message_log(
848 messages.to_vec(),
849 previous.as_ref(),
850 run_identity,
851 steps,
852 input_message_count,
853 );
854
855 let record = RunRecord {
856 run_id: run_id.to_string(),
857 thread_id: thread_id.to_string(),
858 agent_id: agent_id.to_string(),
859 parent_run_id,
860 request: previous.as_ref().and_then(|record| record.request.clone()),
861 input,
862 output,
863 status,
864 termination_reason,
865 final_output,
866 error_payload,
867 dispatch_id: run_identity.trace.dispatch_id.clone(),
868 session_id: run_identity.trace.session_id.clone(),
869 transport_request_id: run_identity.trace.transport_request_id.clone(),
870 waiting,
871 outcome,
872 created_at,
873 started_at: previous
874 .as_ref()
875 .and_then(|record| record.started_at)
876 .or(Some(run_created_at / 1000)),
877 finished_at,
878 updated_at: now_ms() / 1000,
879 steps,
880 input_tokens: 0,
881 output_tokens: 0,
882 state,
883 };
884 storage
885 .checkpoint(thread_id, &messages, &record)
886 .await
887 .map_err(|error| AgentLoopError::StorageError(error.to_string()))
888}
889
890fn materialize_remote_message_log(
891 mut messages: Vec<Message>,
892 previous: Option<&RunRecord>,
893 run_identity: &RunIdentity,
894 steps: usize,
895 input_message_count: usize,
896) -> (
897 Vec<Message>,
898 Option<RunMessageInput>,
899 Option<RunMessageOutput>,
900) {
901 let input = previous
902 .and_then(|record| record.input.clone())
903 .or_else(|| {
904 infer_remote_input_from_initial_messages(&run_identity.thread_id, input_message_count)
905 });
906 let output_start_seq = input
907 .as_ref()
908 .and_then(|input| input.range)
909 .map(|range| range.to_seq.saturating_add(1))
910 .unwrap_or(input_message_count as u64 + 1);
911 let step_index = (steps > 0).then_some(steps.saturating_sub(1) as u32);
912 let mut output_message_ids = Vec::new();
913 let mut output_from_seq = None;
914 let mut output_to_seq = None;
915 for (index, message) in messages.iter_mut().enumerate() {
916 let seq = index as u64 + 1;
917 if seq < output_start_seq || !is_remote_run_output_message(message) {
918 continue;
919 }
920 message.mark_produced_by(&run_identity.run_id, step_index);
921 output_from_seq.get_or_insert(seq);
922 output_to_seq = Some(seq);
923 if let Some(id) = message.id.clone() {
924 output_message_ids.push(id);
925 }
926 }
927 let output = if output_from_seq.is_none() {
928 previous.and_then(|record| record.output.clone())
929 } else {
930 Some(RunMessageOutput {
931 thread_id: run_identity.thread_id.clone(),
932 range: output_from_seq
933 .zip(output_to_seq)
934 .and_then(|(from, to)| MessageSeqRange::new(from, to)),
935 message_ids: output_message_ids,
936 })
937 };
938 (messages, input, output)
939}
940
941fn infer_remote_input_from_initial_messages(
942 thread_id: &str,
943 input_message_count: usize,
944) -> Option<RunMessageInput> {
945 if input_message_count == 0 {
946 return None;
947 }
948 let to_seq = input_message_count as u64;
949 Some(RunMessageInput {
950 thread_id: thread_id.to_string(),
951 range: MessageSeqRange::new(1, to_seq),
952 trigger_message_ids: Vec::new(),
953 selected_message_ids: Vec::new(),
954 context_policy: None,
955 compacted_snapshot_id: None,
956 })
957}
958
959fn is_remote_run_output_message(message: &Message) -> bool {
960 matches!(message.role, Role::Assistant | Role::Tool)
961}
962
963fn remote_backend_error_message(error: ExecutionBackendError) -> String {
964 match error {
965 ExecutionBackendError::AgentNotFound(message)
966 | ExecutionBackendError::ExecutionFailed(message)
967 | ExecutionBackendError::RemoteError(message) => message,
968 ExecutionBackendError::Loop(error) => error.to_string(),
969 }
970}
971
972fn run_status_label(status: RunStatus) -> &'static str {
973 match status {
974 RunStatus::Created => "created",
975 RunStatus::Running => "running",
976 RunStatus::Waiting => "waiting",
977 RunStatus::Done => "done",
978 }
979}
980
981#[cfg(test)]
982mod tests {
983 use super::*;
984
985 #[test]
986 fn backend_status_timeout_is_first_class_at_runtime_boundary() {
987 let status = BackendRunStatus::Timeout;
988
989 assert_eq!(
990 status.durable_run_status(&TerminationReason::Error("polling timeout exceeded".into())),
991 RunStatus::Done
992 );
993 assert_eq!(
994 status
995 .durable_status_reason(&TerminationReason::Error("polling timeout exceeded".into()))
996 .as_deref(),
997 Some("timeout")
998 );
999 assert_eq!(
1000 status
1001 .result_status_label(&TerminationReason::Error("polling timeout exceeded".into())),
1002 "timeout"
1003 );
1004 }
1005
1006 #[test]
1007 fn backend_status_waiting_is_first_class_at_runtime_boundary() {
1008 let status = BackendRunStatus::WaitingInput(Some("need details".into()));
1009
1010 assert_eq!(
1011 status.durable_run_status(&TerminationReason::Error("should not win".into())),
1012 RunStatus::Waiting
1013 );
1014 assert_eq!(
1015 status
1016 .durable_status_reason(&TerminationReason::Error("should not win".into()))
1017 .as_deref(),
1018 Some("input_required")
1019 );
1020 assert_eq!(
1021 status.result_status_label(&TerminationReason::Error("should not win".into())),
1022 "waiting_input"
1023 );
1024 }
1025}