Skip to main content

everruns_runtime/
host.rs

1// Shared host orchestration for embedded and durable execution hosts.
2// Decision: everruns-runtime owns worker-facing turn phase execution so
3// durable/server-backed hosts reuse the same input/reason/act wiring.
4
5use async_trait::async_trait;
6use everruns_core::atoms::{
7    ActAtom, ActInput, ActResult, Atom, InputAtom, InputAtomInput, InputAtomResult, ReasonAtom,
8    ReasonInput, ReasonResult,
9};
10use everruns_core::capabilities::{SystemPromptContext, collect_capabilities_with_configs};
11use everruns_core::events::{
12    EventContext, EventRequest, OutputMessageCompletedData, SessionActivatedData, SessionIdledData,
13    TurnCompletedData, TurnFailedData, TurnStartedData,
14};
15use everruns_core::message::{ContentPart, Message};
16use everruns_core::message_retriever::MessageRetriever;
17use everruns_core::platform_store::PlatformStore;
18use everruns_core::session::SessionStatus;
19use everruns_core::traits::{
20    AgentStore, BudgetChecker, EventEmitter, HarnessStore, ImageArtifactStore, ImageResolver,
21    LeasedResourceStore, LlmProviderStore, ModelWithProvider, PaymentAuthority,
22    ProviderCredentialStore, SessionFileSystem, SessionMutator, SessionResourceRegistry,
23    SessionScheduleStore, SessionSqlDbStoreRef, SessionStorageStore, SessionStore,
24    UserConnectionResolver,
25};
26use everruns_core::typed_id::{AgentId, HarnessId, MessageId, SessionId, TurnId};
27use everruns_core::{
28    Agent, CapabilityRegistry, CapabilityStatus, DependencyBlocker, DriverRegistry, EgressService,
29    Harness, Session, TokenUsage, ToolDefinition, ToolRegistry, UserFacingError, UtilityLlmService,
30    assemble_turn_context, org_public_id_from_internal, resolve_runtime_capabilities,
31};
32use std::sync::Arc;
33use tracing::warn;
34
35/// Turn context loaded in one batched call for runtime host execution.
36#[derive(Debug, Clone)]
37pub struct RuntimeHostTurnContext {
38    pub agent: Option<Agent>,
39    pub session: Session,
40    pub messages: Vec<Message>,
41    pub model: Option<ModelWithProvider>,
42    pub mcp_tool_definitions: Vec<ToolDefinition>,
43}
44
45/// Public adapter contract for server-backed or durable runtime hosts.
46///
47/// `everruns-runtime` owns shared host orchestration for both embedded and
48/// durable execution. That includes phase execution (`input -> reason -> act`),
49/// lifecycle emission, and the generic turn-strategy decisions used by durable
50/// or custom hosts.
51///
52/// Host crates implement this trait to provide persistence, session-lifecycle
53/// plumbing, event delivery, and their own orchestration backend. The durable
54/// engine itself remains outside this crate.
55#[async_trait]
56pub trait RuntimeHostAdapter: Send + Sync + Clone + 'static {
57    async fn get_agent(
58        &self,
59        org_id: i64,
60        agent_id: AgentId,
61    ) -> everruns_core::error::Result<Option<Agent>>;
62
63    async fn get_harness(
64        &self,
65        org_id: i64,
66        harness_id: HarnessId,
67    ) -> everruns_core::error::Result<Option<Harness>>;
68
69    async fn set_session_status(
70        &self,
71        org_id: i64,
72        session_id: SessionId,
73        status: SessionStatus,
74    ) -> everruns_core::error::Result<Session>;
75
76    async fn load_turn_context(
77        &self,
78        org_id: i64,
79        session_id: SessionId,
80    ) -> everruns_core::error::Result<RuntimeHostTurnContext>;
81
82    fn capability_registry(&self) -> CapabilityRegistry;
83
84    fn driver_registry(&self) -> DriverRegistry;
85
86    fn harness_store(&self, org_id: i64) -> Arc<dyn HarnessStore>;
87
88    fn agent_store(&self, org_id: i64) -> Arc<dyn AgentStore>;
89
90    fn session_store(&self, org_id: i64) -> Arc<dyn SessionStore>;
91
92    fn session_mutator(&self, org_id: i64) -> Arc<dyn SessionMutator>;
93
94    fn provider_store(&self, org_id: i64) -> Arc<dyn LlmProviderStore>;
95
96    fn message_store(&self) -> Arc<dyn MessageRetriever>;
97
98    fn event_emitter(&self) -> Arc<dyn EventEmitter>;
99
100    fn file_store(&self) -> Arc<dyn SessionFileSystem>;
101
102    fn image_resolver(&self, _org_id: i64) -> Option<Arc<dyn ImageResolver>> {
103        None
104    }
105
106    fn image_artifact_store(&self, _org_id: i64) -> Option<Arc<dyn ImageArtifactStore>> {
107        None
108    }
109
110    fn provider_credential_store(&self, _org_id: i64) -> Option<Arc<dyn ProviderCredentialStore>> {
111        None
112    }
113
114    fn utility_llm_service(&self) -> Option<Arc<dyn UtilityLlmService>> {
115        None
116    }
117
118    fn egress_service(&self) -> Option<Arc<dyn EgressService>> {
119        None
120    }
121
122    fn storage_store(&self) -> Option<Arc<dyn SessionStorageStore>> {
123        None
124    }
125
126    fn memory_store(&self, _org_id: i64) -> Option<Arc<dyn everruns_core::MemoryStoreBackend>> {
127        None
128    }
129
130    fn connection_resolver(&self) -> Option<Arc<dyn UserConnectionResolver>> {
131        None
132    }
133
134    fn sqldb_store(&self) -> Option<SessionSqlDbStoreRef> {
135        None
136    }
137
138    fn leased_resource_store(&self) -> Option<Arc<dyn LeasedResourceStore>> {
139        None
140    }
141
142    fn session_resource_registry(&self) -> Option<Arc<dyn SessionResourceRegistry>> {
143        None
144    }
145
146    fn schedule_store(&self, _org_id: i64) -> Option<Arc<dyn SessionScheduleStore>> {
147        None
148    }
149
150    fn platform_store(
151        &self,
152        _org_id: i64,
153        _session_id: SessionId,
154    ) -> Option<Arc<dyn PlatformStore>> {
155        None
156    }
157
158    fn budget_checker(
159        &self,
160        _org_id: i64,
161        _agent_id: Option<AgentId>,
162    ) -> Option<Arc<dyn BudgetChecker>> {
163        None
164    }
165
166    fn payment_authority(
167        &self,
168        _org_id: i64,
169        _agent_id: Option<AgentId>,
170    ) -> Option<Arc<dyn PaymentAuthority>> {
171        None
172    }
173
174    /// Per-org outbound tool-call rate limiter (TM-TOOL-009).
175    /// Default: `None` (no rate limiting — suitable for in-process / test environments).
176    fn outbound_tool_rate_limiter(
177        &self,
178        _org_id: i64,
179    ) -> Option<Arc<dyn everruns_core::OutboundToolRateLimiter>> {
180        None
181    }
182
183    /// Per-turn durable tool result store for act-activity idempotency (EVE-530).
184    /// Default: `None` (no durable claim/settle — every execution runs tools fresh).
185    fn durable_tool_result_store(&self) -> Option<Arc<dyn everruns_core::DurableToolResultStore>> {
186        None
187    }
188
189    /// Stream-liveness heartbeater for the Reason activity (EVE-531).
190    /// Default: `None` (no heartbeats sent — durable workers supply one).
191    fn stream_heartbeater(&self) -> Option<Arc<dyn everruns_core::StreamHeartbeater>> {
192        None
193    }
194
195    /// Provider stall timeout for the Reason activity (EVE-531).
196    /// Default: `None` (use built-in 120s default).
197    fn provider_stall_timeout(&self) -> Option<std::time::Duration> {
198        None
199    }
200
201    /// MCP executor routing `mcp_*` tool calls for this session, if the host
202    /// configures MCP (specs/runtime-mcp.md D4). Default: `None`, so hosts
203    /// without scoped MCP servers keep the plain tool registry unchanged.
204    async fn mcp_executor(
205        &self,
206        _org_id: i64,
207        _session_id: SessionId,
208    ) -> Option<Arc<everruns_mcp::McpExecutor>> {
209        None
210    }
211}
212
213struct RuntimeExecutionCapabilities {
214    tool_registry: ToolRegistry,
215    post_tool_hooks: Vec<Arc<dyn everruns_core::PostToolExecHook>>,
216    pre_tool_hooks: Vec<Arc<dyn everruns_core::atoms::PreToolUseHook>>,
217    tool_call_hooks: Vec<Arc<dyn everruns_core::ToolCallHook>>,
218}
219
220/// Collect and finalize user-hook specs for a session from its resolved
221/// capability configs, plus the shared bash dispatcher used to run them.
222///
223/// This is the single place hook specs are gathered so every firing point —
224/// the act path (`load_execution_capabilities`) and the lifecycle firing
225/// points (`execute_reason_activity` for `user_prompt_submit`, turn completion
226/// for `turn_end`, and the server session paths) — applies identical
227/// `finalize_hook_specs` semantics: `{capability_id}:` namespace stamping,
228/// stable default ids, and `disabled_contributions` muting (TM-HOOK-004).
229fn finalize_specs_from_configs(
230    resolved_capability_configs: &[everruns_core::capability_types::AgentCapabilityConfig],
231    capability_registry: &CapabilityRegistry,
232) -> Vec<everruns_core::user_hook_types::UserHookSpec> {
233    let mut hook_contributions: Vec<(String, Vec<everruns_core::user_hook_types::UserHookSpec>)> =
234        Vec::new();
235    let mut disabled_contributions: Vec<String> = Vec::new();
236    for config in resolved_capability_configs {
237        let Some(capability) = capability_registry.get(config.capability_id()) else {
238            continue;
239        };
240        let specs = capability.user_hooks_with_config(&config.config);
241        if !specs.is_empty() {
242            hook_contributions.push((config.capability_id().to_string(), specs));
243        }
244        if config.capability_id() == "user_hooks" {
245            disabled_contributions.extend(
246                everruns_core::capabilities::user_hooks::disabled_contributions(&config.config),
247            );
248        }
249    }
250    everruns_core::hook_adapter::finalize_hook_specs(hook_contributions, &disabled_contributions)
251}
252
253/// Resolve a session's capability configs and collect finalized hook specs.
254/// Used by the lifecycle firing points, which need specs outside the act path.
255/// Returns `(specs, dispatcher)`; `specs` is empty when the session has no
256/// hook-contributing capabilities.
257async fn collect_lifecycle_hook_specs<A: RuntimeHostAdapter>(
258    adapter: &A,
259    org_id: i64,
260    session_id: SessionId,
261    harness_id: HarnessId,
262    agent_id: Option<AgentId>,
263) -> everruns_core::error::Result<(
264    Vec<everruns_core::user_hook_types::UserHookSpec>,
265    Arc<dyn everruns_core::hook_executor::BashHookDispatcher>,
266)> {
267    let capability_registry = adapter.capability_registry();
268    let harness_chain = adapter
269        .harness_store(org_id)
270        .get_harness_chain(harness_id)
271        .await?;
272    if harness_chain.is_empty() {
273        return Err(everruns_core::error::AgentLoopError::harness_not_found(
274            harness_id,
275        ));
276    }
277    let session = adapter
278        .session_store(org_id)
279        .get_session(session_id)
280        .await?
281        .ok_or_else(|| everruns_core::error::AgentLoopError::session_not_found(session_id))?;
282    let agent = match agent_id {
283        Some(agent_id) => adapter.agent_store(org_id).get_agent(agent_id).await?,
284        None => None,
285    };
286    let resolved = resolve_runtime_capabilities(
287        &harness_chain,
288        agent.as_ref(),
289        &session,
290        &capability_registry,
291    );
292    let specs =
293        finalize_specs_from_configs(&resolved.resolved_capability_configs, &capability_registry);
294    let dispatcher: Arc<dyn everruns_core::hook_executor::BashHookDispatcher> = Arc::new(
295        everruns_core::hook_dispatch::VirtualBashHookDispatcher::new(adapter.file_store()),
296    );
297    Ok((specs, dispatcher))
298}
299
300async fn load_execution_capabilities<A: RuntimeHostAdapter>(
301    adapter: &A,
302    org_id: i64,
303    session_id: SessionId,
304    harness_id: HarnessId,
305    agent_id: Option<AgentId>,
306    locale: Option<String>,
307    blueprint_id: Option<&str>,
308) -> everruns_core::error::Result<RuntimeExecutionCapabilities> {
309    let capability_registry = adapter.capability_registry();
310    if let Some(blueprint_id) = blueprint_id {
311        let mut registry = ToolRegistry::with_defaults();
312        let blueprint = capability_registry.blueprint(blueprint_id).ok_or_else(|| {
313            everruns_core::error::AgentLoopError::config(format!(
314                "Blueprint \"{blueprint_id}\" not found in registry"
315            ))
316        })?;
317        for tool in blueprint.tools {
318            registry.register_boxed(tool);
319        }
320        return Ok(RuntimeExecutionCapabilities {
321            tool_registry: registry,
322            post_tool_hooks: Vec::new(),
323            pre_tool_hooks: Vec::new(),
324            tool_call_hooks: Vec::new(),
325        });
326    }
327
328    let harness_chain = adapter
329        .harness_store(org_id)
330        .get_harness_chain(harness_id)
331        .await?;
332    if harness_chain.is_empty() {
333        return Err(everruns_core::error::AgentLoopError::harness_not_found(
334            harness_id,
335        ));
336    }
337
338    let session = adapter
339        .session_store(org_id)
340        .get_session(session_id)
341        .await?
342        .ok_or_else(|| everruns_core::error::AgentLoopError::session_not_found(session_id))?;
343
344    let agent_store = adapter.agent_store(org_id);
345    let agent = match agent_id {
346        Some(agent_id) => Some(
347            agent_store
348                .get_agent(agent_id)
349                .await?
350                .ok_or_else(|| everruns_core::error::AgentLoopError::agent_not_found(agent_id))?,
351        ),
352        None => None,
353    };
354
355    let resolved = resolve_runtime_capabilities(
356        &harness_chain,
357        agent.as_ref(),
358        &session,
359        &capability_registry,
360    );
361    // Executor (act) path: this builds the worker-side tool registry, not the
362    // model-visible tool list. The model is left unset, so a model-adaptive
363    // capability like `auto_tool_search` resolves to its provider-agnostic
364    // client-side mechanism here. That registers the `tool_search` tool in the
365    // executor, which is a harmless superset: on native models the reason path
366    // never shows that tool to the model, so it is simply never called.
367    let prompt_ctx = SystemPromptContext {
368        session_id,
369        locale: locale.or(session.locale.clone()),
370        file_store: Some(adapter.file_store()),
371        model: None,
372    };
373    let collected = collect_capabilities_with_configs(
374        &resolved.resolved_capability_configs,
375        &capability_registry,
376        &prompt_ctx,
377    )
378    .await;
379
380    let mut registry = ToolRegistry::with_defaults();
381    for tool in collected.tools {
382        registry.register_boxed(tool);
383    }
384
385    // Only `Available` capabilities contribute hooks, matching
386    // `collect_capabilities_with_configs` (which skips non-available
387    // capabilities). This keeps a `ComingSoon`/unavailable capability from
388    // affecting execution via any of its hook seams.
389    let mut post_tool_hooks: Vec<Arc<dyn everruns_core::PostToolExecHook>> = resolved
390        .resolved_capability_configs
391        .iter()
392        .flat_map(|config| {
393            capability_registry
394                .get(config.capability_id())
395                .filter(|capability| capability.status() == CapabilityStatus::Available)
396                .map(|capability| capability.post_tool_exec_hooks())
397                .unwrap_or_default()
398        })
399        .collect();
400
401    // User-hook contributions (see `specs/user-hooks.md`). `finalize_specs_from_configs`
402    // gathers specs across every resolved capability — both the user-facing
403    // `user_hooks` capability and any capability that bundles hooks — and applies
404    // `finalize_hook_specs` (namespace stamping, stable ids, `disabled_contributions`
405    // muting; TM-HOOK-004). The same helper backs the lifecycle firing points so
406    // every event finalizes specs identically.
407    let user_hook_specs =
408        finalize_specs_from_configs(&resolved.resolved_capability_configs, &capability_registry);
409    // Capability-contributed pre-tool hooks run first (e.g. approval gating),
410    // then user-hook (`PreToolUse`) specs. The first hook to block wins.
411    let mut pre_tool_hooks: Vec<Arc<dyn everruns_core::atoms::PreToolUseHook>> = resolved
412        .resolved_capability_configs
413        .iter()
414        .flat_map(|config| {
415            capability_registry
416                .get(config.capability_id())
417                .filter(|capability| capability.status() == CapabilityStatus::Available)
418                .map(|capability| capability.pre_tool_use_hooks())
419                .unwrap_or_default()
420        })
421        .collect();
422    if !user_hook_specs.is_empty() {
423        let dispatcher: Arc<dyn everruns_core::hook_executor::BashHookDispatcher> = Arc::new(
424            everruns_core::hook_dispatch::VirtualBashHookDispatcher::new(adapter.file_store()),
425        );
426        post_tool_hooks.extend(everruns_core::hook_adapter::build_post_tool_use_hooks(
427            &user_hook_specs,
428            dispatcher.clone(),
429        ));
430        pre_tool_hooks.extend(everruns_core::hook_adapter::build_pre_tool_use_hooks(
431            &user_hook_specs,
432            dispatcher,
433        ));
434    }
435
436    let tool_call_hooks = resolved
437        .resolved_capability_configs
438        .iter()
439        .flat_map(|config| {
440            capability_registry
441                .get(config.capability_id())
442                .filter(|capability| capability.status() == CapabilityStatus::Available)
443                .map(|capability| capability.tool_call_hooks())
444                .unwrap_or_default()
445        })
446        .collect();
447
448    Ok(RuntimeExecutionCapabilities {
449        tool_registry: registry,
450        post_tool_hooks,
451        pre_tool_hooks,
452        tool_call_hooks,
453    })
454}
455
456/// Shared lifecycle helper for runtime-backed hosts.
457pub struct RuntimeSessionLifecycle<A: RuntimeHostAdapter> {
458    adapter: A,
459    org_id: i64,
460    session_id: SessionId,
461}
462
463impl<A: RuntimeHostAdapter> RuntimeSessionLifecycle<A> {
464    pub fn new(adapter: A, org_id: i64, session_id: SessionId) -> Self {
465        Self {
466            adapter,
467            org_id,
468            session_id,
469        }
470    }
471
472    async fn set_session_status(&self, status: SessionStatus, action: &'static str) {
473        if let Err(error) = self
474            .adapter
475            .set_session_status(self.org_id, self.session_id, status)
476            .await
477        {
478            warn!(
479                session_id = %self.session_id,
480                org_id = self.org_id,
481                action,
482                %error,
483                "runtime host lifecycle status update failed"
484            );
485        }
486    }
487
488    async fn emit_event(&self, request: EventRequest) {
489        let event_type = request.event_type.clone();
490        if let Err(error) = self.adapter.event_emitter().emit(request).await {
491            warn!(
492                session_id = %self.session_id,
493                org_id = self.org_id,
494                event_type,
495                %error,
496                "runtime host lifecycle event emission failed"
497            );
498        }
499    }
500
501    pub async fn turn_started(&self, turn_id: TurnId, input_message_id: MessageId) {
502        let input_content = self
503            .adapter
504            .message_store()
505            .get(self.session_id, input_message_id)
506            .await
507            .ok()
508            .flatten()
509            .map(|message| message.content_to_llm_string());
510
511        self.set_session_status(SessionStatus::Active, "turn_started")
512            .await;
513
514        self.emit_event(EventRequest::new(
515            self.session_id,
516            EventContext::turn(turn_id, input_message_id),
517            SessionActivatedData {
518                turn_id,
519                input_message_id,
520            },
521        ))
522        .await;
523
524        self.emit_event(EventRequest::new(
525            self.session_id,
526            EventContext::turn(turn_id, input_message_id),
527            TurnStartedData {
528                turn_id,
529                input_message_id,
530                input_content,
531            },
532        ))
533        .await;
534    }
535
536    pub async fn emit_turn_completed(&self, input_message_id: MessageId, data: TurnCompletedData) {
537        let turn_id = data.turn_id;
538        self.emit_event(EventRequest::new(
539            self.session_id,
540            EventContext::turn(turn_id, input_message_id),
541            data,
542        ))
543        .await;
544    }
545
546    pub async fn emit_session_idled(
547        &self,
548        turn_id: TurnId,
549        input_message_id: MessageId,
550        iterations: Option<u32>,
551        usage: Option<TokenUsage>,
552    ) {
553        self.set_session_status(SessionStatus::Idle, "emit_session_idled")
554            .await;
555
556        self.emit_event(EventRequest::new(
557            self.session_id,
558            EventContext::turn(turn_id, input_message_id),
559            SessionIdledData {
560                turn_id,
561                iterations,
562                usage,
563            },
564        ))
565        .await;
566    }
567
568    pub async fn turn_completed(
569        &self,
570        turn_id: TurnId,
571        input_message_id: MessageId,
572        iterations: u32,
573        usage: Option<TokenUsage>,
574        input_content: Option<String>,
575    ) {
576        self.emit_turn_completed(
577            input_message_id,
578            TurnCompletedData {
579                turn_id,
580                iterations,
581                duration_ms: None,
582                usage: usage.clone(),
583                input_content,
584                final_message_id: None,
585                final_answer_preview: None,
586                time_to_first_token_ms: None,
587                tool_call_count: None,
588                llm_call_count: None,
589                status: Some("completed".to_string()),
590            },
591        )
592        .await;
593        self.emit_session_idled(turn_id, input_message_id, Some(iterations), usage)
594            .await;
595    }
596
597    /// Fire `turn_end` lifecycle hooks (advisory). Collects the session's hook
598    /// specs and runs every `turn_end` hook; failures are logged, never fatal.
599    /// `harness_id`/`agent_id` are required to resolve the capability chain.
600    pub async fn fire_turn_end_hooks(
601        &self,
602        harness_id: HarnessId,
603        agent_id: Option<AgentId>,
604        turn_id: TurnId,
605        success: bool,
606    ) {
607        let (specs, dispatcher) = match collect_lifecycle_hook_specs(
608            &self.adapter,
609            self.org_id,
610            self.session_id,
611            harness_id,
612            agent_id,
613        )
614        .await
615        {
616            Ok(pair) => pair,
617            Err(error) => {
618                warn!(
619                    session_id = %self.session_id,
620                    %error,
621                    "failed to collect turn_end hook specs; skipping"
622                );
623                return;
624            }
625        };
626        let hooks = everruns_core::lifecycle_hooks::build_turn_lifecycle_hooks(
627            &specs,
628            everruns_core::user_hook_types::HookEvent::TurnEnd,
629            dispatcher,
630        );
631        if hooks.is_empty() {
632            return;
633        }
634        let ctx = everruns_core::lifecycle_hooks::TurnHookContext {
635            session_id: self.session_id,
636            turn_id: Some(turn_id),
637            org_id: org_public_id_from_internal(self.org_id).parse().ok(),
638            agent_id: agent_id.map(|a| a.to_string()),
639        };
640        everruns_core::lifecycle_hooks::run_turn_end_hooks(
641            &hooks,
642            &ctx,
643            serde_json::json!({ "success": success }),
644        )
645        .await;
646    }
647
648    /// Abort a turn because a `user_prompt_submit` hook returned `Block`.
649    /// Reuses the dependency-blocked failure shape: emit a user-facing message
650    /// carrying the hook's `user_message` (or `reason`), then mark the turn
651    /// failed and idle the session.
652    pub async fn user_prompt_blocked(
653        &self,
654        turn_id: TurnId,
655        input_message_id: MessageId,
656        reason: &str,
657        user_message: Option<&str>,
658    ) {
659        let user_error =
660            UserFacingError::new(everruns_core::user_facing_error_codes::BLOCKED_BY_HOOK);
661        let shown = user_message.unwrap_or(reason);
662        let mut error_message = Message::assistant(shown);
663        let mut metadata = std::collections::HashMap::new();
664        user_error.apply_to_message_metadata(&mut metadata);
665        error_message.metadata = Some(metadata);
666
667        self.emit_event(EventRequest::new(
668            self.session_id,
669            EventContext::turn(turn_id, input_message_id),
670            OutputMessageCompletedData::new(error_message).with_user_facing_error(&user_error),
671        ))
672        .await;
673
674        self.turn_failed(turn_id, input_message_id, reason, Some(&user_error))
675            .await;
676    }
677
678    pub async fn turn_failed(
679        &self,
680        turn_id: TurnId,
681        input_message_id: MessageId,
682        error: &str,
683        user_error: Option<&UserFacingError>,
684    ) {
685        self.set_session_status(SessionStatus::Idle, "turn_failed")
686            .await;
687
688        self.emit_event(EventRequest::new(
689            self.session_id,
690            EventContext::turn(turn_id, input_message_id),
691            {
692                let mut data = TurnFailedData {
693                    turn_id,
694                    error: error.to_string(),
695                    error_code: None,
696                    error_fields: None,
697                };
698                if let Some(user_error) = user_error {
699                    user_error.apply_to_event_fields(&mut data.error_code, &mut data.error_fields);
700                }
701                data
702            },
703        ))
704        .await;
705
706        self.emit_event(EventRequest::new(
707            self.session_id,
708            EventContext::turn(turn_id, input_message_id),
709            SessionIdledData {
710                turn_id,
711                iterations: None,
712                usage: None,
713            },
714        ))
715        .await;
716    }
717
718    pub async fn waiting_for_tool_results(&self) {
719        self.set_session_status(
720            SessionStatus::WaitingForToolResults,
721            "waiting_for_tool_results",
722        )
723        .await;
724    }
725
726    pub async fn dependency_blocked(
727        &self,
728        turn_id: TurnId,
729        input_message_id: MessageId,
730        blocker: DependencyBlocker,
731    ) {
732        let user_error = UserFacingError::new(blocker.error_code())
733            .with_field(
734                "dependency",
735                match blocker {
736                    DependencyBlocker::HarnessArchived | DependencyBlocker::HarnessDeleted => {
737                        "harness"
738                    }
739                    DependencyBlocker::AgentArchived | DependencyBlocker::AgentDeleted => "agent",
740                },
741            )
742            .with_field(
743                "state",
744                match blocker {
745                    DependencyBlocker::HarnessArchived | DependencyBlocker::AgentArchived => {
746                        "archived"
747                    }
748                    DependencyBlocker::HarnessDeleted | DependencyBlocker::AgentDeleted => {
749                        "deleted"
750                    }
751                },
752            );
753        let mut error_message = Message::assistant(blocker.message());
754        let mut metadata = std::collections::HashMap::new();
755        user_error.apply_to_message_metadata(&mut metadata);
756        error_message.metadata = Some(metadata);
757
758        self.emit_event(EventRequest::new(
759            self.session_id,
760            EventContext::turn(turn_id, input_message_id),
761            OutputMessageCompletedData::new(error_message).with_user_facing_error(&user_error),
762        ))
763        .await;
764
765        self.turn_failed(
766            turn_id,
767            input_message_id,
768            blocker.message(),
769            Some(&user_error),
770        )
771        .await;
772    }
773}
774
775pub async fn detect_dependency_blocker<A: RuntimeHostAdapter>(
776    adapter: &A,
777    org_id: i64,
778    harness_id: HarnessId,
779    agent_id: Option<AgentId>,
780) -> everruns_core::error::Result<Option<DependencyBlocker>> {
781    let harness_store = adapter.harness_store(org_id);
782    let agent_store = adapter.agent_store(org_id);
783    everruns_core::detect_dependency_blocker(
784        harness_store.as_ref(),
785        agent_store.as_ref(),
786        harness_id,
787        agent_id,
788    )
789    .await
790}
791
792pub async fn execute_input_activity<A: RuntimeHostAdapter>(
793    adapter: &A,
794    org_id: i64,
795    input: InputAtomInput,
796) -> everruns_core::error::Result<InputAtomResult> {
797    RuntimeSessionLifecycle::new(adapter.clone(), org_id, input.context.session_id)
798        .turn_started(input.context.turn_id, input.context.input_message_id)
799        .await;
800
801    let atom = InputAtom::new(adapter.message_store());
802    atom.execute(input).await
803}
804
805/// Collect `user_prompt_submit` hooks for this turn and run them against the
806/// inbound user message text. Returns `None` when the session has no such
807/// hooks (the common case — no overhead beyond the spec collection, which is
808/// skipped early). Errors loading specs are logged and treated as "no hooks"
809/// so a hook-collection failure never blocks a turn that wasn't asking to be
810/// hooked.
811struct UserPromptHookResult {
812    decision: everruns_core::lifecycle_hooks::UserPromptDecision,
813    original_message: String,
814}
815
816async fn run_user_prompt_submit_for_turn<A: RuntimeHostAdapter>(
817    adapter: &A,
818    org_id: i64,
819    input: &ReasonInput,
820) -> everruns_core::error::Result<Option<UserPromptHookResult>> {
821    let (specs, dispatcher) = match collect_lifecycle_hook_specs(
822        adapter,
823        org_id,
824        input.context.session_id,
825        input.harness_id,
826        input.agent_id,
827    )
828    .await
829    {
830        Ok(pair) => pair,
831        Err(error) => {
832            warn!(
833                session_id = %input.context.session_id,
834                %error,
835                "failed to collect user_prompt_submit hook specs; continuing without them"
836            );
837            return Ok(None);
838        }
839    };
840    let hooks = everruns_core::lifecycle_hooks::build_turn_lifecycle_hooks(
841        &specs,
842        everruns_core::user_hook_types::HookEvent::UserPromptSubmit,
843        dispatcher,
844    );
845    if hooks.is_empty() {
846        return Ok(None);
847    }
848
849    let message_text = adapter
850        .message_store()
851        .get(input.context.session_id, input.context.input_message_id)
852        .await
853        .ok()
854        .flatten()
855        .map(|m| m.content_to_llm_string())
856        .unwrap_or_default();
857
858    let ctx = everruns_core::lifecycle_hooks::TurnHookContext {
859        session_id: input.context.session_id,
860        turn_id: Some(input.context.turn_id),
861        org_id: org_public_id_from_internal(org_id).parse().ok(),
862        agent_id: input.agent_id.map(|a| a.to_string()),
863    };
864    let original_message = message_text.clone();
865    let decision =
866        everruns_core::lifecycle_hooks::run_user_prompt_submit_hooks(&hooks, &ctx, message_text)
867            .await;
868    Ok(Some(UserPromptHookResult {
869        decision,
870        original_message,
871    }))
872}
873
874pub async fn execute_reason_activity<A: RuntimeHostAdapter>(
875    adapter: &A,
876    org_id: i64,
877    input: ReasonInput,
878) -> everruns_core::error::Result<ReasonResult> {
879    if let Some(blocker) =
880        detect_dependency_blocker(adapter, org_id, input.harness_id, input.agent_id).await?
881    {
882        RuntimeSessionLifecycle::new(adapter.clone(), org_id, input.context.session_id)
883            .dependency_blocked(
884                input.context.turn_id,
885                input.context.input_message_id,
886                blocker,
887            )
888            .await;
889        return Ok(ReasonResult {
890            success: false,
891            text: blocker.message().to_string(),
892            tool_calls: vec![],
893            has_tool_calls: false,
894            tool_definitions: vec![],
895            max_iterations: everruns_core::runtime_agent::default_max_iterations(),
896            error: Some("dependency_unavailable".to_string()),
897            usage: None,
898            output_message_id: None,
899            time_to_first_token_ms: None,
900            response_id: None,
901            locale: None,
902            network_access: None,
903        });
904    }
905
906    // user_prompt_submit hook (see `specs/user-hooks.md`). Fires once per turn,
907    // on the first reason iteration, before the LLM is consulted — the closest
908    // choke point to "inbound user message accepted, before reason" that both
909    // the in-process loop and the durable worker share. A `Block` aborts the
910    // turn by reusing the same failure path as `dependency_blocked`: emit a
911    // user-facing message + turn.failed, idle the session, and return a
912    // non-success `ReasonResult` so no LLM/act work runs.
913    let mut user_prompt_message_override = None;
914    if input.iteration <= 1
915        && let Some(hook_result) = run_user_prompt_submit_for_turn(adapter, org_id, &input).await?
916    {
917        match hook_result.decision {
918            everruns_core::lifecycle_hooks::UserPromptDecision::Block {
919                reason,
920                user_message,
921            } => {
922                RuntimeSessionLifecycle::new(adapter.clone(), org_id, input.context.session_id)
923                    .user_prompt_blocked(
924                        input.context.turn_id,
925                        input.context.input_message_id,
926                        &reason,
927                        user_message.as_deref(),
928                    )
929                    .await;
930                return Ok(ReasonResult {
931                    success: false,
932                    text: user_message.unwrap_or_else(|| reason.clone()),
933                    tool_calls: vec![],
934                    has_tool_calls: false,
935                    tool_definitions: vec![],
936                    max_iterations: everruns_core::runtime_agent::default_max_iterations(),
937                    error: Some("blocked_by_user_prompt_hook".to_string()),
938                    usage: None,
939                    output_message_id: None,
940                    time_to_first_token_ms: None,
941                    response_id: None,
942                    locale: None,
943                    network_access: None,
944                });
945            }
946            everruns_core::lifecycle_hooks::UserPromptDecision::Continue { message } => {
947                if message != hook_result.original_message {
948                    user_prompt_message_override = Some(message);
949                }
950            }
951        }
952    }
953
954    let turn_context = adapter
955        .load_turn_context(org_id, input.context.session_id)
956        .await?;
957
958    let mut atom = ReasonAtom::new(
959        adapter.harness_store(org_id),
960        adapter.agent_store(org_id),
961        adapter.session_store(org_id),
962        adapter.message_store(),
963        adapter.provider_store(org_id),
964        adapter.capability_registry(),
965        adapter.driver_registry(),
966        adapter.event_emitter(),
967    )
968    .with_file_store(adapter.file_store());
969    if let Some(image_resolver) = adapter.image_resolver(org_id) {
970        atom = atom.with_image_resolver(image_resolver);
971    }
972    if let Some(hb) = adapter.stream_heartbeater() {
973        atom = atom.with_stream_heartbeater(hb);
974    }
975    if let Some(timeout) = adapter.provider_stall_timeout() {
976        atom = atom.with_provider_stall_timeout(timeout);
977    }
978
979    let input = ReasonInput {
980        mcp_tool_definitions: turn_context.mcp_tool_definitions,
981        ..input
982    };
983
984    if let Some(message_override) = user_prompt_message_override {
985        let mut assembled = assemble_turn_context(
986            adapter.harness_store(org_id).as_ref(),
987            adapter.agent_store(org_id).as_ref(),
988            adapter.session_store(org_id).as_ref(),
989            adapter.message_store().as_ref(),
990            adapter.provider_store(org_id).as_ref(),
991            &adapter.capability_registry(),
992            input.context.session_id,
993            input.harness_id,
994            input.agent_id,
995            &input.mcp_tool_definitions,
996            Some(adapter.file_store()),
997        )
998        .await?;
999
1000        let message = assembled
1001            .messages
1002            .iter_mut()
1003            .find(|message| message.id == input.context.input_message_id)
1004            .ok_or_else(|| {
1005                everruns_core::error::AgentLoopError::config(
1006                    "user_prompt_submit mutation: input message not found in assembled context",
1007                )
1008            })?;
1009
1010        // user_prompt_submit mutations are enforcement controls for the
1011        // provider-bound prompt. Apply them to the assembled context only
1012        // so persisted user history remains an audit record of the input.
1013        // Preserve non-text parts (images, files); replace only text parts.
1014        message
1015            .content
1016            .retain(|part| !matches!(part, ContentPart::Text(_)));
1017        message
1018            .content
1019            .insert(0, ContentPart::text(message_override));
1020
1021        return atom.execute_with_assembled_context(input, assembled).await;
1022    }
1023
1024    atom.execute(input).await
1025}
1026
1027pub async fn execute_act_activity<A: RuntimeHostAdapter>(
1028    adapter: &A,
1029    input: ActInput,
1030) -> everruns_core::error::Result<ActResult> {
1031    let org_id = input.org_id.ok_or_else(|| {
1032        everruns_core::error::AgentLoopError::config(
1033            "ActInput.org_id must be set for runtime host execution",
1034        )
1035    })?;
1036
1037    if let Some(blocker) =
1038        detect_dependency_blocker(adapter, org_id, input.harness_id, input.agent_id).await?
1039    {
1040        RuntimeSessionLifecycle::new(adapter.clone(), org_id, input.context.session_id)
1041            .dependency_blocked(
1042                input.context.turn_id,
1043                input.context.input_message_id,
1044                blocker,
1045            )
1046            .await;
1047        return Ok(ActResult {
1048            results: vec![],
1049            completed: true,
1050            success_count: 0,
1051            error_count: 1,
1052            waiting_for_tool_results: false,
1053            blocked: true,
1054            client_tool_calls: vec![],
1055            client_tool_definitions: vec![],
1056        });
1057    }
1058
1059    let execution_capabilities = load_execution_capabilities(
1060        adapter,
1061        org_id,
1062        input.context.session_id,
1063        input.harness_id,
1064        input.agent_id,
1065        input.locale.clone(),
1066        input.blueprint_id.as_deref(),
1067    )
1068    .await?;
1069    let mut tool_registry = execution_capabilities.tool_registry;
1070
1071    // Register the session's MCP tools as first-class registry tools, so they
1072    // execute through the regular `ToolExecutor` path and are visible to
1073    // everything that introspects the registry (spawn_background, tool_search,
1074    // openai_tool_search namespaces, ...). The turn's tool definitions already
1075    // include the discovered MCP tools, so no re-discovery is needed; the host's
1076    // MCP executor supplies execution (specs/runtime-mcp.md D5).
1077    if let Some(mcp) = adapter.mcp_executor(org_id, input.context.session_id).await {
1078        let invoker: Arc<dyn everruns_core::McpToolInvoker> = mcp;
1079        for tool in everruns_core::build_mcp_proxy_tools(&input.tool_definitions, invoker) {
1080            tool_registry.register_boxed(tool);
1081        }
1082    }
1083
1084    let builtin_tool_registry = Arc::new(tool_registry.clone());
1085    let executor: Arc<dyn everruns_core::traits::ToolExecutor> = Arc::new(tool_registry);
1086
1087    let mut atom =
1088        ActAtom::with_file_store(executor, adapter.event_emitter(), adapter.file_store())
1089            .with_session_store(adapter.session_store(org_id))
1090            .with_session_mutator(adapter.session_mutator(org_id))
1091            .with_agent_store(adapter.agent_store(org_id))
1092            .with_tool_registry(builtin_tool_registry)
1093            .with_org_id(
1094                org_public_id_from_internal(org_id)
1095                    .parse()
1096                    .expect("internal org id converts to valid public org id"),
1097            )
1098            .with_capability_registry(adapter.capability_registry())
1099            .with_post_tool_hooks(execution_capabilities.post_tool_hooks)
1100            .with_pre_tool_hooks(execution_capabilities.pre_tool_hooks)
1101            .with_tool_call_hooks(execution_capabilities.tool_call_hooks);
1102
1103    if let Some(storage_store) = adapter.storage_store() {
1104        atom = atom.with_storage_store(storage_store);
1105    }
1106    if let Some(image_store) = adapter.image_artifact_store(org_id) {
1107        atom = atom.with_image_store(image_store);
1108    }
1109    if let Some(provider_credential_store) = adapter.provider_credential_store(org_id) {
1110        atom = atom.with_provider_credential_store(provider_credential_store);
1111    }
1112    if let Some(utility_llm_service) = adapter.utility_llm_service() {
1113        atom = atom.with_utility_llm_service(utility_llm_service);
1114    }
1115    if let Some(egress_service) = adapter.egress_service() {
1116        atom = atom.with_egress_service(egress_service);
1117    }
1118    if let Some(memory_store) = adapter.memory_store(org_id) {
1119        atom = atom.with_memory_store(memory_store);
1120    }
1121    if let Some(connection_resolver) = adapter.connection_resolver() {
1122        atom = atom.with_connection_resolver(connection_resolver);
1123    }
1124    if let Some(sqldb_store) = adapter.sqldb_store() {
1125        atom = atom.with_sqldb_store(sqldb_store);
1126    }
1127    if let Some(leased_resource_store) = adapter.leased_resource_store() {
1128        atom = atom.with_leased_resource_store(leased_resource_store);
1129    }
1130    if let Some(registry) = adapter.session_resource_registry() {
1131        atom = atom.with_session_resource_registry(registry);
1132    }
1133    if let Some(schedule_store) = adapter.schedule_store(org_id) {
1134        atom = atom.with_schedule_store(schedule_store);
1135    }
1136    if let Some(platform_store) = adapter.platform_store(org_id, input.context.session_id) {
1137        atom = atom.with_platform_store(platform_store);
1138    }
1139    if let Some(budget_checker) = adapter.budget_checker(org_id, input.agent_id) {
1140        atom = atom.with_budget_checker(budget_checker);
1141    }
1142    if let Some(payment_authority) = adapter.payment_authority(org_id, input.agent_id) {
1143        atom = atom.with_payment_authority(payment_authority);
1144    }
1145    if let Some(limiter) = adapter.outbound_tool_rate_limiter(org_id) {
1146        atom = atom.with_outbound_tool_rate_limiter(limiter);
1147    }
1148    if let Some(store) = adapter.durable_tool_result_store() {
1149        atom = atom.with_durable_tool_result_store(store);
1150    }
1151
1152    atom.execute(input).await
1153}