Skip to main content

everruns_runtime/
host.rs

1// Shared host orchestration for embedded and durable execution hosts.
2// Decision: everruns-runtime owns worker-facing turn phase execution so
3// durable/server-backed hosts reuse the same input/reason/act wiring.
4
5use async_trait::async_trait;
6use everruns_core::atoms::{
7    ActAtom, ActInput, ActResult, Atom, InputAtom, InputAtomInput, InputAtomResult, ReasonAtom,
8    ReasonInput, ReasonResult,
9};
10use everruns_core::capabilities::{SystemPromptContext, collect_capabilities_with_configs};
11use everruns_core::events::{
12    EventContext, EventRequest, OutputMessageCompletedData, SessionActivatedData, SessionIdledData,
13    TurnCompletedData, TurnFailedData, TurnStartedData,
14};
15use everruns_core::message::{ContentPart, Message};
16use everruns_core::message_retriever::MessageRetriever;
17use everruns_core::platform_store::PlatformStore;
18use everruns_core::session::SessionStatus;
19use everruns_core::traits::{
20    AgentStore, BudgetChecker, EventEmitter, HarnessStore, ImageArtifactStore, ImageResolver,
21    LeasedResourceStore, PaymentAuthority, ProviderCredentialStore, ProviderStore, ResolvedModel,
22    SessionFileSystem, SessionMutator, SessionResourceRegistry, SessionScheduleStore,
23    SessionSqlDbStoreRef, SessionStorageStore, SessionStore, UserConnectionResolver,
24};
25use everruns_core::typed_id::{AgentId, HarnessId, MessageId, SessionId, TurnId};
26use everruns_core::vector_store::KnowledgeIndexSearch;
27use everruns_core::{
28    Agent, CapabilityRegistry, CapabilityStatus, DependencyBlocker, DriverRegistry, EgressService,
29    ErrorDisclosure, Harness, Session, TokenUsage, ToolDefinition, ToolRegistry, UserFacingError,
30    UtilityLlmService, assemble_turn_context, org_public_id_from_internal,
31    resolve_runtime_capabilities,
32};
33use std::sync::Arc;
34use tracing::warn;
35
36/// Turn context loaded in one batched call for runtime host execution.
37#[derive(Debug, Clone)]
38pub struct RuntimeHostTurnContext {
39    pub agent: Option<Agent>,
40    pub session: Session,
41    pub messages: Vec<Message>,
42    pub model: Option<ResolvedModel>,
43    pub mcp_tool_definitions: Vec<ToolDefinition>,
44}
45
46/// Public adapter contract for server-backed or durable runtime hosts.
47///
48/// `everruns-runtime` owns shared host orchestration for both embedded and
49/// durable execution. That includes phase execution (`input -> reason -> act`),
50/// lifecycle emission, and the generic turn-strategy decisions used by durable
51/// or custom hosts.
52///
53/// Host crates implement this trait to provide persistence, session-lifecycle
54/// plumbing, event delivery, and their own orchestration backend. The durable
55/// engine itself remains outside this crate.
56#[async_trait]
57pub trait RuntimeHostAdapter: Send + Sync + Clone + 'static {
58    async fn get_agent(
59        &self,
60        org_id: i64,
61        agent_id: AgentId,
62    ) -> everruns_core::error::Result<Option<Agent>>;
63
64    async fn get_harness(
65        &self,
66        org_id: i64,
67        harness_id: HarnessId,
68    ) -> everruns_core::error::Result<Option<Harness>>;
69
70    async fn set_session_status(
71        &self,
72        org_id: i64,
73        session_id: SessionId,
74        status: SessionStatus,
75    ) -> everruns_core::error::Result<Session>;
76
77    async fn load_turn_context(
78        &self,
79        org_id: i64,
80        session_id: SessionId,
81    ) -> everruns_core::error::Result<RuntimeHostTurnContext>;
82
83    fn capability_registry(&self) -> CapabilityRegistry;
84
85    fn driver_registry(&self) -> DriverRegistry;
86
87    fn harness_store(&self, org_id: i64) -> Arc<dyn HarnessStore>;
88
89    fn agent_store(&self, org_id: i64) -> Arc<dyn AgentStore>;
90
91    fn session_store(&self, org_id: i64) -> Arc<dyn SessionStore>;
92
93    fn session_mutator(&self, org_id: i64) -> Arc<dyn SessionMutator>;
94
95    fn provider_store(&self, org_id: i64) -> Arc<dyn ProviderStore>;
96
97    fn message_store(&self) -> Arc<dyn MessageRetriever>;
98
99    fn event_emitter(&self) -> Arc<dyn EventEmitter>;
100
101    fn file_store(&self) -> Arc<dyn SessionFileSystem>;
102
103    fn image_resolver(&self, _org_id: i64) -> Option<Arc<dyn ImageResolver>> {
104        None
105    }
106
107    fn image_artifact_store(&self, _org_id: i64) -> Option<Arc<dyn ImageArtifactStore>> {
108        None
109    }
110
111    fn provider_credential_store(&self, _org_id: i64) -> Option<Arc<dyn ProviderCredentialStore>> {
112        None
113    }
114
115    fn utility_llm_service(&self) -> Option<Arc<dyn UtilityLlmService>> {
116        None
117    }
118
119    fn egress_service(&self) -> Option<Arc<dyn EgressService>> {
120        None
121    }
122
123    fn storage_store(&self) -> Option<Arc<dyn SessionStorageStore>> {
124        None
125    }
126
127    /// Knowledge store backing the `search_knowledge` tool. Default: none.
128    fn knowledge_store(&self) -> Option<Arc<dyn everruns_core::traits::KnowledgeStore>> {
129        None
130    }
131
132    fn connection_resolver(&self) -> Option<Arc<dyn UserConnectionResolver>> {
133        None
134    }
135
136    fn sqldb_store(&self) -> Option<SessionSqlDbStoreRef> {
137        None
138    }
139
140    fn leased_resource_store(&self) -> Option<Arc<dyn LeasedResourceStore>> {
141        None
142    }
143
144    fn session_resource_registry(&self) -> Option<Arc<dyn SessionResourceRegistry>> {
145        None
146    }
147
148    fn session_task_registry(
149        &self,
150    ) -> Option<Arc<dyn everruns_core::session_task::SessionTaskRegistry>> {
151        None
152    }
153
154    fn schedule_store(&self, _org_id: i64) -> Option<Arc<dyn SessionScheduleStore>> {
155        None
156    }
157
158    fn platform_store(
159        &self,
160        _org_id: i64,
161        _session_id: SessionId,
162    ) -> Option<Arc<dyn PlatformStore>> {
163        None
164    }
165
166    /// Get the Knowledge Index search service for the `search_index` tool.
167    /// Org-scoped; returns None when retrieval is not available (e.g. gRPC
168    /// workers without a search RPC, or in-memory test backends).
169    fn knowledge_index_search(&self, _org_id: i64) -> Option<Arc<dyn KnowledgeIndexSearch>> {
170        None
171    }
172
173    fn budget_checker(
174        &self,
175        _org_id: i64,
176        _agent_id: Option<AgentId>,
177    ) -> Option<Arc<dyn BudgetChecker>> {
178        None
179    }
180
181    fn payment_authority(
182        &self,
183        _org_id: i64,
184        _agent_id: Option<AgentId>,
185    ) -> Option<Arc<dyn PaymentAuthority>> {
186        None
187    }
188
189    /// Per-org outbound tool-call rate limiter (TM-TOOL-009).
190    /// Default: `None` (no rate limiting — suitable for in-process / test environments).
191    fn outbound_tool_rate_limiter(
192        &self,
193        _org_id: i64,
194    ) -> Option<Arc<dyn everruns_core::OutboundToolRateLimiter>> {
195        None
196    }
197
198    /// Per-turn durable tool result store for act-activity idempotency (EVE-530).
199    /// Default: `None` (no durable claim/settle — every execution runs tools fresh).
200    fn durable_tool_result_store(&self) -> Option<Arc<dyn everruns_core::DurableToolResultStore>> {
201        None
202    }
203
204    /// Durable subagent spawn handle store for reattach on reclaim (EVE-535).
205    /// Default: `None` (no spawn dedup — dev/test mode or hosts without durable execution).
206    fn subagent_spawn_store(&self) -> Option<Arc<dyn everruns_core::SubagentSpawnStore>> {
207        None
208    }
209
210    /// Stream-liveness heartbeater for the Reason activity (EVE-531).
211    /// Default: `None` (no heartbeats sent — durable workers supply one).
212    fn stream_heartbeater(&self) -> Option<Arc<dyn everruns_core::StreamHeartbeater>> {
213        None
214    }
215
216    /// Partial-stream store for ContinuePartial recovery (EVE-532).
217    /// Default: `None` (no recovery; in-memory and dev hosts use this default).
218    fn partial_stream_store(&self) -> Option<Arc<dyn everruns_core::PartialStreamStore>> {
219        None
220    }
221
222    /// Live, turn-scoped reasoning-effort handle for the given session (EVE-595).
223    ///
224    /// When a host returns a handle, the Reason activity re-reads it on every
225    /// LLM step and the Act activity hands the same instance to each tool's
226    /// `ToolContext`. A tool can then change effort mid-turn and have subsequent
227    /// LLM steps in the same turn observe it. Hosts MUST return the *same*
228    /// handle instance for a session across reason/act activities of one turn.
229    /// Default: `None` (effort is resolved solely from message controls).
230    fn reasoning_effort_handle(
231        &self,
232        _session_id: SessionId,
233    ) -> Option<everruns_core::ReasoningEffortHandle> {
234        None
235    }
236
237    /// Provider stall timeout for the Reason activity (EVE-531).
238    /// Default: `None` (use built-in 120s default).
239    fn provider_stall_timeout(&self) -> Option<std::time::Duration> {
240        None
241    }
242
243    /// MCP executor routing `mcp_*` tool calls for this session, if the host
244    /// configures MCP (specs/runtime-mcp.md D4). Default: `None`, so hosts
245    /// without scoped MCP servers keep the plain tool registry unchanged.
246    async fn mcp_executor(
247        &self,
248        _org_id: i64,
249        _session_id: SessionId,
250    ) -> Option<Arc<everruns_mcp::McpExecutor>> {
251        None
252    }
253}
254
255struct RuntimeExecutionCapabilities {
256    tool_registry: ToolRegistry,
257    post_tool_hooks: Vec<Arc<dyn everruns_core::PostToolExecHook>>,
258    pre_tool_hooks: Vec<Arc<dyn everruns_core::atoms::PreToolUseHook>>,
259    tool_call_hooks: Vec<Arc<dyn everruns_core::ToolCallHook>>,
260}
261
262/// Collect and finalize user-hook specs for a session from its resolved
263/// capability configs, plus the shared bash dispatcher used to run them.
264///
265/// This is the single place hook specs are gathered so every firing point —
266/// the act path (`load_execution_capabilities`) and the lifecycle firing
267/// points (`execute_reason_activity` for `user_prompt_submit`, turn completion
268/// for `turn_end`, and the server session paths) — applies identical
269/// `finalize_hook_specs` semantics: `{capability_id}:` namespace stamping,
270/// stable default ids, and `disabled_contributions` muting (TM-HOOK-004).
271fn finalize_specs_from_configs(
272    resolved_capability_configs: &[everruns_core::capability_types::AgentCapabilityConfig],
273    capability_registry: &CapabilityRegistry,
274) -> Vec<everruns_core::user_hook_types::UserHookSpec> {
275    let mut hook_contributions: Vec<(String, Vec<everruns_core::user_hook_types::UserHookSpec>)> =
276        Vec::new();
277    let mut disabled_contributions: Vec<String> = Vec::new();
278    for config in resolved_capability_configs {
279        let Some(capability) = capability_registry.get(config.capability_id()) else {
280            continue;
281        };
282        let specs = capability.user_hooks_with_config(&config.config);
283        if !specs.is_empty() {
284            hook_contributions.push((config.capability_id().to_string(), specs));
285        }
286        if config.capability_id() == "user_hooks" {
287            disabled_contributions.extend(
288                everruns_core::capabilities::user_hooks::disabled_contributions(&config.config),
289            );
290        }
291    }
292    everruns_core::hook_adapter::finalize_hook_specs(hook_contributions, &disabled_contributions)
293}
294
295/// Resolve a session's capability configs and collect finalized hook specs.
296/// Used by the lifecycle firing points, which need specs outside the act path.
297/// Returns `(specs, dispatcher)`; `specs` is empty when the session has no
298/// hook-contributing capabilities.
299async fn collect_lifecycle_hook_specs<A: RuntimeHostAdapter>(
300    adapter: &A,
301    org_id: i64,
302    session_id: SessionId,
303    harness_id: HarnessId,
304    agent_id: Option<AgentId>,
305) -> everruns_core::error::Result<(
306    Vec<everruns_core::user_hook_types::UserHookSpec>,
307    Arc<dyn everruns_core::hook_executor::BashHookDispatcher>,
308)> {
309    let capability_registry = adapter.capability_registry();
310    let harness_chain = adapter
311        .harness_store(org_id)
312        .get_harness_chain(harness_id)
313        .await?;
314    if harness_chain.is_empty() {
315        return Err(everruns_core::error::AgentLoopError::harness_not_found(
316            harness_id,
317        ));
318    }
319    let session = adapter
320        .session_store(org_id)
321        .get_session(session_id)
322        .await?
323        .ok_or_else(|| everruns_core::error::AgentLoopError::session_not_found(session_id))?;
324    let agent = match agent_id {
325        Some(agent_id) => adapter.agent_store(org_id).get_agent(agent_id).await?,
326        None => None,
327    };
328    let resolved = resolve_runtime_capabilities(
329        &harness_chain,
330        agent.as_ref(),
331        &session,
332        &capability_registry,
333    );
334    let specs =
335        finalize_specs_from_configs(&resolved.resolved_capability_configs, &capability_registry);
336    let dispatcher: Arc<dyn everruns_core::hook_executor::BashHookDispatcher> = Arc::new(
337        everruns_core::hook_dispatch::BashkitShellHookDispatcher::new(adapter.file_store()),
338    );
339    Ok((specs, dispatcher))
340}
341
342async fn load_execution_capabilities<A: RuntimeHostAdapter>(
343    adapter: &A,
344    org_id: i64,
345    session_id: SessionId,
346    harness_id: HarnessId,
347    agent_id: Option<AgentId>,
348    locale: Option<String>,
349    blueprint_id: Option<&str>,
350) -> everruns_core::error::Result<RuntimeExecutionCapabilities> {
351    let capability_registry = adapter.capability_registry();
352    if let Some(blueprint_id) = blueprint_id {
353        let mut registry = ToolRegistry::with_defaults();
354        let blueprint = capability_registry.blueprint(blueprint_id).ok_or_else(|| {
355            everruns_core::error::AgentLoopError::config(format!(
356                "Blueprint \"{blueprint_id}\" not found in registry"
357            ))
358        })?;
359        for tool in blueprint.tools {
360            registry.register_boxed(tool);
361        }
362        return Ok(RuntimeExecutionCapabilities {
363            tool_registry: registry,
364            post_tool_hooks: Vec::new(),
365            pre_tool_hooks: Vec::new(),
366            tool_call_hooks: Vec::new(),
367        });
368    }
369
370    let harness_chain = adapter
371        .harness_store(org_id)
372        .get_harness_chain(harness_id)
373        .await?;
374    if harness_chain.is_empty() {
375        return Err(everruns_core::error::AgentLoopError::harness_not_found(
376            harness_id,
377        ));
378    }
379
380    let session = adapter
381        .session_store(org_id)
382        .get_session(session_id)
383        .await?
384        .ok_or_else(|| everruns_core::error::AgentLoopError::session_not_found(session_id))?;
385
386    let agent_store = adapter.agent_store(org_id);
387    let agent = match agent_id {
388        Some(agent_id) => Some(
389            agent_store
390                .get_agent(agent_id)
391                .await?
392                .ok_or_else(|| everruns_core::error::AgentLoopError::agent_not_found(agent_id))?,
393        ),
394        None => None,
395    };
396
397    let resolved = resolve_runtime_capabilities(
398        &harness_chain,
399        agent.as_ref(),
400        &session,
401        &capability_registry,
402    );
403    // Executor (act) path: this builds the worker-side tool registry, not the
404    // model-visible tool list. The model is left unset, so a model-adaptive
405    // capability like `auto_tool_search` resolves to its provider-agnostic
406    // client-side mechanism here. That registers the `tool_search` tool in the
407    // executor, which is a harmless superset: on native models the reason path
408    // never shows that tool to the model, so it is simply never called.
409    let prompt_ctx = SystemPromptContext {
410        session_id,
411        locale: locale.or(session.locale.clone()),
412        // Pin system-prompt file reads to the session's workspace (the default
413        // 1:1 case is a transparent pass-through), then resolve through the
414        // mount resolver (EVE-660): `/workspace` is a mount + cwd.
415        file_store: Some(everruns_core::MountFs::wrap(
416            everruns_core::WorkspaceScopedFileSystem::wrap(
417                adapter.file_store(),
418                session.workspace_id,
419            ),
420        )),
421        model: None,
422    };
423    let collected = collect_capabilities_with_configs(
424        &resolved.resolved_capability_configs,
425        &capability_registry,
426        &prompt_ctx,
427    )
428    .await;
429
430    let mut registry = ToolRegistry::with_defaults();
431    for tool in collected.tools {
432        registry.register_boxed(tool);
433    }
434
435    // Only `Available` capabilities contribute hooks, matching
436    // `collect_capabilities_with_configs` (which skips non-available
437    // capabilities). This keeps a `ComingSoon`/unavailable capability from
438    // affecting execution via any of its hook seams.
439    let mut post_tool_hooks: Vec<Arc<dyn everruns_core::PostToolExecHook>> = resolved
440        .resolved_capability_configs
441        .iter()
442        .flat_map(|config| {
443            capability_registry
444                .get(config.capability_id())
445                .filter(|capability| capability.status() == CapabilityStatus::Available)
446                .map(|capability| capability.post_tool_exec_hooks_with_config(&config.config))
447                .unwrap_or_default()
448        })
449        .collect();
450    // Tool-output guardrails must inspect the original result before other
451    // capability hooks can persist or compact it into secondary surfaces.
452    post_tool_hooks.sort_by_key(|hook| hook.priority());
453
454    // User-hook contributions (see `specs/user-hooks.md`). `finalize_specs_from_configs`
455    // gathers specs across every resolved capability — both the user-facing
456    // `user_hooks` capability and any capability that bundles hooks — and applies
457    // `finalize_hook_specs` (namespace stamping, stable ids, `disabled_contributions`
458    // muting; TM-HOOK-004). The same helper backs the lifecycle firing points so
459    // every event finalizes specs identically.
460    let user_hook_specs =
461        finalize_specs_from_configs(&resolved.resolved_capability_configs, &capability_registry);
462    // Capability-contributed pre-tool hooks run first (e.g. approval gating),
463    // then user-hook (`PreToolUse`) specs. The first hook to block wins.
464    let mut pre_tool_hooks: Vec<Arc<dyn everruns_core::atoms::PreToolUseHook>> = resolved
465        .resolved_capability_configs
466        .iter()
467        .flat_map(|config| {
468            capability_registry
469                .get(config.capability_id())
470                .filter(|capability| capability.status() == CapabilityStatus::Available)
471                .map(|capability| capability.pre_tool_use_hooks_with_config(&config.config))
472                .unwrap_or_default()
473        })
474        .collect();
475    if !user_hook_specs.is_empty() {
476        let dispatcher: Arc<dyn everruns_core::hook_executor::BashHookDispatcher> = Arc::new(
477            everruns_core::hook_dispatch::BashkitShellHookDispatcher::new(adapter.file_store()),
478        );
479        post_tool_hooks.extend(everruns_core::hook_adapter::build_post_tool_use_hooks(
480            &user_hook_specs,
481            dispatcher.clone(),
482        ));
483        pre_tool_hooks.extend(everruns_core::hook_adapter::build_pre_tool_use_hooks(
484            &user_hook_specs,
485            dispatcher,
486        ));
487    }
488
489    // Use the hook list assembled by `collect_capabilities_with_configs` as the
490    // single source of truth. It already contains every explicit capability
491    // `tool_call_hooks()` followed by the generated `CapabilityNarrationHook`
492    // adapters — one per collected capability plus any auto-activated
493    // cross-cutting capability such as `background_execution`. Re-deriving only
494    // the explicit subset here dropped capability-owned narration, so tools fell
495    // back to generic `Ran {display_name}` lines (EVE-601). Explicit hooks stay
496    // first in this list, so model-authored narration (`human_intent`) keeps its
497    // precedence over default `Tool::narrate()`, and only available capabilities
498    // contributed because collection skips non-available ones.
499    let tool_call_hooks = collected.tool_call_hooks;
500
501    Ok(RuntimeExecutionCapabilities {
502        tool_registry: registry,
503        post_tool_hooks,
504        pre_tool_hooks,
505        tool_call_hooks,
506    })
507}
508
509/// Shared lifecycle helper for runtime-backed hosts.
510pub struct RuntimeSessionLifecycle<A: RuntimeHostAdapter> {
511    adapter: A,
512    org_id: i64,
513    session_id: SessionId,
514}
515
516impl<A: RuntimeHostAdapter> RuntimeSessionLifecycle<A> {
517    pub fn new(adapter: A, org_id: i64, session_id: SessionId) -> Self {
518        Self {
519            adapter,
520            org_id,
521            session_id,
522        }
523    }
524
525    async fn set_session_status(&self, status: SessionStatus, action: &'static str) {
526        if let Err(error) = self
527            .adapter
528            .set_session_status(self.org_id, self.session_id, status)
529            .await
530        {
531            warn!(
532                session_id = %self.session_id,
533                org_id = self.org_id,
534                action,
535                %error,
536                "runtime host lifecycle status update failed"
537            );
538        }
539    }
540
541    async fn emit_event(&self, request: EventRequest) {
542        let event_type = request.event_type.clone();
543        if let Err(error) = self.adapter.event_emitter().emit(request).await {
544            warn!(
545                session_id = %self.session_id,
546                org_id = self.org_id,
547                event_type,
548                %error,
549                "runtime host lifecycle event emission failed"
550            );
551        }
552    }
553
554    pub async fn turn_started(&self, turn_id: TurnId, input_message_id: MessageId) {
555        let input_content = self
556            .adapter
557            .message_store()
558            .get(self.session_id, input_message_id)
559            .await
560            .ok()
561            .flatten()
562            .map(|message| message.content_to_llm_string());
563
564        self.set_session_status(SessionStatus::Active, "turn_started")
565            .await;
566
567        self.emit_event(EventRequest::new(
568            self.session_id,
569            EventContext::turn(turn_id, input_message_id),
570            SessionActivatedData {
571                turn_id,
572                input_message_id,
573            },
574        ))
575        .await;
576
577        self.emit_event(EventRequest::new(
578            self.session_id,
579            EventContext::turn(turn_id, input_message_id),
580            TurnStartedData {
581                turn_id,
582                input_message_id,
583                input_content,
584            },
585        ))
586        .await;
587    }
588
589    pub async fn emit_turn_completed(&self, input_message_id: MessageId, data: TurnCompletedData) {
590        let turn_id = data.turn_id;
591        self.emit_event(EventRequest::new(
592            self.session_id,
593            EventContext::turn(turn_id, input_message_id),
594            data,
595        ))
596        .await;
597    }
598
599    pub async fn emit_session_idled(
600        &self,
601        turn_id: TurnId,
602        input_message_id: MessageId,
603        iterations: Option<u32>,
604        usage: Option<TokenUsage>,
605    ) {
606        self.set_session_status(SessionStatus::Idle, "emit_session_idled")
607            .await;
608
609        self.emit_event(EventRequest::new(
610            self.session_id,
611            EventContext::turn(turn_id, input_message_id),
612            SessionIdledData {
613                turn_id,
614                iterations,
615                usage,
616            },
617        ))
618        .await;
619    }
620
621    pub async fn turn_completed(
622        &self,
623        turn_id: TurnId,
624        input_message_id: MessageId,
625        iterations: u32,
626        usage: Option<TokenUsage>,
627        input_content: Option<String>,
628    ) {
629        self.emit_turn_completed(
630            input_message_id,
631            TurnCompletedData {
632                turn_id,
633                iterations,
634                duration_ms: None,
635                usage: usage.clone(),
636                input_content,
637                final_message_id: None,
638                final_answer_preview: None,
639                time_to_first_token_ms: None,
640                tool_call_count: None,
641                llm_call_count: None,
642                status: Some("completed".to_string()),
643            },
644        )
645        .await;
646        self.emit_session_idled(turn_id, input_message_id, Some(iterations), usage)
647            .await;
648    }
649
650    /// Turn was deliberately sealed (EVE-534): emit `turn.sealed` + a
651    /// user-facing message + `session.idled`, and idle the session.
652    ///
653    /// Distinct from `turn_completed` (success) and `turn_failed` (error). The
654    /// session returns to `idle` so the UI unblocks; the Sealed state is
655    /// observable via the `turn.sealed` event and its `reason`.
656    pub async fn turn_sealed(
657        &self,
658        turn_id: TurnId,
659        input_message_id: MessageId,
660        reason: &str,
661        iterations: u32,
662        usage: Option<TokenUsage>,
663    ) {
664        let context = EventContext::turn(turn_id, input_message_id);
665
666        self.emit_event(EventRequest::new(
667            self.session_id,
668            context.clone(),
669            everruns_core::events::TurnSealedData {
670                turn_id,
671                reason: reason.to_string(),
672                detail: None,
673                iterations: Some(iterations),
674                usage: usage.clone(),
675            },
676        ))
677        .await;
678
679        self.emit_session_idled(turn_id, input_message_id, Some(iterations), usage)
680            .await;
681    }
682
683    /// Fire `turn_end` lifecycle hooks (advisory). Collects the session's hook
684    /// specs and runs every `turn_end` hook; failures are logged, never fatal.
685    /// `harness_id`/`agent_id` are required to resolve the capability chain.
686    pub async fn fire_turn_end_hooks(
687        &self,
688        harness_id: HarnessId,
689        agent_id: Option<AgentId>,
690        turn_id: TurnId,
691        success: bool,
692    ) {
693        let (specs, dispatcher) = match collect_lifecycle_hook_specs(
694            &self.adapter,
695            self.org_id,
696            self.session_id,
697            harness_id,
698            agent_id,
699        )
700        .await
701        {
702            Ok(pair) => pair,
703            Err(error) => {
704                warn!(
705                    session_id = %self.session_id,
706                    %error,
707                    "failed to collect turn_end hook specs; skipping"
708                );
709                return;
710            }
711        };
712        let hooks = everruns_core::lifecycle_hooks::build_turn_lifecycle_hooks(
713            &specs,
714            everruns_core::user_hook_types::HookEvent::TurnEnd,
715            dispatcher,
716        );
717        if hooks.is_empty() {
718            return;
719        }
720        let ctx = everruns_core::lifecycle_hooks::TurnHookContext {
721            session_id: self.session_id,
722            turn_id: Some(turn_id),
723            org_id: org_public_id_from_internal(self.org_id).parse().ok(),
724            agent_id: agent_id.map(|a| a.to_string()),
725        };
726        everruns_core::lifecycle_hooks::run_turn_end_hooks(
727            &hooks,
728            &ctx,
729            serde_json::json!({ "success": success }),
730        )
731        .await;
732    }
733
734    /// Abort a turn because a `user_prompt_submit` hook returned `Block`.
735    /// Reuses the dependency-blocked failure shape: emit a user-facing message
736    /// carrying the hook's `user_message` (or `reason`), then mark the turn
737    /// failed and idle the session.
738    pub async fn user_prompt_blocked(
739        &self,
740        turn_id: TurnId,
741        input_message_id: MessageId,
742        reason: &str,
743        user_message: Option<&str>,
744    ) {
745        let user_error =
746            UserFacingError::new(everruns_core::user_facing_error_codes::BLOCKED_BY_HOOK);
747        let shown = user_message.unwrap_or(reason);
748        let mut error_message = Message::assistant(shown);
749        let mut metadata = std::collections::HashMap::new();
750        user_error.apply_to_message_metadata(&mut metadata);
751        error_message.metadata = Some(metadata);
752
753        self.emit_event(EventRequest::new(
754            self.session_id,
755            EventContext::turn(turn_id, input_message_id),
756            OutputMessageCompletedData::new(error_message).with_user_facing_error(&user_error),
757        ))
758        .await;
759
760        self.turn_failed(turn_id, input_message_id, reason, Some(&user_error))
761            .await;
762    }
763
764    pub async fn turn_failed(
765        &self,
766        turn_id: TurnId,
767        input_message_id: MessageId,
768        error: &str,
769        user_error: Option<&UserFacingError>,
770    ) {
771        self.turn_failed_with_disclosure(turn_id, input_message_id, error, user_error, None)
772            .await;
773    }
774
775    /// `turn_failed` with the applied error-disclosure mode recorded on the
776    /// event. `user_error` (and the `error` text shown alongside it) must
777    /// already be disclosure-filtered by the caller.
778    pub async fn turn_failed_with_disclosure(
779        &self,
780        turn_id: TurnId,
781        input_message_id: MessageId,
782        error: &str,
783        user_error: Option<&UserFacingError>,
784        disclosure: Option<ErrorDisclosure>,
785    ) {
786        self.set_session_status(SessionStatus::Idle, "turn_failed")
787            .await;
788
789        self.emit_event(EventRequest::new(
790            self.session_id,
791            EventContext::turn(turn_id, input_message_id),
792            {
793                let mut data = TurnFailedData {
794                    turn_id,
795                    error: error.to_string(),
796                    error_code: None,
797                    error_fields: None,
798                    error_disclosure: disclosure.map(|mode| mode.as_str().to_string()),
799                };
800                if let Some(user_error) = user_error {
801                    user_error.apply_to_event_fields(&mut data.error_code, &mut data.error_fields);
802                }
803                data
804            },
805        ))
806        .await;
807
808        self.emit_event(EventRequest::new(
809            self.session_id,
810            EventContext::turn(turn_id, input_message_id),
811            SessionIdledData {
812                turn_id,
813                iterations: None,
814                usage: None,
815            },
816        ))
817        .await;
818    }
819
820    pub async fn waiting_for_tool_results(&self) {
821        self.set_session_status(
822            SessionStatus::WaitingForToolResults,
823            "waiting_for_tool_results",
824        )
825        .await;
826    }
827
828    pub async fn dependency_blocked(
829        &self,
830        turn_id: TurnId,
831        input_message_id: MessageId,
832        blocker: DependencyBlocker,
833    ) {
834        let user_error = UserFacingError::new(blocker.error_code())
835            .with_field(
836                "dependency",
837                match blocker {
838                    DependencyBlocker::HarnessArchived | DependencyBlocker::HarnessDeleted => {
839                        "harness"
840                    }
841                    DependencyBlocker::AgentArchived | DependencyBlocker::AgentDeleted => "agent",
842                },
843            )
844            .with_field(
845                "state",
846                match blocker {
847                    DependencyBlocker::HarnessArchived | DependencyBlocker::AgentArchived => {
848                        "archived"
849                    }
850                    DependencyBlocker::HarnessDeleted | DependencyBlocker::AgentDeleted => {
851                        "deleted"
852                    }
853                },
854            );
855        let mut error_message = Message::assistant(blocker.message());
856        let mut metadata = std::collections::HashMap::new();
857        user_error.apply_to_message_metadata(&mut metadata);
858        error_message.metadata = Some(metadata);
859
860        self.emit_event(EventRequest::new(
861            self.session_id,
862            EventContext::turn(turn_id, input_message_id),
863            OutputMessageCompletedData::new(error_message).with_user_facing_error(&user_error),
864        ))
865        .await;
866
867        self.turn_failed(
868            turn_id,
869            input_message_id,
870            blocker.message(),
871            Some(&user_error),
872        )
873        .await;
874    }
875}
876
877pub async fn detect_dependency_blocker<A: RuntimeHostAdapter>(
878    adapter: &A,
879    org_id: i64,
880    harness_id: HarnessId,
881    agent_id: Option<AgentId>,
882) -> everruns_core::error::Result<Option<DependencyBlocker>> {
883    let harness_store = adapter.harness_store(org_id);
884    let agent_store = adapter.agent_store(org_id);
885    everruns_core::detect_dependency_blocker(
886        harness_store.as_ref(),
887        agent_store.as_ref(),
888        harness_id,
889        agent_id,
890    )
891    .await
892}
893
894pub async fn execute_input_activity<A: RuntimeHostAdapter>(
895    adapter: &A,
896    org_id: i64,
897    input: InputAtomInput,
898) -> everruns_core::error::Result<InputAtomResult> {
899    RuntimeSessionLifecycle::new(adapter.clone(), org_id, input.context.session_id)
900        .turn_started(input.context.turn_id, input.context.input_message_id)
901        .await;
902
903    let atom = InputAtom::new(adapter.message_store());
904    atom.execute(input).await
905}
906
907/// Collect `user_prompt_submit` hooks for this turn and run them against the
908/// inbound user message text. Returns `None` when the session has no such
909/// hooks (the common case — no overhead beyond the spec collection, which is
910/// skipped early). Errors loading specs are logged and treated as "no hooks"
911/// so a hook-collection failure never blocks a turn that wasn't asking to be
912/// hooked.
913struct UserPromptHookResult {
914    decision: everruns_core::lifecycle_hooks::UserPromptDecision,
915    original_message: String,
916}
917
918async fn run_user_prompt_submit_for_turn<A: RuntimeHostAdapter>(
919    adapter: &A,
920    org_id: i64,
921    input: &ReasonInput,
922) -> everruns_core::error::Result<Option<UserPromptHookResult>> {
923    let (specs, dispatcher) = match collect_lifecycle_hook_specs(
924        adapter,
925        org_id,
926        input.context.session_id,
927        input.harness_id,
928        input.agent_id,
929    )
930    .await
931    {
932        Ok(pair) => pair,
933        Err(error) => {
934            warn!(
935                session_id = %input.context.session_id,
936                %error,
937                "failed to collect user_prompt_submit hook specs; continuing without them"
938            );
939            return Ok(None);
940        }
941    };
942    let hooks = everruns_core::lifecycle_hooks::build_turn_lifecycle_hooks(
943        &specs,
944        everruns_core::user_hook_types::HookEvent::UserPromptSubmit,
945        dispatcher,
946    );
947    if hooks.is_empty() {
948        return Ok(None);
949    }
950
951    let message_text = adapter
952        .message_store()
953        .get(input.context.session_id, input.context.input_message_id)
954        .await
955        .ok()
956        .flatten()
957        .map(|m| m.content_to_llm_string())
958        .unwrap_or_default();
959
960    let ctx = everruns_core::lifecycle_hooks::TurnHookContext {
961        session_id: input.context.session_id,
962        turn_id: Some(input.context.turn_id),
963        org_id: org_public_id_from_internal(org_id).parse().ok(),
964        agent_id: input.agent_id.map(|a| a.to_string()),
965    };
966    let original_message = message_text.clone();
967    let decision =
968        everruns_core::lifecycle_hooks::run_user_prompt_submit_hooks(&hooks, &ctx, message_text)
969            .await;
970    Ok(Some(UserPromptHookResult {
971        decision,
972        original_message,
973    }))
974}
975
976pub async fn execute_reason_activity<A: RuntimeHostAdapter>(
977    adapter: &A,
978    org_id: i64,
979    input: ReasonInput,
980) -> everruns_core::error::Result<ReasonResult> {
981    if let Some(blocker) =
982        detect_dependency_blocker(adapter, org_id, input.harness_id, input.agent_id).await?
983    {
984        RuntimeSessionLifecycle::new(adapter.clone(), org_id, input.context.session_id)
985            .dependency_blocked(
986                input.context.turn_id,
987                input.context.input_message_id,
988                blocker,
989            )
990            .await;
991        return Ok(ReasonResult {
992            success: false,
993            text: blocker.message().to_string(),
994            tool_calls: vec![],
995            has_tool_calls: false,
996            tool_definitions: vec![],
997            max_iterations: everruns_core::runtime_agent::default_max_iterations(),
998            error: Some("dependency_unavailable".to_string()),
999            user_facing_error: None,
1000            error_disclosure: None,
1001            usage: None,
1002            output_message_id: None,
1003            time_to_first_token_ms: None,
1004            response_id: None,
1005            locale: None,
1006            network_access: None,
1007            parallel_tool_calls: None,
1008        });
1009    }
1010
1011    // user_prompt_submit hook (see `specs/user-hooks.md`). Fires once per turn,
1012    // on the first reason iteration, before the LLM is consulted — the closest
1013    // choke point to "inbound user message accepted, before reason" that both
1014    // the in-process loop and the durable worker share. A `Block` aborts the
1015    // turn by reusing the same failure path as `dependency_blocked`: emit a
1016    // user-facing message + turn.failed, idle the session, and return a
1017    // non-success `ReasonResult` so no LLM/act work runs.
1018    let mut user_prompt_message_override = None;
1019    if input.iteration <= 1
1020        && let Some(hook_result) = run_user_prompt_submit_for_turn(adapter, org_id, &input).await?
1021    {
1022        match hook_result.decision {
1023            everruns_core::lifecycle_hooks::UserPromptDecision::Block {
1024                reason,
1025                user_message,
1026            } => {
1027                RuntimeSessionLifecycle::new(adapter.clone(), org_id, input.context.session_id)
1028                    .user_prompt_blocked(
1029                        input.context.turn_id,
1030                        input.context.input_message_id,
1031                        &reason,
1032                        user_message.as_deref(),
1033                    )
1034                    .await;
1035                return Ok(ReasonResult {
1036                    success: false,
1037                    text: user_message.unwrap_or_else(|| reason.clone()),
1038                    tool_calls: vec![],
1039                    has_tool_calls: false,
1040                    tool_definitions: vec![],
1041                    max_iterations: everruns_core::runtime_agent::default_max_iterations(),
1042                    error: Some("blocked_by_user_prompt_hook".to_string()),
1043                    user_facing_error: None,
1044                    error_disclosure: None,
1045                    usage: None,
1046                    output_message_id: None,
1047                    time_to_first_token_ms: None,
1048                    response_id: None,
1049                    locale: None,
1050                    network_access: None,
1051                    parallel_tool_calls: None,
1052                });
1053            }
1054            everruns_core::lifecycle_hooks::UserPromptDecision::Continue { message } => {
1055                if message != hook_result.original_message {
1056                    user_prompt_message_override = Some(message);
1057                }
1058            }
1059        }
1060    }
1061
1062    let turn_context = adapter
1063        .load_turn_context(org_id, input.context.session_id)
1064        .await?;
1065
1066    let mut atom = ReasonAtom::new(
1067        adapter.harness_store(org_id),
1068        adapter.agent_store(org_id),
1069        adapter.session_store(org_id),
1070        adapter.message_store(),
1071        adapter.provider_store(org_id),
1072        adapter.capability_registry(),
1073        adapter.driver_registry(),
1074        adapter.event_emitter(),
1075    )
1076    .with_file_store(adapter.file_store());
1077    if let Some(image_resolver) = adapter.image_resolver(org_id) {
1078        atom = atom.with_image_resolver(image_resolver);
1079    }
1080    if let Some(hb) = adapter.stream_heartbeater() {
1081        atom = atom.with_stream_heartbeater(hb);
1082    }
1083    if let Some(timeout) = adapter.provider_stall_timeout() {
1084        atom = atom.with_provider_stall_timeout(timeout);
1085    }
1086    if let Some(store) = adapter.partial_stream_store() {
1087        atom = atom.with_partial_stream_store(store);
1088    }
1089    if let Some(store) = adapter.durable_tool_result_store() {
1090        atom = atom.with_durable_tool_result_store(store);
1091    }
1092    if let Some(handle) = adapter.reasoning_effort_handle(input.context.session_id) {
1093        atom = atom.with_reasoning_effort_handle(handle);
1094    }
1095    if let Some(utility_llm_service) = adapter.utility_llm_service() {
1096        atom = atom.with_utility_llm_service(utility_llm_service);
1097    }
1098
1099    let input = ReasonInput {
1100        mcp_tool_definitions: turn_context.mcp_tool_definitions,
1101        ..input
1102    };
1103
1104    if let Some(message_override) = user_prompt_message_override {
1105        let mut assembled = assemble_turn_context(
1106            adapter.harness_store(org_id).as_ref(),
1107            adapter.agent_store(org_id).as_ref(),
1108            adapter.session_store(org_id).as_ref(),
1109            adapter.message_store().as_ref(),
1110            adapter.provider_store(org_id).as_ref(),
1111            &adapter.capability_registry(),
1112            input.context.session_id,
1113            input.harness_id,
1114            input.agent_id,
1115            &input.mcp_tool_definitions,
1116            Some(adapter.file_store()),
1117        )
1118        .await?;
1119
1120        let message = assembled
1121            .messages
1122            .iter_mut()
1123            .find(|message| message.id == input.context.input_message_id)
1124            .ok_or_else(|| {
1125                everruns_core::error::AgentLoopError::config(
1126                    "user_prompt_submit mutation: input message not found in assembled context",
1127                )
1128            })?;
1129
1130        // user_prompt_submit mutations are enforcement controls for the
1131        // provider-bound prompt. Apply them to the assembled context only
1132        // so persisted user history remains an audit record of the input.
1133        // Preserve non-text parts (images, files); replace only text parts.
1134        message
1135            .content
1136            .retain(|part| !matches!(part, ContentPart::Text(_)));
1137        message
1138            .content
1139            .insert(0, ContentPart::text(message_override));
1140
1141        return atom.execute_with_assembled_context(input, assembled).await;
1142    }
1143
1144    atom.execute(input).await
1145}
1146
1147pub async fn execute_act_activity<A: RuntimeHostAdapter>(
1148    adapter: &A,
1149    input: ActInput,
1150) -> everruns_core::error::Result<ActResult> {
1151    let org_id = input.org_id.ok_or_else(|| {
1152        everruns_core::error::AgentLoopError::config(
1153            "ActInput.org_id must be set for runtime host execution",
1154        )
1155    })?;
1156
1157    if let Some(blocker) =
1158        detect_dependency_blocker(adapter, org_id, input.harness_id, input.agent_id).await?
1159    {
1160        RuntimeSessionLifecycle::new(adapter.clone(), org_id, input.context.session_id)
1161            .dependency_blocked(
1162                input.context.turn_id,
1163                input.context.input_message_id,
1164                blocker,
1165            )
1166            .await;
1167        return Ok(ActResult {
1168            results: vec![],
1169            completed: true,
1170            success_count: 0,
1171            error_count: 1,
1172            waiting_for_tool_results: false,
1173            blocked: true,
1174            client_tool_calls: vec![],
1175            client_tool_definitions: vec![],
1176        });
1177    }
1178
1179    let execution_capabilities = load_execution_capabilities(
1180        adapter,
1181        org_id,
1182        input.context.session_id,
1183        input.harness_id,
1184        input.agent_id,
1185        input.locale.clone(),
1186        input.blueprint_id.as_deref(),
1187    )
1188    .await?;
1189    let mut tool_registry = execution_capabilities.tool_registry;
1190
1191    // Register the session's MCP tools as first-class registry tools, so they
1192    // execute through the regular `ToolExecutor` path and are visible to
1193    // everything that introspects the registry (spawn_background, tool_search,
1194    // openai_tool_search namespaces, ...). The turn's tool definitions already
1195    // include the discovered MCP tools, so no re-discovery is needed; the host's
1196    // MCP executor supplies execution (specs/runtime-mcp.md D5).
1197    // The MCP invoker is reused below for the guardrails `mcp` check, which
1198    // delegates a guardrail decision to an external endpoint over the same
1199    // scoped-MCP client/auth (specs/guardrails.md).
1200    let mut mcp_invoker: Option<Arc<dyn everruns_core::McpToolInvoker>> = None;
1201    if let Some(mcp) = adapter.mcp_executor(org_id, input.context.session_id).await {
1202        let invoker: Arc<dyn everruns_core::McpToolInvoker> = mcp;
1203        for tool in everruns_core::build_mcp_proxy_tools(&input.tool_definitions, invoker.clone()) {
1204            tool_registry.register_boxed(tool);
1205        }
1206        mcp_invoker = Some(Arc::new(everruns_core::ScopedMcpToolInvoker::new(
1207            &input.tool_definitions,
1208            invoker,
1209        )));
1210    }
1211
1212    let builtin_tool_registry = Arc::new(tool_registry.clone());
1213    let executor: Arc<dyn everruns_core::traits::ToolExecutor> = Arc::new(tool_registry);
1214
1215    let mut atom =
1216        ActAtom::with_file_store(executor, adapter.event_emitter(), adapter.file_store())
1217            .with_session_store(adapter.session_store(org_id))
1218            .with_session_mutator(adapter.session_mutator(org_id))
1219            .with_agent_store(adapter.agent_store(org_id))
1220            .with_tool_registry(builtin_tool_registry)
1221            .with_org_id(
1222                org_public_id_from_internal(org_id)
1223                    .parse()
1224                    .expect("internal org id converts to valid public org id"),
1225            )
1226            .with_capability_registry(adapter.capability_registry())
1227            .with_post_tool_hooks(execution_capabilities.post_tool_hooks)
1228            .with_pre_tool_hooks(execution_capabilities.pre_tool_hooks)
1229            .with_tool_call_hooks(execution_capabilities.tool_call_hooks);
1230
1231    if let Some(storage_store) = adapter.storage_store() {
1232        atom = atom.with_storage_store(storage_store);
1233    }
1234    if let Some(knowledge_store) = adapter.knowledge_store() {
1235        atom = atom.with_knowledge_store(knowledge_store);
1236    }
1237    if let Some(image_store) = adapter.image_artifact_store(org_id) {
1238        atom = atom.with_image_store(image_store);
1239    }
1240    if let Some(provider_credential_store) = adapter.provider_credential_store(org_id) {
1241        atom = atom.with_provider_credential_store(provider_credential_store);
1242    }
1243    if let Some(utility_llm_service) = adapter.utility_llm_service() {
1244        atom = atom.with_utility_llm_service(utility_llm_service);
1245    }
1246    if let Some(invoker) = mcp_invoker {
1247        atom = atom.with_mcp_invoker(invoker);
1248    }
1249    if let Some(egress_service) = adapter.egress_service() {
1250        atom = atom.with_egress_service(egress_service);
1251    }
1252    if let Some(connection_resolver) = adapter.connection_resolver() {
1253        atom = atom.with_connection_resolver(connection_resolver);
1254    }
1255    if let Some(sqldb_store) = adapter.sqldb_store() {
1256        atom = atom.with_sqldb_store(sqldb_store);
1257    }
1258    if let Some(leased_resource_store) = adapter.leased_resource_store() {
1259        atom = atom.with_leased_resource_store(leased_resource_store);
1260    }
1261    if let Some(registry) = adapter.session_resource_registry() {
1262        atom = atom.with_session_resource_registry(registry);
1263    }
1264    if let Some(registry) = adapter.session_task_registry() {
1265        atom = atom.with_session_task_registry(registry);
1266    }
1267    if let Some(schedule_store) = adapter.schedule_store(org_id) {
1268        atom = atom.with_schedule_store(schedule_store);
1269    }
1270    if let Some(platform_store) = adapter.platform_store(org_id, input.context.session_id) {
1271        atom = atom.with_platform_store(platform_store);
1272    }
1273    if let Some(knowledge_index_search) = adapter.knowledge_index_search(org_id) {
1274        atom = atom.with_knowledge_index_search(knowledge_index_search);
1275    }
1276    if let Some(budget_checker) = adapter.budget_checker(org_id, input.agent_id) {
1277        atom = atom.with_budget_checker(budget_checker);
1278    }
1279    if let Some(payment_authority) = adapter.payment_authority(org_id, input.agent_id) {
1280        atom = atom.with_payment_authority(payment_authority);
1281    }
1282    if let Some(limiter) = adapter.outbound_tool_rate_limiter(org_id) {
1283        atom = atom.with_outbound_tool_rate_limiter(limiter);
1284    }
1285    if let Some(store) = adapter.durable_tool_result_store() {
1286        atom = atom.with_durable_tool_result_store(store);
1287    }
1288    if let Some(store) = adapter.subagent_spawn_store() {
1289        atom = atom.with_subagent_spawn_store(store);
1290    }
1291    if let Some(handle) = adapter.reasoning_effort_handle(input.context.session_id) {
1292        atom = atom.with_reasoning_effort_handle(handle);
1293    }
1294
1295    atom.execute(input).await
1296}