Skip to main content

everruns_runtime/
host.rs

1// Shared host orchestration for embedded and durable execution hosts.
2// Decision: everruns-runtime owns worker-facing turn phase execution so
3// durable/server-backed hosts reuse the same input/reason/act wiring.
4
5use async_trait::async_trait;
6use everruns_core::atoms::{
7    ActAtom, ActInput, ActResult, Atom, InputAtom, InputAtomInput, InputAtomResult, ReasonAtom,
8    ReasonInput, ReasonResult,
9};
10use everruns_core::capabilities::{SystemPromptContext, collect_capabilities_with_configs};
11use everruns_core::events::{
12    EventContext, EventRequest, OutputMessageCompletedData, SessionActivatedData, SessionIdledData,
13    TurnCompletedData, TurnFailedData, TurnStartedData,
14};
15use everruns_core::message::{ContentPart, Message};
16use everruns_core::message_retriever::MessageRetriever;
17use everruns_core::platform_store::PlatformStore;
18use everruns_core::session::SessionStatus;
19use everruns_core::traits::{
20    AgentStore, BudgetChecker, EventEmitter, HarnessStore, ImageArtifactStore, ImageResolver,
21    LeasedResourceStore, PaymentAuthority, ProviderCredentialStore, ProviderStore, ResolvedModel,
22    SessionFileSystem, SessionMutator, SessionResourceRegistry, SessionScheduleStore,
23    SessionSqlDbStoreRef, SessionStorageStore, SessionStore, UserConnectionResolver,
24};
25use everruns_core::typed_id::{AgentId, HarnessId, MessageId, SessionId, TurnId};
26use everruns_core::{
27    Agent, CapabilityRegistry, CapabilityStatus, DependencyBlocker, DriverRegistry, EgressService,
28    ErrorDisclosure, Harness, Session, TokenUsage, ToolDefinition, ToolRegistry, UserFacingError,
29    UtilityLlmService, assemble_turn_context, org_public_id_from_internal,
30    resolve_runtime_capabilities,
31};
32use std::sync::Arc;
33use tracing::warn;
34
35/// Turn context loaded in one batched call for runtime host execution.
36#[derive(Debug, Clone)]
37pub struct RuntimeHostTurnContext {
38    pub agent: Option<Agent>,
39    pub session: Session,
40    pub messages: Vec<Message>,
41    pub model: Option<ResolvedModel>,
42    pub mcp_tool_definitions: Vec<ToolDefinition>,
43}
44
45/// Public adapter contract for server-backed or durable runtime hosts.
46///
47/// `everruns-runtime` owns shared host orchestration for both embedded and
48/// durable execution. That includes phase execution (`input -> reason -> act`),
49/// lifecycle emission, and the generic turn-strategy decisions used by durable
50/// or custom hosts.
51///
52/// Host crates implement this trait to provide persistence, session-lifecycle
53/// plumbing, event delivery, and their own orchestration backend. The durable
54/// engine itself remains outside this crate.
55#[async_trait]
56pub trait RuntimeHostAdapter: Send + Sync + Clone + 'static {
57    async fn get_agent(
58        &self,
59        org_id: i64,
60        agent_id: AgentId,
61    ) -> everruns_core::error::Result<Option<Agent>>;
62
63    async fn get_harness(
64        &self,
65        org_id: i64,
66        harness_id: HarnessId,
67    ) -> everruns_core::error::Result<Option<Harness>>;
68
69    async fn set_session_status(
70        &self,
71        org_id: i64,
72        session_id: SessionId,
73        status: SessionStatus,
74    ) -> everruns_core::error::Result<Session>;
75
76    async fn load_turn_context(
77        &self,
78        org_id: i64,
79        session_id: SessionId,
80    ) -> everruns_core::error::Result<RuntimeHostTurnContext>;
81
82    fn capability_registry(&self) -> CapabilityRegistry;
83
84    fn driver_registry(&self) -> DriverRegistry;
85
86    fn harness_store(&self, org_id: i64) -> Arc<dyn HarnessStore>;
87
88    fn agent_store(&self, org_id: i64) -> Arc<dyn AgentStore>;
89
90    fn session_store(&self, org_id: i64) -> Arc<dyn SessionStore>;
91
92    fn session_mutator(&self, org_id: i64) -> Arc<dyn SessionMutator>;
93
94    fn provider_store(&self, org_id: i64) -> Arc<dyn ProviderStore>;
95
96    fn message_store(&self) -> Arc<dyn MessageRetriever>;
97
98    fn event_emitter(&self) -> Arc<dyn EventEmitter>;
99
100    fn file_store(&self) -> Arc<dyn SessionFileSystem>;
101
102    fn image_resolver(&self, _org_id: i64) -> Option<Arc<dyn ImageResolver>> {
103        None
104    }
105
106    fn image_artifact_store(&self, _org_id: i64) -> Option<Arc<dyn ImageArtifactStore>> {
107        None
108    }
109
110    fn provider_credential_store(&self, _org_id: i64) -> Option<Arc<dyn ProviderCredentialStore>> {
111        None
112    }
113
114    fn utility_llm_service(&self) -> Option<Arc<dyn UtilityLlmService>> {
115        None
116    }
117
118    fn egress_service(&self) -> Option<Arc<dyn EgressService>> {
119        None
120    }
121
122    fn storage_store(&self) -> Option<Arc<dyn SessionStorageStore>> {
123        None
124    }
125
126    fn connection_resolver(&self) -> Option<Arc<dyn UserConnectionResolver>> {
127        None
128    }
129
130    fn sqldb_store(&self) -> Option<SessionSqlDbStoreRef> {
131        None
132    }
133
134    fn leased_resource_store(&self) -> Option<Arc<dyn LeasedResourceStore>> {
135        None
136    }
137
138    fn session_resource_registry(&self) -> Option<Arc<dyn SessionResourceRegistry>> {
139        None
140    }
141
142    fn session_task_registry(
143        &self,
144    ) -> Option<Arc<dyn everruns_core::session_task::SessionTaskRegistry>> {
145        None
146    }
147
148    fn schedule_store(&self, _org_id: i64) -> Option<Arc<dyn SessionScheduleStore>> {
149        None
150    }
151
152    fn platform_store(
153        &self,
154        _org_id: i64,
155        _session_id: SessionId,
156    ) -> Option<Arc<dyn PlatformStore>> {
157        None
158    }
159
160    fn budget_checker(
161        &self,
162        _org_id: i64,
163        _agent_id: Option<AgentId>,
164    ) -> Option<Arc<dyn BudgetChecker>> {
165        None
166    }
167
168    fn payment_authority(
169        &self,
170        _org_id: i64,
171        _agent_id: Option<AgentId>,
172    ) -> Option<Arc<dyn PaymentAuthority>> {
173        None
174    }
175
176    /// Per-org outbound tool-call rate limiter (TM-TOOL-009).
177    /// Default: `None` (no rate limiting — suitable for in-process / test environments).
178    fn outbound_tool_rate_limiter(
179        &self,
180        _org_id: i64,
181    ) -> Option<Arc<dyn everruns_core::OutboundToolRateLimiter>> {
182        None
183    }
184
185    /// Per-turn durable tool result store for act-activity idempotency (EVE-530).
186    /// Default: `None` (no durable claim/settle — every execution runs tools fresh).
187    fn durable_tool_result_store(&self) -> Option<Arc<dyn everruns_core::DurableToolResultStore>> {
188        None
189    }
190
191    /// Durable subagent spawn handle store for reattach on reclaim (EVE-535).
192    /// Default: `None` (no spawn dedup — dev/test mode or hosts without durable execution).
193    fn subagent_spawn_store(&self) -> Option<Arc<dyn everruns_core::SubagentSpawnStore>> {
194        None
195    }
196
197    /// Stream-liveness heartbeater for the Reason activity (EVE-531).
198    /// Default: `None` (no heartbeats sent — durable workers supply one).
199    fn stream_heartbeater(&self) -> Option<Arc<dyn everruns_core::StreamHeartbeater>> {
200        None
201    }
202
203    /// Partial-stream store for ContinuePartial recovery (EVE-532).
204    /// Default: `None` (no recovery; in-memory and dev hosts use this default).
205    fn partial_stream_store(&self) -> Option<Arc<dyn everruns_core::PartialStreamStore>> {
206        None
207    }
208
209    /// Provider stall timeout for the Reason activity (EVE-531).
210    /// Default: `None` (use built-in 120s default).
211    fn provider_stall_timeout(&self) -> Option<std::time::Duration> {
212        None
213    }
214
215    /// MCP executor routing `mcp_*` tool calls for this session, if the host
216    /// configures MCP (specs/runtime-mcp.md D4). Default: `None`, so hosts
217    /// without scoped MCP servers keep the plain tool registry unchanged.
218    async fn mcp_executor(
219        &self,
220        _org_id: i64,
221        _session_id: SessionId,
222    ) -> Option<Arc<everruns_mcp::McpExecutor>> {
223        None
224    }
225}
226
227struct RuntimeExecutionCapabilities {
228    tool_registry: ToolRegistry,
229    post_tool_hooks: Vec<Arc<dyn everruns_core::PostToolExecHook>>,
230    pre_tool_hooks: Vec<Arc<dyn everruns_core::atoms::PreToolUseHook>>,
231    tool_call_hooks: Vec<Arc<dyn everruns_core::ToolCallHook>>,
232}
233
234/// Collect and finalize user-hook specs for a session from its resolved
235/// capability configs, plus the shared bash dispatcher used to run them.
236///
237/// This is the single place hook specs are gathered so every firing point —
238/// the act path (`load_execution_capabilities`) and the lifecycle firing
239/// points (`execute_reason_activity` for `user_prompt_submit`, turn completion
240/// for `turn_end`, and the server session paths) — applies identical
241/// `finalize_hook_specs` semantics: `{capability_id}:` namespace stamping,
242/// stable default ids, and `disabled_contributions` muting (TM-HOOK-004).
243fn finalize_specs_from_configs(
244    resolved_capability_configs: &[everruns_core::capability_types::AgentCapabilityConfig],
245    capability_registry: &CapabilityRegistry,
246) -> Vec<everruns_core::user_hook_types::UserHookSpec> {
247    let mut hook_contributions: Vec<(String, Vec<everruns_core::user_hook_types::UserHookSpec>)> =
248        Vec::new();
249    let mut disabled_contributions: Vec<String> = Vec::new();
250    for config in resolved_capability_configs {
251        let Some(capability) = capability_registry.get(config.capability_id()) else {
252            continue;
253        };
254        let specs = capability.user_hooks_with_config(&config.config);
255        if !specs.is_empty() {
256            hook_contributions.push((config.capability_id().to_string(), specs));
257        }
258        if config.capability_id() == "user_hooks" {
259            disabled_contributions.extend(
260                everruns_core::capabilities::user_hooks::disabled_contributions(&config.config),
261            );
262        }
263    }
264    everruns_core::hook_adapter::finalize_hook_specs(hook_contributions, &disabled_contributions)
265}
266
267/// Resolve a session's capability configs and collect finalized hook specs.
268/// Used by the lifecycle firing points, which need specs outside the act path.
269/// Returns `(specs, dispatcher)`; `specs` is empty when the session has no
270/// hook-contributing capabilities.
271async fn collect_lifecycle_hook_specs<A: RuntimeHostAdapter>(
272    adapter: &A,
273    org_id: i64,
274    session_id: SessionId,
275    harness_id: HarnessId,
276    agent_id: Option<AgentId>,
277) -> everruns_core::error::Result<(
278    Vec<everruns_core::user_hook_types::UserHookSpec>,
279    Arc<dyn everruns_core::hook_executor::BashHookDispatcher>,
280)> {
281    let capability_registry = adapter.capability_registry();
282    let harness_chain = adapter
283        .harness_store(org_id)
284        .get_harness_chain(harness_id)
285        .await?;
286    if harness_chain.is_empty() {
287        return Err(everruns_core::error::AgentLoopError::harness_not_found(
288            harness_id,
289        ));
290    }
291    let session = adapter
292        .session_store(org_id)
293        .get_session(session_id)
294        .await?
295        .ok_or_else(|| everruns_core::error::AgentLoopError::session_not_found(session_id))?;
296    let agent = match agent_id {
297        Some(agent_id) => adapter.agent_store(org_id).get_agent(agent_id).await?,
298        None => None,
299    };
300    let resolved = resolve_runtime_capabilities(
301        &harness_chain,
302        agent.as_ref(),
303        &session,
304        &capability_registry,
305    );
306    let specs =
307        finalize_specs_from_configs(&resolved.resolved_capability_configs, &capability_registry);
308    let dispatcher: Arc<dyn everruns_core::hook_executor::BashHookDispatcher> = Arc::new(
309        everruns_core::hook_dispatch::BashkitShellHookDispatcher::new(adapter.file_store()),
310    );
311    Ok((specs, dispatcher))
312}
313
314async fn load_execution_capabilities<A: RuntimeHostAdapter>(
315    adapter: &A,
316    org_id: i64,
317    session_id: SessionId,
318    harness_id: HarnessId,
319    agent_id: Option<AgentId>,
320    locale: Option<String>,
321    blueprint_id: Option<&str>,
322) -> everruns_core::error::Result<RuntimeExecutionCapabilities> {
323    let capability_registry = adapter.capability_registry();
324    if let Some(blueprint_id) = blueprint_id {
325        let mut registry = ToolRegistry::with_defaults();
326        let blueprint = capability_registry.blueprint(blueprint_id).ok_or_else(|| {
327            everruns_core::error::AgentLoopError::config(format!(
328                "Blueprint \"{blueprint_id}\" not found in registry"
329            ))
330        })?;
331        for tool in blueprint.tools {
332            registry.register_boxed(tool);
333        }
334        return Ok(RuntimeExecutionCapabilities {
335            tool_registry: registry,
336            post_tool_hooks: Vec::new(),
337            pre_tool_hooks: Vec::new(),
338            tool_call_hooks: Vec::new(),
339        });
340    }
341
342    let harness_chain = adapter
343        .harness_store(org_id)
344        .get_harness_chain(harness_id)
345        .await?;
346    if harness_chain.is_empty() {
347        return Err(everruns_core::error::AgentLoopError::harness_not_found(
348            harness_id,
349        ));
350    }
351
352    let session = adapter
353        .session_store(org_id)
354        .get_session(session_id)
355        .await?
356        .ok_or_else(|| everruns_core::error::AgentLoopError::session_not_found(session_id))?;
357
358    let agent_store = adapter.agent_store(org_id);
359    let agent = match agent_id {
360        Some(agent_id) => Some(
361            agent_store
362                .get_agent(agent_id)
363                .await?
364                .ok_or_else(|| everruns_core::error::AgentLoopError::agent_not_found(agent_id))?,
365        ),
366        None => None,
367    };
368
369    let resolved = resolve_runtime_capabilities(
370        &harness_chain,
371        agent.as_ref(),
372        &session,
373        &capability_registry,
374    );
375    // Executor (act) path: this builds the worker-side tool registry, not the
376    // model-visible tool list. The model is left unset, so a model-adaptive
377    // capability like `auto_tool_search` resolves to its provider-agnostic
378    // client-side mechanism here. That registers the `tool_search` tool in the
379    // executor, which is a harmless superset: on native models the reason path
380    // never shows that tool to the model, so it is simply never called.
381    let prompt_ctx = SystemPromptContext {
382        session_id,
383        locale: locale.or(session.locale.clone()),
384        // Pin system-prompt file reads to the session's workspace (the default
385        // 1:1 case is a transparent pass-through).
386        file_store: Some(everruns_core::WorkspaceScopedFileSystem::wrap(
387            adapter.file_store(),
388            session.workspace_id,
389        )),
390        model: None,
391    };
392    let collected = collect_capabilities_with_configs(
393        &resolved.resolved_capability_configs,
394        &capability_registry,
395        &prompt_ctx,
396    )
397    .await;
398
399    let mut registry = ToolRegistry::with_defaults();
400    for tool in collected.tools {
401        registry.register_boxed(tool);
402    }
403
404    // Only `Available` capabilities contribute hooks, matching
405    // `collect_capabilities_with_configs` (which skips non-available
406    // capabilities). This keeps a `ComingSoon`/unavailable capability from
407    // affecting execution via any of its hook seams.
408    let mut post_tool_hooks: Vec<Arc<dyn everruns_core::PostToolExecHook>> = resolved
409        .resolved_capability_configs
410        .iter()
411        .flat_map(|config| {
412            capability_registry
413                .get(config.capability_id())
414                .filter(|capability| capability.status() == CapabilityStatus::Available)
415                .map(|capability| capability.post_tool_exec_hooks_with_config(&config.config))
416                .unwrap_or_default()
417        })
418        .collect();
419
420    // User-hook contributions (see `specs/user-hooks.md`). `finalize_specs_from_configs`
421    // gathers specs across every resolved capability — both the user-facing
422    // `user_hooks` capability and any capability that bundles hooks — and applies
423    // `finalize_hook_specs` (namespace stamping, stable ids, `disabled_contributions`
424    // muting; TM-HOOK-004). The same helper backs the lifecycle firing points so
425    // every event finalizes specs identically.
426    let user_hook_specs =
427        finalize_specs_from_configs(&resolved.resolved_capability_configs, &capability_registry);
428    // Capability-contributed pre-tool hooks run first (e.g. approval gating),
429    // then user-hook (`PreToolUse`) specs. The first hook to block wins.
430    let mut pre_tool_hooks: Vec<Arc<dyn everruns_core::atoms::PreToolUseHook>> = resolved
431        .resolved_capability_configs
432        .iter()
433        .flat_map(|config| {
434            capability_registry
435                .get(config.capability_id())
436                .filter(|capability| capability.status() == CapabilityStatus::Available)
437                .map(|capability| capability.pre_tool_use_hooks_with_config(&config.config))
438                .unwrap_or_default()
439        })
440        .collect();
441    if !user_hook_specs.is_empty() {
442        let dispatcher: Arc<dyn everruns_core::hook_executor::BashHookDispatcher> = Arc::new(
443            everruns_core::hook_dispatch::BashkitShellHookDispatcher::new(adapter.file_store()),
444        );
445        post_tool_hooks.extend(everruns_core::hook_adapter::build_post_tool_use_hooks(
446            &user_hook_specs,
447            dispatcher.clone(),
448        ));
449        pre_tool_hooks.extend(everruns_core::hook_adapter::build_pre_tool_use_hooks(
450            &user_hook_specs,
451            dispatcher,
452        ));
453    }
454
455    let tool_call_hooks = resolved
456        .resolved_capability_configs
457        .iter()
458        .flat_map(|config| {
459            capability_registry
460                .get(config.capability_id())
461                .filter(|capability| capability.status() == CapabilityStatus::Available)
462                .map(|capability| capability.tool_call_hooks())
463                .unwrap_or_default()
464        })
465        .collect();
466
467    Ok(RuntimeExecutionCapabilities {
468        tool_registry: registry,
469        post_tool_hooks,
470        pre_tool_hooks,
471        tool_call_hooks,
472    })
473}
474
475/// Shared lifecycle helper for runtime-backed hosts.
476pub struct RuntimeSessionLifecycle<A: RuntimeHostAdapter> {
477    adapter: A,
478    org_id: i64,
479    session_id: SessionId,
480}
481
482impl<A: RuntimeHostAdapter> RuntimeSessionLifecycle<A> {
483    pub fn new(adapter: A, org_id: i64, session_id: SessionId) -> Self {
484        Self {
485            adapter,
486            org_id,
487            session_id,
488        }
489    }
490
491    async fn set_session_status(&self, status: SessionStatus, action: &'static str) {
492        if let Err(error) = self
493            .adapter
494            .set_session_status(self.org_id, self.session_id, status)
495            .await
496        {
497            warn!(
498                session_id = %self.session_id,
499                org_id = self.org_id,
500                action,
501                %error,
502                "runtime host lifecycle status update failed"
503            );
504        }
505    }
506
507    async fn emit_event(&self, request: EventRequest) {
508        let event_type = request.event_type.clone();
509        if let Err(error) = self.adapter.event_emitter().emit(request).await {
510            warn!(
511                session_id = %self.session_id,
512                org_id = self.org_id,
513                event_type,
514                %error,
515                "runtime host lifecycle event emission failed"
516            );
517        }
518    }
519
520    pub async fn turn_started(&self, turn_id: TurnId, input_message_id: MessageId) {
521        let input_content = self
522            .adapter
523            .message_store()
524            .get(self.session_id, input_message_id)
525            .await
526            .ok()
527            .flatten()
528            .map(|message| message.content_to_llm_string());
529
530        self.set_session_status(SessionStatus::Active, "turn_started")
531            .await;
532
533        self.emit_event(EventRequest::new(
534            self.session_id,
535            EventContext::turn(turn_id, input_message_id),
536            SessionActivatedData {
537                turn_id,
538                input_message_id,
539            },
540        ))
541        .await;
542
543        self.emit_event(EventRequest::new(
544            self.session_id,
545            EventContext::turn(turn_id, input_message_id),
546            TurnStartedData {
547                turn_id,
548                input_message_id,
549                input_content,
550            },
551        ))
552        .await;
553    }
554
555    pub async fn emit_turn_completed(&self, input_message_id: MessageId, data: TurnCompletedData) {
556        let turn_id = data.turn_id;
557        self.emit_event(EventRequest::new(
558            self.session_id,
559            EventContext::turn(turn_id, input_message_id),
560            data,
561        ))
562        .await;
563    }
564
565    pub async fn emit_session_idled(
566        &self,
567        turn_id: TurnId,
568        input_message_id: MessageId,
569        iterations: Option<u32>,
570        usage: Option<TokenUsage>,
571    ) {
572        self.set_session_status(SessionStatus::Idle, "emit_session_idled")
573            .await;
574
575        self.emit_event(EventRequest::new(
576            self.session_id,
577            EventContext::turn(turn_id, input_message_id),
578            SessionIdledData {
579                turn_id,
580                iterations,
581                usage,
582            },
583        ))
584        .await;
585    }
586
587    pub async fn turn_completed(
588        &self,
589        turn_id: TurnId,
590        input_message_id: MessageId,
591        iterations: u32,
592        usage: Option<TokenUsage>,
593        input_content: Option<String>,
594    ) {
595        self.emit_turn_completed(
596            input_message_id,
597            TurnCompletedData {
598                turn_id,
599                iterations,
600                duration_ms: None,
601                usage: usage.clone(),
602                input_content,
603                final_message_id: None,
604                final_answer_preview: None,
605                time_to_first_token_ms: None,
606                tool_call_count: None,
607                llm_call_count: None,
608                status: Some("completed".to_string()),
609            },
610        )
611        .await;
612        self.emit_session_idled(turn_id, input_message_id, Some(iterations), usage)
613            .await;
614    }
615
616    /// Fire `turn_end` lifecycle hooks (advisory). Collects the session's hook
617    /// specs and runs every `turn_end` hook; failures are logged, never fatal.
618    /// `harness_id`/`agent_id` are required to resolve the capability chain.
619    pub async fn fire_turn_end_hooks(
620        &self,
621        harness_id: HarnessId,
622        agent_id: Option<AgentId>,
623        turn_id: TurnId,
624        success: bool,
625    ) {
626        let (specs, dispatcher) = match collect_lifecycle_hook_specs(
627            &self.adapter,
628            self.org_id,
629            self.session_id,
630            harness_id,
631            agent_id,
632        )
633        .await
634        {
635            Ok(pair) => pair,
636            Err(error) => {
637                warn!(
638                    session_id = %self.session_id,
639                    %error,
640                    "failed to collect turn_end hook specs; skipping"
641                );
642                return;
643            }
644        };
645        let hooks = everruns_core::lifecycle_hooks::build_turn_lifecycle_hooks(
646            &specs,
647            everruns_core::user_hook_types::HookEvent::TurnEnd,
648            dispatcher,
649        );
650        if hooks.is_empty() {
651            return;
652        }
653        let ctx = everruns_core::lifecycle_hooks::TurnHookContext {
654            session_id: self.session_id,
655            turn_id: Some(turn_id),
656            org_id: org_public_id_from_internal(self.org_id).parse().ok(),
657            agent_id: agent_id.map(|a| a.to_string()),
658        };
659        everruns_core::lifecycle_hooks::run_turn_end_hooks(
660            &hooks,
661            &ctx,
662            serde_json::json!({ "success": success }),
663        )
664        .await;
665    }
666
667    /// Abort a turn because a `user_prompt_submit` hook returned `Block`.
668    /// Reuses the dependency-blocked failure shape: emit a user-facing message
669    /// carrying the hook's `user_message` (or `reason`), then mark the turn
670    /// failed and idle the session.
671    pub async fn user_prompt_blocked(
672        &self,
673        turn_id: TurnId,
674        input_message_id: MessageId,
675        reason: &str,
676        user_message: Option<&str>,
677    ) {
678        let user_error =
679            UserFacingError::new(everruns_core::user_facing_error_codes::BLOCKED_BY_HOOK);
680        let shown = user_message.unwrap_or(reason);
681        let mut error_message = Message::assistant(shown);
682        let mut metadata = std::collections::HashMap::new();
683        user_error.apply_to_message_metadata(&mut metadata);
684        error_message.metadata = Some(metadata);
685
686        self.emit_event(EventRequest::new(
687            self.session_id,
688            EventContext::turn(turn_id, input_message_id),
689            OutputMessageCompletedData::new(error_message).with_user_facing_error(&user_error),
690        ))
691        .await;
692
693        self.turn_failed(turn_id, input_message_id, reason, Some(&user_error))
694            .await;
695    }
696
697    pub async fn turn_failed(
698        &self,
699        turn_id: TurnId,
700        input_message_id: MessageId,
701        error: &str,
702        user_error: Option<&UserFacingError>,
703    ) {
704        self.turn_failed_with_disclosure(turn_id, input_message_id, error, user_error, None)
705            .await;
706    }
707
708    /// `turn_failed` with the applied error-disclosure mode recorded on the
709    /// event. `user_error` (and the `error` text shown alongside it) must
710    /// already be disclosure-filtered by the caller.
711    pub async fn turn_failed_with_disclosure(
712        &self,
713        turn_id: TurnId,
714        input_message_id: MessageId,
715        error: &str,
716        user_error: Option<&UserFacingError>,
717        disclosure: Option<ErrorDisclosure>,
718    ) {
719        self.set_session_status(SessionStatus::Idle, "turn_failed")
720            .await;
721
722        self.emit_event(EventRequest::new(
723            self.session_id,
724            EventContext::turn(turn_id, input_message_id),
725            {
726                let mut data = TurnFailedData {
727                    turn_id,
728                    error: error.to_string(),
729                    error_code: None,
730                    error_fields: None,
731                    error_disclosure: disclosure.map(|mode| mode.as_str().to_string()),
732                };
733                if let Some(user_error) = user_error {
734                    user_error.apply_to_event_fields(&mut data.error_code, &mut data.error_fields);
735                }
736                data
737            },
738        ))
739        .await;
740
741        self.emit_event(EventRequest::new(
742            self.session_id,
743            EventContext::turn(turn_id, input_message_id),
744            SessionIdledData {
745                turn_id,
746                iterations: None,
747                usage: None,
748            },
749        ))
750        .await;
751    }
752
753    pub async fn waiting_for_tool_results(&self) {
754        self.set_session_status(
755            SessionStatus::WaitingForToolResults,
756            "waiting_for_tool_results",
757        )
758        .await;
759    }
760
761    pub async fn dependency_blocked(
762        &self,
763        turn_id: TurnId,
764        input_message_id: MessageId,
765        blocker: DependencyBlocker,
766    ) {
767        let user_error = UserFacingError::new(blocker.error_code())
768            .with_field(
769                "dependency",
770                match blocker {
771                    DependencyBlocker::HarnessArchived | DependencyBlocker::HarnessDeleted => {
772                        "harness"
773                    }
774                    DependencyBlocker::AgentArchived | DependencyBlocker::AgentDeleted => "agent",
775                },
776            )
777            .with_field(
778                "state",
779                match blocker {
780                    DependencyBlocker::HarnessArchived | DependencyBlocker::AgentArchived => {
781                        "archived"
782                    }
783                    DependencyBlocker::HarnessDeleted | DependencyBlocker::AgentDeleted => {
784                        "deleted"
785                    }
786                },
787            );
788        let mut error_message = Message::assistant(blocker.message());
789        let mut metadata = std::collections::HashMap::new();
790        user_error.apply_to_message_metadata(&mut metadata);
791        error_message.metadata = Some(metadata);
792
793        self.emit_event(EventRequest::new(
794            self.session_id,
795            EventContext::turn(turn_id, input_message_id),
796            OutputMessageCompletedData::new(error_message).with_user_facing_error(&user_error),
797        ))
798        .await;
799
800        self.turn_failed(
801            turn_id,
802            input_message_id,
803            blocker.message(),
804            Some(&user_error),
805        )
806        .await;
807    }
808}
809
810pub async fn detect_dependency_blocker<A: RuntimeHostAdapter>(
811    adapter: &A,
812    org_id: i64,
813    harness_id: HarnessId,
814    agent_id: Option<AgentId>,
815) -> everruns_core::error::Result<Option<DependencyBlocker>> {
816    let harness_store = adapter.harness_store(org_id);
817    let agent_store = adapter.agent_store(org_id);
818    everruns_core::detect_dependency_blocker(
819        harness_store.as_ref(),
820        agent_store.as_ref(),
821        harness_id,
822        agent_id,
823    )
824    .await
825}
826
827pub async fn execute_input_activity<A: RuntimeHostAdapter>(
828    adapter: &A,
829    org_id: i64,
830    input: InputAtomInput,
831) -> everruns_core::error::Result<InputAtomResult> {
832    RuntimeSessionLifecycle::new(adapter.clone(), org_id, input.context.session_id)
833        .turn_started(input.context.turn_id, input.context.input_message_id)
834        .await;
835
836    let atom = InputAtom::new(adapter.message_store());
837    atom.execute(input).await
838}
839
840/// Collect `user_prompt_submit` hooks for this turn and run them against the
841/// inbound user message text. Returns `None` when the session has no such
842/// hooks (the common case — no overhead beyond the spec collection, which is
843/// skipped early). Errors loading specs are logged and treated as "no hooks"
844/// so a hook-collection failure never blocks a turn that wasn't asking to be
845/// hooked.
846struct UserPromptHookResult {
847    decision: everruns_core::lifecycle_hooks::UserPromptDecision,
848    original_message: String,
849}
850
851async fn run_user_prompt_submit_for_turn<A: RuntimeHostAdapter>(
852    adapter: &A,
853    org_id: i64,
854    input: &ReasonInput,
855) -> everruns_core::error::Result<Option<UserPromptHookResult>> {
856    let (specs, dispatcher) = match collect_lifecycle_hook_specs(
857        adapter,
858        org_id,
859        input.context.session_id,
860        input.harness_id,
861        input.agent_id,
862    )
863    .await
864    {
865        Ok(pair) => pair,
866        Err(error) => {
867            warn!(
868                session_id = %input.context.session_id,
869                %error,
870                "failed to collect user_prompt_submit hook specs; continuing without them"
871            );
872            return Ok(None);
873        }
874    };
875    let hooks = everruns_core::lifecycle_hooks::build_turn_lifecycle_hooks(
876        &specs,
877        everruns_core::user_hook_types::HookEvent::UserPromptSubmit,
878        dispatcher,
879    );
880    if hooks.is_empty() {
881        return Ok(None);
882    }
883
884    let message_text = adapter
885        .message_store()
886        .get(input.context.session_id, input.context.input_message_id)
887        .await
888        .ok()
889        .flatten()
890        .map(|m| m.content_to_llm_string())
891        .unwrap_or_default();
892
893    let ctx = everruns_core::lifecycle_hooks::TurnHookContext {
894        session_id: input.context.session_id,
895        turn_id: Some(input.context.turn_id),
896        org_id: org_public_id_from_internal(org_id).parse().ok(),
897        agent_id: input.agent_id.map(|a| a.to_string()),
898    };
899    let original_message = message_text.clone();
900    let decision =
901        everruns_core::lifecycle_hooks::run_user_prompt_submit_hooks(&hooks, &ctx, message_text)
902            .await;
903    Ok(Some(UserPromptHookResult {
904        decision,
905        original_message,
906    }))
907}
908
909pub async fn execute_reason_activity<A: RuntimeHostAdapter>(
910    adapter: &A,
911    org_id: i64,
912    input: ReasonInput,
913) -> everruns_core::error::Result<ReasonResult> {
914    if let Some(blocker) =
915        detect_dependency_blocker(adapter, org_id, input.harness_id, input.agent_id).await?
916    {
917        RuntimeSessionLifecycle::new(adapter.clone(), org_id, input.context.session_id)
918            .dependency_blocked(
919                input.context.turn_id,
920                input.context.input_message_id,
921                blocker,
922            )
923            .await;
924        return Ok(ReasonResult {
925            success: false,
926            text: blocker.message().to_string(),
927            tool_calls: vec![],
928            has_tool_calls: false,
929            tool_definitions: vec![],
930            max_iterations: everruns_core::runtime_agent::default_max_iterations(),
931            error: Some("dependency_unavailable".to_string()),
932            user_facing_error: None,
933            error_disclosure: None,
934            usage: None,
935            output_message_id: None,
936            time_to_first_token_ms: None,
937            response_id: None,
938            locale: None,
939            network_access: None,
940        });
941    }
942
943    // user_prompt_submit hook (see `specs/user-hooks.md`). Fires once per turn,
944    // on the first reason iteration, before the LLM is consulted — the closest
945    // choke point to "inbound user message accepted, before reason" that both
946    // the in-process loop and the durable worker share. A `Block` aborts the
947    // turn by reusing the same failure path as `dependency_blocked`: emit a
948    // user-facing message + turn.failed, idle the session, and return a
949    // non-success `ReasonResult` so no LLM/act work runs.
950    let mut user_prompt_message_override = None;
951    if input.iteration <= 1
952        && let Some(hook_result) = run_user_prompt_submit_for_turn(adapter, org_id, &input).await?
953    {
954        match hook_result.decision {
955            everruns_core::lifecycle_hooks::UserPromptDecision::Block {
956                reason,
957                user_message,
958            } => {
959                RuntimeSessionLifecycle::new(adapter.clone(), org_id, input.context.session_id)
960                    .user_prompt_blocked(
961                        input.context.turn_id,
962                        input.context.input_message_id,
963                        &reason,
964                        user_message.as_deref(),
965                    )
966                    .await;
967                return Ok(ReasonResult {
968                    success: false,
969                    text: user_message.unwrap_or_else(|| reason.clone()),
970                    tool_calls: vec![],
971                    has_tool_calls: false,
972                    tool_definitions: vec![],
973                    max_iterations: everruns_core::runtime_agent::default_max_iterations(),
974                    error: Some("blocked_by_user_prompt_hook".to_string()),
975                    user_facing_error: None,
976                    error_disclosure: None,
977                    usage: None,
978                    output_message_id: None,
979                    time_to_first_token_ms: None,
980                    response_id: None,
981                    locale: None,
982                    network_access: None,
983                });
984            }
985            everruns_core::lifecycle_hooks::UserPromptDecision::Continue { message } => {
986                if message != hook_result.original_message {
987                    user_prompt_message_override = Some(message);
988                }
989            }
990        }
991    }
992
993    let turn_context = adapter
994        .load_turn_context(org_id, input.context.session_id)
995        .await?;
996
997    let mut atom = ReasonAtom::new(
998        adapter.harness_store(org_id),
999        adapter.agent_store(org_id),
1000        adapter.session_store(org_id),
1001        adapter.message_store(),
1002        adapter.provider_store(org_id),
1003        adapter.capability_registry(),
1004        adapter.driver_registry(),
1005        adapter.event_emitter(),
1006    )
1007    .with_file_store(adapter.file_store());
1008    if let Some(image_resolver) = adapter.image_resolver(org_id) {
1009        atom = atom.with_image_resolver(image_resolver);
1010    }
1011    if let Some(hb) = adapter.stream_heartbeater() {
1012        atom = atom.with_stream_heartbeater(hb);
1013    }
1014    if let Some(timeout) = adapter.provider_stall_timeout() {
1015        atom = atom.with_provider_stall_timeout(timeout);
1016    }
1017    if let Some(store) = adapter.partial_stream_store() {
1018        atom = atom.with_partial_stream_store(store);
1019    }
1020    if let Some(store) = adapter.durable_tool_result_store() {
1021        atom = atom.with_durable_tool_result_store(store);
1022    }
1023
1024    let input = ReasonInput {
1025        mcp_tool_definitions: turn_context.mcp_tool_definitions,
1026        ..input
1027    };
1028
1029    if let Some(message_override) = user_prompt_message_override {
1030        let mut assembled = assemble_turn_context(
1031            adapter.harness_store(org_id).as_ref(),
1032            adapter.agent_store(org_id).as_ref(),
1033            adapter.session_store(org_id).as_ref(),
1034            adapter.message_store().as_ref(),
1035            adapter.provider_store(org_id).as_ref(),
1036            &adapter.capability_registry(),
1037            input.context.session_id,
1038            input.harness_id,
1039            input.agent_id,
1040            &input.mcp_tool_definitions,
1041            Some(adapter.file_store()),
1042        )
1043        .await?;
1044
1045        let message = assembled
1046            .messages
1047            .iter_mut()
1048            .find(|message| message.id == input.context.input_message_id)
1049            .ok_or_else(|| {
1050                everruns_core::error::AgentLoopError::config(
1051                    "user_prompt_submit mutation: input message not found in assembled context",
1052                )
1053            })?;
1054
1055        // user_prompt_submit mutations are enforcement controls for the
1056        // provider-bound prompt. Apply them to the assembled context only
1057        // so persisted user history remains an audit record of the input.
1058        // Preserve non-text parts (images, files); replace only text parts.
1059        message
1060            .content
1061            .retain(|part| !matches!(part, ContentPart::Text(_)));
1062        message
1063            .content
1064            .insert(0, ContentPart::text(message_override));
1065
1066        return atom.execute_with_assembled_context(input, assembled).await;
1067    }
1068
1069    atom.execute(input).await
1070}
1071
1072pub async fn execute_act_activity<A: RuntimeHostAdapter>(
1073    adapter: &A,
1074    input: ActInput,
1075) -> everruns_core::error::Result<ActResult> {
1076    let org_id = input.org_id.ok_or_else(|| {
1077        everruns_core::error::AgentLoopError::config(
1078            "ActInput.org_id must be set for runtime host execution",
1079        )
1080    })?;
1081
1082    if let Some(blocker) =
1083        detect_dependency_blocker(adapter, org_id, input.harness_id, input.agent_id).await?
1084    {
1085        RuntimeSessionLifecycle::new(adapter.clone(), org_id, input.context.session_id)
1086            .dependency_blocked(
1087                input.context.turn_id,
1088                input.context.input_message_id,
1089                blocker,
1090            )
1091            .await;
1092        return Ok(ActResult {
1093            results: vec![],
1094            completed: true,
1095            success_count: 0,
1096            error_count: 1,
1097            waiting_for_tool_results: false,
1098            blocked: true,
1099            client_tool_calls: vec![],
1100            client_tool_definitions: vec![],
1101        });
1102    }
1103
1104    let execution_capabilities = load_execution_capabilities(
1105        adapter,
1106        org_id,
1107        input.context.session_id,
1108        input.harness_id,
1109        input.agent_id,
1110        input.locale.clone(),
1111        input.blueprint_id.as_deref(),
1112    )
1113    .await?;
1114    let mut tool_registry = execution_capabilities.tool_registry;
1115
1116    // Register the session's MCP tools as first-class registry tools, so they
1117    // execute through the regular `ToolExecutor` path and are visible to
1118    // everything that introspects the registry (spawn_background, tool_search,
1119    // openai_tool_search namespaces, ...). The turn's tool definitions already
1120    // include the discovered MCP tools, so no re-discovery is needed; the host's
1121    // MCP executor supplies execution (specs/runtime-mcp.md D5).
1122    if let Some(mcp) = adapter.mcp_executor(org_id, input.context.session_id).await {
1123        let invoker: Arc<dyn everruns_core::McpToolInvoker> = mcp;
1124        for tool in everruns_core::build_mcp_proxy_tools(&input.tool_definitions, invoker) {
1125            tool_registry.register_boxed(tool);
1126        }
1127    }
1128
1129    let builtin_tool_registry = Arc::new(tool_registry.clone());
1130    let executor: Arc<dyn everruns_core::traits::ToolExecutor> = Arc::new(tool_registry);
1131
1132    let mut atom =
1133        ActAtom::with_file_store(executor, adapter.event_emitter(), adapter.file_store())
1134            .with_session_store(adapter.session_store(org_id))
1135            .with_session_mutator(adapter.session_mutator(org_id))
1136            .with_agent_store(adapter.agent_store(org_id))
1137            .with_tool_registry(builtin_tool_registry)
1138            .with_org_id(
1139                org_public_id_from_internal(org_id)
1140                    .parse()
1141                    .expect("internal org id converts to valid public org id"),
1142            )
1143            .with_capability_registry(adapter.capability_registry())
1144            .with_post_tool_hooks(execution_capabilities.post_tool_hooks)
1145            .with_pre_tool_hooks(execution_capabilities.pre_tool_hooks)
1146            .with_tool_call_hooks(execution_capabilities.tool_call_hooks);
1147
1148    if let Some(storage_store) = adapter.storage_store() {
1149        atom = atom.with_storage_store(storage_store);
1150    }
1151    if let Some(image_store) = adapter.image_artifact_store(org_id) {
1152        atom = atom.with_image_store(image_store);
1153    }
1154    if let Some(provider_credential_store) = adapter.provider_credential_store(org_id) {
1155        atom = atom.with_provider_credential_store(provider_credential_store);
1156    }
1157    if let Some(utility_llm_service) = adapter.utility_llm_service() {
1158        atom = atom.with_utility_llm_service(utility_llm_service);
1159    }
1160    if let Some(egress_service) = adapter.egress_service() {
1161        atom = atom.with_egress_service(egress_service);
1162    }
1163    if let Some(connection_resolver) = adapter.connection_resolver() {
1164        atom = atom.with_connection_resolver(connection_resolver);
1165    }
1166    if let Some(sqldb_store) = adapter.sqldb_store() {
1167        atom = atom.with_sqldb_store(sqldb_store);
1168    }
1169    if let Some(leased_resource_store) = adapter.leased_resource_store() {
1170        atom = atom.with_leased_resource_store(leased_resource_store);
1171    }
1172    if let Some(registry) = adapter.session_resource_registry() {
1173        atom = atom.with_session_resource_registry(registry);
1174    }
1175    if let Some(registry) = adapter.session_task_registry() {
1176        atom = atom.with_session_task_registry(registry);
1177    }
1178    if let Some(schedule_store) = adapter.schedule_store(org_id) {
1179        atom = atom.with_schedule_store(schedule_store);
1180    }
1181    if let Some(platform_store) = adapter.platform_store(org_id, input.context.session_id) {
1182        atom = atom.with_platform_store(platform_store);
1183    }
1184    if let Some(budget_checker) = adapter.budget_checker(org_id, input.agent_id) {
1185        atom = atom.with_budget_checker(budget_checker);
1186    }
1187    if let Some(payment_authority) = adapter.payment_authority(org_id, input.agent_id) {
1188        atom = atom.with_payment_authority(payment_authority);
1189    }
1190    if let Some(limiter) = adapter.outbound_tool_rate_limiter(org_id) {
1191        atom = atom.with_outbound_tool_rate_limiter(limiter);
1192    }
1193    if let Some(store) = adapter.durable_tool_result_store() {
1194        atom = atom.with_durable_tool_result_store(store);
1195    }
1196    if let Some(store) = adapter.subagent_spawn_store() {
1197        atom = atom.with_subagent_spawn_store(store);
1198    }
1199
1200    atom.execute(input).await
1201}