Skip to main content

everruns_runtime/
host.rs

1// Shared host orchestration for embedded and durable execution hosts.
2// Decision: everruns-runtime owns worker-facing turn phase execution so
3// durable/server-backed hosts reuse the same input/reason/act wiring.
4
5use async_trait::async_trait;
6use everruns_core::atoms::{
7    ActAtom, ActInput, ActResult, Atom, InputAtom, InputAtomInput, InputAtomResult, ReasonAtom,
8    ReasonInput, ReasonResult,
9};
10use everruns_core::capabilities::{SystemPromptContext, collect_capabilities_with_configs};
11use everruns_core::events::{
12    EventContext, EventRequest, OutputMessageCompletedData, SessionActivatedData, SessionIdledData,
13    TurnCompletedData, TurnFailedData, TurnStartedData,
14};
15use everruns_core::message::Message;
16use everruns_core::message_retriever::MessageRetriever;
17use everruns_core::platform_store::PlatformStore;
18use everruns_core::session::SessionStatus;
19use everruns_core::traits::{
20    AgentStore, BudgetChecker, EventEmitter, HarnessStore, ImageArtifactStore, ImageResolver,
21    LeasedResourceStore, LlmProviderStore, ModelWithProvider, PaymentAuthority,
22    ProviderCredentialStore, SessionFileSystem, SessionMutator, SessionResourceRegistry,
23    SessionScheduleStore, SessionSqlDbStoreRef, SessionStorageStore, SessionStore,
24    UserConnectionResolver,
25};
26use everruns_core::typed_id::{AgentId, HarnessId, MessageId, SessionId, TurnId};
27use everruns_core::{
28    Agent, CapabilityRegistry, CapabilityStatus, DependencyBlocker, DriverRegistry, EgressService,
29    Harness, Session, TokenUsage, ToolDefinition, ToolRegistry, UserFacingError, UtilityLlmService,
30    org_public_id_from_internal, resolve_runtime_capabilities,
31};
32use std::sync::Arc;
33use tracing::warn;
34
35/// Turn context loaded in one batched call for runtime host execution.
36#[derive(Debug, Clone)]
37pub struct RuntimeHostTurnContext {
38    pub agent: Option<Agent>,
39    pub session: Session,
40    pub messages: Vec<Message>,
41    pub model: Option<ModelWithProvider>,
42    pub mcp_tool_definitions: Vec<ToolDefinition>,
43}
44
45/// Public adapter contract for server-backed or durable runtime hosts.
46///
47/// `everruns-runtime` owns shared host orchestration for both embedded and
48/// durable execution. That includes phase execution (`input -> reason -> act`),
49/// lifecycle emission, and the generic turn-strategy decisions used by durable
50/// or custom hosts.
51///
52/// Host crates implement this trait to provide persistence, session-lifecycle
53/// plumbing, event delivery, and their own orchestration backend. The durable
54/// engine itself remains outside this crate.
55#[async_trait]
56pub trait RuntimeHostAdapter: Send + Sync + Clone + 'static {
57    async fn get_agent(
58        &self,
59        org_id: i64,
60        agent_id: AgentId,
61    ) -> everruns_core::error::Result<Option<Agent>>;
62
63    async fn get_harness(
64        &self,
65        org_id: i64,
66        harness_id: HarnessId,
67    ) -> everruns_core::error::Result<Option<Harness>>;
68
69    async fn set_session_status(
70        &self,
71        org_id: i64,
72        session_id: SessionId,
73        status: SessionStatus,
74    ) -> everruns_core::error::Result<Session>;
75
76    async fn load_turn_context(
77        &self,
78        org_id: i64,
79        session_id: SessionId,
80    ) -> everruns_core::error::Result<RuntimeHostTurnContext>;
81
82    fn capability_registry(&self) -> CapabilityRegistry;
83
84    fn driver_registry(&self) -> DriverRegistry;
85
86    fn harness_store(&self, org_id: i64) -> Arc<dyn HarnessStore>;
87
88    fn agent_store(&self, org_id: i64) -> Arc<dyn AgentStore>;
89
90    fn session_store(&self, org_id: i64) -> Arc<dyn SessionStore>;
91
92    fn session_mutator(&self, org_id: i64) -> Arc<dyn SessionMutator>;
93
94    fn provider_store(&self, org_id: i64) -> Arc<dyn LlmProviderStore>;
95
96    fn message_store(&self) -> Arc<dyn MessageRetriever>;
97
98    fn event_emitter(&self) -> Arc<dyn EventEmitter>;
99
100    fn file_store(&self) -> Arc<dyn SessionFileSystem>;
101
102    fn image_resolver(&self, _org_id: i64) -> Option<Arc<dyn ImageResolver>> {
103        None
104    }
105
106    fn image_artifact_store(&self, _org_id: i64) -> Option<Arc<dyn ImageArtifactStore>> {
107        None
108    }
109
110    fn provider_credential_store(&self, _org_id: i64) -> Option<Arc<dyn ProviderCredentialStore>> {
111        None
112    }
113
114    fn utility_llm_service(&self) -> Option<Arc<dyn UtilityLlmService>> {
115        None
116    }
117
118    fn egress_service(&self) -> Option<Arc<dyn EgressService>> {
119        None
120    }
121
122    fn storage_store(&self) -> Option<Arc<dyn SessionStorageStore>> {
123        None
124    }
125
126    fn memory_store(&self, _org_id: i64) -> Option<Arc<dyn everruns_core::MemoryStoreBackend>> {
127        None
128    }
129
130    fn connection_resolver(&self) -> Option<Arc<dyn UserConnectionResolver>> {
131        None
132    }
133
134    fn sqldb_store(&self) -> Option<SessionSqlDbStoreRef> {
135        None
136    }
137
138    fn leased_resource_store(&self) -> Option<Arc<dyn LeasedResourceStore>> {
139        None
140    }
141
142    fn session_resource_registry(&self) -> Option<Arc<dyn SessionResourceRegistry>> {
143        None
144    }
145
146    fn schedule_store(&self, _org_id: i64) -> Option<Arc<dyn SessionScheduleStore>> {
147        None
148    }
149
150    fn platform_store(
151        &self,
152        _org_id: i64,
153        _session_id: SessionId,
154    ) -> Option<Arc<dyn PlatformStore>> {
155        None
156    }
157
158    fn budget_checker(
159        &self,
160        _org_id: i64,
161        _agent_id: Option<AgentId>,
162    ) -> Option<Arc<dyn BudgetChecker>> {
163        None
164    }
165
166    fn payment_authority(
167        &self,
168        _org_id: i64,
169        _agent_id: Option<AgentId>,
170    ) -> Option<Arc<dyn PaymentAuthority>> {
171        None
172    }
173
174    /// Per-org outbound tool-call rate limiter (TM-TOOL-009).
175    /// Default: `None` (no rate limiting — suitable for in-process / test environments).
176    fn outbound_tool_rate_limiter(
177        &self,
178        _org_id: i64,
179    ) -> Option<Arc<dyn everruns_core::OutboundToolRateLimiter>> {
180        None
181    }
182
183    /// MCP executor routing `mcp_*` tool calls for this session, if the host
184    /// configures MCP (specs/runtime-mcp.md D4). Default: `None`, so hosts
185    /// without scoped MCP servers keep the plain tool registry unchanged.
186    async fn mcp_executor(
187        &self,
188        _org_id: i64,
189        _session_id: SessionId,
190    ) -> Option<Arc<everruns_mcp::McpExecutor>> {
191        None
192    }
193}
194
195struct RuntimeExecutionCapabilities {
196    tool_registry: ToolRegistry,
197    post_tool_hooks: Vec<Arc<dyn everruns_core::PostToolExecHook>>,
198    pre_tool_hooks: Vec<Arc<dyn everruns_core::atoms::PreToolUseHook>>,
199    tool_call_hooks: Vec<Arc<dyn everruns_core::ToolCallHook>>,
200}
201
202/// Collect and finalize user-hook specs for a session from its resolved
203/// capability configs, plus the shared bash dispatcher used to run them.
204///
205/// This is the single place hook specs are gathered so every firing point —
206/// the act path (`load_execution_capabilities`) and the lifecycle firing
207/// points (`execute_reason_activity` for `user_prompt_submit`, turn completion
208/// for `turn_end`, and the server session paths) — applies identical
209/// `finalize_hook_specs` semantics: `{capability_id}:` namespace stamping,
210/// stable default ids, and `disabled_contributions` muting (TM-HOOK-004).
211fn finalize_specs_from_configs(
212    resolved_capability_configs: &[everruns_core::capability_types::AgentCapabilityConfig],
213    capability_registry: &CapabilityRegistry,
214) -> Vec<everruns_core::user_hook_types::UserHookSpec> {
215    let mut hook_contributions: Vec<(String, Vec<everruns_core::user_hook_types::UserHookSpec>)> =
216        Vec::new();
217    let mut disabled_contributions: Vec<String> = Vec::new();
218    for config in resolved_capability_configs {
219        let Some(capability) = capability_registry.get(config.capability_id()) else {
220            continue;
221        };
222        let specs = capability.user_hooks_with_config(&config.config);
223        if !specs.is_empty() {
224            hook_contributions.push((config.capability_id().to_string(), specs));
225        }
226        if config.capability_id() == "user_hooks" {
227            disabled_contributions.extend(
228                everruns_core::capabilities::user_hooks::disabled_contributions(&config.config),
229            );
230        }
231    }
232    everruns_core::hook_adapter::finalize_hook_specs(hook_contributions, &disabled_contributions)
233}
234
235/// Resolve a session's capability configs and collect finalized hook specs.
236/// Used by the lifecycle firing points, which need specs outside the act path.
237/// Returns `(specs, dispatcher)`; `specs` is empty when the session has no
238/// hook-contributing capabilities.
239async fn collect_lifecycle_hook_specs<A: RuntimeHostAdapter>(
240    adapter: &A,
241    org_id: i64,
242    session_id: SessionId,
243    harness_id: HarnessId,
244    agent_id: Option<AgentId>,
245) -> everruns_core::error::Result<(
246    Vec<everruns_core::user_hook_types::UserHookSpec>,
247    Arc<dyn everruns_core::hook_executor::BashHookDispatcher>,
248)> {
249    let capability_registry = adapter.capability_registry();
250    let harness_chain = adapter
251        .harness_store(org_id)
252        .get_harness_chain(harness_id)
253        .await?;
254    if harness_chain.is_empty() {
255        return Err(everruns_core::error::AgentLoopError::harness_not_found(
256            harness_id,
257        ));
258    }
259    let session = adapter
260        .session_store(org_id)
261        .get_session(session_id)
262        .await?
263        .ok_or_else(|| everruns_core::error::AgentLoopError::session_not_found(session_id))?;
264    let agent = match agent_id {
265        Some(agent_id) => adapter.agent_store(org_id).get_agent(agent_id).await?,
266        None => None,
267    };
268    let resolved = resolve_runtime_capabilities(
269        &harness_chain,
270        agent.as_ref(),
271        &session,
272        &capability_registry,
273    );
274    let specs =
275        finalize_specs_from_configs(&resolved.resolved_capability_configs, &capability_registry);
276    let dispatcher: Arc<dyn everruns_core::hook_executor::BashHookDispatcher> = Arc::new(
277        everruns_core::hook_dispatch::VirtualBashHookDispatcher::new(adapter.file_store()),
278    );
279    Ok((specs, dispatcher))
280}
281
282async fn load_execution_capabilities<A: RuntimeHostAdapter>(
283    adapter: &A,
284    org_id: i64,
285    session_id: SessionId,
286    harness_id: HarnessId,
287    agent_id: Option<AgentId>,
288    locale: Option<String>,
289    blueprint_id: Option<&str>,
290) -> everruns_core::error::Result<RuntimeExecutionCapabilities> {
291    let capability_registry = adapter.capability_registry();
292    if let Some(blueprint_id) = blueprint_id {
293        let mut registry = ToolRegistry::with_defaults();
294        let blueprint = capability_registry.blueprint(blueprint_id).ok_or_else(|| {
295            everruns_core::error::AgentLoopError::config(format!(
296                "Blueprint \"{blueprint_id}\" not found in registry"
297            ))
298        })?;
299        for tool in blueprint.tools {
300            registry.register_boxed(tool);
301        }
302        return Ok(RuntimeExecutionCapabilities {
303            tool_registry: registry,
304            post_tool_hooks: Vec::new(),
305            pre_tool_hooks: Vec::new(),
306            tool_call_hooks: Vec::new(),
307        });
308    }
309
310    let harness_chain = adapter
311        .harness_store(org_id)
312        .get_harness_chain(harness_id)
313        .await?;
314    if harness_chain.is_empty() {
315        return Err(everruns_core::error::AgentLoopError::harness_not_found(
316            harness_id,
317        ));
318    }
319
320    let session = adapter
321        .session_store(org_id)
322        .get_session(session_id)
323        .await?
324        .ok_or_else(|| everruns_core::error::AgentLoopError::session_not_found(session_id))?;
325
326    let agent_store = adapter.agent_store(org_id);
327    let agent = match agent_id {
328        Some(agent_id) => Some(
329            agent_store
330                .get_agent(agent_id)
331                .await?
332                .ok_or_else(|| everruns_core::error::AgentLoopError::agent_not_found(agent_id))?,
333        ),
334        None => None,
335    };
336
337    let resolved = resolve_runtime_capabilities(
338        &harness_chain,
339        agent.as_ref(),
340        &session,
341        &capability_registry,
342    );
343    // Executor (act) path: this builds the worker-side tool registry, not the
344    // model-visible tool list. The model is left unset, so a model-adaptive
345    // capability like `auto_tool_search` resolves to its provider-agnostic
346    // client-side mechanism here. That registers the `tool_search` tool in the
347    // executor, which is a harmless superset: on native models the reason path
348    // never shows that tool to the model, so it is simply never called.
349    let prompt_ctx = SystemPromptContext {
350        session_id,
351        locale: locale.or(session.locale.clone()),
352        file_store: Some(adapter.file_store()),
353        model: None,
354    };
355    let collected = collect_capabilities_with_configs(
356        &resolved.resolved_capability_configs,
357        &capability_registry,
358        &prompt_ctx,
359    )
360    .await;
361
362    let mut registry = ToolRegistry::with_defaults();
363    for tool in collected.tools {
364        registry.register_boxed(tool);
365    }
366
367    // Only `Available` capabilities contribute hooks, matching
368    // `collect_capabilities_with_configs` (which skips non-available
369    // capabilities). This keeps a `ComingSoon`/unavailable capability from
370    // affecting execution via any of its hook seams.
371    let mut post_tool_hooks: Vec<Arc<dyn everruns_core::PostToolExecHook>> = resolved
372        .resolved_capability_configs
373        .iter()
374        .flat_map(|config| {
375            capability_registry
376                .get(config.capability_id())
377                .filter(|capability| capability.status() == CapabilityStatus::Available)
378                .map(|capability| capability.post_tool_exec_hooks())
379                .unwrap_or_default()
380        })
381        .collect();
382
383    // User-hook contributions (see `specs/user-hooks.md`). `finalize_specs_from_configs`
384    // gathers specs across every resolved capability — both the user-facing
385    // `user_hooks` capability and any capability that bundles hooks — and applies
386    // `finalize_hook_specs` (namespace stamping, stable ids, `disabled_contributions`
387    // muting; TM-HOOK-004). The same helper backs the lifecycle firing points so
388    // every event finalizes specs identically.
389    let user_hook_specs =
390        finalize_specs_from_configs(&resolved.resolved_capability_configs, &capability_registry);
391    // Capability-contributed pre-tool hooks run first (e.g. approval gating),
392    // then user-hook (`PreToolUse`) specs. The first hook to block wins.
393    let mut pre_tool_hooks: Vec<Arc<dyn everruns_core::atoms::PreToolUseHook>> = resolved
394        .resolved_capability_configs
395        .iter()
396        .flat_map(|config| {
397            capability_registry
398                .get(config.capability_id())
399                .filter(|capability| capability.status() == CapabilityStatus::Available)
400                .map(|capability| capability.pre_tool_use_hooks())
401                .unwrap_or_default()
402        })
403        .collect();
404    if !user_hook_specs.is_empty() {
405        let dispatcher: Arc<dyn everruns_core::hook_executor::BashHookDispatcher> = Arc::new(
406            everruns_core::hook_dispatch::VirtualBashHookDispatcher::new(adapter.file_store()),
407        );
408        post_tool_hooks.extend(everruns_core::hook_adapter::build_post_tool_use_hooks(
409            &user_hook_specs,
410            dispatcher.clone(),
411        ));
412        pre_tool_hooks.extend(everruns_core::hook_adapter::build_pre_tool_use_hooks(
413            &user_hook_specs,
414            dispatcher,
415        ));
416    }
417
418    let tool_call_hooks = resolved
419        .resolved_capability_configs
420        .iter()
421        .flat_map(|config| {
422            capability_registry
423                .get(config.capability_id())
424                .filter(|capability| capability.status() == CapabilityStatus::Available)
425                .map(|capability| capability.tool_call_hooks())
426                .unwrap_or_default()
427        })
428        .collect();
429
430    Ok(RuntimeExecutionCapabilities {
431        tool_registry: registry,
432        post_tool_hooks,
433        pre_tool_hooks,
434        tool_call_hooks,
435    })
436}
437
438/// Shared lifecycle helper for runtime-backed hosts.
439pub struct RuntimeSessionLifecycle<A: RuntimeHostAdapter> {
440    adapter: A,
441    org_id: i64,
442    session_id: SessionId,
443}
444
445impl<A: RuntimeHostAdapter> RuntimeSessionLifecycle<A> {
446    pub fn new(adapter: A, org_id: i64, session_id: SessionId) -> Self {
447        Self {
448            adapter,
449            org_id,
450            session_id,
451        }
452    }
453
454    async fn set_session_status(&self, status: SessionStatus, action: &'static str) {
455        if let Err(error) = self
456            .adapter
457            .set_session_status(self.org_id, self.session_id, status)
458            .await
459        {
460            warn!(
461                session_id = %self.session_id,
462                org_id = self.org_id,
463                action,
464                %error,
465                "runtime host lifecycle status update failed"
466            );
467        }
468    }
469
470    async fn emit_event(&self, request: EventRequest) {
471        let event_type = request.event_type.clone();
472        if let Err(error) = self.adapter.event_emitter().emit(request).await {
473            warn!(
474                session_id = %self.session_id,
475                org_id = self.org_id,
476                event_type,
477                %error,
478                "runtime host lifecycle event emission failed"
479            );
480        }
481    }
482
483    pub async fn turn_started(&self, turn_id: TurnId, input_message_id: MessageId) {
484        let input_content = self
485            .adapter
486            .message_store()
487            .get(self.session_id, input_message_id)
488            .await
489            .ok()
490            .flatten()
491            .map(|message| message.content_to_llm_string());
492
493        self.set_session_status(SessionStatus::Active, "turn_started")
494            .await;
495
496        self.emit_event(EventRequest::new(
497            self.session_id,
498            EventContext::turn(turn_id, input_message_id),
499            SessionActivatedData {
500                turn_id,
501                input_message_id,
502            },
503        ))
504        .await;
505
506        self.emit_event(EventRequest::new(
507            self.session_id,
508            EventContext::turn(turn_id, input_message_id),
509            TurnStartedData {
510                turn_id,
511                input_message_id,
512                input_content,
513            },
514        ))
515        .await;
516    }
517
518    pub async fn emit_turn_completed(&self, input_message_id: MessageId, data: TurnCompletedData) {
519        let turn_id = data.turn_id;
520        self.emit_event(EventRequest::new(
521            self.session_id,
522            EventContext::turn(turn_id, input_message_id),
523            data,
524        ))
525        .await;
526    }
527
528    pub async fn emit_session_idled(
529        &self,
530        turn_id: TurnId,
531        input_message_id: MessageId,
532        iterations: Option<u32>,
533        usage: Option<TokenUsage>,
534    ) {
535        self.set_session_status(SessionStatus::Idle, "emit_session_idled")
536            .await;
537
538        self.emit_event(EventRequest::new(
539            self.session_id,
540            EventContext::turn(turn_id, input_message_id),
541            SessionIdledData {
542                turn_id,
543                iterations,
544                usage,
545            },
546        ))
547        .await;
548    }
549
550    pub async fn turn_completed(
551        &self,
552        turn_id: TurnId,
553        input_message_id: MessageId,
554        iterations: u32,
555        usage: Option<TokenUsage>,
556        input_content: Option<String>,
557    ) {
558        self.emit_turn_completed(
559            input_message_id,
560            TurnCompletedData {
561                turn_id,
562                iterations,
563                duration_ms: None,
564                usage: usage.clone(),
565                input_content,
566                final_message_id: None,
567                final_answer_preview: None,
568                time_to_first_token_ms: None,
569                tool_call_count: None,
570                llm_call_count: None,
571                status: Some("completed".to_string()),
572            },
573        )
574        .await;
575        self.emit_session_idled(turn_id, input_message_id, Some(iterations), usage)
576            .await;
577    }
578
579    /// Fire `turn_end` lifecycle hooks (advisory). Collects the session's hook
580    /// specs and runs every `turn_end` hook; failures are logged, never fatal.
581    /// `harness_id`/`agent_id` are required to resolve the capability chain.
582    pub async fn fire_turn_end_hooks(
583        &self,
584        harness_id: HarnessId,
585        agent_id: Option<AgentId>,
586        turn_id: TurnId,
587        success: bool,
588    ) {
589        let (specs, dispatcher) = match collect_lifecycle_hook_specs(
590            &self.adapter,
591            self.org_id,
592            self.session_id,
593            harness_id,
594            agent_id,
595        )
596        .await
597        {
598            Ok(pair) => pair,
599            Err(error) => {
600                warn!(
601                    session_id = %self.session_id,
602                    %error,
603                    "failed to collect turn_end hook specs; skipping"
604                );
605                return;
606            }
607        };
608        let hooks = everruns_core::lifecycle_hooks::build_turn_lifecycle_hooks(
609            &specs,
610            everruns_core::user_hook_types::HookEvent::TurnEnd,
611            dispatcher,
612        );
613        if hooks.is_empty() {
614            return;
615        }
616        let ctx = everruns_core::lifecycle_hooks::TurnHookContext {
617            session_id: self.session_id,
618            turn_id: Some(turn_id),
619            org_id: org_public_id_from_internal(self.org_id).parse().ok(),
620            agent_id: agent_id.map(|a| a.to_string()),
621        };
622        everruns_core::lifecycle_hooks::run_turn_end_hooks(
623            &hooks,
624            &ctx,
625            serde_json::json!({ "success": success }),
626        )
627        .await;
628    }
629
630    /// Abort a turn because a `user_prompt_submit` hook returned `Block`.
631    /// Reuses the dependency-blocked failure shape: emit a user-facing message
632    /// carrying the hook's `user_message` (or `reason`), then mark the turn
633    /// failed and idle the session.
634    pub async fn user_prompt_blocked(
635        &self,
636        turn_id: TurnId,
637        input_message_id: MessageId,
638        reason: &str,
639        user_message: Option<&str>,
640    ) {
641        let user_error =
642            UserFacingError::new(everruns_core::user_facing_error_codes::BLOCKED_BY_HOOK);
643        let shown = user_message.unwrap_or(reason);
644        let mut error_message = Message::assistant(shown);
645        let mut metadata = std::collections::HashMap::new();
646        user_error.apply_to_message_metadata(&mut metadata);
647        error_message.metadata = Some(metadata);
648
649        self.emit_event(EventRequest::new(
650            self.session_id,
651            EventContext::turn(turn_id, input_message_id),
652            OutputMessageCompletedData::new(error_message).with_user_facing_error(&user_error),
653        ))
654        .await;
655
656        self.turn_failed(turn_id, input_message_id, reason, Some(&user_error))
657            .await;
658    }
659
660    pub async fn turn_failed(
661        &self,
662        turn_id: TurnId,
663        input_message_id: MessageId,
664        error: &str,
665        user_error: Option<&UserFacingError>,
666    ) {
667        self.set_session_status(SessionStatus::Idle, "turn_failed")
668            .await;
669
670        self.emit_event(EventRequest::new(
671            self.session_id,
672            EventContext::turn(turn_id, input_message_id),
673            {
674                let mut data = TurnFailedData {
675                    turn_id,
676                    error: error.to_string(),
677                    error_code: None,
678                    error_fields: None,
679                };
680                if let Some(user_error) = user_error {
681                    user_error.apply_to_event_fields(&mut data.error_code, &mut data.error_fields);
682                }
683                data
684            },
685        ))
686        .await;
687
688        self.emit_event(EventRequest::new(
689            self.session_id,
690            EventContext::turn(turn_id, input_message_id),
691            SessionIdledData {
692                turn_id,
693                iterations: None,
694                usage: None,
695            },
696        ))
697        .await;
698    }
699
700    pub async fn waiting_for_tool_results(&self) {
701        self.set_session_status(
702            SessionStatus::WaitingForToolResults,
703            "waiting_for_tool_results",
704        )
705        .await;
706    }
707
708    pub async fn dependency_blocked(
709        &self,
710        turn_id: TurnId,
711        input_message_id: MessageId,
712        blocker: DependencyBlocker,
713    ) {
714        let user_error = UserFacingError::new(blocker.error_code())
715            .with_field(
716                "dependency",
717                match blocker {
718                    DependencyBlocker::HarnessArchived | DependencyBlocker::HarnessDeleted => {
719                        "harness"
720                    }
721                    DependencyBlocker::AgentArchived | DependencyBlocker::AgentDeleted => "agent",
722                },
723            )
724            .with_field(
725                "state",
726                match blocker {
727                    DependencyBlocker::HarnessArchived | DependencyBlocker::AgentArchived => {
728                        "archived"
729                    }
730                    DependencyBlocker::HarnessDeleted | DependencyBlocker::AgentDeleted => {
731                        "deleted"
732                    }
733                },
734            );
735        let mut error_message = Message::assistant(blocker.message());
736        let mut metadata = std::collections::HashMap::new();
737        user_error.apply_to_message_metadata(&mut metadata);
738        error_message.metadata = Some(metadata);
739
740        self.emit_event(EventRequest::new(
741            self.session_id,
742            EventContext::turn(turn_id, input_message_id),
743            OutputMessageCompletedData::new(error_message).with_user_facing_error(&user_error),
744        ))
745        .await;
746
747        self.turn_failed(
748            turn_id,
749            input_message_id,
750            blocker.message(),
751            Some(&user_error),
752        )
753        .await;
754    }
755}
756
757pub async fn detect_dependency_blocker<A: RuntimeHostAdapter>(
758    adapter: &A,
759    org_id: i64,
760    harness_id: HarnessId,
761    agent_id: Option<AgentId>,
762) -> everruns_core::error::Result<Option<DependencyBlocker>> {
763    let harness_store = adapter.harness_store(org_id);
764    let agent_store = adapter.agent_store(org_id);
765    everruns_core::detect_dependency_blocker(
766        harness_store.as_ref(),
767        agent_store.as_ref(),
768        harness_id,
769        agent_id,
770    )
771    .await
772}
773
774pub async fn execute_input_activity<A: RuntimeHostAdapter>(
775    adapter: &A,
776    org_id: i64,
777    input: InputAtomInput,
778) -> everruns_core::error::Result<InputAtomResult> {
779    RuntimeSessionLifecycle::new(adapter.clone(), org_id, input.context.session_id)
780        .turn_started(input.context.turn_id, input.context.input_message_id)
781        .await;
782
783    let atom = InputAtom::new(adapter.message_store());
784    atom.execute(input).await
785}
786
787/// Collect `user_prompt_submit` hooks for this turn and run them against the
788/// inbound user message text. Returns `None` when the session has no such
789/// hooks (the common case — no overhead beyond the spec collection, which is
790/// skipped early). Errors loading specs are logged and treated as "no hooks"
791/// so a hook-collection failure never blocks a turn that wasn't asking to be
792/// hooked.
793async fn run_user_prompt_submit_for_turn<A: RuntimeHostAdapter>(
794    adapter: &A,
795    org_id: i64,
796    input: &ReasonInput,
797) -> everruns_core::error::Result<Option<everruns_core::lifecycle_hooks::UserPromptDecision>> {
798    let (specs, dispatcher) = match collect_lifecycle_hook_specs(
799        adapter,
800        org_id,
801        input.context.session_id,
802        input.harness_id,
803        input.agent_id,
804    )
805    .await
806    {
807        Ok(pair) => pair,
808        Err(error) => {
809            warn!(
810                session_id = %input.context.session_id,
811                %error,
812                "failed to collect user_prompt_submit hook specs; continuing without them"
813            );
814            return Ok(None);
815        }
816    };
817    let hooks = everruns_core::lifecycle_hooks::build_turn_lifecycle_hooks(
818        &specs,
819        everruns_core::user_hook_types::HookEvent::UserPromptSubmit,
820        dispatcher,
821    );
822    if hooks.is_empty() {
823        return Ok(None);
824    }
825
826    let message_text = adapter
827        .message_store()
828        .get(input.context.session_id, input.context.input_message_id)
829        .await
830        .ok()
831        .flatten()
832        .map(|m| m.content_to_llm_string())
833        .unwrap_or_default();
834
835    let ctx = everruns_core::lifecycle_hooks::TurnHookContext {
836        session_id: input.context.session_id,
837        turn_id: Some(input.context.turn_id),
838        org_id: org_public_id_from_internal(org_id).parse().ok(),
839        agent_id: input.agent_id.map(|a| a.to_string()),
840    };
841    Ok(Some(
842        everruns_core::lifecycle_hooks::run_user_prompt_submit_hooks(&hooks, &ctx, message_text)
843            .await,
844    ))
845}
846
847pub async fn execute_reason_activity<A: RuntimeHostAdapter>(
848    adapter: &A,
849    org_id: i64,
850    input: ReasonInput,
851) -> everruns_core::error::Result<ReasonResult> {
852    if let Some(blocker) =
853        detect_dependency_blocker(adapter, org_id, input.harness_id, input.agent_id).await?
854    {
855        RuntimeSessionLifecycle::new(adapter.clone(), org_id, input.context.session_id)
856            .dependency_blocked(
857                input.context.turn_id,
858                input.context.input_message_id,
859                blocker,
860            )
861            .await;
862        return Ok(ReasonResult {
863            success: false,
864            text: blocker.message().to_string(),
865            tool_calls: vec![],
866            has_tool_calls: false,
867            tool_definitions: vec![],
868            max_iterations: everruns_core::runtime_agent::default_max_iterations(),
869            error: Some("dependency_unavailable".to_string()),
870            usage: None,
871            output_message_id: None,
872            time_to_first_token_ms: None,
873            response_id: None,
874            locale: None,
875            network_access: None,
876        });
877    }
878
879    // user_prompt_submit hook (see `specs/user-hooks.md`). Fires once per turn,
880    // on the first reason iteration, before the LLM is consulted — the closest
881    // choke point to "inbound user message accepted, before reason" that both
882    // the in-process loop and the durable worker share. A `Block` aborts the
883    // turn by reusing the same failure path as `dependency_blocked`: emit a
884    // user-facing message + turn.failed, idle the session, and return a
885    // non-success `ReasonResult` so no LLM/act work runs.
886    if input.iteration <= 1
887        && let Some(everruns_core::lifecycle_hooks::UserPromptDecision::Block {
888            reason,
889            user_message,
890        }) = run_user_prompt_submit_for_turn(adapter, org_id, &input).await?
891    {
892        RuntimeSessionLifecycle::new(adapter.clone(), org_id, input.context.session_id)
893            .user_prompt_blocked(
894                input.context.turn_id,
895                input.context.input_message_id,
896                &reason,
897                user_message.as_deref(),
898            )
899            .await;
900        return Ok(ReasonResult {
901            success: false,
902            text: user_message.unwrap_or_else(|| reason.clone()),
903            tool_calls: vec![],
904            has_tool_calls: false,
905            tool_definitions: vec![],
906            max_iterations: everruns_core::runtime_agent::default_max_iterations(),
907            error: Some("blocked_by_user_prompt_hook".to_string()),
908            usage: None,
909            output_message_id: None,
910            time_to_first_token_ms: None,
911            response_id: None,
912            locale: None,
913            network_access: None,
914        });
915    }
916
917    let turn_context = adapter
918        .load_turn_context(org_id, input.context.session_id)
919        .await?;
920
921    let mut atom = ReasonAtom::new(
922        adapter.harness_store(org_id),
923        adapter.agent_store(org_id),
924        adapter.session_store(org_id),
925        adapter.message_store(),
926        adapter.provider_store(org_id),
927        adapter.capability_registry(),
928        adapter.driver_registry(),
929        adapter.event_emitter(),
930    )
931    .with_file_store(adapter.file_store());
932    if let Some(image_resolver) = adapter.image_resolver(org_id) {
933        atom = atom.with_image_resolver(image_resolver);
934    }
935
936    atom.execute(ReasonInput {
937        mcp_tool_definitions: turn_context.mcp_tool_definitions,
938        ..input
939    })
940    .await
941}
942
943pub async fn execute_act_activity<A: RuntimeHostAdapter>(
944    adapter: &A,
945    input: ActInput,
946) -> everruns_core::error::Result<ActResult> {
947    let org_id = input.org_id.ok_or_else(|| {
948        everruns_core::error::AgentLoopError::config(
949            "ActInput.org_id must be set for runtime host execution",
950        )
951    })?;
952
953    if let Some(blocker) =
954        detect_dependency_blocker(adapter, org_id, input.harness_id, input.agent_id).await?
955    {
956        RuntimeSessionLifecycle::new(adapter.clone(), org_id, input.context.session_id)
957            .dependency_blocked(
958                input.context.turn_id,
959                input.context.input_message_id,
960                blocker,
961            )
962            .await;
963        return Ok(ActResult {
964            results: vec![],
965            completed: true,
966            success_count: 0,
967            error_count: 1,
968            waiting_for_tool_results: false,
969            blocked: true,
970            client_tool_calls: vec![],
971            client_tool_definitions: vec![],
972        });
973    }
974
975    let execution_capabilities = load_execution_capabilities(
976        adapter,
977        org_id,
978        input.context.session_id,
979        input.harness_id,
980        input.agent_id,
981        input.locale.clone(),
982        input.blueprint_id.as_deref(),
983    )
984    .await?;
985    let mut tool_registry = execution_capabilities.tool_registry;
986
987    // Register the session's MCP tools as first-class registry tools, so they
988    // execute through the regular `ToolExecutor` path and are visible to
989    // everything that introspects the registry (spawn_background, tool_search,
990    // openai_tool_search namespaces, ...). The turn's tool definitions already
991    // include the discovered MCP tools, so no re-discovery is needed; the host's
992    // MCP executor supplies execution (specs/runtime-mcp.md D5).
993    if let Some(mcp) = adapter.mcp_executor(org_id, input.context.session_id).await {
994        let invoker: Arc<dyn everruns_core::McpToolInvoker> = mcp;
995        for tool in everruns_core::build_mcp_proxy_tools(&input.tool_definitions, invoker) {
996            tool_registry.register_boxed(tool);
997        }
998    }
999
1000    let builtin_tool_registry = Arc::new(tool_registry.clone());
1001    let executor: Arc<dyn everruns_core::traits::ToolExecutor> = Arc::new(tool_registry);
1002
1003    let mut atom =
1004        ActAtom::with_file_store(executor, adapter.event_emitter(), adapter.file_store())
1005            .with_session_store(adapter.session_store(org_id))
1006            .with_session_mutator(adapter.session_mutator(org_id))
1007            .with_agent_store(adapter.agent_store(org_id))
1008            .with_tool_registry(builtin_tool_registry)
1009            .with_org_id(
1010                org_public_id_from_internal(org_id)
1011                    .parse()
1012                    .expect("internal org id converts to valid public org id"),
1013            )
1014            .with_capability_registry(adapter.capability_registry())
1015            .with_post_tool_hooks(execution_capabilities.post_tool_hooks)
1016            .with_pre_tool_hooks(execution_capabilities.pre_tool_hooks)
1017            .with_tool_call_hooks(execution_capabilities.tool_call_hooks);
1018
1019    if let Some(storage_store) = adapter.storage_store() {
1020        atom = atom.with_storage_store(storage_store);
1021    }
1022    if let Some(image_store) = adapter.image_artifact_store(org_id) {
1023        atom = atom.with_image_store(image_store);
1024    }
1025    if let Some(provider_credential_store) = adapter.provider_credential_store(org_id) {
1026        atom = atom.with_provider_credential_store(provider_credential_store);
1027    }
1028    if let Some(utility_llm_service) = adapter.utility_llm_service() {
1029        atom = atom.with_utility_llm_service(utility_llm_service);
1030    }
1031    if let Some(egress_service) = adapter.egress_service() {
1032        atom = atom.with_egress_service(egress_service);
1033    }
1034    if let Some(memory_store) = adapter.memory_store(org_id) {
1035        atom = atom.with_memory_store(memory_store);
1036    }
1037    if let Some(connection_resolver) = adapter.connection_resolver() {
1038        atom = atom.with_connection_resolver(connection_resolver);
1039    }
1040    if let Some(sqldb_store) = adapter.sqldb_store() {
1041        atom = atom.with_sqldb_store(sqldb_store);
1042    }
1043    if let Some(leased_resource_store) = adapter.leased_resource_store() {
1044        atom = atom.with_leased_resource_store(leased_resource_store);
1045    }
1046    if let Some(registry) = adapter.session_resource_registry() {
1047        atom = atom.with_session_resource_registry(registry);
1048    }
1049    if let Some(schedule_store) = adapter.schedule_store(org_id) {
1050        atom = atom.with_schedule_store(schedule_store);
1051    }
1052    if let Some(platform_store) = adapter.platform_store(org_id, input.context.session_id) {
1053        atom = atom.with_platform_store(platform_store);
1054    }
1055    if let Some(budget_checker) = adapter.budget_checker(org_id, input.agent_id) {
1056        atom = atom.with_budget_checker(budget_checker);
1057    }
1058    if let Some(payment_authority) = adapter.payment_authority(org_id, input.agent_id) {
1059        atom = atom.with_payment_authority(payment_authority);
1060    }
1061    if let Some(limiter) = adapter.outbound_tool_rate_limiter(org_id) {
1062        atom = atom.with_outbound_tool_rate_limiter(limiter);
1063    }
1064
1065    atom.execute(input).await
1066}