Skip to main content

everruns_runtime/
host.rs

1// Shared host orchestration for embedded and durable execution hosts.
2// Decision: everruns-runtime owns worker-facing turn phase execution so
3// durable/server-backed hosts reuse the same input/reason/act wiring.
4
5use async_trait::async_trait;
6use everruns_core::atoms::{
7    ActAtom, ActInput, ActResult, Atom, InputAtom, InputAtomInput, InputAtomResult, ReasonAtom,
8    ReasonInput, ReasonResult,
9};
10use everruns_core::capabilities::{SystemPromptContext, collect_capabilities_with_configs};
11use everruns_core::events::{
12    EventContext, EventRequest, OutputMessageCompletedData, SessionActivatedData, SessionIdledData,
13    TurnCompletedData, TurnFailedData, TurnStartedData,
14};
15use everruns_core::message::{ContentPart, Message};
16use everruns_core::message_retriever::MessageRetriever;
17use everruns_core::platform_store::PlatformStore;
18use everruns_core::session::SessionStatus;
19use everruns_core::traits::{
20    AgentStore, BudgetChecker, EventEmitter, HarnessStore, ImageArtifactStore, ImageResolver,
21    LeasedResourceStore, LlmProviderStore, ModelWithProvider, PaymentAuthority,
22    ProviderCredentialStore, SessionFileSystem, SessionMutator, SessionResourceRegistry,
23    SessionScheduleStore, SessionSqlDbStoreRef, SessionStorageStore, SessionStore,
24    UserConnectionResolver,
25};
26use everruns_core::typed_id::{AgentId, HarnessId, MessageId, SessionId, TurnId};
27use everruns_core::{
28    Agent, CapabilityRegistry, CapabilityStatus, DependencyBlocker, DriverRegistry, EgressService,
29    Harness, Session, TokenUsage, ToolDefinition, ToolRegistry, UserFacingError, UtilityLlmService,
30    assemble_turn_context, org_public_id_from_internal, resolve_runtime_capabilities,
31};
32use std::sync::Arc;
33use tracing::warn;
34
35/// Turn context loaded in one batched call for runtime host execution.
36#[derive(Debug, Clone)]
37pub struct RuntimeHostTurnContext {
38    pub agent: Option<Agent>,
39    pub session: Session,
40    pub messages: Vec<Message>,
41    pub model: Option<ModelWithProvider>,
42    pub mcp_tool_definitions: Vec<ToolDefinition>,
43}
44
45/// Public adapter contract for server-backed or durable runtime hosts.
46///
47/// `everruns-runtime` owns shared host orchestration for both embedded and
48/// durable execution. That includes phase execution (`input -> reason -> act`),
49/// lifecycle emission, and the generic turn-strategy decisions used by durable
50/// or custom hosts.
51///
52/// Host crates implement this trait to provide persistence, session-lifecycle
53/// plumbing, event delivery, and their own orchestration backend. The durable
54/// engine itself remains outside this crate.
55#[async_trait]
56pub trait RuntimeHostAdapter: Send + Sync + Clone + 'static {
57    async fn get_agent(
58        &self,
59        org_id: i64,
60        agent_id: AgentId,
61    ) -> everruns_core::error::Result<Option<Agent>>;
62
63    async fn get_harness(
64        &self,
65        org_id: i64,
66        harness_id: HarnessId,
67    ) -> everruns_core::error::Result<Option<Harness>>;
68
69    async fn set_session_status(
70        &self,
71        org_id: i64,
72        session_id: SessionId,
73        status: SessionStatus,
74    ) -> everruns_core::error::Result<Session>;
75
76    async fn load_turn_context(
77        &self,
78        org_id: i64,
79        session_id: SessionId,
80    ) -> everruns_core::error::Result<RuntimeHostTurnContext>;
81
82    fn capability_registry(&self) -> CapabilityRegistry;
83
84    fn driver_registry(&self) -> DriverRegistry;
85
86    fn harness_store(&self, org_id: i64) -> Arc<dyn HarnessStore>;
87
88    fn agent_store(&self, org_id: i64) -> Arc<dyn AgentStore>;
89
90    fn session_store(&self, org_id: i64) -> Arc<dyn SessionStore>;
91
92    fn session_mutator(&self, org_id: i64) -> Arc<dyn SessionMutator>;
93
94    fn provider_store(&self, org_id: i64) -> Arc<dyn LlmProviderStore>;
95
96    fn message_store(&self) -> Arc<dyn MessageRetriever>;
97
98    fn event_emitter(&self) -> Arc<dyn EventEmitter>;
99
100    fn file_store(&self) -> Arc<dyn SessionFileSystem>;
101
102    fn image_resolver(&self, _org_id: i64) -> Option<Arc<dyn ImageResolver>> {
103        None
104    }
105
106    fn image_artifact_store(&self, _org_id: i64) -> Option<Arc<dyn ImageArtifactStore>> {
107        None
108    }
109
110    fn provider_credential_store(&self, _org_id: i64) -> Option<Arc<dyn ProviderCredentialStore>> {
111        None
112    }
113
114    fn utility_llm_service(&self) -> Option<Arc<dyn UtilityLlmService>> {
115        None
116    }
117
118    fn egress_service(&self) -> Option<Arc<dyn EgressService>> {
119        None
120    }
121
122    fn storage_store(&self) -> Option<Arc<dyn SessionStorageStore>> {
123        None
124    }
125
126    fn connection_resolver(&self) -> Option<Arc<dyn UserConnectionResolver>> {
127        None
128    }
129
130    fn sqldb_store(&self) -> Option<SessionSqlDbStoreRef> {
131        None
132    }
133
134    fn leased_resource_store(&self) -> Option<Arc<dyn LeasedResourceStore>> {
135        None
136    }
137
138    fn session_resource_registry(&self) -> Option<Arc<dyn SessionResourceRegistry>> {
139        None
140    }
141
142    fn session_task_registry(
143        &self,
144    ) -> Option<Arc<dyn everruns_core::session_task::SessionTaskRegistry>> {
145        None
146    }
147
148    fn schedule_store(&self, _org_id: i64) -> Option<Arc<dyn SessionScheduleStore>> {
149        None
150    }
151
152    fn platform_store(
153        &self,
154        _org_id: i64,
155        _session_id: SessionId,
156    ) -> Option<Arc<dyn PlatformStore>> {
157        None
158    }
159
160    fn budget_checker(
161        &self,
162        _org_id: i64,
163        _agent_id: Option<AgentId>,
164    ) -> Option<Arc<dyn BudgetChecker>> {
165        None
166    }
167
168    fn payment_authority(
169        &self,
170        _org_id: i64,
171        _agent_id: Option<AgentId>,
172    ) -> Option<Arc<dyn PaymentAuthority>> {
173        None
174    }
175
176    /// Per-org outbound tool-call rate limiter (TM-TOOL-009).
177    /// Default: `None` (no rate limiting — suitable for in-process / test environments).
178    fn outbound_tool_rate_limiter(
179        &self,
180        _org_id: i64,
181    ) -> Option<Arc<dyn everruns_core::OutboundToolRateLimiter>> {
182        None
183    }
184
185    /// Per-turn durable tool result store for act-activity idempotency (EVE-530).
186    /// Default: `None` (no durable claim/settle — every execution runs tools fresh).
187    fn durable_tool_result_store(&self) -> Option<Arc<dyn everruns_core::DurableToolResultStore>> {
188        None
189    }
190
191    /// Durable subagent spawn handle store for reattach on reclaim (EVE-535).
192    /// Default: `None` (no spawn dedup — dev/test mode or hosts without durable execution).
193    fn subagent_spawn_store(&self) -> Option<Arc<dyn everruns_core::SubagentSpawnStore>> {
194        None
195    }
196
197    /// Stream-liveness heartbeater for the Reason activity (EVE-531).
198    /// Default: `None` (no heartbeats sent — durable workers supply one).
199    fn stream_heartbeater(&self) -> Option<Arc<dyn everruns_core::StreamHeartbeater>> {
200        None
201    }
202
203    /// Partial-stream store for ContinuePartial recovery (EVE-532).
204    /// Default: `None` (no recovery; in-memory and dev hosts use this default).
205    fn partial_stream_store(&self) -> Option<Arc<dyn everruns_core::PartialStreamStore>> {
206        None
207    }
208
209    /// Provider stall timeout for the Reason activity (EVE-531).
210    /// Default: `None` (use built-in 120s default).
211    fn provider_stall_timeout(&self) -> Option<std::time::Duration> {
212        None
213    }
214
215    /// MCP executor routing `mcp_*` tool calls for this session, if the host
216    /// configures MCP (specs/runtime-mcp.md D4). Default: `None`, so hosts
217    /// without scoped MCP servers keep the plain tool registry unchanged.
218    async fn mcp_executor(
219        &self,
220        _org_id: i64,
221        _session_id: SessionId,
222    ) -> Option<Arc<everruns_mcp::McpExecutor>> {
223        None
224    }
225}
226
227struct RuntimeExecutionCapabilities {
228    tool_registry: ToolRegistry,
229    post_tool_hooks: Vec<Arc<dyn everruns_core::PostToolExecHook>>,
230    pre_tool_hooks: Vec<Arc<dyn everruns_core::atoms::PreToolUseHook>>,
231    tool_call_hooks: Vec<Arc<dyn everruns_core::ToolCallHook>>,
232}
233
234/// Collect and finalize user-hook specs for a session from its resolved
235/// capability configs, plus the shared bash dispatcher used to run them.
236///
237/// This is the single place hook specs are gathered so every firing point —
238/// the act path (`load_execution_capabilities`) and the lifecycle firing
239/// points (`execute_reason_activity` for `user_prompt_submit`, turn completion
240/// for `turn_end`, and the server session paths) — applies identical
241/// `finalize_hook_specs` semantics: `{capability_id}:` namespace stamping,
242/// stable default ids, and `disabled_contributions` muting (TM-HOOK-004).
243fn finalize_specs_from_configs(
244    resolved_capability_configs: &[everruns_core::capability_types::AgentCapabilityConfig],
245    capability_registry: &CapabilityRegistry,
246) -> Vec<everruns_core::user_hook_types::UserHookSpec> {
247    let mut hook_contributions: Vec<(String, Vec<everruns_core::user_hook_types::UserHookSpec>)> =
248        Vec::new();
249    let mut disabled_contributions: Vec<String> = Vec::new();
250    for config in resolved_capability_configs {
251        let Some(capability) = capability_registry.get(config.capability_id()) else {
252            continue;
253        };
254        let specs = capability.user_hooks_with_config(&config.config);
255        if !specs.is_empty() {
256            hook_contributions.push((config.capability_id().to_string(), specs));
257        }
258        if config.capability_id() == "user_hooks" {
259            disabled_contributions.extend(
260                everruns_core::capabilities::user_hooks::disabled_contributions(&config.config),
261            );
262        }
263    }
264    everruns_core::hook_adapter::finalize_hook_specs(hook_contributions, &disabled_contributions)
265}
266
267/// Resolve a session's capability configs and collect finalized hook specs.
268/// Used by the lifecycle firing points, which need specs outside the act path.
269/// Returns `(specs, dispatcher)`; `specs` is empty when the session has no
270/// hook-contributing capabilities.
271async fn collect_lifecycle_hook_specs<A: RuntimeHostAdapter>(
272    adapter: &A,
273    org_id: i64,
274    session_id: SessionId,
275    harness_id: HarnessId,
276    agent_id: Option<AgentId>,
277) -> everruns_core::error::Result<(
278    Vec<everruns_core::user_hook_types::UserHookSpec>,
279    Arc<dyn everruns_core::hook_executor::BashHookDispatcher>,
280)> {
281    let capability_registry = adapter.capability_registry();
282    let harness_chain = adapter
283        .harness_store(org_id)
284        .get_harness_chain(harness_id)
285        .await?;
286    if harness_chain.is_empty() {
287        return Err(everruns_core::error::AgentLoopError::harness_not_found(
288            harness_id,
289        ));
290    }
291    let session = adapter
292        .session_store(org_id)
293        .get_session(session_id)
294        .await?
295        .ok_or_else(|| everruns_core::error::AgentLoopError::session_not_found(session_id))?;
296    let agent = match agent_id {
297        Some(agent_id) => adapter.agent_store(org_id).get_agent(agent_id).await?,
298        None => None,
299    };
300    let resolved = resolve_runtime_capabilities(
301        &harness_chain,
302        agent.as_ref(),
303        &session,
304        &capability_registry,
305    );
306    let specs =
307        finalize_specs_from_configs(&resolved.resolved_capability_configs, &capability_registry);
308    let dispatcher: Arc<dyn everruns_core::hook_executor::BashHookDispatcher> = Arc::new(
309        everruns_core::hook_dispatch::BashkitShellHookDispatcher::new(adapter.file_store()),
310    );
311    Ok((specs, dispatcher))
312}
313
314async fn load_execution_capabilities<A: RuntimeHostAdapter>(
315    adapter: &A,
316    org_id: i64,
317    session_id: SessionId,
318    harness_id: HarnessId,
319    agent_id: Option<AgentId>,
320    locale: Option<String>,
321    blueprint_id: Option<&str>,
322) -> everruns_core::error::Result<RuntimeExecutionCapabilities> {
323    let capability_registry = adapter.capability_registry();
324    if let Some(blueprint_id) = blueprint_id {
325        let mut registry = ToolRegistry::with_defaults();
326        let blueprint = capability_registry.blueprint(blueprint_id).ok_or_else(|| {
327            everruns_core::error::AgentLoopError::config(format!(
328                "Blueprint \"{blueprint_id}\" not found in registry"
329            ))
330        })?;
331        for tool in blueprint.tools {
332            registry.register_boxed(tool);
333        }
334        return Ok(RuntimeExecutionCapabilities {
335            tool_registry: registry,
336            post_tool_hooks: Vec::new(),
337            pre_tool_hooks: Vec::new(),
338            tool_call_hooks: Vec::new(),
339        });
340    }
341
342    let harness_chain = adapter
343        .harness_store(org_id)
344        .get_harness_chain(harness_id)
345        .await?;
346    if harness_chain.is_empty() {
347        return Err(everruns_core::error::AgentLoopError::harness_not_found(
348            harness_id,
349        ));
350    }
351
352    let session = adapter
353        .session_store(org_id)
354        .get_session(session_id)
355        .await?
356        .ok_or_else(|| everruns_core::error::AgentLoopError::session_not_found(session_id))?;
357
358    let agent_store = adapter.agent_store(org_id);
359    let agent = match agent_id {
360        Some(agent_id) => Some(
361            agent_store
362                .get_agent(agent_id)
363                .await?
364                .ok_or_else(|| everruns_core::error::AgentLoopError::agent_not_found(agent_id))?,
365        ),
366        None => None,
367    };
368
369    let resolved = resolve_runtime_capabilities(
370        &harness_chain,
371        agent.as_ref(),
372        &session,
373        &capability_registry,
374    );
375    // Executor (act) path: this builds the worker-side tool registry, not the
376    // model-visible tool list. The model is left unset, so a model-adaptive
377    // capability like `auto_tool_search` resolves to its provider-agnostic
378    // client-side mechanism here. That registers the `tool_search` tool in the
379    // executor, which is a harmless superset: on native models the reason path
380    // never shows that tool to the model, so it is simply never called.
381    let prompt_ctx = SystemPromptContext {
382        session_id,
383        locale: locale.or(session.locale.clone()),
384        file_store: Some(adapter.file_store()),
385        model: None,
386    };
387    let collected = collect_capabilities_with_configs(
388        &resolved.resolved_capability_configs,
389        &capability_registry,
390        &prompt_ctx,
391    )
392    .await;
393
394    let mut registry = ToolRegistry::with_defaults();
395    for tool in collected.tools {
396        registry.register_boxed(tool);
397    }
398
399    // Only `Available` capabilities contribute hooks, matching
400    // `collect_capabilities_with_configs` (which skips non-available
401    // capabilities). This keeps a `ComingSoon`/unavailable capability from
402    // affecting execution via any of its hook seams.
403    let mut post_tool_hooks: Vec<Arc<dyn everruns_core::PostToolExecHook>> = resolved
404        .resolved_capability_configs
405        .iter()
406        .flat_map(|config| {
407            capability_registry
408                .get(config.capability_id())
409                .filter(|capability| capability.status() == CapabilityStatus::Available)
410                .map(|capability| capability.post_tool_exec_hooks())
411                .unwrap_or_default()
412        })
413        .collect();
414
415    // User-hook contributions (see `specs/user-hooks.md`). `finalize_specs_from_configs`
416    // gathers specs across every resolved capability — both the user-facing
417    // `user_hooks` capability and any capability that bundles hooks — and applies
418    // `finalize_hook_specs` (namespace stamping, stable ids, `disabled_contributions`
419    // muting; TM-HOOK-004). The same helper backs the lifecycle firing points so
420    // every event finalizes specs identically.
421    let user_hook_specs =
422        finalize_specs_from_configs(&resolved.resolved_capability_configs, &capability_registry);
423    // Capability-contributed pre-tool hooks run first (e.g. approval gating),
424    // then user-hook (`PreToolUse`) specs. The first hook to block wins.
425    let mut pre_tool_hooks: Vec<Arc<dyn everruns_core::atoms::PreToolUseHook>> = resolved
426        .resolved_capability_configs
427        .iter()
428        .flat_map(|config| {
429            capability_registry
430                .get(config.capability_id())
431                .filter(|capability| capability.status() == CapabilityStatus::Available)
432                .map(|capability| capability.pre_tool_use_hooks())
433                .unwrap_or_default()
434        })
435        .collect();
436    if !user_hook_specs.is_empty() {
437        let dispatcher: Arc<dyn everruns_core::hook_executor::BashHookDispatcher> = Arc::new(
438            everruns_core::hook_dispatch::BashkitShellHookDispatcher::new(adapter.file_store()),
439        );
440        post_tool_hooks.extend(everruns_core::hook_adapter::build_post_tool_use_hooks(
441            &user_hook_specs,
442            dispatcher.clone(),
443        ));
444        pre_tool_hooks.extend(everruns_core::hook_adapter::build_pre_tool_use_hooks(
445            &user_hook_specs,
446            dispatcher,
447        ));
448    }
449
450    let tool_call_hooks = resolved
451        .resolved_capability_configs
452        .iter()
453        .flat_map(|config| {
454            capability_registry
455                .get(config.capability_id())
456                .filter(|capability| capability.status() == CapabilityStatus::Available)
457                .map(|capability| capability.tool_call_hooks())
458                .unwrap_or_default()
459        })
460        .collect();
461
462    Ok(RuntimeExecutionCapabilities {
463        tool_registry: registry,
464        post_tool_hooks,
465        pre_tool_hooks,
466        tool_call_hooks,
467    })
468}
469
470/// Shared lifecycle helper for runtime-backed hosts.
471pub struct RuntimeSessionLifecycle<A: RuntimeHostAdapter> {
472    adapter: A,
473    org_id: i64,
474    session_id: SessionId,
475}
476
477impl<A: RuntimeHostAdapter> RuntimeSessionLifecycle<A> {
478    pub fn new(adapter: A, org_id: i64, session_id: SessionId) -> Self {
479        Self {
480            adapter,
481            org_id,
482            session_id,
483        }
484    }
485
486    async fn set_session_status(&self, status: SessionStatus, action: &'static str) {
487        if let Err(error) = self
488            .adapter
489            .set_session_status(self.org_id, self.session_id, status)
490            .await
491        {
492            warn!(
493                session_id = %self.session_id,
494                org_id = self.org_id,
495                action,
496                %error,
497                "runtime host lifecycle status update failed"
498            );
499        }
500    }
501
502    async fn emit_event(&self, request: EventRequest) {
503        let event_type = request.event_type.clone();
504        if let Err(error) = self.adapter.event_emitter().emit(request).await {
505            warn!(
506                session_id = %self.session_id,
507                org_id = self.org_id,
508                event_type,
509                %error,
510                "runtime host lifecycle event emission failed"
511            );
512        }
513    }
514
515    pub async fn turn_started(&self, turn_id: TurnId, input_message_id: MessageId) {
516        let input_content = self
517            .adapter
518            .message_store()
519            .get(self.session_id, input_message_id)
520            .await
521            .ok()
522            .flatten()
523            .map(|message| message.content_to_llm_string());
524
525        self.set_session_status(SessionStatus::Active, "turn_started")
526            .await;
527
528        self.emit_event(EventRequest::new(
529            self.session_id,
530            EventContext::turn(turn_id, input_message_id),
531            SessionActivatedData {
532                turn_id,
533                input_message_id,
534            },
535        ))
536        .await;
537
538        self.emit_event(EventRequest::new(
539            self.session_id,
540            EventContext::turn(turn_id, input_message_id),
541            TurnStartedData {
542                turn_id,
543                input_message_id,
544                input_content,
545            },
546        ))
547        .await;
548    }
549
550    pub async fn emit_turn_completed(&self, input_message_id: MessageId, data: TurnCompletedData) {
551        let turn_id = data.turn_id;
552        self.emit_event(EventRequest::new(
553            self.session_id,
554            EventContext::turn(turn_id, input_message_id),
555            data,
556        ))
557        .await;
558    }
559
560    pub async fn emit_session_idled(
561        &self,
562        turn_id: TurnId,
563        input_message_id: MessageId,
564        iterations: Option<u32>,
565        usage: Option<TokenUsage>,
566    ) {
567        self.set_session_status(SessionStatus::Idle, "emit_session_idled")
568            .await;
569
570        self.emit_event(EventRequest::new(
571            self.session_id,
572            EventContext::turn(turn_id, input_message_id),
573            SessionIdledData {
574                turn_id,
575                iterations,
576                usage,
577            },
578        ))
579        .await;
580    }
581
582    pub async fn turn_completed(
583        &self,
584        turn_id: TurnId,
585        input_message_id: MessageId,
586        iterations: u32,
587        usage: Option<TokenUsage>,
588        input_content: Option<String>,
589    ) {
590        self.emit_turn_completed(
591            input_message_id,
592            TurnCompletedData {
593                turn_id,
594                iterations,
595                duration_ms: None,
596                usage: usage.clone(),
597                input_content,
598                final_message_id: None,
599                final_answer_preview: None,
600                time_to_first_token_ms: None,
601                tool_call_count: None,
602                llm_call_count: None,
603                status: Some("completed".to_string()),
604            },
605        )
606        .await;
607        self.emit_session_idled(turn_id, input_message_id, Some(iterations), usage)
608            .await;
609    }
610
611    /// Fire `turn_end` lifecycle hooks (advisory). Collects the session's hook
612    /// specs and runs every `turn_end` hook; failures are logged, never fatal.
613    /// `harness_id`/`agent_id` are required to resolve the capability chain.
614    pub async fn fire_turn_end_hooks(
615        &self,
616        harness_id: HarnessId,
617        agent_id: Option<AgentId>,
618        turn_id: TurnId,
619        success: bool,
620    ) {
621        let (specs, dispatcher) = match collect_lifecycle_hook_specs(
622            &self.adapter,
623            self.org_id,
624            self.session_id,
625            harness_id,
626            agent_id,
627        )
628        .await
629        {
630            Ok(pair) => pair,
631            Err(error) => {
632                warn!(
633                    session_id = %self.session_id,
634                    %error,
635                    "failed to collect turn_end hook specs; skipping"
636                );
637                return;
638            }
639        };
640        let hooks = everruns_core::lifecycle_hooks::build_turn_lifecycle_hooks(
641            &specs,
642            everruns_core::user_hook_types::HookEvent::TurnEnd,
643            dispatcher,
644        );
645        if hooks.is_empty() {
646            return;
647        }
648        let ctx = everruns_core::lifecycle_hooks::TurnHookContext {
649            session_id: self.session_id,
650            turn_id: Some(turn_id),
651            org_id: org_public_id_from_internal(self.org_id).parse().ok(),
652            agent_id: agent_id.map(|a| a.to_string()),
653        };
654        everruns_core::lifecycle_hooks::run_turn_end_hooks(
655            &hooks,
656            &ctx,
657            serde_json::json!({ "success": success }),
658        )
659        .await;
660    }
661
662    /// Abort a turn because a `user_prompt_submit` hook returned `Block`.
663    /// Reuses the dependency-blocked failure shape: emit a user-facing message
664    /// carrying the hook's `user_message` (or `reason`), then mark the turn
665    /// failed and idle the session.
666    pub async fn user_prompt_blocked(
667        &self,
668        turn_id: TurnId,
669        input_message_id: MessageId,
670        reason: &str,
671        user_message: Option<&str>,
672    ) {
673        let user_error =
674            UserFacingError::new(everruns_core::user_facing_error_codes::BLOCKED_BY_HOOK);
675        let shown = user_message.unwrap_or(reason);
676        let mut error_message = Message::assistant(shown);
677        let mut metadata = std::collections::HashMap::new();
678        user_error.apply_to_message_metadata(&mut metadata);
679        error_message.metadata = Some(metadata);
680
681        self.emit_event(EventRequest::new(
682            self.session_id,
683            EventContext::turn(turn_id, input_message_id),
684            OutputMessageCompletedData::new(error_message).with_user_facing_error(&user_error),
685        ))
686        .await;
687
688        self.turn_failed(turn_id, input_message_id, reason, Some(&user_error))
689            .await;
690    }
691
692    pub async fn turn_failed(
693        &self,
694        turn_id: TurnId,
695        input_message_id: MessageId,
696        error: &str,
697        user_error: Option<&UserFacingError>,
698    ) {
699        self.set_session_status(SessionStatus::Idle, "turn_failed")
700            .await;
701
702        self.emit_event(EventRequest::new(
703            self.session_id,
704            EventContext::turn(turn_id, input_message_id),
705            {
706                let mut data = TurnFailedData {
707                    turn_id,
708                    error: error.to_string(),
709                    error_code: None,
710                    error_fields: None,
711                };
712                if let Some(user_error) = user_error {
713                    user_error.apply_to_event_fields(&mut data.error_code, &mut data.error_fields);
714                }
715                data
716            },
717        ))
718        .await;
719
720        self.emit_event(EventRequest::new(
721            self.session_id,
722            EventContext::turn(turn_id, input_message_id),
723            SessionIdledData {
724                turn_id,
725                iterations: None,
726                usage: None,
727            },
728        ))
729        .await;
730    }
731
732    pub async fn waiting_for_tool_results(&self) {
733        self.set_session_status(
734            SessionStatus::WaitingForToolResults,
735            "waiting_for_tool_results",
736        )
737        .await;
738    }
739
740    pub async fn dependency_blocked(
741        &self,
742        turn_id: TurnId,
743        input_message_id: MessageId,
744        blocker: DependencyBlocker,
745    ) {
746        let user_error = UserFacingError::new(blocker.error_code())
747            .with_field(
748                "dependency",
749                match blocker {
750                    DependencyBlocker::HarnessArchived | DependencyBlocker::HarnessDeleted => {
751                        "harness"
752                    }
753                    DependencyBlocker::AgentArchived | DependencyBlocker::AgentDeleted => "agent",
754                },
755            )
756            .with_field(
757                "state",
758                match blocker {
759                    DependencyBlocker::HarnessArchived | DependencyBlocker::AgentArchived => {
760                        "archived"
761                    }
762                    DependencyBlocker::HarnessDeleted | DependencyBlocker::AgentDeleted => {
763                        "deleted"
764                    }
765                },
766            );
767        let mut error_message = Message::assistant(blocker.message());
768        let mut metadata = std::collections::HashMap::new();
769        user_error.apply_to_message_metadata(&mut metadata);
770        error_message.metadata = Some(metadata);
771
772        self.emit_event(EventRequest::new(
773            self.session_id,
774            EventContext::turn(turn_id, input_message_id),
775            OutputMessageCompletedData::new(error_message).with_user_facing_error(&user_error),
776        ))
777        .await;
778
779        self.turn_failed(
780            turn_id,
781            input_message_id,
782            blocker.message(),
783            Some(&user_error),
784        )
785        .await;
786    }
787}
788
789pub async fn detect_dependency_blocker<A: RuntimeHostAdapter>(
790    adapter: &A,
791    org_id: i64,
792    harness_id: HarnessId,
793    agent_id: Option<AgentId>,
794) -> everruns_core::error::Result<Option<DependencyBlocker>> {
795    let harness_store = adapter.harness_store(org_id);
796    let agent_store = adapter.agent_store(org_id);
797    everruns_core::detect_dependency_blocker(
798        harness_store.as_ref(),
799        agent_store.as_ref(),
800        harness_id,
801        agent_id,
802    )
803    .await
804}
805
806pub async fn execute_input_activity<A: RuntimeHostAdapter>(
807    adapter: &A,
808    org_id: i64,
809    input: InputAtomInput,
810) -> everruns_core::error::Result<InputAtomResult> {
811    RuntimeSessionLifecycle::new(adapter.clone(), org_id, input.context.session_id)
812        .turn_started(input.context.turn_id, input.context.input_message_id)
813        .await;
814
815    let atom = InputAtom::new(adapter.message_store());
816    atom.execute(input).await
817}
818
819/// Collect `user_prompt_submit` hooks for this turn and run them against the
820/// inbound user message text. Returns `None` when the session has no such
821/// hooks (the common case — no overhead beyond the spec collection, which is
822/// skipped early). Errors loading specs are logged and treated as "no hooks"
823/// so a hook-collection failure never blocks a turn that wasn't asking to be
824/// hooked.
825struct UserPromptHookResult {
826    decision: everruns_core::lifecycle_hooks::UserPromptDecision,
827    original_message: String,
828}
829
830async fn run_user_prompt_submit_for_turn<A: RuntimeHostAdapter>(
831    adapter: &A,
832    org_id: i64,
833    input: &ReasonInput,
834) -> everruns_core::error::Result<Option<UserPromptHookResult>> {
835    let (specs, dispatcher) = match collect_lifecycle_hook_specs(
836        adapter,
837        org_id,
838        input.context.session_id,
839        input.harness_id,
840        input.agent_id,
841    )
842    .await
843    {
844        Ok(pair) => pair,
845        Err(error) => {
846            warn!(
847                session_id = %input.context.session_id,
848                %error,
849                "failed to collect user_prompt_submit hook specs; continuing without them"
850            );
851            return Ok(None);
852        }
853    };
854    let hooks = everruns_core::lifecycle_hooks::build_turn_lifecycle_hooks(
855        &specs,
856        everruns_core::user_hook_types::HookEvent::UserPromptSubmit,
857        dispatcher,
858    );
859    if hooks.is_empty() {
860        return Ok(None);
861    }
862
863    let message_text = adapter
864        .message_store()
865        .get(input.context.session_id, input.context.input_message_id)
866        .await
867        .ok()
868        .flatten()
869        .map(|m| m.content_to_llm_string())
870        .unwrap_or_default();
871
872    let ctx = everruns_core::lifecycle_hooks::TurnHookContext {
873        session_id: input.context.session_id,
874        turn_id: Some(input.context.turn_id),
875        org_id: org_public_id_from_internal(org_id).parse().ok(),
876        agent_id: input.agent_id.map(|a| a.to_string()),
877    };
878    let original_message = message_text.clone();
879    let decision =
880        everruns_core::lifecycle_hooks::run_user_prompt_submit_hooks(&hooks, &ctx, message_text)
881            .await;
882    Ok(Some(UserPromptHookResult {
883        decision,
884        original_message,
885    }))
886}
887
888pub async fn execute_reason_activity<A: RuntimeHostAdapter>(
889    adapter: &A,
890    org_id: i64,
891    input: ReasonInput,
892) -> everruns_core::error::Result<ReasonResult> {
893    if let Some(blocker) =
894        detect_dependency_blocker(adapter, org_id, input.harness_id, input.agent_id).await?
895    {
896        RuntimeSessionLifecycle::new(adapter.clone(), org_id, input.context.session_id)
897            .dependency_blocked(
898                input.context.turn_id,
899                input.context.input_message_id,
900                blocker,
901            )
902            .await;
903        return Ok(ReasonResult {
904            success: false,
905            text: blocker.message().to_string(),
906            tool_calls: vec![],
907            has_tool_calls: false,
908            tool_definitions: vec![],
909            max_iterations: everruns_core::runtime_agent::default_max_iterations(),
910            error: Some("dependency_unavailable".to_string()),
911            usage: None,
912            output_message_id: None,
913            time_to_first_token_ms: None,
914            response_id: None,
915            locale: None,
916            network_access: None,
917        });
918    }
919
920    // user_prompt_submit hook (see `specs/user-hooks.md`). Fires once per turn,
921    // on the first reason iteration, before the LLM is consulted — the closest
922    // choke point to "inbound user message accepted, before reason" that both
923    // the in-process loop and the durable worker share. A `Block` aborts the
924    // turn by reusing the same failure path as `dependency_blocked`: emit a
925    // user-facing message + turn.failed, idle the session, and return a
926    // non-success `ReasonResult` so no LLM/act work runs.
927    let mut user_prompt_message_override = None;
928    if input.iteration <= 1
929        && let Some(hook_result) = run_user_prompt_submit_for_turn(adapter, org_id, &input).await?
930    {
931        match hook_result.decision {
932            everruns_core::lifecycle_hooks::UserPromptDecision::Block {
933                reason,
934                user_message,
935            } => {
936                RuntimeSessionLifecycle::new(adapter.clone(), org_id, input.context.session_id)
937                    .user_prompt_blocked(
938                        input.context.turn_id,
939                        input.context.input_message_id,
940                        &reason,
941                        user_message.as_deref(),
942                    )
943                    .await;
944                return Ok(ReasonResult {
945                    success: false,
946                    text: user_message.unwrap_or_else(|| reason.clone()),
947                    tool_calls: vec![],
948                    has_tool_calls: false,
949                    tool_definitions: vec![],
950                    max_iterations: everruns_core::runtime_agent::default_max_iterations(),
951                    error: Some("blocked_by_user_prompt_hook".to_string()),
952                    usage: None,
953                    output_message_id: None,
954                    time_to_first_token_ms: None,
955                    response_id: None,
956                    locale: None,
957                    network_access: None,
958                });
959            }
960            everruns_core::lifecycle_hooks::UserPromptDecision::Continue { message } => {
961                if message != hook_result.original_message {
962                    user_prompt_message_override = Some(message);
963                }
964            }
965        }
966    }
967
968    let turn_context = adapter
969        .load_turn_context(org_id, input.context.session_id)
970        .await?;
971
972    let mut atom = ReasonAtom::new(
973        adapter.harness_store(org_id),
974        adapter.agent_store(org_id),
975        adapter.session_store(org_id),
976        adapter.message_store(),
977        adapter.provider_store(org_id),
978        adapter.capability_registry(),
979        adapter.driver_registry(),
980        adapter.event_emitter(),
981    )
982    .with_file_store(adapter.file_store());
983    if let Some(image_resolver) = adapter.image_resolver(org_id) {
984        atom = atom.with_image_resolver(image_resolver);
985    }
986    if let Some(hb) = adapter.stream_heartbeater() {
987        atom = atom.with_stream_heartbeater(hb);
988    }
989    if let Some(timeout) = adapter.provider_stall_timeout() {
990        atom = atom.with_provider_stall_timeout(timeout);
991    }
992    if let Some(store) = adapter.partial_stream_store() {
993        atom = atom.with_partial_stream_store(store);
994    }
995    if let Some(store) = adapter.durable_tool_result_store() {
996        atom = atom.with_durable_tool_result_store(store);
997    }
998
999    let input = ReasonInput {
1000        mcp_tool_definitions: turn_context.mcp_tool_definitions,
1001        ..input
1002    };
1003
1004    if let Some(message_override) = user_prompt_message_override {
1005        let mut assembled = assemble_turn_context(
1006            adapter.harness_store(org_id).as_ref(),
1007            adapter.agent_store(org_id).as_ref(),
1008            adapter.session_store(org_id).as_ref(),
1009            adapter.message_store().as_ref(),
1010            adapter.provider_store(org_id).as_ref(),
1011            &adapter.capability_registry(),
1012            input.context.session_id,
1013            input.harness_id,
1014            input.agent_id,
1015            &input.mcp_tool_definitions,
1016            Some(adapter.file_store()),
1017        )
1018        .await?;
1019
1020        let message = assembled
1021            .messages
1022            .iter_mut()
1023            .find(|message| message.id == input.context.input_message_id)
1024            .ok_or_else(|| {
1025                everruns_core::error::AgentLoopError::config(
1026                    "user_prompt_submit mutation: input message not found in assembled context",
1027                )
1028            })?;
1029
1030        // user_prompt_submit mutations are enforcement controls for the
1031        // provider-bound prompt. Apply them to the assembled context only
1032        // so persisted user history remains an audit record of the input.
1033        // Preserve non-text parts (images, files); replace only text parts.
1034        message
1035            .content
1036            .retain(|part| !matches!(part, ContentPart::Text(_)));
1037        message
1038            .content
1039            .insert(0, ContentPart::text(message_override));
1040
1041        return atom.execute_with_assembled_context(input, assembled).await;
1042    }
1043
1044    atom.execute(input).await
1045}
1046
1047pub async fn execute_act_activity<A: RuntimeHostAdapter>(
1048    adapter: &A,
1049    input: ActInput,
1050) -> everruns_core::error::Result<ActResult> {
1051    let org_id = input.org_id.ok_or_else(|| {
1052        everruns_core::error::AgentLoopError::config(
1053            "ActInput.org_id must be set for runtime host execution",
1054        )
1055    })?;
1056
1057    if let Some(blocker) =
1058        detect_dependency_blocker(adapter, org_id, input.harness_id, input.agent_id).await?
1059    {
1060        RuntimeSessionLifecycle::new(adapter.clone(), org_id, input.context.session_id)
1061            .dependency_blocked(
1062                input.context.turn_id,
1063                input.context.input_message_id,
1064                blocker,
1065            )
1066            .await;
1067        return Ok(ActResult {
1068            results: vec![],
1069            completed: true,
1070            success_count: 0,
1071            error_count: 1,
1072            waiting_for_tool_results: false,
1073            blocked: true,
1074            client_tool_calls: vec![],
1075            client_tool_definitions: vec![],
1076        });
1077    }
1078
1079    let execution_capabilities = load_execution_capabilities(
1080        adapter,
1081        org_id,
1082        input.context.session_id,
1083        input.harness_id,
1084        input.agent_id,
1085        input.locale.clone(),
1086        input.blueprint_id.as_deref(),
1087    )
1088    .await?;
1089    let mut tool_registry = execution_capabilities.tool_registry;
1090
1091    // Register the session's MCP tools as first-class registry tools, so they
1092    // execute through the regular `ToolExecutor` path and are visible to
1093    // everything that introspects the registry (spawn_background, tool_search,
1094    // openai_tool_search namespaces, ...). The turn's tool definitions already
1095    // include the discovered MCP tools, so no re-discovery is needed; the host's
1096    // MCP executor supplies execution (specs/runtime-mcp.md D5).
1097    if let Some(mcp) = adapter.mcp_executor(org_id, input.context.session_id).await {
1098        let invoker: Arc<dyn everruns_core::McpToolInvoker> = mcp;
1099        for tool in everruns_core::build_mcp_proxy_tools(&input.tool_definitions, invoker) {
1100            tool_registry.register_boxed(tool);
1101        }
1102    }
1103
1104    let builtin_tool_registry = Arc::new(tool_registry.clone());
1105    let executor: Arc<dyn everruns_core::traits::ToolExecutor> = Arc::new(tool_registry);
1106
1107    let mut atom =
1108        ActAtom::with_file_store(executor, adapter.event_emitter(), adapter.file_store())
1109            .with_session_store(adapter.session_store(org_id))
1110            .with_session_mutator(adapter.session_mutator(org_id))
1111            .with_agent_store(adapter.agent_store(org_id))
1112            .with_tool_registry(builtin_tool_registry)
1113            .with_org_id(
1114                org_public_id_from_internal(org_id)
1115                    .parse()
1116                    .expect("internal org id converts to valid public org id"),
1117            )
1118            .with_capability_registry(adapter.capability_registry())
1119            .with_post_tool_hooks(execution_capabilities.post_tool_hooks)
1120            .with_pre_tool_hooks(execution_capabilities.pre_tool_hooks)
1121            .with_tool_call_hooks(execution_capabilities.tool_call_hooks);
1122
1123    if let Some(storage_store) = adapter.storage_store() {
1124        atom = atom.with_storage_store(storage_store);
1125    }
1126    if let Some(image_store) = adapter.image_artifact_store(org_id) {
1127        atom = atom.with_image_store(image_store);
1128    }
1129    if let Some(provider_credential_store) = adapter.provider_credential_store(org_id) {
1130        atom = atom.with_provider_credential_store(provider_credential_store);
1131    }
1132    if let Some(utility_llm_service) = adapter.utility_llm_service() {
1133        atom = atom.with_utility_llm_service(utility_llm_service);
1134    }
1135    if let Some(egress_service) = adapter.egress_service() {
1136        atom = atom.with_egress_service(egress_service);
1137    }
1138    if let Some(connection_resolver) = adapter.connection_resolver() {
1139        atom = atom.with_connection_resolver(connection_resolver);
1140    }
1141    if let Some(sqldb_store) = adapter.sqldb_store() {
1142        atom = atom.with_sqldb_store(sqldb_store);
1143    }
1144    if let Some(leased_resource_store) = adapter.leased_resource_store() {
1145        atom = atom.with_leased_resource_store(leased_resource_store);
1146    }
1147    if let Some(registry) = adapter.session_resource_registry() {
1148        atom = atom.with_session_resource_registry(registry);
1149    }
1150    if let Some(registry) = adapter.session_task_registry() {
1151        atom = atom.with_session_task_registry(registry);
1152    }
1153    if let Some(schedule_store) = adapter.schedule_store(org_id) {
1154        atom = atom.with_schedule_store(schedule_store);
1155    }
1156    if let Some(platform_store) = adapter.platform_store(org_id, input.context.session_id) {
1157        atom = atom.with_platform_store(platform_store);
1158    }
1159    if let Some(budget_checker) = adapter.budget_checker(org_id, input.agent_id) {
1160        atom = atom.with_budget_checker(budget_checker);
1161    }
1162    if let Some(payment_authority) = adapter.payment_authority(org_id, input.agent_id) {
1163        atom = atom.with_payment_authority(payment_authority);
1164    }
1165    if let Some(limiter) = adapter.outbound_tool_rate_limiter(org_id) {
1166        atom = atom.with_outbound_tool_rate_limiter(limiter);
1167    }
1168    if let Some(store) = adapter.durable_tool_result_store() {
1169        atom = atom.with_durable_tool_result_store(store);
1170    }
1171    if let Some(store) = adapter.subagent_spawn_store() {
1172        atom = atom.with_subagent_spawn_store(store);
1173    }
1174
1175    atom.execute(input).await
1176}