Skip to main content

everruns_runtime/
host.rs

1// Shared host orchestration for embedded and durable execution hosts.
2// Decision: everruns-runtime owns worker-facing turn phase execution so
3// durable/server-backed hosts reuse the same input/reason/act wiring.
4
5use async_trait::async_trait;
6use everruns_core::atoms::{
7    ActAtom, ActInput, ActResult, Atom, InputAtom, InputAtomInput, InputAtomResult, ReasonAtom,
8    ReasonInput, ReasonResult,
9};
10use everruns_core::capabilities::{SystemPromptContext, collect_capabilities_with_configs};
11use everruns_core::events::{
12    EventContext, EventRequest, OutputMessageCompletedData, SessionActivatedData, SessionIdledData,
13    TurnCompletedData, TurnFailedData, TurnStartedData,
14};
15use everruns_core::message::{ContentPart, Message};
16use everruns_core::message_retriever::MessageRetriever;
17use everruns_core::platform_store::PlatformStore;
18use everruns_core::session::SessionStatus;
19use everruns_core::traits::{
20    AgentStore, BudgetChecker, EventEmitter, HarnessStore, ImageArtifactStore, ImageResolver,
21    LeasedResourceStore, PaymentAuthority, ProviderCredentialStore, ProviderStore, ResolvedModel,
22    SessionFileSystem, SessionMutator, SessionResourceRegistry, SessionScheduleStore,
23    SessionSqlDbStoreRef, SessionStorageStore, SessionStore, UserConnectionResolver,
24};
25use everruns_core::typed_id::{AgentId, HarnessId, MessageId, SessionId, TurnId};
26use everruns_core::vector_store::KnowledgeIndexSearch;
27use everruns_core::{
28    Agent, CapabilityRegistry, CapabilityStatus, DependencyBlocker, DriverRegistry, EgressService,
29    ErrorDisclosure, Harness, Session, TokenUsage, ToolDefinition, ToolRegistry, UserFacingError,
30    UtilityLlmService, assemble_turn_context, org_public_id_from_internal,
31    resolve_runtime_capabilities,
32};
33use std::sync::Arc;
34use tracing::warn;
35
36/// Turn context loaded in one batched call for runtime host execution.
37#[derive(Debug, Clone)]
38pub struct RuntimeHostTurnContext {
39    pub agent: Option<Agent>,
40    pub session: Session,
41    pub messages: Vec<Message>,
42    pub model: Option<ResolvedModel>,
43    pub mcp_tool_definitions: Vec<ToolDefinition>,
44}
45
46/// Public adapter contract for server-backed or durable runtime hosts.
47///
48/// `everruns-runtime` owns shared host orchestration for both embedded and
49/// durable execution. That includes phase execution (`input -> reason -> act`),
50/// lifecycle emission, and the generic turn-strategy decisions used by durable
51/// or custom hosts.
52///
53/// Host crates implement this trait to provide persistence, session-lifecycle
54/// plumbing, event delivery, and their own orchestration backend. The durable
55/// engine itself remains outside this crate.
56#[async_trait]
57pub trait RuntimeHostAdapter: Send + Sync + Clone + 'static {
58    async fn get_agent(
59        &self,
60        org_id: i64,
61        agent_id: AgentId,
62    ) -> everruns_core::error::Result<Option<Agent>>;
63
64    async fn get_harness(
65        &self,
66        org_id: i64,
67        harness_id: HarnessId,
68    ) -> everruns_core::error::Result<Option<Harness>>;
69
70    async fn set_session_status(
71        &self,
72        org_id: i64,
73        session_id: SessionId,
74        status: SessionStatus,
75    ) -> everruns_core::error::Result<Session>;
76
77    async fn load_turn_context(
78        &self,
79        org_id: i64,
80        session_id: SessionId,
81    ) -> everruns_core::error::Result<RuntimeHostTurnContext>;
82
83    fn capability_registry(&self) -> CapabilityRegistry;
84
85    fn driver_registry(&self) -> DriverRegistry;
86
87    fn harness_store(&self, org_id: i64) -> Arc<dyn HarnessStore>;
88
89    fn agent_store(&self, org_id: i64) -> Arc<dyn AgentStore>;
90
91    fn session_store(&self, org_id: i64) -> Arc<dyn SessionStore>;
92
93    fn session_mutator(&self, org_id: i64) -> Arc<dyn SessionMutator>;
94
95    fn provider_store(&self, org_id: i64) -> Arc<dyn ProviderStore>;
96
97    fn message_store(&self) -> Arc<dyn MessageRetriever>;
98
99    fn event_emitter(&self) -> Arc<dyn EventEmitter>;
100
101    fn file_store(&self) -> Arc<dyn SessionFileSystem>;
102
103    fn image_resolver(&self, _org_id: i64) -> Option<Arc<dyn ImageResolver>> {
104        None
105    }
106
107    fn image_artifact_store(&self, _org_id: i64) -> Option<Arc<dyn ImageArtifactStore>> {
108        None
109    }
110
111    fn provider_credential_store(&self, _org_id: i64) -> Option<Arc<dyn ProviderCredentialStore>> {
112        None
113    }
114
115    fn utility_llm_service(&self) -> Option<Arc<dyn UtilityLlmService>> {
116        None
117    }
118
119    fn egress_service(&self) -> Option<Arc<dyn EgressService>> {
120        None
121    }
122
123    fn storage_store(&self) -> Option<Arc<dyn SessionStorageStore>> {
124        None
125    }
126
127    /// Knowledge store backing the `search_knowledge` tool. Default: none.
128    fn knowledge_store(&self) -> Option<Arc<dyn everruns_core::traits::KnowledgeStore>> {
129        None
130    }
131
132    fn connection_resolver(&self) -> Option<Arc<dyn UserConnectionResolver>> {
133        None
134    }
135
136    fn sqldb_store(&self) -> Option<SessionSqlDbStoreRef> {
137        None
138    }
139
140    fn leased_resource_store(&self) -> Option<Arc<dyn LeasedResourceStore>> {
141        None
142    }
143
144    fn session_resource_registry(&self) -> Option<Arc<dyn SessionResourceRegistry>> {
145        None
146    }
147
148    fn session_task_registry(
149        &self,
150    ) -> Option<Arc<dyn everruns_core::session_task::SessionTaskRegistry>> {
151        None
152    }
153
154    fn schedule_store(&self, _org_id: i64) -> Option<Arc<dyn SessionScheduleStore>> {
155        None
156    }
157
158    fn platform_store(
159        &self,
160        _org_id: i64,
161        _session_id: SessionId,
162    ) -> Option<Arc<dyn PlatformStore>> {
163        None
164    }
165
166    /// Get the Knowledge Index search service for the `search_index` tool.
167    /// Org-scoped; returns None when retrieval is not available (e.g. gRPC
168    /// workers without a search RPC, or in-memory test backends).
169    fn knowledge_index_search(&self, _org_id: i64) -> Option<Arc<dyn KnowledgeIndexSearch>> {
170        None
171    }
172
173    fn budget_checker(
174        &self,
175        _org_id: i64,
176        _agent_id: Option<AgentId>,
177    ) -> Option<Arc<dyn BudgetChecker>> {
178        None
179    }
180
181    fn payment_authority(
182        &self,
183        _org_id: i64,
184        _agent_id: Option<AgentId>,
185    ) -> Option<Arc<dyn PaymentAuthority>> {
186        None
187    }
188
189    /// Per-org outbound tool-call rate limiter (TM-TOOL-009).
190    /// Default: `None` (no rate limiting — suitable for in-process / test environments).
191    fn outbound_tool_rate_limiter(
192        &self,
193        _org_id: i64,
194    ) -> Option<Arc<dyn everruns_core::OutboundToolRateLimiter>> {
195        None
196    }
197
198    /// Per-turn durable tool result store for act-activity idempotency (EVE-530).
199    /// Default: `None` (no durable claim/settle — every execution runs tools fresh).
200    fn durable_tool_result_store(&self) -> Option<Arc<dyn everruns_core::DurableToolResultStore>> {
201        None
202    }
203
204    /// Durable subagent spawn handle store for reattach on reclaim (EVE-535).
205    /// Default: `None` (no spawn dedup — dev/test mode or hosts without durable execution).
206    fn subagent_spawn_store(&self) -> Option<Arc<dyn everruns_core::SubagentSpawnStore>> {
207        None
208    }
209
210    /// Stream-liveness heartbeater for the Reason activity (EVE-531).
211    /// Default: `None` (no heartbeats sent — durable workers supply one).
212    fn stream_heartbeater(&self) -> Option<Arc<dyn everruns_core::StreamHeartbeater>> {
213        None
214    }
215
216    /// Partial-stream store for ContinuePartial recovery (EVE-532).
217    /// Default: `None` (no recovery; in-memory and dev hosts use this default).
218    fn partial_stream_store(&self) -> Option<Arc<dyn everruns_core::PartialStreamStore>> {
219        None
220    }
221
222    /// Live, turn-scoped reasoning-effort handle for the given session (EVE-595).
223    ///
224    /// When a host returns a handle, the Reason activity re-reads it on every
225    /// LLM step and the Act activity hands the same instance to each tool's
226    /// `ToolContext`. A tool can then change effort mid-turn and have subsequent
227    /// LLM steps in the same turn observe it. Hosts MUST return the *same*
228    /// handle instance for a session across reason/act activities of one turn.
229    /// Default: `None` (effort is resolved solely from message controls).
230    fn reasoning_effort_handle(
231        &self,
232        _session_id: SessionId,
233    ) -> Option<everruns_core::ReasoningEffortHandle> {
234        None
235    }
236
237    /// Provider stall timeout for the Reason activity (EVE-531).
238    /// Default: `None` (use built-in 120s default).
239    fn provider_stall_timeout(&self) -> Option<std::time::Duration> {
240        None
241    }
242
243    /// MCP executor routing `mcp_*` tool calls for this session, if the host
244    /// configures MCP (specs/runtime-mcp.md D4). Default: `None`, so hosts
245    /// without scoped MCP servers keep the plain tool registry unchanged.
246    async fn mcp_executor(
247        &self,
248        _org_id: i64,
249        _session_id: SessionId,
250    ) -> Option<Arc<everruns_mcp::McpExecutor>> {
251        None
252    }
253}
254
255struct RuntimeExecutionCapabilities {
256    tool_registry: ToolRegistry,
257    post_tool_hooks: Vec<Arc<dyn everruns_core::PostToolExecHook>>,
258    pre_tool_hooks: Vec<Arc<dyn everruns_core::atoms::PreToolUseHook>>,
259    tool_call_hooks: Vec<Arc<dyn everruns_core::ToolCallHook>>,
260}
261
262/// Collect and finalize user-hook specs for a session from its resolved
263/// capability configs, plus the shared bash dispatcher used to run them.
264///
265/// This is the single place hook specs are gathered so every firing point —
266/// the act path (`load_execution_capabilities`) and the lifecycle firing
267/// points (`execute_reason_activity` for `user_prompt_submit`, turn completion
268/// for `turn_end`, and the server session paths) — applies identical
269/// `finalize_hook_specs` semantics: `{capability_id}:` namespace stamping,
270/// stable default ids, and `disabled_contributions` muting (TM-HOOK-004).
271fn finalize_specs_from_configs(
272    resolved_capability_configs: &[everruns_core::capability_types::AgentCapabilityConfig],
273    capability_registry: &CapabilityRegistry,
274) -> Vec<everruns_core::user_hook_types::UserHookSpec> {
275    let mut hook_contributions: Vec<(String, Vec<everruns_core::user_hook_types::UserHookSpec>)> =
276        Vec::new();
277    let mut disabled_contributions: Vec<String> = Vec::new();
278    for config in resolved_capability_configs {
279        let Some(capability) = capability_registry.get(config.capability_id()) else {
280            continue;
281        };
282        let specs = capability.user_hooks_with_config(&config.config);
283        if !specs.is_empty() {
284            hook_contributions.push((config.capability_id().to_string(), specs));
285        }
286        if config.capability_id() == "user_hooks" {
287            disabled_contributions.extend(
288                everruns_core::capabilities::user_hooks::disabled_contributions(&config.config),
289            );
290        }
291    }
292    everruns_core::hook_adapter::finalize_hook_specs(hook_contributions, &disabled_contributions)
293}
294
295/// Resolve a session's capability configs and collect finalized hook specs.
296/// Used by the lifecycle firing points, which need specs outside the act path.
297/// Returns `(specs, dispatcher)`; `specs` is empty when the session has no
298/// hook-contributing capabilities.
299async fn collect_lifecycle_hook_specs<A: RuntimeHostAdapter>(
300    adapter: &A,
301    org_id: i64,
302    session_id: SessionId,
303    harness_id: HarnessId,
304    agent_id: Option<AgentId>,
305) -> everruns_core::error::Result<(
306    Vec<everruns_core::user_hook_types::UserHookSpec>,
307    Arc<dyn everruns_core::hook_executor::BashHookDispatcher>,
308)> {
309    let capability_registry = adapter.capability_registry();
310    let harness_chain = adapter
311        .harness_store(org_id)
312        .get_harness_chain(harness_id)
313        .await?;
314    if harness_chain.is_empty() {
315        return Err(everruns_core::error::AgentLoopError::harness_not_found(
316            harness_id,
317        ));
318    }
319    let session = adapter
320        .session_store(org_id)
321        .get_session(session_id)
322        .await?
323        .ok_or_else(|| everruns_core::error::AgentLoopError::session_not_found(session_id))?;
324    let agent = match agent_id {
325        Some(agent_id) => adapter.agent_store(org_id).get_agent(agent_id).await?,
326        None => None,
327    };
328    let resolved = resolve_runtime_capabilities(
329        &harness_chain,
330        agent.as_ref(),
331        &session,
332        &capability_registry,
333    );
334    let specs =
335        finalize_specs_from_configs(&resolved.resolved_capability_configs, &capability_registry);
336    let dispatcher: Arc<dyn everruns_core::hook_executor::BashHookDispatcher> = Arc::new(
337        everruns_core::hook_dispatch::BashkitShellHookDispatcher::new(adapter.file_store()),
338    );
339    Ok((specs, dispatcher))
340}
341
342async fn load_execution_capabilities<A: RuntimeHostAdapter>(
343    adapter: &A,
344    org_id: i64,
345    session_id: SessionId,
346    harness_id: HarnessId,
347    agent_id: Option<AgentId>,
348    locale: Option<String>,
349    blueprint_id: Option<&str>,
350) -> everruns_core::error::Result<RuntimeExecutionCapabilities> {
351    let capability_registry = adapter.capability_registry();
352    if let Some(blueprint_id) = blueprint_id {
353        let mut registry = ToolRegistry::with_defaults();
354        let blueprint = capability_registry.blueprint(blueprint_id).ok_or_else(|| {
355            everruns_core::error::AgentLoopError::config(format!(
356                "Blueprint \"{blueprint_id}\" not found in registry"
357            ))
358        })?;
359        for tool in blueprint.tools {
360            registry.register_boxed(tool);
361        }
362        return Ok(RuntimeExecutionCapabilities {
363            tool_registry: registry,
364            post_tool_hooks: Vec::new(),
365            pre_tool_hooks: Vec::new(),
366            tool_call_hooks: Vec::new(),
367        });
368    }
369
370    let harness_chain = adapter
371        .harness_store(org_id)
372        .get_harness_chain(harness_id)
373        .await?;
374    if harness_chain.is_empty() {
375        return Err(everruns_core::error::AgentLoopError::harness_not_found(
376            harness_id,
377        ));
378    }
379
380    let session = adapter
381        .session_store(org_id)
382        .get_session(session_id)
383        .await?
384        .ok_or_else(|| everruns_core::error::AgentLoopError::session_not_found(session_id))?;
385
386    let agent_store = adapter.agent_store(org_id);
387    let agent = match agent_id {
388        Some(agent_id) => Some(
389            agent_store
390                .get_agent(agent_id)
391                .await?
392                .ok_or_else(|| everruns_core::error::AgentLoopError::agent_not_found(agent_id))?,
393        ),
394        None => None,
395    };
396
397    let resolved = resolve_runtime_capabilities(
398        &harness_chain,
399        agent.as_ref(),
400        &session,
401        &capability_registry,
402    );
403    // Executor (act) path: this builds the worker-side tool registry, not the
404    // model-visible tool list. The model is left unset, so a model-adaptive
405    // capability like `auto_tool_search` resolves to its provider-agnostic
406    // client-side mechanism here. That registers the `tool_search` tool in the
407    // executor, which is a harmless superset: on native models the reason path
408    // never shows that tool to the model, so it is simply never called.
409    let prompt_ctx = SystemPromptContext {
410        session_id,
411        locale: locale.or(session.locale.clone()),
412        // Pin system-prompt file reads to the session's workspace (the default
413        // 1:1 case is a transparent pass-through).
414        file_store: Some(everruns_core::WorkspaceScopedFileSystem::wrap(
415            adapter.file_store(),
416            session.workspace_id,
417        )),
418        model: None,
419    };
420    let collected = collect_capabilities_with_configs(
421        &resolved.resolved_capability_configs,
422        &capability_registry,
423        &prompt_ctx,
424    )
425    .await;
426
427    let mut registry = ToolRegistry::with_defaults();
428    for tool in collected.tools {
429        registry.register_boxed(tool);
430    }
431
432    // Only `Available` capabilities contribute hooks, matching
433    // `collect_capabilities_with_configs` (which skips non-available
434    // capabilities). This keeps a `ComingSoon`/unavailable capability from
435    // affecting execution via any of its hook seams.
436    let mut post_tool_hooks: Vec<Arc<dyn everruns_core::PostToolExecHook>> = resolved
437        .resolved_capability_configs
438        .iter()
439        .flat_map(|config| {
440            capability_registry
441                .get(config.capability_id())
442                .filter(|capability| capability.status() == CapabilityStatus::Available)
443                .map(|capability| capability.post_tool_exec_hooks_with_config(&config.config))
444                .unwrap_or_default()
445        })
446        .collect();
447    // Tool-output guardrails must inspect the original result before other
448    // capability hooks can persist or compact it into secondary surfaces.
449    post_tool_hooks.sort_by_key(|hook| hook.priority());
450
451    // User-hook contributions (see `specs/user-hooks.md`). `finalize_specs_from_configs`
452    // gathers specs across every resolved capability — both the user-facing
453    // `user_hooks` capability and any capability that bundles hooks — and applies
454    // `finalize_hook_specs` (namespace stamping, stable ids, `disabled_contributions`
455    // muting; TM-HOOK-004). The same helper backs the lifecycle firing points so
456    // every event finalizes specs identically.
457    let user_hook_specs =
458        finalize_specs_from_configs(&resolved.resolved_capability_configs, &capability_registry);
459    // Capability-contributed pre-tool hooks run first (e.g. approval gating),
460    // then user-hook (`PreToolUse`) specs. The first hook to block wins.
461    let mut pre_tool_hooks: Vec<Arc<dyn everruns_core::atoms::PreToolUseHook>> = resolved
462        .resolved_capability_configs
463        .iter()
464        .flat_map(|config| {
465            capability_registry
466                .get(config.capability_id())
467                .filter(|capability| capability.status() == CapabilityStatus::Available)
468                .map(|capability| capability.pre_tool_use_hooks_with_config(&config.config))
469                .unwrap_or_default()
470        })
471        .collect();
472    if !user_hook_specs.is_empty() {
473        let dispatcher: Arc<dyn everruns_core::hook_executor::BashHookDispatcher> = Arc::new(
474            everruns_core::hook_dispatch::BashkitShellHookDispatcher::new(adapter.file_store()),
475        );
476        post_tool_hooks.extend(everruns_core::hook_adapter::build_post_tool_use_hooks(
477            &user_hook_specs,
478            dispatcher.clone(),
479        ));
480        pre_tool_hooks.extend(everruns_core::hook_adapter::build_pre_tool_use_hooks(
481            &user_hook_specs,
482            dispatcher,
483        ));
484    }
485
486    // Use the hook list assembled by `collect_capabilities_with_configs` as the
487    // single source of truth. It already contains every explicit capability
488    // `tool_call_hooks()` followed by the generated `CapabilityNarrationHook`
489    // adapters — one per collected capability plus any auto-activated
490    // cross-cutting capability such as `background_execution`. Re-deriving only
491    // the explicit subset here dropped capability-owned narration, so tools fell
492    // back to generic `Ran {display_name}` lines (EVE-601). Explicit hooks stay
493    // first in this list, so model-authored narration (`human_intent`) keeps its
494    // precedence over default `Tool::narrate()`, and only available capabilities
495    // contributed because collection skips non-available ones.
496    let tool_call_hooks = collected.tool_call_hooks;
497
498    Ok(RuntimeExecutionCapabilities {
499        tool_registry: registry,
500        post_tool_hooks,
501        pre_tool_hooks,
502        tool_call_hooks,
503    })
504}
505
506/// Shared lifecycle helper for runtime-backed hosts.
507pub struct RuntimeSessionLifecycle<A: RuntimeHostAdapter> {
508    adapter: A,
509    org_id: i64,
510    session_id: SessionId,
511}
512
513impl<A: RuntimeHostAdapter> RuntimeSessionLifecycle<A> {
514    pub fn new(adapter: A, org_id: i64, session_id: SessionId) -> Self {
515        Self {
516            adapter,
517            org_id,
518            session_id,
519        }
520    }
521
522    async fn set_session_status(&self, status: SessionStatus, action: &'static str) {
523        if let Err(error) = self
524            .adapter
525            .set_session_status(self.org_id, self.session_id, status)
526            .await
527        {
528            warn!(
529                session_id = %self.session_id,
530                org_id = self.org_id,
531                action,
532                %error,
533                "runtime host lifecycle status update failed"
534            );
535        }
536    }
537
538    async fn emit_event(&self, request: EventRequest) {
539        let event_type = request.event_type.clone();
540        if let Err(error) = self.adapter.event_emitter().emit(request).await {
541            warn!(
542                session_id = %self.session_id,
543                org_id = self.org_id,
544                event_type,
545                %error,
546                "runtime host lifecycle event emission failed"
547            );
548        }
549    }
550
551    pub async fn turn_started(&self, turn_id: TurnId, input_message_id: MessageId) {
552        let input_content = self
553            .adapter
554            .message_store()
555            .get(self.session_id, input_message_id)
556            .await
557            .ok()
558            .flatten()
559            .map(|message| message.content_to_llm_string());
560
561        self.set_session_status(SessionStatus::Active, "turn_started")
562            .await;
563
564        self.emit_event(EventRequest::new(
565            self.session_id,
566            EventContext::turn(turn_id, input_message_id),
567            SessionActivatedData {
568                turn_id,
569                input_message_id,
570            },
571        ))
572        .await;
573
574        self.emit_event(EventRequest::new(
575            self.session_id,
576            EventContext::turn(turn_id, input_message_id),
577            TurnStartedData {
578                turn_id,
579                input_message_id,
580                input_content,
581            },
582        ))
583        .await;
584    }
585
586    pub async fn emit_turn_completed(&self, input_message_id: MessageId, data: TurnCompletedData) {
587        let turn_id = data.turn_id;
588        self.emit_event(EventRequest::new(
589            self.session_id,
590            EventContext::turn(turn_id, input_message_id),
591            data,
592        ))
593        .await;
594    }
595
596    pub async fn emit_session_idled(
597        &self,
598        turn_id: TurnId,
599        input_message_id: MessageId,
600        iterations: Option<u32>,
601        usage: Option<TokenUsage>,
602    ) {
603        self.set_session_status(SessionStatus::Idle, "emit_session_idled")
604            .await;
605
606        self.emit_event(EventRequest::new(
607            self.session_id,
608            EventContext::turn(turn_id, input_message_id),
609            SessionIdledData {
610                turn_id,
611                iterations,
612                usage,
613            },
614        ))
615        .await;
616    }
617
618    pub async fn turn_completed(
619        &self,
620        turn_id: TurnId,
621        input_message_id: MessageId,
622        iterations: u32,
623        usage: Option<TokenUsage>,
624        input_content: Option<String>,
625    ) {
626        self.emit_turn_completed(
627            input_message_id,
628            TurnCompletedData {
629                turn_id,
630                iterations,
631                duration_ms: None,
632                usage: usage.clone(),
633                input_content,
634                final_message_id: None,
635                final_answer_preview: None,
636                time_to_first_token_ms: None,
637                tool_call_count: None,
638                llm_call_count: None,
639                status: Some("completed".to_string()),
640            },
641        )
642        .await;
643        self.emit_session_idled(turn_id, input_message_id, Some(iterations), usage)
644            .await;
645    }
646
647    /// Turn was deliberately sealed (EVE-534): emit `turn.sealed` + a
648    /// user-facing message + `session.idled`, and idle the session.
649    ///
650    /// Distinct from `turn_completed` (success) and `turn_failed` (error). The
651    /// session returns to `idle` so the UI unblocks; the Sealed state is
652    /// observable via the `turn.sealed` event and its `reason`.
653    pub async fn turn_sealed(
654        &self,
655        turn_id: TurnId,
656        input_message_id: MessageId,
657        reason: &str,
658        iterations: u32,
659        usage: Option<TokenUsage>,
660    ) {
661        let context = EventContext::turn(turn_id, input_message_id);
662
663        self.emit_event(EventRequest::new(
664            self.session_id,
665            context.clone(),
666            everruns_core::events::TurnSealedData {
667                turn_id,
668                reason: reason.to_string(),
669                detail: None,
670                iterations: Some(iterations),
671                usage: usage.clone(),
672            },
673        ))
674        .await;
675
676        self.emit_session_idled(turn_id, input_message_id, Some(iterations), usage)
677            .await;
678    }
679
680    /// Fire `turn_end` lifecycle hooks (advisory). Collects the session's hook
681    /// specs and runs every `turn_end` hook; failures are logged, never fatal.
682    /// `harness_id`/`agent_id` are required to resolve the capability chain.
683    pub async fn fire_turn_end_hooks(
684        &self,
685        harness_id: HarnessId,
686        agent_id: Option<AgentId>,
687        turn_id: TurnId,
688        success: bool,
689    ) {
690        let (specs, dispatcher) = match collect_lifecycle_hook_specs(
691            &self.adapter,
692            self.org_id,
693            self.session_id,
694            harness_id,
695            agent_id,
696        )
697        .await
698        {
699            Ok(pair) => pair,
700            Err(error) => {
701                warn!(
702                    session_id = %self.session_id,
703                    %error,
704                    "failed to collect turn_end hook specs; skipping"
705                );
706                return;
707            }
708        };
709        let hooks = everruns_core::lifecycle_hooks::build_turn_lifecycle_hooks(
710            &specs,
711            everruns_core::user_hook_types::HookEvent::TurnEnd,
712            dispatcher,
713        );
714        if hooks.is_empty() {
715            return;
716        }
717        let ctx = everruns_core::lifecycle_hooks::TurnHookContext {
718            session_id: self.session_id,
719            turn_id: Some(turn_id),
720            org_id: org_public_id_from_internal(self.org_id).parse().ok(),
721            agent_id: agent_id.map(|a| a.to_string()),
722        };
723        everruns_core::lifecycle_hooks::run_turn_end_hooks(
724            &hooks,
725            &ctx,
726            serde_json::json!({ "success": success }),
727        )
728        .await;
729    }
730
731    /// Abort a turn because a `user_prompt_submit` hook returned `Block`.
732    /// Reuses the dependency-blocked failure shape: emit a user-facing message
733    /// carrying the hook's `user_message` (or `reason`), then mark the turn
734    /// failed and idle the session.
735    pub async fn user_prompt_blocked(
736        &self,
737        turn_id: TurnId,
738        input_message_id: MessageId,
739        reason: &str,
740        user_message: Option<&str>,
741    ) {
742        let user_error =
743            UserFacingError::new(everruns_core::user_facing_error_codes::BLOCKED_BY_HOOK);
744        let shown = user_message.unwrap_or(reason);
745        let mut error_message = Message::assistant(shown);
746        let mut metadata = std::collections::HashMap::new();
747        user_error.apply_to_message_metadata(&mut metadata);
748        error_message.metadata = Some(metadata);
749
750        self.emit_event(EventRequest::new(
751            self.session_id,
752            EventContext::turn(turn_id, input_message_id),
753            OutputMessageCompletedData::new(error_message).with_user_facing_error(&user_error),
754        ))
755        .await;
756
757        self.turn_failed(turn_id, input_message_id, reason, Some(&user_error))
758            .await;
759    }
760
761    pub async fn turn_failed(
762        &self,
763        turn_id: TurnId,
764        input_message_id: MessageId,
765        error: &str,
766        user_error: Option<&UserFacingError>,
767    ) {
768        self.turn_failed_with_disclosure(turn_id, input_message_id, error, user_error, None)
769            .await;
770    }
771
772    /// `turn_failed` with the applied error-disclosure mode recorded on the
773    /// event. `user_error` (and the `error` text shown alongside it) must
774    /// already be disclosure-filtered by the caller.
775    pub async fn turn_failed_with_disclosure(
776        &self,
777        turn_id: TurnId,
778        input_message_id: MessageId,
779        error: &str,
780        user_error: Option<&UserFacingError>,
781        disclosure: Option<ErrorDisclosure>,
782    ) {
783        self.set_session_status(SessionStatus::Idle, "turn_failed")
784            .await;
785
786        self.emit_event(EventRequest::new(
787            self.session_id,
788            EventContext::turn(turn_id, input_message_id),
789            {
790                let mut data = TurnFailedData {
791                    turn_id,
792                    error: error.to_string(),
793                    error_code: None,
794                    error_fields: None,
795                    error_disclosure: disclosure.map(|mode| mode.as_str().to_string()),
796                };
797                if let Some(user_error) = user_error {
798                    user_error.apply_to_event_fields(&mut data.error_code, &mut data.error_fields);
799                }
800                data
801            },
802        ))
803        .await;
804
805        self.emit_event(EventRequest::new(
806            self.session_id,
807            EventContext::turn(turn_id, input_message_id),
808            SessionIdledData {
809                turn_id,
810                iterations: None,
811                usage: None,
812            },
813        ))
814        .await;
815    }
816
817    pub async fn waiting_for_tool_results(&self) {
818        self.set_session_status(
819            SessionStatus::WaitingForToolResults,
820            "waiting_for_tool_results",
821        )
822        .await;
823    }
824
825    pub async fn dependency_blocked(
826        &self,
827        turn_id: TurnId,
828        input_message_id: MessageId,
829        blocker: DependencyBlocker,
830    ) {
831        let user_error = UserFacingError::new(blocker.error_code())
832            .with_field(
833                "dependency",
834                match blocker {
835                    DependencyBlocker::HarnessArchived | DependencyBlocker::HarnessDeleted => {
836                        "harness"
837                    }
838                    DependencyBlocker::AgentArchived | DependencyBlocker::AgentDeleted => "agent",
839                },
840            )
841            .with_field(
842                "state",
843                match blocker {
844                    DependencyBlocker::HarnessArchived | DependencyBlocker::AgentArchived => {
845                        "archived"
846                    }
847                    DependencyBlocker::HarnessDeleted | DependencyBlocker::AgentDeleted => {
848                        "deleted"
849                    }
850                },
851            );
852        let mut error_message = Message::assistant(blocker.message());
853        let mut metadata = std::collections::HashMap::new();
854        user_error.apply_to_message_metadata(&mut metadata);
855        error_message.metadata = Some(metadata);
856
857        self.emit_event(EventRequest::new(
858            self.session_id,
859            EventContext::turn(turn_id, input_message_id),
860            OutputMessageCompletedData::new(error_message).with_user_facing_error(&user_error),
861        ))
862        .await;
863
864        self.turn_failed(
865            turn_id,
866            input_message_id,
867            blocker.message(),
868            Some(&user_error),
869        )
870        .await;
871    }
872}
873
874pub async fn detect_dependency_blocker<A: RuntimeHostAdapter>(
875    adapter: &A,
876    org_id: i64,
877    harness_id: HarnessId,
878    agent_id: Option<AgentId>,
879) -> everruns_core::error::Result<Option<DependencyBlocker>> {
880    let harness_store = adapter.harness_store(org_id);
881    let agent_store = adapter.agent_store(org_id);
882    everruns_core::detect_dependency_blocker(
883        harness_store.as_ref(),
884        agent_store.as_ref(),
885        harness_id,
886        agent_id,
887    )
888    .await
889}
890
891pub async fn execute_input_activity<A: RuntimeHostAdapter>(
892    adapter: &A,
893    org_id: i64,
894    input: InputAtomInput,
895) -> everruns_core::error::Result<InputAtomResult> {
896    RuntimeSessionLifecycle::new(adapter.clone(), org_id, input.context.session_id)
897        .turn_started(input.context.turn_id, input.context.input_message_id)
898        .await;
899
900    let atom = InputAtom::new(adapter.message_store());
901    atom.execute(input).await
902}
903
904/// Collect `user_prompt_submit` hooks for this turn and run them against the
905/// inbound user message text. Returns `None` when the session has no such
906/// hooks (the common case — no overhead beyond the spec collection, which is
907/// skipped early). Errors loading specs are logged and treated as "no hooks"
908/// so a hook-collection failure never blocks a turn that wasn't asking to be
909/// hooked.
910struct UserPromptHookResult {
911    decision: everruns_core::lifecycle_hooks::UserPromptDecision,
912    original_message: String,
913}
914
915async fn run_user_prompt_submit_for_turn<A: RuntimeHostAdapter>(
916    adapter: &A,
917    org_id: i64,
918    input: &ReasonInput,
919) -> everruns_core::error::Result<Option<UserPromptHookResult>> {
920    let (specs, dispatcher) = match collect_lifecycle_hook_specs(
921        adapter,
922        org_id,
923        input.context.session_id,
924        input.harness_id,
925        input.agent_id,
926    )
927    .await
928    {
929        Ok(pair) => pair,
930        Err(error) => {
931            warn!(
932                session_id = %input.context.session_id,
933                %error,
934                "failed to collect user_prompt_submit hook specs; continuing without them"
935            );
936            return Ok(None);
937        }
938    };
939    let hooks = everruns_core::lifecycle_hooks::build_turn_lifecycle_hooks(
940        &specs,
941        everruns_core::user_hook_types::HookEvent::UserPromptSubmit,
942        dispatcher,
943    );
944    if hooks.is_empty() {
945        return Ok(None);
946    }
947
948    let message_text = adapter
949        .message_store()
950        .get(input.context.session_id, input.context.input_message_id)
951        .await
952        .ok()
953        .flatten()
954        .map(|m| m.content_to_llm_string())
955        .unwrap_or_default();
956
957    let ctx = everruns_core::lifecycle_hooks::TurnHookContext {
958        session_id: input.context.session_id,
959        turn_id: Some(input.context.turn_id),
960        org_id: org_public_id_from_internal(org_id).parse().ok(),
961        agent_id: input.agent_id.map(|a| a.to_string()),
962    };
963    let original_message = message_text.clone();
964    let decision =
965        everruns_core::lifecycle_hooks::run_user_prompt_submit_hooks(&hooks, &ctx, message_text)
966            .await;
967    Ok(Some(UserPromptHookResult {
968        decision,
969        original_message,
970    }))
971}
972
973pub async fn execute_reason_activity<A: RuntimeHostAdapter>(
974    adapter: &A,
975    org_id: i64,
976    input: ReasonInput,
977) -> everruns_core::error::Result<ReasonResult> {
978    if let Some(blocker) =
979        detect_dependency_blocker(adapter, org_id, input.harness_id, input.agent_id).await?
980    {
981        RuntimeSessionLifecycle::new(adapter.clone(), org_id, input.context.session_id)
982            .dependency_blocked(
983                input.context.turn_id,
984                input.context.input_message_id,
985                blocker,
986            )
987            .await;
988        return Ok(ReasonResult {
989            success: false,
990            text: blocker.message().to_string(),
991            tool_calls: vec![],
992            has_tool_calls: false,
993            tool_definitions: vec![],
994            max_iterations: everruns_core::runtime_agent::default_max_iterations(),
995            error: Some("dependency_unavailable".to_string()),
996            user_facing_error: None,
997            error_disclosure: None,
998            usage: None,
999            output_message_id: None,
1000            time_to_first_token_ms: None,
1001            response_id: None,
1002            locale: None,
1003            network_access: None,
1004            parallel_tool_calls: None,
1005        });
1006    }
1007
1008    // user_prompt_submit hook (see `specs/user-hooks.md`). Fires once per turn,
1009    // on the first reason iteration, before the LLM is consulted — the closest
1010    // choke point to "inbound user message accepted, before reason" that both
1011    // the in-process loop and the durable worker share. A `Block` aborts the
1012    // turn by reusing the same failure path as `dependency_blocked`: emit a
1013    // user-facing message + turn.failed, idle the session, and return a
1014    // non-success `ReasonResult` so no LLM/act work runs.
1015    let mut user_prompt_message_override = None;
1016    if input.iteration <= 1
1017        && let Some(hook_result) = run_user_prompt_submit_for_turn(adapter, org_id, &input).await?
1018    {
1019        match hook_result.decision {
1020            everruns_core::lifecycle_hooks::UserPromptDecision::Block {
1021                reason,
1022                user_message,
1023            } => {
1024                RuntimeSessionLifecycle::new(adapter.clone(), org_id, input.context.session_id)
1025                    .user_prompt_blocked(
1026                        input.context.turn_id,
1027                        input.context.input_message_id,
1028                        &reason,
1029                        user_message.as_deref(),
1030                    )
1031                    .await;
1032                return Ok(ReasonResult {
1033                    success: false,
1034                    text: user_message.unwrap_or_else(|| reason.clone()),
1035                    tool_calls: vec![],
1036                    has_tool_calls: false,
1037                    tool_definitions: vec![],
1038                    max_iterations: everruns_core::runtime_agent::default_max_iterations(),
1039                    error: Some("blocked_by_user_prompt_hook".to_string()),
1040                    user_facing_error: None,
1041                    error_disclosure: None,
1042                    usage: None,
1043                    output_message_id: None,
1044                    time_to_first_token_ms: None,
1045                    response_id: None,
1046                    locale: None,
1047                    network_access: None,
1048                    parallel_tool_calls: None,
1049                });
1050            }
1051            everruns_core::lifecycle_hooks::UserPromptDecision::Continue { message } => {
1052                if message != hook_result.original_message {
1053                    user_prompt_message_override = Some(message);
1054                }
1055            }
1056        }
1057    }
1058
1059    let turn_context = adapter
1060        .load_turn_context(org_id, input.context.session_id)
1061        .await?;
1062
1063    let mut atom = ReasonAtom::new(
1064        adapter.harness_store(org_id),
1065        adapter.agent_store(org_id),
1066        adapter.session_store(org_id),
1067        adapter.message_store(),
1068        adapter.provider_store(org_id),
1069        adapter.capability_registry(),
1070        adapter.driver_registry(),
1071        adapter.event_emitter(),
1072    )
1073    .with_file_store(adapter.file_store());
1074    if let Some(image_resolver) = adapter.image_resolver(org_id) {
1075        atom = atom.with_image_resolver(image_resolver);
1076    }
1077    if let Some(hb) = adapter.stream_heartbeater() {
1078        atom = atom.with_stream_heartbeater(hb);
1079    }
1080    if let Some(timeout) = adapter.provider_stall_timeout() {
1081        atom = atom.with_provider_stall_timeout(timeout);
1082    }
1083    if let Some(store) = adapter.partial_stream_store() {
1084        atom = atom.with_partial_stream_store(store);
1085    }
1086    if let Some(store) = adapter.durable_tool_result_store() {
1087        atom = atom.with_durable_tool_result_store(store);
1088    }
1089    if let Some(handle) = adapter.reasoning_effort_handle(input.context.session_id) {
1090        atom = atom.with_reasoning_effort_handle(handle);
1091    }
1092
1093    let input = ReasonInput {
1094        mcp_tool_definitions: turn_context.mcp_tool_definitions,
1095        ..input
1096    };
1097
1098    if let Some(message_override) = user_prompt_message_override {
1099        let mut assembled = assemble_turn_context(
1100            adapter.harness_store(org_id).as_ref(),
1101            adapter.agent_store(org_id).as_ref(),
1102            adapter.session_store(org_id).as_ref(),
1103            adapter.message_store().as_ref(),
1104            adapter.provider_store(org_id).as_ref(),
1105            &adapter.capability_registry(),
1106            input.context.session_id,
1107            input.harness_id,
1108            input.agent_id,
1109            &input.mcp_tool_definitions,
1110            Some(adapter.file_store()),
1111        )
1112        .await?;
1113
1114        let message = assembled
1115            .messages
1116            .iter_mut()
1117            .find(|message| message.id == input.context.input_message_id)
1118            .ok_or_else(|| {
1119                everruns_core::error::AgentLoopError::config(
1120                    "user_prompt_submit mutation: input message not found in assembled context",
1121                )
1122            })?;
1123
1124        // user_prompt_submit mutations are enforcement controls for the
1125        // provider-bound prompt. Apply them to the assembled context only
1126        // so persisted user history remains an audit record of the input.
1127        // Preserve non-text parts (images, files); replace only text parts.
1128        message
1129            .content
1130            .retain(|part| !matches!(part, ContentPart::Text(_)));
1131        message
1132            .content
1133            .insert(0, ContentPart::text(message_override));
1134
1135        return atom.execute_with_assembled_context(input, assembled).await;
1136    }
1137
1138    atom.execute(input).await
1139}
1140
1141pub async fn execute_act_activity<A: RuntimeHostAdapter>(
1142    adapter: &A,
1143    input: ActInput,
1144) -> everruns_core::error::Result<ActResult> {
1145    let org_id = input.org_id.ok_or_else(|| {
1146        everruns_core::error::AgentLoopError::config(
1147            "ActInput.org_id must be set for runtime host execution",
1148        )
1149    })?;
1150
1151    if let Some(blocker) =
1152        detect_dependency_blocker(adapter, org_id, input.harness_id, input.agent_id).await?
1153    {
1154        RuntimeSessionLifecycle::new(adapter.clone(), org_id, input.context.session_id)
1155            .dependency_blocked(
1156                input.context.turn_id,
1157                input.context.input_message_id,
1158                blocker,
1159            )
1160            .await;
1161        return Ok(ActResult {
1162            results: vec![],
1163            completed: true,
1164            success_count: 0,
1165            error_count: 1,
1166            waiting_for_tool_results: false,
1167            blocked: true,
1168            client_tool_calls: vec![],
1169            client_tool_definitions: vec![],
1170        });
1171    }
1172
1173    let execution_capabilities = load_execution_capabilities(
1174        adapter,
1175        org_id,
1176        input.context.session_id,
1177        input.harness_id,
1178        input.agent_id,
1179        input.locale.clone(),
1180        input.blueprint_id.as_deref(),
1181    )
1182    .await?;
1183    let mut tool_registry = execution_capabilities.tool_registry;
1184
1185    // Register the session's MCP tools as first-class registry tools, so they
1186    // execute through the regular `ToolExecutor` path and are visible to
1187    // everything that introspects the registry (spawn_background, tool_search,
1188    // openai_tool_search namespaces, ...). The turn's tool definitions already
1189    // include the discovered MCP tools, so no re-discovery is needed; the host's
1190    // MCP executor supplies execution (specs/runtime-mcp.md D5).
1191    // The MCP invoker is reused below for the guardrails `mcp` check, which
1192    // delegates a guardrail decision to an external endpoint over the same
1193    // scoped-MCP client/auth (specs/guardrails.md).
1194    let mut mcp_invoker: Option<Arc<dyn everruns_core::McpToolInvoker>> = None;
1195    if let Some(mcp) = adapter.mcp_executor(org_id, input.context.session_id).await {
1196        let invoker: Arc<dyn everruns_core::McpToolInvoker> = mcp;
1197        for tool in everruns_core::build_mcp_proxy_tools(&input.tool_definitions, invoker.clone()) {
1198            tool_registry.register_boxed(tool);
1199        }
1200        mcp_invoker = Some(Arc::new(everruns_core::ScopedMcpToolInvoker::new(
1201            &input.tool_definitions,
1202            invoker,
1203        )));
1204    }
1205
1206    let builtin_tool_registry = Arc::new(tool_registry.clone());
1207    let executor: Arc<dyn everruns_core::traits::ToolExecutor> = Arc::new(tool_registry);
1208
1209    let mut atom =
1210        ActAtom::with_file_store(executor, adapter.event_emitter(), adapter.file_store())
1211            .with_session_store(adapter.session_store(org_id))
1212            .with_session_mutator(adapter.session_mutator(org_id))
1213            .with_agent_store(adapter.agent_store(org_id))
1214            .with_tool_registry(builtin_tool_registry)
1215            .with_org_id(
1216                org_public_id_from_internal(org_id)
1217                    .parse()
1218                    .expect("internal org id converts to valid public org id"),
1219            )
1220            .with_capability_registry(adapter.capability_registry())
1221            .with_post_tool_hooks(execution_capabilities.post_tool_hooks)
1222            .with_pre_tool_hooks(execution_capabilities.pre_tool_hooks)
1223            .with_tool_call_hooks(execution_capabilities.tool_call_hooks);
1224
1225    if let Some(storage_store) = adapter.storage_store() {
1226        atom = atom.with_storage_store(storage_store);
1227    }
1228    if let Some(knowledge_store) = adapter.knowledge_store() {
1229        atom = atom.with_knowledge_store(knowledge_store);
1230    }
1231    if let Some(image_store) = adapter.image_artifact_store(org_id) {
1232        atom = atom.with_image_store(image_store);
1233    }
1234    if let Some(provider_credential_store) = adapter.provider_credential_store(org_id) {
1235        atom = atom.with_provider_credential_store(provider_credential_store);
1236    }
1237    if let Some(utility_llm_service) = adapter.utility_llm_service() {
1238        atom = atom.with_utility_llm_service(utility_llm_service);
1239    }
1240    if let Some(invoker) = mcp_invoker {
1241        atom = atom.with_mcp_invoker(invoker);
1242    }
1243    if let Some(egress_service) = adapter.egress_service() {
1244        atom = atom.with_egress_service(egress_service);
1245    }
1246    if let Some(connection_resolver) = adapter.connection_resolver() {
1247        atom = atom.with_connection_resolver(connection_resolver);
1248    }
1249    if let Some(sqldb_store) = adapter.sqldb_store() {
1250        atom = atom.with_sqldb_store(sqldb_store);
1251    }
1252    if let Some(leased_resource_store) = adapter.leased_resource_store() {
1253        atom = atom.with_leased_resource_store(leased_resource_store);
1254    }
1255    if let Some(registry) = adapter.session_resource_registry() {
1256        atom = atom.with_session_resource_registry(registry);
1257    }
1258    if let Some(registry) = adapter.session_task_registry() {
1259        atom = atom.with_session_task_registry(registry);
1260    }
1261    if let Some(schedule_store) = adapter.schedule_store(org_id) {
1262        atom = atom.with_schedule_store(schedule_store);
1263    }
1264    if let Some(platform_store) = adapter.platform_store(org_id, input.context.session_id) {
1265        atom = atom.with_platform_store(platform_store);
1266    }
1267    if let Some(knowledge_index_search) = adapter.knowledge_index_search(org_id) {
1268        atom = atom.with_knowledge_index_search(knowledge_index_search);
1269    }
1270    if let Some(budget_checker) = adapter.budget_checker(org_id, input.agent_id) {
1271        atom = atom.with_budget_checker(budget_checker);
1272    }
1273    if let Some(payment_authority) = adapter.payment_authority(org_id, input.agent_id) {
1274        atom = atom.with_payment_authority(payment_authority);
1275    }
1276    if let Some(limiter) = adapter.outbound_tool_rate_limiter(org_id) {
1277        atom = atom.with_outbound_tool_rate_limiter(limiter);
1278    }
1279    if let Some(store) = adapter.durable_tool_result_store() {
1280        atom = atom.with_durable_tool_result_store(store);
1281    }
1282    if let Some(store) = adapter.subagent_spawn_store() {
1283        atom = atom.with_subagent_spawn_store(store);
1284    }
1285    if let Some(handle) = adapter.reasoning_effort_handle(input.context.session_id) {
1286        atom = atom.with_reasoning_effort_handle(handle);
1287    }
1288
1289    atom.execute(input).await
1290}