Skip to main content

everruns_runtime/
host.rs

1// Shared host orchestration for embedded and durable execution hosts.
2// Decision: everruns-runtime owns worker-facing turn phase execution so
3// durable/server-backed hosts reuse the same input/reason/act wiring.
4
5use async_trait::async_trait;
6use everruns_core::atoms::{
7    ActAtom, ActInput, ActResult, Atom, InputAtom, InputAtomInput, InputAtomResult, ReasonAtom,
8    ReasonInput, ReasonResult,
9};
10use everruns_core::capabilities::{SystemPromptContext, collect_capabilities_with_configs};
11use everruns_core::events::{
12    EventContext, EventRequest, OutputMessageCompletedData, SessionActivatedData, SessionIdledData,
13    TurnCompletedData, TurnFailedData, TurnStartedData,
14};
15use everruns_core::message::Message;
16use everruns_core::message_retriever::MessageRetriever;
17use everruns_core::platform_store::PlatformStore;
18use everruns_core::session::SessionStatus;
19use everruns_core::traits::{
20    AgentStore, BudgetChecker, EventEmitter, HarnessStore, ImageArtifactStore, ImageResolver,
21    LeasedResourceStore, LlmProviderStore, ModelWithProvider, PaymentAuthority,
22    ProviderCredentialStore, SessionFileSystem, SessionMutator, SessionResourceRegistry,
23    SessionScheduleStore, SessionSqlDbStoreRef, SessionStorageStore, SessionStore,
24    UserConnectionResolver,
25};
26use everruns_core::typed_id::{AgentId, HarnessId, MessageId, SessionId, TurnId};
27use everruns_core::{
28    Agent, CapabilityRegistry, DependencyBlocker, DriverRegistry, EgressService, Harness, Session,
29    TokenUsage, ToolDefinition, ToolRegistry, UserFacingError, UtilityLlmService,
30    org_public_id_from_internal, resolve_runtime_capabilities,
31};
32use std::sync::Arc;
33use tracing::warn;
34
35/// Turn context loaded in one batched call for runtime host execution.
36#[derive(Debug, Clone)]
37pub struct RuntimeHostTurnContext {
38    pub agent: Option<Agent>,
39    pub session: Session,
40    pub messages: Vec<Message>,
41    pub model: Option<ModelWithProvider>,
42    pub mcp_tool_definitions: Vec<ToolDefinition>,
43}
44
45/// Public adapter contract for server-backed or durable runtime hosts.
46///
47/// `everruns-runtime` owns shared host orchestration for both embedded and
48/// durable execution. That includes phase execution (`input -> reason -> act`),
49/// lifecycle emission, and the generic turn-strategy decisions used by durable
50/// or custom hosts.
51///
52/// Host crates implement this trait to provide persistence, session-lifecycle
53/// plumbing, event delivery, and their own orchestration backend. The durable
54/// engine itself remains outside this crate.
55#[async_trait]
56pub trait RuntimeHostAdapter: Send + Sync + Clone + 'static {
57    async fn get_agent(
58        &self,
59        org_id: i64,
60        agent_id: AgentId,
61    ) -> everruns_core::error::Result<Option<Agent>>;
62
63    async fn get_harness(
64        &self,
65        org_id: i64,
66        harness_id: HarnessId,
67    ) -> everruns_core::error::Result<Option<Harness>>;
68
69    async fn set_session_status(
70        &self,
71        org_id: i64,
72        session_id: SessionId,
73        status: SessionStatus,
74    ) -> everruns_core::error::Result<Session>;
75
76    async fn load_turn_context(
77        &self,
78        org_id: i64,
79        session_id: SessionId,
80    ) -> everruns_core::error::Result<RuntimeHostTurnContext>;
81
82    fn capability_registry(&self) -> CapabilityRegistry;
83
84    fn driver_registry(&self) -> DriverRegistry;
85
86    fn harness_store(&self, org_id: i64) -> Arc<dyn HarnessStore>;
87
88    fn agent_store(&self, org_id: i64) -> Arc<dyn AgentStore>;
89
90    fn session_store(&self, org_id: i64) -> Arc<dyn SessionStore>;
91
92    fn session_mutator(&self, org_id: i64) -> Arc<dyn SessionMutator>;
93
94    fn provider_store(&self, org_id: i64) -> Arc<dyn LlmProviderStore>;
95
96    fn message_store(&self) -> Arc<dyn MessageRetriever>;
97
98    fn event_emitter(&self) -> Arc<dyn EventEmitter>;
99
100    fn file_store(&self) -> Arc<dyn SessionFileSystem>;
101
102    fn image_resolver(&self, _org_id: i64) -> Option<Arc<dyn ImageResolver>> {
103        None
104    }
105
106    fn image_artifact_store(&self, _org_id: i64) -> Option<Arc<dyn ImageArtifactStore>> {
107        None
108    }
109
110    fn provider_credential_store(&self, _org_id: i64) -> Option<Arc<dyn ProviderCredentialStore>> {
111        None
112    }
113
114    fn utility_llm_service(&self) -> Option<Arc<dyn UtilityLlmService>> {
115        None
116    }
117
118    fn egress_service(&self) -> Option<Arc<dyn EgressService>> {
119        None
120    }
121
122    fn storage_store(&self) -> Option<Arc<dyn SessionStorageStore>> {
123        None
124    }
125
126    fn memory_store(&self, _org_id: i64) -> Option<Arc<dyn everruns_core::MemoryStoreBackend>> {
127        None
128    }
129
130    fn connection_resolver(&self) -> Option<Arc<dyn UserConnectionResolver>> {
131        None
132    }
133
134    fn sqldb_store(&self) -> Option<SessionSqlDbStoreRef> {
135        None
136    }
137
138    fn leased_resource_store(&self) -> Option<Arc<dyn LeasedResourceStore>> {
139        None
140    }
141
142    fn session_resource_registry(&self) -> Option<Arc<dyn SessionResourceRegistry>> {
143        None
144    }
145
146    fn schedule_store(&self, _org_id: i64) -> Option<Arc<dyn SessionScheduleStore>> {
147        None
148    }
149
150    fn platform_store(
151        &self,
152        _org_id: i64,
153        _session_id: SessionId,
154    ) -> Option<Arc<dyn PlatformStore>> {
155        None
156    }
157
158    fn budget_checker(
159        &self,
160        _org_id: i64,
161        _agent_id: Option<AgentId>,
162    ) -> Option<Arc<dyn BudgetChecker>> {
163        None
164    }
165
166    fn payment_authority(
167        &self,
168        _org_id: i64,
169        _agent_id: Option<AgentId>,
170    ) -> Option<Arc<dyn PaymentAuthority>> {
171        None
172    }
173}
174
175struct RuntimeExecutionCapabilities {
176    tool_registry: ToolRegistry,
177    post_tool_hooks: Vec<Arc<dyn everruns_core::PostToolExecHook>>,
178    tool_call_hooks: Vec<Arc<dyn everruns_core::ToolCallHook>>,
179}
180
181async fn load_execution_capabilities<A: RuntimeHostAdapter>(
182    adapter: &A,
183    org_id: i64,
184    session_id: SessionId,
185    harness_id: HarnessId,
186    agent_id: Option<AgentId>,
187    locale: Option<String>,
188    blueprint_id: Option<&str>,
189) -> everruns_core::error::Result<RuntimeExecutionCapabilities> {
190    let capability_registry = adapter.capability_registry();
191    if let Some(blueprint_id) = blueprint_id {
192        let mut registry = ToolRegistry::with_defaults();
193        let blueprint = capability_registry.blueprint(blueprint_id).ok_or_else(|| {
194            everruns_core::error::AgentLoopError::config(format!(
195                "Blueprint \"{blueprint_id}\" not found in registry"
196            ))
197        })?;
198        for tool in blueprint.tools {
199            registry.register_boxed(tool);
200        }
201        return Ok(RuntimeExecutionCapabilities {
202            tool_registry: registry,
203            post_tool_hooks: Vec::new(),
204            tool_call_hooks: Vec::new(),
205        });
206    }
207
208    let harness_chain = adapter
209        .harness_store(org_id)
210        .get_harness_chain(harness_id)
211        .await?;
212    if harness_chain.is_empty() {
213        return Err(everruns_core::error::AgentLoopError::harness_not_found(
214            harness_id,
215        ));
216    }
217
218    let session = adapter
219        .session_store(org_id)
220        .get_session(session_id)
221        .await?
222        .ok_or_else(|| everruns_core::error::AgentLoopError::session_not_found(session_id))?;
223
224    let agent_store = adapter.agent_store(org_id);
225    let agent = match agent_id {
226        Some(agent_id) => Some(
227            agent_store
228                .get_agent(agent_id)
229                .await?
230                .ok_or_else(|| everruns_core::error::AgentLoopError::agent_not_found(agent_id))?,
231        ),
232        None => None,
233    };
234
235    let resolved = resolve_runtime_capabilities(
236        &harness_chain,
237        agent.as_ref(),
238        &session,
239        &capability_registry,
240    );
241    let prompt_ctx = SystemPromptContext {
242        session_id,
243        locale: locale.or(session.locale.clone()),
244        file_store: Some(adapter.file_store()),
245    };
246    let collected = collect_capabilities_with_configs(
247        &resolved.resolved_capability_configs,
248        &capability_registry,
249        &prompt_ctx,
250    )
251    .await;
252
253    let mut registry = ToolRegistry::with_defaults();
254    for tool in collected.tools {
255        registry.register_boxed(tool);
256    }
257
258    let post_tool_hooks = resolved
259        .resolved_capability_configs
260        .iter()
261        .flat_map(|config| {
262            capability_registry
263                .get(config.capability_id())
264                .map(|capability| capability.post_tool_exec_hooks())
265                .unwrap_or_default()
266        })
267        .collect();
268    let tool_call_hooks = resolved
269        .resolved_capability_configs
270        .iter()
271        .flat_map(|config| {
272            capability_registry
273                .get(config.capability_id())
274                .map(|capability| capability.tool_call_hooks())
275                .unwrap_or_default()
276        })
277        .collect();
278
279    Ok(RuntimeExecutionCapabilities {
280        tool_registry: registry,
281        post_tool_hooks,
282        tool_call_hooks,
283    })
284}
285
286/// Shared lifecycle helper for runtime-backed hosts.
287pub struct RuntimeSessionLifecycle<A: RuntimeHostAdapter> {
288    adapter: A,
289    org_id: i64,
290    session_id: SessionId,
291}
292
293impl<A: RuntimeHostAdapter> RuntimeSessionLifecycle<A> {
294    pub fn new(adapter: A, org_id: i64, session_id: SessionId) -> Self {
295        Self {
296            adapter,
297            org_id,
298            session_id,
299        }
300    }
301
302    async fn set_session_status(&self, status: SessionStatus, action: &'static str) {
303        if let Err(error) = self
304            .adapter
305            .set_session_status(self.org_id, self.session_id, status)
306            .await
307        {
308            warn!(
309                session_id = %self.session_id,
310                org_id = self.org_id,
311                action,
312                %error,
313                "runtime host lifecycle status update failed"
314            );
315        }
316    }
317
318    async fn emit_event(&self, request: EventRequest) {
319        let event_type = request.event_type.clone();
320        if let Err(error) = self.adapter.event_emitter().emit(request).await {
321            warn!(
322                session_id = %self.session_id,
323                org_id = self.org_id,
324                event_type,
325                %error,
326                "runtime host lifecycle event emission failed"
327            );
328        }
329    }
330
331    pub async fn turn_started(&self, turn_id: TurnId, input_message_id: MessageId) {
332        let input_content = self
333            .adapter
334            .message_store()
335            .get(self.session_id, input_message_id)
336            .await
337            .ok()
338            .flatten()
339            .map(|message| message.content_to_llm_string());
340
341        self.set_session_status(SessionStatus::Active, "turn_started")
342            .await;
343
344        self.emit_event(EventRequest::new(
345            self.session_id,
346            EventContext::turn(turn_id, input_message_id),
347            SessionActivatedData {
348                turn_id,
349                input_message_id,
350            },
351        ))
352        .await;
353
354        self.emit_event(EventRequest::new(
355            self.session_id,
356            EventContext::turn(turn_id, input_message_id),
357            TurnStartedData {
358                turn_id,
359                input_message_id,
360                input_content,
361            },
362        ))
363        .await;
364    }
365
366    pub async fn emit_turn_completed(&self, input_message_id: MessageId, data: TurnCompletedData) {
367        let turn_id = data.turn_id;
368        self.emit_event(EventRequest::new(
369            self.session_id,
370            EventContext::turn(turn_id, input_message_id),
371            data,
372        ))
373        .await;
374    }
375
376    pub async fn emit_session_idled(
377        &self,
378        turn_id: TurnId,
379        input_message_id: MessageId,
380        iterations: Option<u32>,
381        usage: Option<TokenUsage>,
382    ) {
383        self.set_session_status(SessionStatus::Idle, "emit_session_idled")
384            .await;
385
386        self.emit_event(EventRequest::new(
387            self.session_id,
388            EventContext::turn(turn_id, input_message_id),
389            SessionIdledData {
390                turn_id,
391                iterations,
392                usage,
393            },
394        ))
395        .await;
396    }
397
398    pub async fn turn_completed(
399        &self,
400        turn_id: TurnId,
401        input_message_id: MessageId,
402        iterations: u32,
403        usage: Option<TokenUsage>,
404        input_content: Option<String>,
405    ) {
406        self.emit_turn_completed(
407            input_message_id,
408            TurnCompletedData {
409                turn_id,
410                iterations,
411                duration_ms: None,
412                usage: usage.clone(),
413                input_content,
414                final_message_id: None,
415                final_answer_preview: None,
416                time_to_first_token_ms: None,
417                tool_call_count: None,
418                llm_call_count: None,
419                status: Some("completed".to_string()),
420            },
421        )
422        .await;
423        self.emit_session_idled(turn_id, input_message_id, Some(iterations), usage)
424            .await;
425    }
426
427    pub async fn turn_failed(
428        &self,
429        turn_id: TurnId,
430        input_message_id: MessageId,
431        error: &str,
432        user_error: Option<&UserFacingError>,
433    ) {
434        self.set_session_status(SessionStatus::Idle, "turn_failed")
435            .await;
436
437        self.emit_event(EventRequest::new(
438            self.session_id,
439            EventContext::turn(turn_id, input_message_id),
440            {
441                let mut data = TurnFailedData {
442                    turn_id,
443                    error: error.to_string(),
444                    error_code: None,
445                    error_fields: None,
446                };
447                if let Some(user_error) = user_error {
448                    user_error.apply_to_event_fields(&mut data.error_code, &mut data.error_fields);
449                }
450                data
451            },
452        ))
453        .await;
454
455        self.emit_event(EventRequest::new(
456            self.session_id,
457            EventContext::turn(turn_id, input_message_id),
458            SessionIdledData {
459                turn_id,
460                iterations: None,
461                usage: None,
462            },
463        ))
464        .await;
465    }
466
467    pub async fn waiting_for_tool_results(&self) {
468        self.set_session_status(
469            SessionStatus::WaitingForToolResults,
470            "waiting_for_tool_results",
471        )
472        .await;
473    }
474
475    pub async fn dependency_blocked(
476        &self,
477        turn_id: TurnId,
478        input_message_id: MessageId,
479        blocker: DependencyBlocker,
480    ) {
481        let user_error = UserFacingError::new(blocker.error_code())
482            .with_field(
483                "dependency",
484                match blocker {
485                    DependencyBlocker::HarnessArchived | DependencyBlocker::HarnessDeleted => {
486                        "harness"
487                    }
488                    DependencyBlocker::AgentArchived | DependencyBlocker::AgentDeleted => "agent",
489                },
490            )
491            .with_field(
492                "state",
493                match blocker {
494                    DependencyBlocker::HarnessArchived | DependencyBlocker::AgentArchived => {
495                        "archived"
496                    }
497                    DependencyBlocker::HarnessDeleted | DependencyBlocker::AgentDeleted => {
498                        "deleted"
499                    }
500                },
501            );
502        let mut error_message = Message::assistant(blocker.message());
503        let mut metadata = std::collections::HashMap::new();
504        user_error.apply_to_message_metadata(&mut metadata);
505        error_message.metadata = Some(metadata);
506
507        self.emit_event(EventRequest::new(
508            self.session_id,
509            EventContext::turn(turn_id, input_message_id),
510            OutputMessageCompletedData::new(error_message).with_user_facing_error(&user_error),
511        ))
512        .await;
513
514        self.turn_failed(
515            turn_id,
516            input_message_id,
517            blocker.message(),
518            Some(&user_error),
519        )
520        .await;
521    }
522}
523
524pub async fn detect_dependency_blocker<A: RuntimeHostAdapter>(
525    adapter: &A,
526    org_id: i64,
527    harness_id: HarnessId,
528    agent_id: Option<AgentId>,
529) -> everruns_core::error::Result<Option<DependencyBlocker>> {
530    let harness_store = adapter.harness_store(org_id);
531    let agent_store = adapter.agent_store(org_id);
532    everruns_core::detect_dependency_blocker(
533        harness_store.as_ref(),
534        agent_store.as_ref(),
535        harness_id,
536        agent_id,
537    )
538    .await
539}
540
541pub async fn execute_input_activity<A: RuntimeHostAdapter>(
542    adapter: &A,
543    org_id: i64,
544    input: InputAtomInput,
545) -> everruns_core::error::Result<InputAtomResult> {
546    RuntimeSessionLifecycle::new(adapter.clone(), org_id, input.context.session_id)
547        .turn_started(input.context.turn_id, input.context.input_message_id)
548        .await;
549
550    let atom = InputAtom::new(adapter.message_store());
551    atom.execute(input).await
552}
553
554pub async fn execute_reason_activity<A: RuntimeHostAdapter>(
555    adapter: &A,
556    org_id: i64,
557    input: ReasonInput,
558) -> everruns_core::error::Result<ReasonResult> {
559    if let Some(blocker) =
560        detect_dependency_blocker(adapter, org_id, input.harness_id, input.agent_id).await?
561    {
562        RuntimeSessionLifecycle::new(adapter.clone(), org_id, input.context.session_id)
563            .dependency_blocked(
564                input.context.turn_id,
565                input.context.input_message_id,
566                blocker,
567            )
568            .await;
569        return Ok(ReasonResult {
570            success: false,
571            text: blocker.message().to_string(),
572            tool_calls: vec![],
573            has_tool_calls: false,
574            tool_definitions: vec![],
575            max_iterations: everruns_core::runtime_agent::default_max_iterations(),
576            error: Some("dependency_unavailable".to_string()),
577            usage: None,
578            output_message_id: None,
579            time_to_first_token_ms: None,
580            response_id: None,
581            locale: None,
582            network_access: None,
583        });
584    }
585
586    let turn_context = adapter
587        .load_turn_context(org_id, input.context.session_id)
588        .await?;
589
590    let mut atom = ReasonAtom::new(
591        adapter.harness_store(org_id),
592        adapter.agent_store(org_id),
593        adapter.session_store(org_id),
594        adapter.message_store(),
595        adapter.provider_store(org_id),
596        adapter.capability_registry(),
597        adapter.driver_registry(),
598        adapter.event_emitter(),
599    )
600    .with_file_store(adapter.file_store());
601    if let Some(image_resolver) = adapter.image_resolver(org_id) {
602        atom = atom.with_image_resolver(image_resolver);
603    }
604
605    atom.execute(ReasonInput {
606        mcp_tool_definitions: turn_context.mcp_tool_definitions,
607        ..input
608    })
609    .await
610}
611
612pub async fn execute_act_activity<A: RuntimeHostAdapter>(
613    adapter: &A,
614    input: ActInput,
615) -> everruns_core::error::Result<ActResult> {
616    let org_id = input.org_id.ok_or_else(|| {
617        everruns_core::error::AgentLoopError::config(
618            "ActInput.org_id must be set for runtime host execution",
619        )
620    })?;
621
622    if let Some(blocker) =
623        detect_dependency_blocker(adapter, org_id, input.harness_id, input.agent_id).await?
624    {
625        RuntimeSessionLifecycle::new(adapter.clone(), org_id, input.context.session_id)
626            .dependency_blocked(
627                input.context.turn_id,
628                input.context.input_message_id,
629                blocker,
630            )
631            .await;
632        return Ok(ActResult {
633            results: vec![],
634            completed: true,
635            success_count: 0,
636            error_count: 1,
637            waiting_for_tool_results: false,
638            blocked: true,
639            client_tool_calls: vec![],
640            client_tool_definitions: vec![],
641        });
642    }
643
644    let execution_capabilities = load_execution_capabilities(
645        adapter,
646        org_id,
647        input.context.session_id,
648        input.harness_id,
649        input.agent_id,
650        input.locale.clone(),
651        input.blueprint_id.as_deref(),
652    )
653    .await?;
654    let tool_registry = execution_capabilities.tool_registry;
655    let builtin_tool_registry = Arc::new(tool_registry.clone());
656
657    let mut atom =
658        ActAtom::with_file_store(tool_registry, adapter.event_emitter(), adapter.file_store())
659            .with_session_store(adapter.session_store(org_id))
660            .with_session_mutator(adapter.session_mutator(org_id))
661            .with_agent_store(adapter.agent_store(org_id))
662            .with_tool_registry(builtin_tool_registry)
663            .with_org_id(
664                org_public_id_from_internal(org_id)
665                    .parse()
666                    .expect("internal org id converts to valid public org id"),
667            )
668            .with_capability_registry(adapter.capability_registry())
669            .with_post_tool_hooks(execution_capabilities.post_tool_hooks)
670            .with_tool_call_hooks(execution_capabilities.tool_call_hooks);
671
672    if let Some(storage_store) = adapter.storage_store() {
673        atom = atom.with_storage_store(storage_store);
674    }
675    if let Some(image_store) = adapter.image_artifact_store(org_id) {
676        atom = atom.with_image_store(image_store);
677    }
678    if let Some(provider_credential_store) = adapter.provider_credential_store(org_id) {
679        atom = atom.with_provider_credential_store(provider_credential_store);
680    }
681    if let Some(utility_llm_service) = adapter.utility_llm_service() {
682        atom = atom.with_utility_llm_service(utility_llm_service);
683    }
684    if let Some(egress_service) = adapter.egress_service() {
685        atom = atom.with_egress_service(egress_service);
686    }
687    if let Some(memory_store) = adapter.memory_store(org_id) {
688        atom = atom.with_memory_store(memory_store);
689    }
690    if let Some(connection_resolver) = adapter.connection_resolver() {
691        atom = atom.with_connection_resolver(connection_resolver);
692    }
693    if let Some(sqldb_store) = adapter.sqldb_store() {
694        atom = atom.with_sqldb_store(sqldb_store);
695    }
696    if let Some(leased_resource_store) = adapter.leased_resource_store() {
697        atom = atom.with_leased_resource_store(leased_resource_store);
698    }
699    if let Some(registry) = adapter.session_resource_registry() {
700        atom = atom.with_session_resource_registry(registry);
701    }
702    if let Some(schedule_store) = adapter.schedule_store(org_id) {
703        atom = atom.with_schedule_store(schedule_store);
704    }
705    if let Some(platform_store) = adapter.platform_store(org_id, input.context.session_id) {
706        atom = atom.with_platform_store(platform_store);
707    }
708    if let Some(budget_checker) = adapter.budget_checker(org_id, input.agent_id) {
709        atom = atom.with_budget_checker(budget_checker);
710    }
711    if let Some(payment_authority) = adapter.payment_authority(org_id, input.agent_id) {
712        atom = atom.with_payment_authority(payment_authority);
713    }
714
715    atom.execute(input).await
716}