Skip to main content

everruns_runtime/
host.rs

1// Shared host orchestration for embedded and durable execution hosts.
2// Decision: everruns-runtime owns worker-facing turn phase execution so
3// durable/server-backed hosts reuse the same input/reason/act wiring.
4
5use async_trait::async_trait;
6use everruns_core::atoms::{
7    ActAtom, ActInput, ActResult, Atom, InputAtom, InputAtomInput, InputAtomResult, ReasonAtom,
8    ReasonInput, ReasonResult,
9};
10use everruns_core::capabilities::{SystemPromptContext, collect_capabilities_with_configs};
11use everruns_core::events::{
12    EventContext, EventRequest, OutputMessageCompletedData, SessionActivatedData, SessionIdledData,
13    TurnCompletedData, TurnFailedData, TurnStartedData,
14};
15use everruns_core::message::Message;
16use everruns_core::message_retriever::MessageRetriever;
17use everruns_core::platform_store::PlatformStore;
18use everruns_core::session::SessionStatus;
19use everruns_core::traits::{
20    AgentStore, BudgetChecker, EventEmitter, HarnessStore, ImageArtifactStore, ImageResolver,
21    LeasedResourceStore, LlmProviderStore, ModelWithProvider, PaymentAuthority,
22    ProviderCredentialStore, SessionFileSystem, SessionMutator, SessionResourceRegistry,
23    SessionScheduleStore, SessionSqlDbStoreRef, SessionStorageStore, SessionStore,
24    UserConnectionResolver,
25};
26use everruns_core::typed_id::{AgentId, HarnessId, MessageId, SessionId, TurnId};
27use everruns_core::{
28    Agent, CapabilityRegistry, DependencyBlocker, DriverRegistry, Harness, Session, TokenUsage,
29    ToolDefinition, ToolRegistry, UserFacingError, org_public_id_from_internal,
30    resolve_runtime_capabilities,
31};
32use std::sync::Arc;
33use tracing::warn;
34
35/// Turn context loaded in one batched call for runtime host execution.
36#[derive(Debug, Clone)]
37pub struct RuntimeHostTurnContext {
38    pub agent: Option<Agent>,
39    pub session: Session,
40    pub messages: Vec<Message>,
41    pub model: Option<ModelWithProvider>,
42    pub mcp_tool_definitions: Vec<ToolDefinition>,
43}
44
45/// Public adapter contract for server-backed or durable runtime hosts.
46///
47/// `everruns-runtime` owns shared host orchestration for both embedded and
48/// durable execution. That includes phase execution (`input -> reason -> act`),
49/// lifecycle emission, and the generic turn-strategy decisions used by durable
50/// or custom hosts.
51///
52/// Host crates implement this trait to provide persistence, session-lifecycle
53/// plumbing, event delivery, and their own orchestration backend. The durable
54/// engine itself remains outside this crate.
55#[async_trait]
56pub trait RuntimeHostAdapter: Send + Sync + Clone + 'static {
57    async fn get_agent(
58        &self,
59        org_id: i64,
60        agent_id: AgentId,
61    ) -> everruns_core::error::Result<Option<Agent>>;
62
63    async fn get_harness(
64        &self,
65        org_id: i64,
66        harness_id: HarnessId,
67    ) -> everruns_core::error::Result<Option<Harness>>;
68
69    async fn set_session_status(
70        &self,
71        org_id: i64,
72        session_id: SessionId,
73        status: SessionStatus,
74    ) -> everruns_core::error::Result<Session>;
75
76    async fn load_turn_context(
77        &self,
78        org_id: i64,
79        session_id: SessionId,
80    ) -> everruns_core::error::Result<RuntimeHostTurnContext>;
81
82    fn capability_registry(&self) -> CapabilityRegistry;
83
84    fn driver_registry(&self) -> DriverRegistry;
85
86    fn harness_store(&self, org_id: i64) -> Arc<dyn HarnessStore>;
87
88    fn agent_store(&self, org_id: i64) -> Arc<dyn AgentStore>;
89
90    fn session_store(&self, org_id: i64) -> Arc<dyn SessionStore>;
91
92    fn session_mutator(&self, org_id: i64) -> Arc<dyn SessionMutator>;
93
94    fn provider_store(&self, org_id: i64) -> Arc<dyn LlmProviderStore>;
95
96    fn message_store(&self) -> Arc<dyn MessageRetriever>;
97
98    fn event_emitter(&self) -> Arc<dyn EventEmitter>;
99
100    fn file_store(&self) -> Arc<dyn SessionFileSystem>;
101
102    fn image_resolver(&self, _org_id: i64) -> Option<Arc<dyn ImageResolver>> {
103        None
104    }
105
106    fn image_artifact_store(&self, _org_id: i64) -> Option<Arc<dyn ImageArtifactStore>> {
107        None
108    }
109
110    fn provider_credential_store(&self, _org_id: i64) -> Option<Arc<dyn ProviderCredentialStore>> {
111        None
112    }
113
114    fn storage_store(&self) -> Option<Arc<dyn SessionStorageStore>> {
115        None
116    }
117
118    fn memory_store(&self, _org_id: i64) -> Option<Arc<dyn everruns_core::MemoryStoreBackend>> {
119        None
120    }
121
122    fn connection_resolver(&self) -> Option<Arc<dyn UserConnectionResolver>> {
123        None
124    }
125
126    fn sqldb_store(&self) -> Option<SessionSqlDbStoreRef> {
127        None
128    }
129
130    fn leased_resource_store(&self) -> Option<Arc<dyn LeasedResourceStore>> {
131        None
132    }
133
134    fn session_resource_registry(&self) -> Option<Arc<dyn SessionResourceRegistry>> {
135        None
136    }
137
138    fn schedule_store(&self, _org_id: i64) -> Option<Arc<dyn SessionScheduleStore>> {
139        None
140    }
141
142    fn platform_store(
143        &self,
144        _org_id: i64,
145        _session_id: SessionId,
146    ) -> Option<Arc<dyn PlatformStore>> {
147        None
148    }
149
150    fn budget_checker(
151        &self,
152        _org_id: i64,
153        _agent_id: Option<AgentId>,
154    ) -> Option<Arc<dyn BudgetChecker>> {
155        None
156    }
157
158    fn payment_authority(
159        &self,
160        _org_id: i64,
161        _agent_id: Option<AgentId>,
162    ) -> Option<Arc<dyn PaymentAuthority>> {
163        None
164    }
165}
166
167struct RuntimeExecutionCapabilities {
168    tool_registry: ToolRegistry,
169    post_tool_hooks: Vec<Arc<dyn everruns_core::PostToolExecHook>>,
170    tool_call_hooks: Vec<Arc<dyn everruns_core::ToolCallHook>>,
171}
172
173async fn load_execution_capabilities<A: RuntimeHostAdapter>(
174    adapter: &A,
175    org_id: i64,
176    session_id: SessionId,
177    harness_id: HarnessId,
178    agent_id: Option<AgentId>,
179    locale: Option<String>,
180    blueprint_id: Option<&str>,
181) -> everruns_core::error::Result<RuntimeExecutionCapabilities> {
182    let capability_registry = adapter.capability_registry();
183    if let Some(blueprint_id) = blueprint_id {
184        let mut registry = ToolRegistry::with_defaults();
185        let blueprint = capability_registry.blueprint(blueprint_id).ok_or_else(|| {
186            everruns_core::error::AgentLoopError::config(format!(
187                "Blueprint \"{blueprint_id}\" not found in registry"
188            ))
189        })?;
190        for tool in blueprint.tools {
191            registry.register_boxed(tool);
192        }
193        return Ok(RuntimeExecutionCapabilities {
194            tool_registry: registry,
195            post_tool_hooks: Vec::new(),
196            tool_call_hooks: Vec::new(),
197        });
198    }
199
200    let harness_chain = adapter
201        .harness_store(org_id)
202        .get_harness_chain(harness_id)
203        .await?;
204    if harness_chain.is_empty() {
205        return Err(everruns_core::error::AgentLoopError::harness_not_found(
206            harness_id,
207        ));
208    }
209
210    let session = adapter
211        .session_store(org_id)
212        .get_session(session_id)
213        .await?
214        .ok_or_else(|| everruns_core::error::AgentLoopError::session_not_found(session_id))?;
215
216    let agent_store = adapter.agent_store(org_id);
217    let agent = match agent_id {
218        Some(agent_id) => Some(
219            agent_store
220                .get_agent(agent_id)
221                .await?
222                .ok_or_else(|| everruns_core::error::AgentLoopError::agent_not_found(agent_id))?,
223        ),
224        None => None,
225    };
226
227    let resolved = resolve_runtime_capabilities(
228        &harness_chain,
229        agent.as_ref(),
230        &session,
231        &capability_registry,
232    );
233    let prompt_ctx = SystemPromptContext {
234        session_id,
235        locale: locale.or(session.locale.clone()),
236        file_store: Some(adapter.file_store()),
237    };
238    let collected = collect_capabilities_with_configs(
239        &resolved.resolved_capability_configs,
240        &capability_registry,
241        &prompt_ctx,
242    )
243    .await;
244
245    let mut registry = ToolRegistry::with_defaults();
246    for tool in collected.tools {
247        registry.register_boxed(tool);
248    }
249
250    let post_tool_hooks = resolved
251        .resolved_capability_configs
252        .iter()
253        .flat_map(|config| {
254            capability_registry
255                .get(config.capability_id())
256                .map(|capability| capability.post_tool_exec_hooks())
257                .unwrap_or_default()
258        })
259        .collect();
260    let tool_call_hooks = resolved
261        .resolved_capability_configs
262        .iter()
263        .flat_map(|config| {
264            capability_registry
265                .get(config.capability_id())
266                .map(|capability| capability.tool_call_hooks())
267                .unwrap_or_default()
268        })
269        .collect();
270
271    Ok(RuntimeExecutionCapabilities {
272        tool_registry: registry,
273        post_tool_hooks,
274        tool_call_hooks,
275    })
276}
277
278/// Shared lifecycle helper for runtime-backed hosts.
279pub struct RuntimeSessionLifecycle<A: RuntimeHostAdapter> {
280    adapter: A,
281    org_id: i64,
282    session_id: SessionId,
283}
284
285impl<A: RuntimeHostAdapter> RuntimeSessionLifecycle<A> {
286    pub fn new(adapter: A, org_id: i64, session_id: SessionId) -> Self {
287        Self {
288            adapter,
289            org_id,
290            session_id,
291        }
292    }
293
294    async fn set_session_status(&self, status: SessionStatus, action: &'static str) {
295        if let Err(error) = self
296            .adapter
297            .set_session_status(self.org_id, self.session_id, status)
298            .await
299        {
300            warn!(
301                session_id = %self.session_id,
302                org_id = self.org_id,
303                action,
304                %error,
305                "runtime host lifecycle status update failed"
306            );
307        }
308    }
309
310    async fn emit_event(&self, request: EventRequest) {
311        let event_type = request.event_type.clone();
312        if let Err(error) = self.adapter.event_emitter().emit(request).await {
313            warn!(
314                session_id = %self.session_id,
315                org_id = self.org_id,
316                event_type,
317                %error,
318                "runtime host lifecycle event emission failed"
319            );
320        }
321    }
322
323    pub async fn turn_started(&self, turn_id: TurnId, input_message_id: MessageId) {
324        let input_content = self
325            .adapter
326            .message_store()
327            .get(self.session_id, input_message_id)
328            .await
329            .ok()
330            .flatten()
331            .map(|message| message.content_to_llm_string());
332
333        self.set_session_status(SessionStatus::Active, "turn_started")
334            .await;
335
336        self.emit_event(EventRequest::new(
337            self.session_id,
338            EventContext::turn(turn_id, input_message_id),
339            SessionActivatedData {
340                turn_id,
341                input_message_id,
342            },
343        ))
344        .await;
345
346        self.emit_event(EventRequest::new(
347            self.session_id,
348            EventContext::turn(turn_id, input_message_id),
349            TurnStartedData {
350                turn_id,
351                input_message_id,
352                input_content,
353            },
354        ))
355        .await;
356    }
357
358    pub async fn emit_turn_completed(
359        &self,
360        turn_id: TurnId,
361        input_message_id: MessageId,
362        iterations: u32,
363        usage: Option<TokenUsage>,
364        input_content: Option<String>,
365    ) {
366        self.emit_event(EventRequest::new(
367            self.session_id,
368            EventContext::turn(turn_id, input_message_id),
369            TurnCompletedData {
370                turn_id,
371                iterations,
372                duration_ms: None,
373                usage,
374                input_content,
375            },
376        ))
377        .await;
378    }
379
380    pub async fn emit_session_idled(
381        &self,
382        turn_id: TurnId,
383        input_message_id: MessageId,
384        iterations: Option<u32>,
385        usage: Option<TokenUsage>,
386    ) {
387        self.set_session_status(SessionStatus::Idle, "emit_session_idled")
388            .await;
389
390        self.emit_event(EventRequest::new(
391            self.session_id,
392            EventContext::turn(turn_id, input_message_id),
393            SessionIdledData {
394                turn_id,
395                iterations,
396                usage,
397            },
398        ))
399        .await;
400    }
401
402    pub async fn turn_completed(
403        &self,
404        turn_id: TurnId,
405        input_message_id: MessageId,
406        iterations: u32,
407        usage: Option<TokenUsage>,
408        input_content: Option<String>,
409    ) {
410        self.emit_turn_completed(
411            turn_id,
412            input_message_id,
413            iterations,
414            usage.clone(),
415            input_content,
416        )
417        .await;
418        self.emit_session_idled(turn_id, input_message_id, Some(iterations), usage)
419            .await;
420    }
421
422    pub async fn turn_failed(
423        &self,
424        turn_id: TurnId,
425        input_message_id: MessageId,
426        error: &str,
427        user_error: Option<&UserFacingError>,
428    ) {
429        self.set_session_status(SessionStatus::Idle, "turn_failed")
430            .await;
431
432        self.emit_event(EventRequest::new(
433            self.session_id,
434            EventContext::turn(turn_id, input_message_id),
435            {
436                let mut data = TurnFailedData {
437                    turn_id,
438                    error: error.to_string(),
439                    error_code: None,
440                    error_fields: None,
441                };
442                if let Some(user_error) = user_error {
443                    user_error.apply_to_event_fields(&mut data.error_code, &mut data.error_fields);
444                }
445                data
446            },
447        ))
448        .await;
449
450        self.emit_event(EventRequest::new(
451            self.session_id,
452            EventContext::turn(turn_id, input_message_id),
453            SessionIdledData {
454                turn_id,
455                iterations: None,
456                usage: None,
457            },
458        ))
459        .await;
460    }
461
462    pub async fn waiting_for_tool_results(&self) {
463        self.set_session_status(
464            SessionStatus::WaitingForToolResults,
465            "waiting_for_tool_results",
466        )
467        .await;
468    }
469
470    pub async fn dependency_blocked(
471        &self,
472        turn_id: TurnId,
473        input_message_id: MessageId,
474        blocker: DependencyBlocker,
475    ) {
476        let user_error = UserFacingError::new(blocker.error_code())
477            .with_field(
478                "dependency",
479                match blocker {
480                    DependencyBlocker::HarnessArchived | DependencyBlocker::HarnessDeleted => {
481                        "harness"
482                    }
483                    DependencyBlocker::AgentArchived | DependencyBlocker::AgentDeleted => "agent",
484                },
485            )
486            .with_field(
487                "state",
488                match blocker {
489                    DependencyBlocker::HarnessArchived | DependencyBlocker::AgentArchived => {
490                        "archived"
491                    }
492                    DependencyBlocker::HarnessDeleted | DependencyBlocker::AgentDeleted => {
493                        "deleted"
494                    }
495                },
496            );
497        let mut error_message = Message::assistant(blocker.message());
498        let mut metadata = std::collections::HashMap::new();
499        user_error.apply_to_message_metadata(&mut metadata);
500        error_message.metadata = Some(metadata);
501
502        self.emit_event(EventRequest::new(
503            self.session_id,
504            EventContext::turn(turn_id, input_message_id),
505            OutputMessageCompletedData::new(error_message).with_user_facing_error(&user_error),
506        ))
507        .await;
508
509        self.turn_failed(
510            turn_id,
511            input_message_id,
512            blocker.message(),
513            Some(&user_error),
514        )
515        .await;
516    }
517}
518
519pub async fn detect_dependency_blocker<A: RuntimeHostAdapter>(
520    adapter: &A,
521    org_id: i64,
522    harness_id: HarnessId,
523    agent_id: Option<AgentId>,
524) -> everruns_core::error::Result<Option<DependencyBlocker>> {
525    let harness_store = adapter.harness_store(org_id);
526    let agent_store = adapter.agent_store(org_id);
527    everruns_core::detect_dependency_blocker(
528        harness_store.as_ref(),
529        agent_store.as_ref(),
530        harness_id,
531        agent_id,
532    )
533    .await
534}
535
536pub async fn execute_input_activity<A: RuntimeHostAdapter>(
537    adapter: &A,
538    org_id: i64,
539    input: InputAtomInput,
540) -> everruns_core::error::Result<InputAtomResult> {
541    RuntimeSessionLifecycle::new(adapter.clone(), org_id, input.context.session_id)
542        .turn_started(input.context.turn_id, input.context.input_message_id)
543        .await;
544
545    let atom = InputAtom::new(adapter.message_store());
546    atom.execute(input).await
547}
548
549pub async fn execute_reason_activity<A: RuntimeHostAdapter>(
550    adapter: &A,
551    org_id: i64,
552    input: ReasonInput,
553) -> everruns_core::error::Result<ReasonResult> {
554    if let Some(blocker) =
555        detect_dependency_blocker(adapter, org_id, input.harness_id, input.agent_id).await?
556    {
557        RuntimeSessionLifecycle::new(adapter.clone(), org_id, input.context.session_id)
558            .dependency_blocked(
559                input.context.turn_id,
560                input.context.input_message_id,
561                blocker,
562            )
563            .await;
564        return Ok(ReasonResult {
565            success: false,
566            text: blocker.message().to_string(),
567            tool_calls: vec![],
568            has_tool_calls: false,
569            tool_definitions: vec![],
570            max_iterations: everruns_core::runtime_agent::default_max_iterations(),
571            error: Some("dependency_unavailable".to_string()),
572            usage: None,
573            response_id: None,
574            locale: None,
575            network_access: None,
576        });
577    }
578
579    let turn_context = adapter
580        .load_turn_context(org_id, input.context.session_id)
581        .await?;
582
583    let mut atom = ReasonAtom::new(
584        adapter.harness_store(org_id),
585        adapter.agent_store(org_id),
586        adapter.session_store(org_id),
587        adapter.message_store(),
588        adapter.provider_store(org_id),
589        adapter.capability_registry(),
590        adapter.driver_registry(),
591        adapter.event_emitter(),
592    )
593    .with_file_store(adapter.file_store());
594    if let Some(image_resolver) = adapter.image_resolver(org_id) {
595        atom = atom.with_image_resolver(image_resolver);
596    }
597
598    atom.execute(ReasonInput {
599        mcp_tool_definitions: turn_context.mcp_tool_definitions,
600        ..input
601    })
602    .await
603}
604
605pub async fn execute_act_activity<A: RuntimeHostAdapter>(
606    adapter: &A,
607    input: ActInput,
608) -> everruns_core::error::Result<ActResult> {
609    let org_id = input.org_id.ok_or_else(|| {
610        everruns_core::error::AgentLoopError::config(
611            "ActInput.org_id must be set for runtime host execution",
612        )
613    })?;
614
615    if let Some(blocker) =
616        detect_dependency_blocker(adapter, org_id, input.harness_id, input.agent_id).await?
617    {
618        RuntimeSessionLifecycle::new(adapter.clone(), org_id, input.context.session_id)
619            .dependency_blocked(
620                input.context.turn_id,
621                input.context.input_message_id,
622                blocker,
623            )
624            .await;
625        return Ok(ActResult {
626            results: vec![],
627            completed: true,
628            success_count: 0,
629            error_count: 1,
630            waiting_for_tool_results: false,
631            blocked: true,
632            client_tool_calls: vec![],
633            client_tool_definitions: vec![],
634        });
635    }
636
637    let execution_capabilities = load_execution_capabilities(
638        adapter,
639        org_id,
640        input.context.session_id,
641        input.harness_id,
642        input.agent_id,
643        input.locale.clone(),
644        input.blueprint_id.as_deref(),
645    )
646    .await?;
647    let tool_registry = execution_capabilities.tool_registry;
648    let builtin_tool_registry = Arc::new(tool_registry.clone());
649
650    let mut atom =
651        ActAtom::with_file_store(tool_registry, adapter.event_emitter(), adapter.file_store())
652            .with_session_store(adapter.session_store(org_id))
653            .with_session_mutator(adapter.session_mutator(org_id))
654            .with_agent_store(adapter.agent_store(org_id))
655            .with_tool_registry(builtin_tool_registry)
656            .with_org_id(
657                org_public_id_from_internal(org_id)
658                    .parse()
659                    .expect("internal org id converts to valid public org id"),
660            )
661            .with_capability_registry(adapter.capability_registry())
662            .with_post_tool_hooks(execution_capabilities.post_tool_hooks)
663            .with_tool_call_hooks(execution_capabilities.tool_call_hooks);
664
665    if let Some(storage_store) = adapter.storage_store() {
666        atom = atom.with_storage_store(storage_store);
667    }
668    if let Some(image_store) = adapter.image_artifact_store(org_id) {
669        atom = atom.with_image_store(image_store);
670    }
671    if let Some(provider_credential_store) = adapter.provider_credential_store(org_id) {
672        atom = atom.with_provider_credential_store(provider_credential_store);
673    }
674    if let Some(memory_store) = adapter.memory_store(org_id) {
675        atom = atom.with_memory_store(memory_store);
676    }
677    if let Some(connection_resolver) = adapter.connection_resolver() {
678        atom = atom.with_connection_resolver(connection_resolver);
679    }
680    if let Some(sqldb_store) = adapter.sqldb_store() {
681        atom = atom.with_sqldb_store(sqldb_store);
682    }
683    if let Some(leased_resource_store) = adapter.leased_resource_store() {
684        atom = atom.with_leased_resource_store(leased_resource_store);
685    }
686    if let Some(registry) = adapter.session_resource_registry() {
687        atom = atom.with_session_resource_registry(registry);
688    }
689    if let Some(schedule_store) = adapter.schedule_store(org_id) {
690        atom = atom.with_schedule_store(schedule_store);
691    }
692    if let Some(platform_store) = adapter.platform_store(org_id, input.context.session_id) {
693        atom = atom.with_platform_store(platform_store);
694    }
695    if let Some(budget_checker) = adapter.budget_checker(org_id, input.agent_id) {
696        atom = atom.with_budget_checker(budget_checker);
697    }
698    if let Some(payment_authority) = adapter.payment_authority(org_id, input.agent_id) {
699        atom = atom.with_payment_authority(payment_authority);
700    }
701
702    atom.execute(input).await
703}