Skip to main content

everruns_runtime/
host.rs

1// Shared host orchestration for embedded and durable execution hosts.
2// Decision: everruns-runtime owns worker-facing turn phase execution so
3// durable/server-backed hosts reuse the same input/reason/act wiring.
4
5use async_trait::async_trait;
6use everruns_core::atoms::{
7    ActAtom, ActInput, ActResult, Atom, InputAtom, InputAtomInput, InputAtomResult, ReasonAtom,
8    ReasonInput, ReasonResult,
9};
10use everruns_core::capabilities::{SystemPromptContext, collect_capabilities_with_configs};
11use everruns_core::events::{
12    EventContext, EventRequest, OutputMessageCompletedData, SessionActivatedData, SessionIdledData,
13    TurnCompletedData, TurnFailedData, TurnStartedData,
14};
15use everruns_core::message::Message;
16use everruns_core::message_retriever::MessageRetriever;
17use everruns_core::platform_store::PlatformStore;
18use everruns_core::session::SessionStatus;
19use everruns_core::traits::{
20    AgentStore, BudgetChecker, EventEmitter, HarnessStore, ImageArtifactStore, ImageResolver,
21    LeasedResourceStore, LlmProviderStore, ModelWithProvider, PaymentAuthority,
22    ProviderCredentialStore, SessionFileSystem, SessionMutator, SessionResourceRegistry,
23    SessionScheduleStore, SessionSqlDbStoreRef, SessionStorageStore, SessionStore,
24    UserConnectionResolver,
25};
26use everruns_core::typed_id::{AgentId, HarnessId, MessageId, SessionId, TurnId};
27use everruns_core::{
28    Agent, CapabilityRegistry, DependencyBlocker, DriverRegistry, Harness, Session, TokenUsage,
29    ToolDefinition, ToolRegistry, UserFacingError, UtilityLlmService, org_public_id_from_internal,
30    resolve_runtime_capabilities,
31};
32use std::sync::Arc;
33use tracing::warn;
34
35/// Turn context loaded in one batched call for runtime host execution.
36#[derive(Debug, Clone)]
37pub struct RuntimeHostTurnContext {
38    pub agent: Option<Agent>,
39    pub session: Session,
40    pub messages: Vec<Message>,
41    pub model: Option<ModelWithProvider>,
42    pub mcp_tool_definitions: Vec<ToolDefinition>,
43}
44
45/// Public adapter contract for server-backed or durable runtime hosts.
46///
47/// `everruns-runtime` owns shared host orchestration for both embedded and
48/// durable execution. That includes phase execution (`input -> reason -> act`),
49/// lifecycle emission, and the generic turn-strategy decisions used by durable
50/// or custom hosts.
51///
52/// Host crates implement this trait to provide persistence, session-lifecycle
53/// plumbing, event delivery, and their own orchestration backend. The durable
54/// engine itself remains outside this crate.
55#[async_trait]
56pub trait RuntimeHostAdapter: Send + Sync + Clone + 'static {
57    async fn get_agent(
58        &self,
59        org_id: i64,
60        agent_id: AgentId,
61    ) -> everruns_core::error::Result<Option<Agent>>;
62
63    async fn get_harness(
64        &self,
65        org_id: i64,
66        harness_id: HarnessId,
67    ) -> everruns_core::error::Result<Option<Harness>>;
68
69    async fn set_session_status(
70        &self,
71        org_id: i64,
72        session_id: SessionId,
73        status: SessionStatus,
74    ) -> everruns_core::error::Result<Session>;
75
76    async fn load_turn_context(
77        &self,
78        org_id: i64,
79        session_id: SessionId,
80    ) -> everruns_core::error::Result<RuntimeHostTurnContext>;
81
82    fn capability_registry(&self) -> CapabilityRegistry;
83
84    fn driver_registry(&self) -> DriverRegistry;
85
86    fn harness_store(&self, org_id: i64) -> Arc<dyn HarnessStore>;
87
88    fn agent_store(&self, org_id: i64) -> Arc<dyn AgentStore>;
89
90    fn session_store(&self, org_id: i64) -> Arc<dyn SessionStore>;
91
92    fn session_mutator(&self, org_id: i64) -> Arc<dyn SessionMutator>;
93
94    fn provider_store(&self, org_id: i64) -> Arc<dyn LlmProviderStore>;
95
96    fn message_store(&self) -> Arc<dyn MessageRetriever>;
97
98    fn event_emitter(&self) -> Arc<dyn EventEmitter>;
99
100    fn file_store(&self) -> Arc<dyn SessionFileSystem>;
101
102    fn image_resolver(&self, _org_id: i64) -> Option<Arc<dyn ImageResolver>> {
103        None
104    }
105
106    fn image_artifact_store(&self, _org_id: i64) -> Option<Arc<dyn ImageArtifactStore>> {
107        None
108    }
109
110    fn provider_credential_store(&self, _org_id: i64) -> Option<Arc<dyn ProviderCredentialStore>> {
111        None
112    }
113
114    fn utility_llm_service(&self) -> Option<Arc<dyn UtilityLlmService>> {
115        None
116    }
117
118    fn storage_store(&self) -> Option<Arc<dyn SessionStorageStore>> {
119        None
120    }
121
122    fn memory_store(&self, _org_id: i64) -> Option<Arc<dyn everruns_core::MemoryStoreBackend>> {
123        None
124    }
125
126    fn connection_resolver(&self) -> Option<Arc<dyn UserConnectionResolver>> {
127        None
128    }
129
130    fn sqldb_store(&self) -> Option<SessionSqlDbStoreRef> {
131        None
132    }
133
134    fn leased_resource_store(&self) -> Option<Arc<dyn LeasedResourceStore>> {
135        None
136    }
137
138    fn session_resource_registry(&self) -> Option<Arc<dyn SessionResourceRegistry>> {
139        None
140    }
141
142    fn schedule_store(&self, _org_id: i64) -> Option<Arc<dyn SessionScheduleStore>> {
143        None
144    }
145
146    fn platform_store(
147        &self,
148        _org_id: i64,
149        _session_id: SessionId,
150    ) -> Option<Arc<dyn PlatformStore>> {
151        None
152    }
153
154    fn budget_checker(
155        &self,
156        _org_id: i64,
157        _agent_id: Option<AgentId>,
158    ) -> Option<Arc<dyn BudgetChecker>> {
159        None
160    }
161
162    fn payment_authority(
163        &self,
164        _org_id: i64,
165        _agent_id: Option<AgentId>,
166    ) -> Option<Arc<dyn PaymentAuthority>> {
167        None
168    }
169}
170
171struct RuntimeExecutionCapabilities {
172    tool_registry: ToolRegistry,
173    post_tool_hooks: Vec<Arc<dyn everruns_core::PostToolExecHook>>,
174    tool_call_hooks: Vec<Arc<dyn everruns_core::ToolCallHook>>,
175}
176
177async fn load_execution_capabilities<A: RuntimeHostAdapter>(
178    adapter: &A,
179    org_id: i64,
180    session_id: SessionId,
181    harness_id: HarnessId,
182    agent_id: Option<AgentId>,
183    locale: Option<String>,
184    blueprint_id: Option<&str>,
185) -> everruns_core::error::Result<RuntimeExecutionCapabilities> {
186    let capability_registry = adapter.capability_registry();
187    if let Some(blueprint_id) = blueprint_id {
188        let mut registry = ToolRegistry::with_defaults();
189        let blueprint = capability_registry.blueprint(blueprint_id).ok_or_else(|| {
190            everruns_core::error::AgentLoopError::config(format!(
191                "Blueprint \"{blueprint_id}\" not found in registry"
192            ))
193        })?;
194        for tool in blueprint.tools {
195            registry.register_boxed(tool);
196        }
197        return Ok(RuntimeExecutionCapabilities {
198            tool_registry: registry,
199            post_tool_hooks: Vec::new(),
200            tool_call_hooks: Vec::new(),
201        });
202    }
203
204    let harness_chain = adapter
205        .harness_store(org_id)
206        .get_harness_chain(harness_id)
207        .await?;
208    if harness_chain.is_empty() {
209        return Err(everruns_core::error::AgentLoopError::harness_not_found(
210            harness_id,
211        ));
212    }
213
214    let session = adapter
215        .session_store(org_id)
216        .get_session(session_id)
217        .await?
218        .ok_or_else(|| everruns_core::error::AgentLoopError::session_not_found(session_id))?;
219
220    let agent_store = adapter.agent_store(org_id);
221    let agent = match agent_id {
222        Some(agent_id) => Some(
223            agent_store
224                .get_agent(agent_id)
225                .await?
226                .ok_or_else(|| everruns_core::error::AgentLoopError::agent_not_found(agent_id))?,
227        ),
228        None => None,
229    };
230
231    let resolved = resolve_runtime_capabilities(
232        &harness_chain,
233        agent.as_ref(),
234        &session,
235        &capability_registry,
236    );
237    let prompt_ctx = SystemPromptContext {
238        session_id,
239        locale: locale.or(session.locale.clone()),
240        file_store: Some(adapter.file_store()),
241    };
242    let collected = collect_capabilities_with_configs(
243        &resolved.resolved_capability_configs,
244        &capability_registry,
245        &prompt_ctx,
246    )
247    .await;
248
249    let mut registry = ToolRegistry::with_defaults();
250    for tool in collected.tools {
251        registry.register_boxed(tool);
252    }
253
254    let post_tool_hooks = resolved
255        .resolved_capability_configs
256        .iter()
257        .flat_map(|config| {
258            capability_registry
259                .get(config.capability_id())
260                .map(|capability| capability.post_tool_exec_hooks())
261                .unwrap_or_default()
262        })
263        .collect();
264    let tool_call_hooks = resolved
265        .resolved_capability_configs
266        .iter()
267        .flat_map(|config| {
268            capability_registry
269                .get(config.capability_id())
270                .map(|capability| capability.tool_call_hooks())
271                .unwrap_or_default()
272        })
273        .collect();
274
275    Ok(RuntimeExecutionCapabilities {
276        tool_registry: registry,
277        post_tool_hooks,
278        tool_call_hooks,
279    })
280}
281
282/// Shared lifecycle helper for runtime-backed hosts.
283pub struct RuntimeSessionLifecycle<A: RuntimeHostAdapter> {
284    adapter: A,
285    org_id: i64,
286    session_id: SessionId,
287}
288
289impl<A: RuntimeHostAdapter> RuntimeSessionLifecycle<A> {
290    pub fn new(adapter: A, org_id: i64, session_id: SessionId) -> Self {
291        Self {
292            adapter,
293            org_id,
294            session_id,
295        }
296    }
297
298    async fn set_session_status(&self, status: SessionStatus, action: &'static str) {
299        if let Err(error) = self
300            .adapter
301            .set_session_status(self.org_id, self.session_id, status)
302            .await
303        {
304            warn!(
305                session_id = %self.session_id,
306                org_id = self.org_id,
307                action,
308                %error,
309                "runtime host lifecycle status update failed"
310            );
311        }
312    }
313
314    async fn emit_event(&self, request: EventRequest) {
315        let event_type = request.event_type.clone();
316        if let Err(error) = self.adapter.event_emitter().emit(request).await {
317            warn!(
318                session_id = %self.session_id,
319                org_id = self.org_id,
320                event_type,
321                %error,
322                "runtime host lifecycle event emission failed"
323            );
324        }
325    }
326
327    pub async fn turn_started(&self, turn_id: TurnId, input_message_id: MessageId) {
328        let input_content = self
329            .adapter
330            .message_store()
331            .get(self.session_id, input_message_id)
332            .await
333            .ok()
334            .flatten()
335            .map(|message| message.content_to_llm_string());
336
337        self.set_session_status(SessionStatus::Active, "turn_started")
338            .await;
339
340        self.emit_event(EventRequest::new(
341            self.session_id,
342            EventContext::turn(turn_id, input_message_id),
343            SessionActivatedData {
344                turn_id,
345                input_message_id,
346            },
347        ))
348        .await;
349
350        self.emit_event(EventRequest::new(
351            self.session_id,
352            EventContext::turn(turn_id, input_message_id),
353            TurnStartedData {
354                turn_id,
355                input_message_id,
356                input_content,
357            },
358        ))
359        .await;
360    }
361
362    pub async fn emit_turn_completed(&self, input_message_id: MessageId, data: TurnCompletedData) {
363        let turn_id = data.turn_id;
364        self.emit_event(EventRequest::new(
365            self.session_id,
366            EventContext::turn(turn_id, input_message_id),
367            data,
368        ))
369        .await;
370    }
371
372    pub async fn emit_session_idled(
373        &self,
374        turn_id: TurnId,
375        input_message_id: MessageId,
376        iterations: Option<u32>,
377        usage: Option<TokenUsage>,
378    ) {
379        self.set_session_status(SessionStatus::Idle, "emit_session_idled")
380            .await;
381
382        self.emit_event(EventRequest::new(
383            self.session_id,
384            EventContext::turn(turn_id, input_message_id),
385            SessionIdledData {
386                turn_id,
387                iterations,
388                usage,
389            },
390        ))
391        .await;
392    }
393
394    pub async fn turn_completed(
395        &self,
396        turn_id: TurnId,
397        input_message_id: MessageId,
398        iterations: u32,
399        usage: Option<TokenUsage>,
400        input_content: Option<String>,
401    ) {
402        self.emit_turn_completed(
403            input_message_id,
404            TurnCompletedData {
405                turn_id,
406                iterations,
407                duration_ms: None,
408                usage: usage.clone(),
409                input_content,
410                final_message_id: None,
411                final_answer_preview: None,
412                time_to_first_token_ms: None,
413                tool_call_count: None,
414                llm_call_count: None,
415                status: Some("completed".to_string()),
416            },
417        )
418        .await;
419        self.emit_session_idled(turn_id, input_message_id, Some(iterations), usage)
420            .await;
421    }
422
423    pub async fn turn_failed(
424        &self,
425        turn_id: TurnId,
426        input_message_id: MessageId,
427        error: &str,
428        user_error: Option<&UserFacingError>,
429    ) {
430        self.set_session_status(SessionStatus::Idle, "turn_failed")
431            .await;
432
433        self.emit_event(EventRequest::new(
434            self.session_id,
435            EventContext::turn(turn_id, input_message_id),
436            {
437                let mut data = TurnFailedData {
438                    turn_id,
439                    error: error.to_string(),
440                    error_code: None,
441                    error_fields: None,
442                };
443                if let Some(user_error) = user_error {
444                    user_error.apply_to_event_fields(&mut data.error_code, &mut data.error_fields);
445                }
446                data
447            },
448        ))
449        .await;
450
451        self.emit_event(EventRequest::new(
452            self.session_id,
453            EventContext::turn(turn_id, input_message_id),
454            SessionIdledData {
455                turn_id,
456                iterations: None,
457                usage: None,
458            },
459        ))
460        .await;
461    }
462
463    pub async fn waiting_for_tool_results(&self) {
464        self.set_session_status(
465            SessionStatus::WaitingForToolResults,
466            "waiting_for_tool_results",
467        )
468        .await;
469    }
470
471    pub async fn dependency_blocked(
472        &self,
473        turn_id: TurnId,
474        input_message_id: MessageId,
475        blocker: DependencyBlocker,
476    ) {
477        let user_error = UserFacingError::new(blocker.error_code())
478            .with_field(
479                "dependency",
480                match blocker {
481                    DependencyBlocker::HarnessArchived | DependencyBlocker::HarnessDeleted => {
482                        "harness"
483                    }
484                    DependencyBlocker::AgentArchived | DependencyBlocker::AgentDeleted => "agent",
485                },
486            )
487            .with_field(
488                "state",
489                match blocker {
490                    DependencyBlocker::HarnessArchived | DependencyBlocker::AgentArchived => {
491                        "archived"
492                    }
493                    DependencyBlocker::HarnessDeleted | DependencyBlocker::AgentDeleted => {
494                        "deleted"
495                    }
496                },
497            );
498        let mut error_message = Message::assistant(blocker.message());
499        let mut metadata = std::collections::HashMap::new();
500        user_error.apply_to_message_metadata(&mut metadata);
501        error_message.metadata = Some(metadata);
502
503        self.emit_event(EventRequest::new(
504            self.session_id,
505            EventContext::turn(turn_id, input_message_id),
506            OutputMessageCompletedData::new(error_message).with_user_facing_error(&user_error),
507        ))
508        .await;
509
510        self.turn_failed(
511            turn_id,
512            input_message_id,
513            blocker.message(),
514            Some(&user_error),
515        )
516        .await;
517    }
518}
519
520pub async fn detect_dependency_blocker<A: RuntimeHostAdapter>(
521    adapter: &A,
522    org_id: i64,
523    harness_id: HarnessId,
524    agent_id: Option<AgentId>,
525) -> everruns_core::error::Result<Option<DependencyBlocker>> {
526    let harness_store = adapter.harness_store(org_id);
527    let agent_store = adapter.agent_store(org_id);
528    everruns_core::detect_dependency_blocker(
529        harness_store.as_ref(),
530        agent_store.as_ref(),
531        harness_id,
532        agent_id,
533    )
534    .await
535}
536
537pub async fn execute_input_activity<A: RuntimeHostAdapter>(
538    adapter: &A,
539    org_id: i64,
540    input: InputAtomInput,
541) -> everruns_core::error::Result<InputAtomResult> {
542    RuntimeSessionLifecycle::new(adapter.clone(), org_id, input.context.session_id)
543        .turn_started(input.context.turn_id, input.context.input_message_id)
544        .await;
545
546    let atom = InputAtom::new(adapter.message_store());
547    atom.execute(input).await
548}
549
550pub async fn execute_reason_activity<A: RuntimeHostAdapter>(
551    adapter: &A,
552    org_id: i64,
553    input: ReasonInput,
554) -> everruns_core::error::Result<ReasonResult> {
555    if let Some(blocker) =
556        detect_dependency_blocker(adapter, org_id, input.harness_id, input.agent_id).await?
557    {
558        RuntimeSessionLifecycle::new(adapter.clone(), org_id, input.context.session_id)
559            .dependency_blocked(
560                input.context.turn_id,
561                input.context.input_message_id,
562                blocker,
563            )
564            .await;
565        return Ok(ReasonResult {
566            success: false,
567            text: blocker.message().to_string(),
568            tool_calls: vec![],
569            has_tool_calls: false,
570            tool_definitions: vec![],
571            max_iterations: everruns_core::runtime_agent::default_max_iterations(),
572            error: Some("dependency_unavailable".to_string()),
573            usage: None,
574            output_message_id: None,
575            time_to_first_token_ms: None,
576            response_id: None,
577            locale: None,
578            network_access: None,
579        });
580    }
581
582    let turn_context = adapter
583        .load_turn_context(org_id, input.context.session_id)
584        .await?;
585
586    let mut atom = ReasonAtom::new(
587        adapter.harness_store(org_id),
588        adapter.agent_store(org_id),
589        adapter.session_store(org_id),
590        adapter.message_store(),
591        adapter.provider_store(org_id),
592        adapter.capability_registry(),
593        adapter.driver_registry(),
594        adapter.event_emitter(),
595    )
596    .with_file_store(adapter.file_store());
597    if let Some(image_resolver) = adapter.image_resolver(org_id) {
598        atom = atom.with_image_resolver(image_resolver);
599    }
600
601    atom.execute(ReasonInput {
602        mcp_tool_definitions: turn_context.mcp_tool_definitions,
603        ..input
604    })
605    .await
606}
607
608pub async fn execute_act_activity<A: RuntimeHostAdapter>(
609    adapter: &A,
610    input: ActInput,
611) -> everruns_core::error::Result<ActResult> {
612    let org_id = input.org_id.ok_or_else(|| {
613        everruns_core::error::AgentLoopError::config(
614            "ActInput.org_id must be set for runtime host execution",
615        )
616    })?;
617
618    if let Some(blocker) =
619        detect_dependency_blocker(adapter, org_id, input.harness_id, input.agent_id).await?
620    {
621        RuntimeSessionLifecycle::new(adapter.clone(), org_id, input.context.session_id)
622            .dependency_blocked(
623                input.context.turn_id,
624                input.context.input_message_id,
625                blocker,
626            )
627            .await;
628        return Ok(ActResult {
629            results: vec![],
630            completed: true,
631            success_count: 0,
632            error_count: 1,
633            waiting_for_tool_results: false,
634            blocked: true,
635            client_tool_calls: vec![],
636            client_tool_definitions: vec![],
637        });
638    }
639
640    let execution_capabilities = load_execution_capabilities(
641        adapter,
642        org_id,
643        input.context.session_id,
644        input.harness_id,
645        input.agent_id,
646        input.locale.clone(),
647        input.blueprint_id.as_deref(),
648    )
649    .await?;
650    let tool_registry = execution_capabilities.tool_registry;
651    let builtin_tool_registry = Arc::new(tool_registry.clone());
652
653    let mut atom =
654        ActAtom::with_file_store(tool_registry, adapter.event_emitter(), adapter.file_store())
655            .with_session_store(adapter.session_store(org_id))
656            .with_session_mutator(adapter.session_mutator(org_id))
657            .with_agent_store(adapter.agent_store(org_id))
658            .with_tool_registry(builtin_tool_registry)
659            .with_org_id(
660                org_public_id_from_internal(org_id)
661                    .parse()
662                    .expect("internal org id converts to valid public org id"),
663            )
664            .with_capability_registry(adapter.capability_registry())
665            .with_post_tool_hooks(execution_capabilities.post_tool_hooks)
666            .with_tool_call_hooks(execution_capabilities.tool_call_hooks);
667
668    if let Some(storage_store) = adapter.storage_store() {
669        atom = atom.with_storage_store(storage_store);
670    }
671    if let Some(image_store) = adapter.image_artifact_store(org_id) {
672        atom = atom.with_image_store(image_store);
673    }
674    if let Some(provider_credential_store) = adapter.provider_credential_store(org_id) {
675        atom = atom.with_provider_credential_store(provider_credential_store);
676    }
677    if let Some(utility_llm_service) = adapter.utility_llm_service() {
678        atom = atom.with_utility_llm_service(utility_llm_service);
679    }
680    if let Some(memory_store) = adapter.memory_store(org_id) {
681        atom = atom.with_memory_store(memory_store);
682    }
683    if let Some(connection_resolver) = adapter.connection_resolver() {
684        atom = atom.with_connection_resolver(connection_resolver);
685    }
686    if let Some(sqldb_store) = adapter.sqldb_store() {
687        atom = atom.with_sqldb_store(sqldb_store);
688    }
689    if let Some(leased_resource_store) = adapter.leased_resource_store() {
690        atom = atom.with_leased_resource_store(leased_resource_store);
691    }
692    if let Some(registry) = adapter.session_resource_registry() {
693        atom = atom.with_session_resource_registry(registry);
694    }
695    if let Some(schedule_store) = adapter.schedule_store(org_id) {
696        atom = atom.with_schedule_store(schedule_store);
697    }
698    if let Some(platform_store) = adapter.platform_store(org_id, input.context.session_id) {
699        atom = atom.with_platform_store(platform_store);
700    }
701    if let Some(budget_checker) = adapter.budget_checker(org_id, input.agent_id) {
702        atom = atom.with_budget_checker(budget_checker);
703    }
704    if let Some(payment_authority) = adapter.payment_authority(org_id, input.agent_id) {
705        atom = atom.with_payment_authority(payment_authority);
706    }
707
708    atom.execute(input).await
709}