1use async_trait::async_trait;
6use everruns_core::atoms::{
7 ActAtom, ActInput, ActResult, Atom, InputAtom, InputAtomInput, InputAtomResult, ReasonAtom,
8 ReasonInput, ReasonResult,
9};
10use everruns_core::capabilities::{SystemPromptContext, collect_capabilities_with_configs};
11use everruns_core::events::{
12 EventContext, EventRequest, OutputMessageCompletedData, SessionActivatedData, SessionIdledData,
13 TurnCompletedData, TurnFailedData, TurnStartedData,
14};
15use everruns_core::message::{ContentPart, Message};
16use everruns_core::message_retriever::MessageRetriever;
17use everruns_core::platform_store::PlatformStore;
18use everruns_core::session::SessionStatus;
19use everruns_core::traits::{
20 AgentStore, BudgetChecker, EventEmitter, HarnessStore, ImageArtifactStore, ImageResolver,
21 LeasedResourceStore, PaymentAuthority, ProviderCredentialStore, ProviderStore, ResolvedModel,
22 SessionFileSystem, SessionMutator, SessionResourceRegistry, SessionScheduleStore,
23 SessionSqlDbStoreRef, SessionStorageStore, SessionStore, UserConnectionResolver,
24};
25use everruns_core::typed_id::{AgentId, HarnessId, MessageId, SessionId, TurnId};
26use everruns_core::vector_store::KnowledgeIndexSearch;
27use everruns_core::{
28 Agent, CapabilityRegistry, CapabilityStatus, DependencyBlocker, DriverRegistry, EgressService,
29 ErrorDisclosure, Harness, Session, TokenUsage, ToolDefinition, ToolRegistry, UserFacingError,
30 UtilityLlmService, assemble_turn_context, org_public_id_from_internal,
31 resolve_runtime_capabilities,
32};
33use std::sync::Arc;
34use tracing::warn;
35
36#[derive(Debug, Clone)]
38pub struct RuntimeHostTurnContext {
39 pub agent: Option<Agent>,
40 pub session: Session,
41 pub messages: Vec<Message>,
42 pub model: Option<ResolvedModel>,
43 pub mcp_tool_definitions: Vec<ToolDefinition>,
44}
45
46#[async_trait]
57pub trait RuntimeHostAdapter: Send + Sync + Clone + 'static {
58 async fn get_agent(
59 &self,
60 org_id: i64,
61 agent_id: AgentId,
62 ) -> everruns_core::error::Result<Option<Agent>>;
63
64 async fn get_harness(
65 &self,
66 org_id: i64,
67 harness_id: HarnessId,
68 ) -> everruns_core::error::Result<Option<Harness>>;
69
70 async fn set_session_status(
71 &self,
72 org_id: i64,
73 session_id: SessionId,
74 status: SessionStatus,
75 ) -> everruns_core::error::Result<Session>;
76
77 async fn load_turn_context(
78 &self,
79 org_id: i64,
80 session_id: SessionId,
81 ) -> everruns_core::error::Result<RuntimeHostTurnContext>;
82
83 fn capability_registry(&self) -> CapabilityRegistry;
84
85 fn driver_registry(&self) -> DriverRegistry;
86
87 fn harness_store(&self, org_id: i64) -> Arc<dyn HarnessStore>;
88
89 fn agent_store(&self, org_id: i64) -> Arc<dyn AgentStore>;
90
91 fn session_store(&self, org_id: i64) -> Arc<dyn SessionStore>;
92
93 fn session_mutator(&self, org_id: i64) -> Arc<dyn SessionMutator>;
94
95 fn provider_store(&self, org_id: i64) -> Arc<dyn ProviderStore>;
96
97 fn message_store(&self) -> Arc<dyn MessageRetriever>;
98
99 fn event_emitter(&self) -> Arc<dyn EventEmitter>;
100
101 fn file_store(&self) -> Arc<dyn SessionFileSystem>;
102
103 fn image_resolver(&self, _org_id: i64) -> Option<Arc<dyn ImageResolver>> {
104 None
105 }
106
107 fn image_artifact_store(&self, _org_id: i64) -> Option<Arc<dyn ImageArtifactStore>> {
108 None
109 }
110
111 fn provider_credential_store(&self, _org_id: i64) -> Option<Arc<dyn ProviderCredentialStore>> {
112 None
113 }
114
115 fn utility_llm_service(&self) -> Option<Arc<dyn UtilityLlmService>> {
116 None
117 }
118
119 fn egress_service(&self) -> Option<Arc<dyn EgressService>> {
120 None
121 }
122
123 fn storage_store(&self) -> Option<Arc<dyn SessionStorageStore>> {
124 None
125 }
126
127 fn knowledge_store(&self) -> Option<Arc<dyn everruns_core::traits::KnowledgeStore>> {
129 None
130 }
131
132 fn connection_resolver(&self) -> Option<Arc<dyn UserConnectionResolver>> {
133 None
134 }
135
136 fn sqldb_store(&self) -> Option<SessionSqlDbStoreRef> {
137 None
138 }
139
140 fn leased_resource_store(&self) -> Option<Arc<dyn LeasedResourceStore>> {
141 None
142 }
143
144 fn session_resource_registry(&self) -> Option<Arc<dyn SessionResourceRegistry>> {
145 None
146 }
147
148 fn session_task_registry(
149 &self,
150 ) -> Option<Arc<dyn everruns_core::session_task::SessionTaskRegistry>> {
151 None
152 }
153
154 fn schedule_store(&self, _org_id: i64) -> Option<Arc<dyn SessionScheduleStore>> {
155 None
156 }
157
158 fn platform_store(
159 &self,
160 _org_id: i64,
161 _session_id: SessionId,
162 ) -> Option<Arc<dyn PlatformStore>> {
163 None
164 }
165
166 fn knowledge_index_search(&self, _org_id: i64) -> Option<Arc<dyn KnowledgeIndexSearch>> {
170 None
171 }
172
173 fn budget_checker(
174 &self,
175 _org_id: i64,
176 _agent_id: Option<AgentId>,
177 ) -> Option<Arc<dyn BudgetChecker>> {
178 None
179 }
180
181 fn payment_authority(
182 &self,
183 _org_id: i64,
184 _agent_id: Option<AgentId>,
185 ) -> Option<Arc<dyn PaymentAuthority>> {
186 None
187 }
188
189 fn outbound_tool_rate_limiter(
192 &self,
193 _org_id: i64,
194 ) -> Option<Arc<dyn everruns_core::OutboundToolRateLimiter>> {
195 None
196 }
197
198 fn durable_tool_result_store(&self) -> Option<Arc<dyn everruns_core::DurableToolResultStore>> {
201 None
202 }
203
204 fn subagent_spawn_store(&self) -> Option<Arc<dyn everruns_core::SubagentSpawnStore>> {
207 None
208 }
209
210 fn stream_heartbeater(&self) -> Option<Arc<dyn everruns_core::StreamHeartbeater>> {
213 None
214 }
215
216 fn partial_stream_store(&self) -> Option<Arc<dyn everruns_core::PartialStreamStore>> {
219 None
220 }
221
222 fn reasoning_effort_handle(
231 &self,
232 _session_id: SessionId,
233 ) -> Option<everruns_core::ReasoningEffortHandle> {
234 None
235 }
236
237 fn provider_stall_timeout(&self) -> Option<std::time::Duration> {
240 None
241 }
242
243 async fn mcp_executor(
247 &self,
248 _org_id: i64,
249 _session_id: SessionId,
250 ) -> Option<Arc<everruns_mcp::McpExecutor>> {
251 None
252 }
253}
254
255struct RuntimeExecutionCapabilities {
256 tool_registry: ToolRegistry,
257 post_tool_hooks: Vec<Arc<dyn everruns_core::PostToolExecHook>>,
258 pre_tool_hooks: Vec<Arc<dyn everruns_core::atoms::PreToolUseHook>>,
259 tool_call_hooks: Vec<Arc<dyn everruns_core::ToolCallHook>>,
260}
261
262fn finalize_specs_from_configs(
272 resolved_capability_configs: &[everruns_core::capability_types::AgentCapabilityConfig],
273 capability_registry: &CapabilityRegistry,
274) -> Vec<everruns_core::user_hook_types::UserHookSpec> {
275 let mut hook_contributions: Vec<(String, Vec<everruns_core::user_hook_types::UserHookSpec>)> =
276 Vec::new();
277 let mut disabled_contributions: Vec<String> = Vec::new();
278 for config in resolved_capability_configs {
279 let Some(capability) = capability_registry.get(config.capability_id()) else {
280 continue;
281 };
282 let specs = capability.user_hooks_with_config(&config.config);
283 if !specs.is_empty() {
284 hook_contributions.push((config.capability_id().to_string(), specs));
285 }
286 if config.capability_id() == "user_hooks" {
287 disabled_contributions.extend(
288 everruns_core::capabilities::user_hooks::disabled_contributions(&config.config),
289 );
290 }
291 }
292 everruns_core::hook_adapter::finalize_hook_specs(hook_contributions, &disabled_contributions)
293}
294
295async fn collect_lifecycle_hook_specs<A: RuntimeHostAdapter>(
300 adapter: &A,
301 org_id: i64,
302 session_id: SessionId,
303 harness_id: HarnessId,
304 agent_id: Option<AgentId>,
305) -> everruns_core::error::Result<(
306 Vec<everruns_core::user_hook_types::UserHookSpec>,
307 Arc<dyn everruns_core::hook_executor::BashHookDispatcher>,
308)> {
309 let capability_registry = adapter.capability_registry();
310 let harness_chain = adapter
311 .harness_store(org_id)
312 .get_harness_chain(harness_id)
313 .await?;
314 if harness_chain.is_empty() {
315 return Err(everruns_core::error::AgentLoopError::harness_not_found(
316 harness_id,
317 ));
318 }
319 let session = adapter
320 .session_store(org_id)
321 .get_session(session_id)
322 .await?
323 .ok_or_else(|| everruns_core::error::AgentLoopError::session_not_found(session_id))?;
324 let agent = match agent_id {
325 Some(agent_id) => adapter.agent_store(org_id).get_agent(agent_id).await?,
326 None => None,
327 };
328 let resolved = resolve_runtime_capabilities(
329 &harness_chain,
330 agent.as_ref(),
331 &session,
332 &capability_registry,
333 );
334 let specs =
335 finalize_specs_from_configs(&resolved.resolved_capability_configs, &capability_registry);
336 let dispatcher: Arc<dyn everruns_core::hook_executor::BashHookDispatcher> = Arc::new(
337 everruns_core::hook_dispatch::BashkitShellHookDispatcher::new(adapter.file_store()),
338 );
339 Ok((specs, dispatcher))
340}
341
342async fn load_execution_capabilities<A: RuntimeHostAdapter>(
343 adapter: &A,
344 org_id: i64,
345 session_id: SessionId,
346 harness_id: HarnessId,
347 agent_id: Option<AgentId>,
348 locale: Option<String>,
349 blueprint_id: Option<&str>,
350) -> everruns_core::error::Result<RuntimeExecutionCapabilities> {
351 let capability_registry = adapter.capability_registry();
352 if let Some(blueprint_id) = blueprint_id {
353 let mut registry = ToolRegistry::with_defaults();
354 let blueprint = capability_registry.blueprint(blueprint_id).ok_or_else(|| {
355 everruns_core::error::AgentLoopError::config(format!(
356 "Blueprint \"{blueprint_id}\" not found in registry"
357 ))
358 })?;
359 for tool in blueprint.tools {
360 registry.register_boxed(tool);
361 }
362 return Ok(RuntimeExecutionCapabilities {
363 tool_registry: registry,
364 post_tool_hooks: Vec::new(),
365 pre_tool_hooks: Vec::new(),
366 tool_call_hooks: Vec::new(),
367 });
368 }
369
370 let harness_chain = adapter
371 .harness_store(org_id)
372 .get_harness_chain(harness_id)
373 .await?;
374 if harness_chain.is_empty() {
375 return Err(everruns_core::error::AgentLoopError::harness_not_found(
376 harness_id,
377 ));
378 }
379
380 let session = adapter
381 .session_store(org_id)
382 .get_session(session_id)
383 .await?
384 .ok_or_else(|| everruns_core::error::AgentLoopError::session_not_found(session_id))?;
385
386 let agent_store = adapter.agent_store(org_id);
387 let agent = match agent_id {
388 Some(agent_id) => Some(
389 agent_store
390 .get_agent(agent_id)
391 .await?
392 .ok_or_else(|| everruns_core::error::AgentLoopError::agent_not_found(agent_id))?,
393 ),
394 None => None,
395 };
396
397 let resolved = resolve_runtime_capabilities(
398 &harness_chain,
399 agent.as_ref(),
400 &session,
401 &capability_registry,
402 );
403 let prompt_ctx = SystemPromptContext {
410 session_id,
411 locale: locale.or(session.locale.clone()),
412 file_store: Some(everruns_core::MountFs::wrap(
416 everruns_core::WorkspaceScopedFileSystem::wrap(
417 adapter.file_store(),
418 session.workspace_id,
419 ),
420 )),
421 model: None,
422 };
423 let collected = collect_capabilities_with_configs(
424 &resolved.resolved_capability_configs,
425 &capability_registry,
426 &prompt_ctx,
427 )
428 .await;
429
430 let mut registry = ToolRegistry::with_defaults();
431 for tool in collected.tools {
432 registry.register_boxed(tool);
433 }
434
435 let mut post_tool_hooks: Vec<Arc<dyn everruns_core::PostToolExecHook>> = resolved
440 .resolved_capability_configs
441 .iter()
442 .flat_map(|config| {
443 capability_registry
444 .get(config.capability_id())
445 .filter(|capability| capability.status() == CapabilityStatus::Available)
446 .map(|capability| capability.post_tool_exec_hooks_with_config(&config.config))
447 .unwrap_or_default()
448 })
449 .collect();
450 post_tool_hooks.sort_by_key(|hook| hook.priority());
453
454 let user_hook_specs =
461 finalize_specs_from_configs(&resolved.resolved_capability_configs, &capability_registry);
462 let mut pre_tool_hooks: Vec<Arc<dyn everruns_core::atoms::PreToolUseHook>> = resolved
465 .resolved_capability_configs
466 .iter()
467 .flat_map(|config| {
468 capability_registry
469 .get(config.capability_id())
470 .filter(|capability| capability.status() == CapabilityStatus::Available)
471 .map(|capability| capability.pre_tool_use_hooks_with_config(&config.config))
472 .unwrap_or_default()
473 })
474 .collect();
475 if !user_hook_specs.is_empty() {
476 let dispatcher: Arc<dyn everruns_core::hook_executor::BashHookDispatcher> = Arc::new(
477 everruns_core::hook_dispatch::BashkitShellHookDispatcher::new(adapter.file_store()),
478 );
479 post_tool_hooks.extend(everruns_core::hook_adapter::build_post_tool_use_hooks(
480 &user_hook_specs,
481 dispatcher.clone(),
482 ));
483 pre_tool_hooks.extend(everruns_core::hook_adapter::build_pre_tool_use_hooks(
484 &user_hook_specs,
485 dispatcher,
486 ));
487 }
488
489 let tool_call_hooks = collected.tool_call_hooks;
500
501 Ok(RuntimeExecutionCapabilities {
502 tool_registry: registry,
503 post_tool_hooks,
504 pre_tool_hooks,
505 tool_call_hooks,
506 })
507}
508
509pub struct RuntimeSessionLifecycle<A: RuntimeHostAdapter> {
511 adapter: A,
512 org_id: i64,
513 session_id: SessionId,
514}
515
516impl<A: RuntimeHostAdapter> RuntimeSessionLifecycle<A> {
517 pub fn new(adapter: A, org_id: i64, session_id: SessionId) -> Self {
518 Self {
519 adapter,
520 org_id,
521 session_id,
522 }
523 }
524
525 async fn set_session_status(&self, status: SessionStatus, action: &'static str) {
526 if let Err(error) = self
527 .adapter
528 .set_session_status(self.org_id, self.session_id, status)
529 .await
530 {
531 warn!(
532 session_id = %self.session_id,
533 org_id = self.org_id,
534 action,
535 %error,
536 "runtime host lifecycle status update failed"
537 );
538 }
539 }
540
541 async fn emit_event(&self, request: EventRequest) {
542 let event_type = request.event_type.clone();
543 if let Err(error) = self.adapter.event_emitter().emit(request).await {
544 warn!(
545 session_id = %self.session_id,
546 org_id = self.org_id,
547 event_type,
548 %error,
549 "runtime host lifecycle event emission failed"
550 );
551 }
552 }
553
554 pub async fn turn_started(&self, turn_id: TurnId, input_message_id: MessageId) {
555 let input_content = self
556 .adapter
557 .message_store()
558 .get(self.session_id, input_message_id)
559 .await
560 .ok()
561 .flatten()
562 .map(|message| message.content_to_llm_string());
563
564 self.set_session_status(SessionStatus::Active, "turn_started")
565 .await;
566
567 self.emit_event(EventRequest::new(
568 self.session_id,
569 EventContext::turn(turn_id, input_message_id),
570 SessionActivatedData {
571 turn_id,
572 input_message_id,
573 },
574 ))
575 .await;
576
577 self.emit_event(EventRequest::new(
578 self.session_id,
579 EventContext::turn(turn_id, input_message_id),
580 TurnStartedData {
581 turn_id,
582 input_message_id,
583 input_content,
584 },
585 ))
586 .await;
587 }
588
589 pub async fn emit_turn_completed(&self, input_message_id: MessageId, data: TurnCompletedData) {
590 let turn_id = data.turn_id;
591 self.emit_event(EventRequest::new(
592 self.session_id,
593 EventContext::turn(turn_id, input_message_id),
594 data,
595 ))
596 .await;
597 }
598
599 pub async fn emit_session_idled(
600 &self,
601 turn_id: TurnId,
602 input_message_id: MessageId,
603 iterations: Option<u32>,
604 usage: Option<TokenUsage>,
605 ) {
606 self.set_session_status(SessionStatus::Idle, "emit_session_idled")
607 .await;
608
609 self.emit_event(EventRequest::new(
610 self.session_id,
611 EventContext::turn(turn_id, input_message_id),
612 SessionIdledData {
613 turn_id,
614 iterations,
615 usage,
616 },
617 ))
618 .await;
619 }
620
621 pub async fn turn_completed(
622 &self,
623 turn_id: TurnId,
624 input_message_id: MessageId,
625 iterations: u32,
626 usage: Option<TokenUsage>,
627 input_content: Option<String>,
628 ) {
629 self.emit_turn_completed(
630 input_message_id,
631 TurnCompletedData {
632 turn_id,
633 iterations,
634 duration_ms: None,
635 usage: usage.clone(),
636 input_content,
637 final_message_id: None,
638 final_answer_preview: None,
639 time_to_first_token_ms: None,
640 tool_call_count: None,
641 llm_call_count: None,
642 status: Some("completed".to_string()),
643 },
644 )
645 .await;
646 self.emit_session_idled(turn_id, input_message_id, Some(iterations), usage)
647 .await;
648 }
649
650 pub async fn turn_sealed(
657 &self,
658 turn_id: TurnId,
659 input_message_id: MessageId,
660 reason: &str,
661 iterations: u32,
662 usage: Option<TokenUsage>,
663 ) {
664 let context = EventContext::turn(turn_id, input_message_id);
665
666 self.emit_event(EventRequest::new(
667 self.session_id,
668 context.clone(),
669 everruns_core::events::TurnSealedData {
670 turn_id,
671 reason: reason.to_string(),
672 detail: None,
673 iterations: Some(iterations),
674 usage: usage.clone(),
675 },
676 ))
677 .await;
678
679 self.emit_session_idled(turn_id, input_message_id, Some(iterations), usage)
680 .await;
681 }
682
683 pub async fn fire_turn_end_hooks(
687 &self,
688 harness_id: HarnessId,
689 agent_id: Option<AgentId>,
690 turn_id: TurnId,
691 success: bool,
692 ) {
693 let (specs, dispatcher) = match collect_lifecycle_hook_specs(
694 &self.adapter,
695 self.org_id,
696 self.session_id,
697 harness_id,
698 agent_id,
699 )
700 .await
701 {
702 Ok(pair) => pair,
703 Err(error) => {
704 warn!(
705 session_id = %self.session_id,
706 %error,
707 "failed to collect turn_end hook specs; skipping"
708 );
709 return;
710 }
711 };
712 let hooks = everruns_core::lifecycle_hooks::build_turn_lifecycle_hooks(
713 &specs,
714 everruns_core::user_hook_types::HookEvent::TurnEnd,
715 dispatcher,
716 );
717 if hooks.is_empty() {
718 return;
719 }
720 let ctx = everruns_core::lifecycle_hooks::TurnHookContext {
721 session_id: self.session_id,
722 turn_id: Some(turn_id),
723 org_id: org_public_id_from_internal(self.org_id).parse().ok(),
724 agent_id: agent_id.map(|a| a.to_string()),
725 };
726 everruns_core::lifecycle_hooks::run_turn_end_hooks(
727 &hooks,
728 &ctx,
729 serde_json::json!({ "success": success }),
730 )
731 .await;
732 }
733
734 pub async fn user_prompt_blocked(
739 &self,
740 turn_id: TurnId,
741 input_message_id: MessageId,
742 reason: &str,
743 user_message: Option<&str>,
744 ) {
745 let user_error =
746 UserFacingError::new(everruns_core::user_facing_error_codes::BLOCKED_BY_HOOK);
747 let shown = user_message.unwrap_or(reason);
748 let mut error_message = Message::assistant(shown);
749 let mut metadata = std::collections::HashMap::new();
750 user_error.apply_to_message_metadata(&mut metadata);
751 error_message.metadata = Some(metadata);
752
753 self.emit_event(EventRequest::new(
754 self.session_id,
755 EventContext::turn(turn_id, input_message_id),
756 OutputMessageCompletedData::new(error_message).with_user_facing_error(&user_error),
757 ))
758 .await;
759
760 self.turn_failed(turn_id, input_message_id, reason, Some(&user_error))
761 .await;
762 }
763
764 pub async fn turn_failed(
765 &self,
766 turn_id: TurnId,
767 input_message_id: MessageId,
768 error: &str,
769 user_error: Option<&UserFacingError>,
770 ) {
771 self.turn_failed_with_disclosure(turn_id, input_message_id, error, user_error, None)
772 .await;
773 }
774
775 pub async fn turn_failed_with_disclosure(
779 &self,
780 turn_id: TurnId,
781 input_message_id: MessageId,
782 error: &str,
783 user_error: Option<&UserFacingError>,
784 disclosure: Option<ErrorDisclosure>,
785 ) {
786 self.set_session_status(SessionStatus::Idle, "turn_failed")
787 .await;
788
789 self.emit_event(EventRequest::new(
790 self.session_id,
791 EventContext::turn(turn_id, input_message_id),
792 {
793 let mut data = TurnFailedData {
794 turn_id,
795 error: error.to_string(),
796 error_code: None,
797 error_fields: None,
798 error_disclosure: disclosure.map(|mode| mode.as_str().to_string()),
799 };
800 if let Some(user_error) = user_error {
801 user_error.apply_to_event_fields(&mut data.error_code, &mut data.error_fields);
802 }
803 data
804 },
805 ))
806 .await;
807
808 self.emit_event(EventRequest::new(
809 self.session_id,
810 EventContext::turn(turn_id, input_message_id),
811 SessionIdledData {
812 turn_id,
813 iterations: None,
814 usage: None,
815 },
816 ))
817 .await;
818 }
819
820 pub async fn waiting_for_tool_results(&self) {
821 self.set_session_status(
822 SessionStatus::WaitingForToolResults,
823 "waiting_for_tool_results",
824 )
825 .await;
826 }
827
828 pub async fn dependency_blocked(
829 &self,
830 turn_id: TurnId,
831 input_message_id: MessageId,
832 blocker: DependencyBlocker,
833 ) {
834 let user_error = UserFacingError::new(blocker.error_code())
835 .with_field(
836 "dependency",
837 match blocker {
838 DependencyBlocker::HarnessArchived | DependencyBlocker::HarnessDeleted => {
839 "harness"
840 }
841 DependencyBlocker::AgentArchived | DependencyBlocker::AgentDeleted => "agent",
842 },
843 )
844 .with_field(
845 "state",
846 match blocker {
847 DependencyBlocker::HarnessArchived | DependencyBlocker::AgentArchived => {
848 "archived"
849 }
850 DependencyBlocker::HarnessDeleted | DependencyBlocker::AgentDeleted => {
851 "deleted"
852 }
853 },
854 );
855 let mut error_message = Message::assistant(blocker.message());
856 let mut metadata = std::collections::HashMap::new();
857 user_error.apply_to_message_metadata(&mut metadata);
858 error_message.metadata = Some(metadata);
859
860 self.emit_event(EventRequest::new(
861 self.session_id,
862 EventContext::turn(turn_id, input_message_id),
863 OutputMessageCompletedData::new(error_message).with_user_facing_error(&user_error),
864 ))
865 .await;
866
867 self.turn_failed(
868 turn_id,
869 input_message_id,
870 blocker.message(),
871 Some(&user_error),
872 )
873 .await;
874 }
875}
876
877pub async fn detect_dependency_blocker<A: RuntimeHostAdapter>(
878 adapter: &A,
879 org_id: i64,
880 harness_id: HarnessId,
881 agent_id: Option<AgentId>,
882) -> everruns_core::error::Result<Option<DependencyBlocker>> {
883 let harness_store = adapter.harness_store(org_id);
884 let agent_store = adapter.agent_store(org_id);
885 everruns_core::detect_dependency_blocker(
886 harness_store.as_ref(),
887 agent_store.as_ref(),
888 harness_id,
889 agent_id,
890 )
891 .await
892}
893
894pub async fn execute_input_activity<A: RuntimeHostAdapter>(
895 adapter: &A,
896 org_id: i64,
897 input: InputAtomInput,
898) -> everruns_core::error::Result<InputAtomResult> {
899 RuntimeSessionLifecycle::new(adapter.clone(), org_id, input.context.session_id)
900 .turn_started(input.context.turn_id, input.context.input_message_id)
901 .await;
902
903 let atom = InputAtom::new(adapter.message_store());
904 atom.execute(input).await
905}
906
907struct UserPromptHookResult {
914 decision: everruns_core::lifecycle_hooks::UserPromptDecision,
915 original_message: String,
916}
917
918async fn run_user_prompt_submit_for_turn<A: RuntimeHostAdapter>(
919 adapter: &A,
920 org_id: i64,
921 input: &ReasonInput,
922) -> everruns_core::error::Result<Option<UserPromptHookResult>> {
923 let (specs, dispatcher) = match collect_lifecycle_hook_specs(
924 adapter,
925 org_id,
926 input.context.session_id,
927 input.harness_id,
928 input.agent_id,
929 )
930 .await
931 {
932 Ok(pair) => pair,
933 Err(error) => {
934 warn!(
935 session_id = %input.context.session_id,
936 %error,
937 "failed to collect user_prompt_submit hook specs; continuing without them"
938 );
939 return Ok(None);
940 }
941 };
942 let hooks = everruns_core::lifecycle_hooks::build_turn_lifecycle_hooks(
943 &specs,
944 everruns_core::user_hook_types::HookEvent::UserPromptSubmit,
945 dispatcher,
946 );
947 if hooks.is_empty() {
948 return Ok(None);
949 }
950
951 let message_text = adapter
952 .message_store()
953 .get(input.context.session_id, input.context.input_message_id)
954 .await
955 .ok()
956 .flatten()
957 .map(|m| m.content_to_llm_string())
958 .unwrap_or_default();
959
960 let ctx = everruns_core::lifecycle_hooks::TurnHookContext {
961 session_id: input.context.session_id,
962 turn_id: Some(input.context.turn_id),
963 org_id: org_public_id_from_internal(org_id).parse().ok(),
964 agent_id: input.agent_id.map(|a| a.to_string()),
965 };
966 let original_message = message_text.clone();
967 let decision =
968 everruns_core::lifecycle_hooks::run_user_prompt_submit_hooks(&hooks, &ctx, message_text)
969 .await;
970 Ok(Some(UserPromptHookResult {
971 decision,
972 original_message,
973 }))
974}
975
976pub async fn execute_reason_activity<A: RuntimeHostAdapter>(
977 adapter: &A,
978 org_id: i64,
979 input: ReasonInput,
980) -> everruns_core::error::Result<ReasonResult> {
981 if let Some(blocker) =
982 detect_dependency_blocker(adapter, org_id, input.harness_id, input.agent_id).await?
983 {
984 RuntimeSessionLifecycle::new(adapter.clone(), org_id, input.context.session_id)
985 .dependency_blocked(
986 input.context.turn_id,
987 input.context.input_message_id,
988 blocker,
989 )
990 .await;
991 return Ok(ReasonResult {
992 success: false,
993 text: blocker.message().to_string(),
994 tool_calls: vec![],
995 has_tool_calls: false,
996 tool_definitions: vec![],
997 max_iterations: everruns_core::runtime_agent::default_max_iterations(),
998 error: Some("dependency_unavailable".to_string()),
999 user_facing_error: None,
1000 error_disclosure: None,
1001 usage: None,
1002 output_message_id: None,
1003 time_to_first_token_ms: None,
1004 response_id: None,
1005 locale: None,
1006 network_access: None,
1007 parallel_tool_calls: None,
1008 });
1009 }
1010
1011 let mut user_prompt_message_override = None;
1019 if input.iteration <= 1
1020 && let Some(hook_result) = run_user_prompt_submit_for_turn(adapter, org_id, &input).await?
1021 {
1022 match hook_result.decision {
1023 everruns_core::lifecycle_hooks::UserPromptDecision::Block {
1024 reason,
1025 user_message,
1026 } => {
1027 RuntimeSessionLifecycle::new(adapter.clone(), org_id, input.context.session_id)
1028 .user_prompt_blocked(
1029 input.context.turn_id,
1030 input.context.input_message_id,
1031 &reason,
1032 user_message.as_deref(),
1033 )
1034 .await;
1035 return Ok(ReasonResult {
1036 success: false,
1037 text: user_message.unwrap_or_else(|| reason.clone()),
1038 tool_calls: vec![],
1039 has_tool_calls: false,
1040 tool_definitions: vec![],
1041 max_iterations: everruns_core::runtime_agent::default_max_iterations(),
1042 error: Some("blocked_by_user_prompt_hook".to_string()),
1043 user_facing_error: None,
1044 error_disclosure: None,
1045 usage: None,
1046 output_message_id: None,
1047 time_to_first_token_ms: None,
1048 response_id: None,
1049 locale: None,
1050 network_access: None,
1051 parallel_tool_calls: None,
1052 });
1053 }
1054 everruns_core::lifecycle_hooks::UserPromptDecision::Continue { message } => {
1055 if message != hook_result.original_message {
1056 user_prompt_message_override = Some(message);
1057 }
1058 }
1059 }
1060 }
1061
1062 let turn_context = adapter
1063 .load_turn_context(org_id, input.context.session_id)
1064 .await?;
1065
1066 let mut atom = ReasonAtom::new(
1067 adapter.harness_store(org_id),
1068 adapter.agent_store(org_id),
1069 adapter.session_store(org_id),
1070 adapter.message_store(),
1071 adapter.provider_store(org_id),
1072 adapter.capability_registry(),
1073 adapter.driver_registry(),
1074 adapter.event_emitter(),
1075 )
1076 .with_file_store(adapter.file_store());
1077 if let Some(image_resolver) = adapter.image_resolver(org_id) {
1078 atom = atom.with_image_resolver(image_resolver);
1079 }
1080 if let Some(hb) = adapter.stream_heartbeater() {
1081 atom = atom.with_stream_heartbeater(hb);
1082 }
1083 if let Some(timeout) = adapter.provider_stall_timeout() {
1084 atom = atom.with_provider_stall_timeout(timeout);
1085 }
1086 if let Some(store) = adapter.partial_stream_store() {
1087 atom = atom.with_partial_stream_store(store);
1088 }
1089 if let Some(store) = adapter.durable_tool_result_store() {
1090 atom = atom.with_durable_tool_result_store(store);
1091 }
1092 if let Some(handle) = adapter.reasoning_effort_handle(input.context.session_id) {
1093 atom = atom.with_reasoning_effort_handle(handle);
1094 }
1095 if let Some(utility_llm_service) = adapter.utility_llm_service() {
1096 atom = atom.with_utility_llm_service(utility_llm_service);
1097 }
1098
1099 let input = ReasonInput {
1100 mcp_tool_definitions: turn_context.mcp_tool_definitions,
1101 ..input
1102 };
1103
1104 if let Some(message_override) = user_prompt_message_override {
1105 let mut assembled = assemble_turn_context(
1106 adapter.harness_store(org_id).as_ref(),
1107 adapter.agent_store(org_id).as_ref(),
1108 adapter.session_store(org_id).as_ref(),
1109 adapter.message_store().as_ref(),
1110 adapter.provider_store(org_id).as_ref(),
1111 &adapter.capability_registry(),
1112 input.context.session_id,
1113 input.harness_id,
1114 input.agent_id,
1115 &input.mcp_tool_definitions,
1116 Some(adapter.file_store()),
1117 )
1118 .await?;
1119
1120 let message = assembled
1121 .messages
1122 .iter_mut()
1123 .find(|message| message.id == input.context.input_message_id)
1124 .ok_or_else(|| {
1125 everruns_core::error::AgentLoopError::config(
1126 "user_prompt_submit mutation: input message not found in assembled context",
1127 )
1128 })?;
1129
1130 message
1135 .content
1136 .retain(|part| !matches!(part, ContentPart::Text(_)));
1137 message
1138 .content
1139 .insert(0, ContentPart::text(message_override));
1140
1141 return atom.execute_with_assembled_context(input, assembled).await;
1142 }
1143
1144 atom.execute(input).await
1145}
1146
1147pub async fn execute_act_activity<A: RuntimeHostAdapter>(
1148 adapter: &A,
1149 input: ActInput,
1150) -> everruns_core::error::Result<ActResult> {
1151 let org_id = input.org_id.ok_or_else(|| {
1152 everruns_core::error::AgentLoopError::config(
1153 "ActInput.org_id must be set for runtime host execution",
1154 )
1155 })?;
1156
1157 if let Some(blocker) =
1158 detect_dependency_blocker(adapter, org_id, input.harness_id, input.agent_id).await?
1159 {
1160 RuntimeSessionLifecycle::new(adapter.clone(), org_id, input.context.session_id)
1161 .dependency_blocked(
1162 input.context.turn_id,
1163 input.context.input_message_id,
1164 blocker,
1165 )
1166 .await;
1167 return Ok(ActResult {
1168 results: vec![],
1169 completed: true,
1170 success_count: 0,
1171 error_count: 1,
1172 waiting_for_tool_results: false,
1173 blocked: true,
1174 client_tool_calls: vec![],
1175 client_tool_definitions: vec![],
1176 });
1177 }
1178
1179 let execution_capabilities = load_execution_capabilities(
1180 adapter,
1181 org_id,
1182 input.context.session_id,
1183 input.harness_id,
1184 input.agent_id,
1185 input.locale.clone(),
1186 input.blueprint_id.as_deref(),
1187 )
1188 .await?;
1189 let mut tool_registry = execution_capabilities.tool_registry;
1190
1191 let mut mcp_invoker: Option<Arc<dyn everruns_core::McpToolInvoker>> = None;
1201 if let Some(mcp) = adapter.mcp_executor(org_id, input.context.session_id).await {
1202 let invoker: Arc<dyn everruns_core::McpToolInvoker> = mcp;
1203 for tool in everruns_core::build_mcp_proxy_tools(&input.tool_definitions, invoker.clone()) {
1204 tool_registry.register_boxed(tool);
1205 }
1206 mcp_invoker = Some(Arc::new(everruns_core::ScopedMcpToolInvoker::new(
1207 &input.tool_definitions,
1208 invoker,
1209 )));
1210 }
1211
1212 let builtin_tool_registry = Arc::new(tool_registry.clone());
1213 let executor: Arc<dyn everruns_core::traits::ToolExecutor> = Arc::new(tool_registry);
1214
1215 let mut atom =
1216 ActAtom::with_file_store(executor, adapter.event_emitter(), adapter.file_store())
1217 .with_session_store(adapter.session_store(org_id))
1218 .with_session_mutator(adapter.session_mutator(org_id))
1219 .with_agent_store(adapter.agent_store(org_id))
1220 .with_tool_registry(builtin_tool_registry)
1221 .with_org_id(
1222 org_public_id_from_internal(org_id)
1223 .parse()
1224 .expect("internal org id converts to valid public org id"),
1225 )
1226 .with_capability_registry(adapter.capability_registry())
1227 .with_post_tool_hooks(execution_capabilities.post_tool_hooks)
1228 .with_pre_tool_hooks(execution_capabilities.pre_tool_hooks)
1229 .with_tool_call_hooks(execution_capabilities.tool_call_hooks);
1230
1231 if let Some(storage_store) = adapter.storage_store() {
1232 atom = atom.with_storage_store(storage_store);
1233 }
1234 if let Some(knowledge_store) = adapter.knowledge_store() {
1235 atom = atom.with_knowledge_store(knowledge_store);
1236 }
1237 if let Some(image_store) = adapter.image_artifact_store(org_id) {
1238 atom = atom.with_image_store(image_store);
1239 }
1240 if let Some(provider_credential_store) = adapter.provider_credential_store(org_id) {
1241 atom = atom.with_provider_credential_store(provider_credential_store);
1242 }
1243 if let Some(utility_llm_service) = adapter.utility_llm_service() {
1244 atom = atom.with_utility_llm_service(utility_llm_service);
1245 }
1246 if let Some(invoker) = mcp_invoker {
1247 atom = atom.with_mcp_invoker(invoker);
1248 }
1249 if let Some(egress_service) = adapter.egress_service() {
1250 atom = atom.with_egress_service(egress_service);
1251 }
1252 if let Some(connection_resolver) = adapter.connection_resolver() {
1253 atom = atom.with_connection_resolver(connection_resolver);
1254 }
1255 if let Some(sqldb_store) = adapter.sqldb_store() {
1256 atom = atom.with_sqldb_store(sqldb_store);
1257 }
1258 if let Some(leased_resource_store) = adapter.leased_resource_store() {
1259 atom = atom.with_leased_resource_store(leased_resource_store);
1260 }
1261 if let Some(registry) = adapter.session_resource_registry() {
1262 atom = atom.with_session_resource_registry(registry);
1263 }
1264 if let Some(registry) = adapter.session_task_registry() {
1265 atom = atom.with_session_task_registry(registry);
1266 }
1267 if let Some(schedule_store) = adapter.schedule_store(org_id) {
1268 atom = atom.with_schedule_store(schedule_store);
1269 }
1270 if let Some(platform_store) = adapter.platform_store(org_id, input.context.session_id) {
1271 atom = atom.with_platform_store(platform_store);
1272 }
1273 if let Some(knowledge_index_search) = adapter.knowledge_index_search(org_id) {
1274 atom = atom.with_knowledge_index_search(knowledge_index_search);
1275 }
1276 if let Some(budget_checker) = adapter.budget_checker(org_id, input.agent_id) {
1277 atom = atom.with_budget_checker(budget_checker);
1278 }
1279 if let Some(payment_authority) = adapter.payment_authority(org_id, input.agent_id) {
1280 atom = atom.with_payment_authority(payment_authority);
1281 }
1282 if let Some(limiter) = adapter.outbound_tool_rate_limiter(org_id) {
1283 atom = atom.with_outbound_tool_rate_limiter(limiter);
1284 }
1285 if let Some(store) = adapter.durable_tool_result_store() {
1286 atom = atom.with_durable_tool_result_store(store);
1287 }
1288 if let Some(store) = adapter.subagent_spawn_store() {
1289 atom = atom.with_subagent_spawn_store(store);
1290 }
1291 if let Some(handle) = adapter.reasoning_effort_handle(input.context.session_id) {
1292 atom = atom.with_reasoning_effort_handle(handle);
1293 }
1294
1295 atom.execute(input).await
1296}