1use async_trait::async_trait;
6use everruns_core::atoms::{
7 ActAtom, ActInput, ActResult, Atom, InputAtom, InputAtomInput, InputAtomResult, ReasonAtom,
8 ReasonInput, ReasonResult,
9};
10use everruns_core::capabilities::{SystemPromptContext, collect_capabilities_with_configs};
11use everruns_core::events::{
12 EventContext, EventRequest, OutputMessageCompletedData, SessionActivatedData, SessionIdledData,
13 TurnCompletedData, TurnFailedData, TurnStartedData,
14};
15use everruns_core::message::Message;
16use everruns_core::message_retriever::MessageRetriever;
17use everruns_core::platform_store::PlatformStore;
18use everruns_core::session::SessionStatus;
19use everruns_core::traits::{
20 AgentStore, BudgetChecker, EventEmitter, HarnessStore, ImageArtifactStore, ImageResolver,
21 LeasedResourceStore, LlmProviderStore, ModelWithProvider, PaymentAuthority,
22 ProviderCredentialStore, SessionFileSystem, SessionMutator, SessionResourceRegistry,
23 SessionScheduleStore, SessionSqlDbStoreRef, SessionStorageStore, SessionStore,
24 UserConnectionResolver,
25};
26use everruns_core::typed_id::{AgentId, HarnessId, MessageId, SessionId, TurnId};
27use everruns_core::{
28 Agent, CapabilityRegistry, CapabilityStatus, DependencyBlocker, DriverRegistry, EgressService,
29 Harness, Session, TokenUsage, ToolDefinition, ToolRegistry, UserFacingError, UtilityLlmService,
30 org_public_id_from_internal, resolve_runtime_capabilities,
31};
32use std::sync::Arc;
33use tracing::warn;
34
35#[derive(Debug, Clone)]
37pub struct RuntimeHostTurnContext {
38 pub agent: Option<Agent>,
39 pub session: Session,
40 pub messages: Vec<Message>,
41 pub model: Option<ModelWithProvider>,
42 pub mcp_tool_definitions: Vec<ToolDefinition>,
43}
44
45#[async_trait]
56pub trait RuntimeHostAdapter: Send + Sync + Clone + 'static {
57 async fn get_agent(
58 &self,
59 org_id: i64,
60 agent_id: AgentId,
61 ) -> everruns_core::error::Result<Option<Agent>>;
62
63 async fn get_harness(
64 &self,
65 org_id: i64,
66 harness_id: HarnessId,
67 ) -> everruns_core::error::Result<Option<Harness>>;
68
69 async fn set_session_status(
70 &self,
71 org_id: i64,
72 session_id: SessionId,
73 status: SessionStatus,
74 ) -> everruns_core::error::Result<Session>;
75
76 async fn load_turn_context(
77 &self,
78 org_id: i64,
79 session_id: SessionId,
80 ) -> everruns_core::error::Result<RuntimeHostTurnContext>;
81
82 fn capability_registry(&self) -> CapabilityRegistry;
83
84 fn driver_registry(&self) -> DriverRegistry;
85
86 fn harness_store(&self, org_id: i64) -> Arc<dyn HarnessStore>;
87
88 fn agent_store(&self, org_id: i64) -> Arc<dyn AgentStore>;
89
90 fn session_store(&self, org_id: i64) -> Arc<dyn SessionStore>;
91
92 fn session_mutator(&self, org_id: i64) -> Arc<dyn SessionMutator>;
93
94 fn provider_store(&self, org_id: i64) -> Arc<dyn LlmProviderStore>;
95
96 fn message_store(&self) -> Arc<dyn MessageRetriever>;
97
98 fn event_emitter(&self) -> Arc<dyn EventEmitter>;
99
100 fn file_store(&self) -> Arc<dyn SessionFileSystem>;
101
102 fn image_resolver(&self, _org_id: i64) -> Option<Arc<dyn ImageResolver>> {
103 None
104 }
105
106 fn image_artifact_store(&self, _org_id: i64) -> Option<Arc<dyn ImageArtifactStore>> {
107 None
108 }
109
110 fn provider_credential_store(&self, _org_id: i64) -> Option<Arc<dyn ProviderCredentialStore>> {
111 None
112 }
113
114 fn utility_llm_service(&self) -> Option<Arc<dyn UtilityLlmService>> {
115 None
116 }
117
118 fn egress_service(&self) -> Option<Arc<dyn EgressService>> {
119 None
120 }
121
122 fn storage_store(&self) -> Option<Arc<dyn SessionStorageStore>> {
123 None
124 }
125
126 fn memory_store(&self, _org_id: i64) -> Option<Arc<dyn everruns_core::MemoryStoreBackend>> {
127 None
128 }
129
130 fn connection_resolver(&self) -> Option<Arc<dyn UserConnectionResolver>> {
131 None
132 }
133
134 fn sqldb_store(&self) -> Option<SessionSqlDbStoreRef> {
135 None
136 }
137
138 fn leased_resource_store(&self) -> Option<Arc<dyn LeasedResourceStore>> {
139 None
140 }
141
142 fn session_resource_registry(&self) -> Option<Arc<dyn SessionResourceRegistry>> {
143 None
144 }
145
146 fn schedule_store(&self, _org_id: i64) -> Option<Arc<dyn SessionScheduleStore>> {
147 None
148 }
149
150 fn platform_store(
151 &self,
152 _org_id: i64,
153 _session_id: SessionId,
154 ) -> Option<Arc<dyn PlatformStore>> {
155 None
156 }
157
158 fn budget_checker(
159 &self,
160 _org_id: i64,
161 _agent_id: Option<AgentId>,
162 ) -> Option<Arc<dyn BudgetChecker>> {
163 None
164 }
165
166 fn payment_authority(
167 &self,
168 _org_id: i64,
169 _agent_id: Option<AgentId>,
170 ) -> Option<Arc<dyn PaymentAuthority>> {
171 None
172 }
173
174 fn outbound_tool_rate_limiter(
177 &self,
178 _org_id: i64,
179 ) -> Option<Arc<dyn everruns_core::OutboundToolRateLimiter>> {
180 None
181 }
182
183 async fn mcp_executor(
187 &self,
188 _org_id: i64,
189 _session_id: SessionId,
190 ) -> Option<Arc<everruns_mcp::McpExecutor>> {
191 None
192 }
193}
194
195struct RuntimeExecutionCapabilities {
196 tool_registry: ToolRegistry,
197 post_tool_hooks: Vec<Arc<dyn everruns_core::PostToolExecHook>>,
198 pre_tool_hooks: Vec<Arc<dyn everruns_core::atoms::PreToolUseHook>>,
199 tool_call_hooks: Vec<Arc<dyn everruns_core::ToolCallHook>>,
200}
201
202fn finalize_specs_from_configs(
212 resolved_capability_configs: &[everruns_core::capability_types::AgentCapabilityConfig],
213 capability_registry: &CapabilityRegistry,
214) -> Vec<everruns_core::user_hook_types::UserHookSpec> {
215 let mut hook_contributions: Vec<(String, Vec<everruns_core::user_hook_types::UserHookSpec>)> =
216 Vec::new();
217 let mut disabled_contributions: Vec<String> = Vec::new();
218 for config in resolved_capability_configs {
219 let Some(capability) = capability_registry.get(config.capability_id()) else {
220 continue;
221 };
222 let specs = capability.user_hooks_with_config(&config.config);
223 if !specs.is_empty() {
224 hook_contributions.push((config.capability_id().to_string(), specs));
225 }
226 if config.capability_id() == "user_hooks" {
227 disabled_contributions.extend(
228 everruns_core::capabilities::user_hooks::disabled_contributions(&config.config),
229 );
230 }
231 }
232 everruns_core::hook_adapter::finalize_hook_specs(hook_contributions, &disabled_contributions)
233}
234
235async fn collect_lifecycle_hook_specs<A: RuntimeHostAdapter>(
240 adapter: &A,
241 org_id: i64,
242 session_id: SessionId,
243 harness_id: HarnessId,
244 agent_id: Option<AgentId>,
245) -> everruns_core::error::Result<(
246 Vec<everruns_core::user_hook_types::UserHookSpec>,
247 Arc<dyn everruns_core::hook_executor::BashHookDispatcher>,
248)> {
249 let capability_registry = adapter.capability_registry();
250 let harness_chain = adapter
251 .harness_store(org_id)
252 .get_harness_chain(harness_id)
253 .await?;
254 if harness_chain.is_empty() {
255 return Err(everruns_core::error::AgentLoopError::harness_not_found(
256 harness_id,
257 ));
258 }
259 let session = adapter
260 .session_store(org_id)
261 .get_session(session_id)
262 .await?
263 .ok_or_else(|| everruns_core::error::AgentLoopError::session_not_found(session_id))?;
264 let agent = match agent_id {
265 Some(agent_id) => adapter.agent_store(org_id).get_agent(agent_id).await?,
266 None => None,
267 };
268 let resolved = resolve_runtime_capabilities(
269 &harness_chain,
270 agent.as_ref(),
271 &session,
272 &capability_registry,
273 );
274 let specs =
275 finalize_specs_from_configs(&resolved.resolved_capability_configs, &capability_registry);
276 let dispatcher: Arc<dyn everruns_core::hook_executor::BashHookDispatcher> = Arc::new(
277 everruns_core::hook_dispatch::VirtualBashHookDispatcher::new(adapter.file_store()),
278 );
279 Ok((specs, dispatcher))
280}
281
282async fn load_execution_capabilities<A: RuntimeHostAdapter>(
283 adapter: &A,
284 org_id: i64,
285 session_id: SessionId,
286 harness_id: HarnessId,
287 agent_id: Option<AgentId>,
288 locale: Option<String>,
289 blueprint_id: Option<&str>,
290) -> everruns_core::error::Result<RuntimeExecutionCapabilities> {
291 let capability_registry = adapter.capability_registry();
292 if let Some(blueprint_id) = blueprint_id {
293 let mut registry = ToolRegistry::with_defaults();
294 let blueprint = capability_registry.blueprint(blueprint_id).ok_or_else(|| {
295 everruns_core::error::AgentLoopError::config(format!(
296 "Blueprint \"{blueprint_id}\" not found in registry"
297 ))
298 })?;
299 for tool in blueprint.tools {
300 registry.register_boxed(tool);
301 }
302 return Ok(RuntimeExecutionCapabilities {
303 tool_registry: registry,
304 post_tool_hooks: Vec::new(),
305 pre_tool_hooks: Vec::new(),
306 tool_call_hooks: Vec::new(),
307 });
308 }
309
310 let harness_chain = adapter
311 .harness_store(org_id)
312 .get_harness_chain(harness_id)
313 .await?;
314 if harness_chain.is_empty() {
315 return Err(everruns_core::error::AgentLoopError::harness_not_found(
316 harness_id,
317 ));
318 }
319
320 let session = adapter
321 .session_store(org_id)
322 .get_session(session_id)
323 .await?
324 .ok_or_else(|| everruns_core::error::AgentLoopError::session_not_found(session_id))?;
325
326 let agent_store = adapter.agent_store(org_id);
327 let agent = match agent_id {
328 Some(agent_id) => Some(
329 agent_store
330 .get_agent(agent_id)
331 .await?
332 .ok_or_else(|| everruns_core::error::AgentLoopError::agent_not_found(agent_id))?,
333 ),
334 None => None,
335 };
336
337 let resolved = resolve_runtime_capabilities(
338 &harness_chain,
339 agent.as_ref(),
340 &session,
341 &capability_registry,
342 );
343 let prompt_ctx = SystemPromptContext {
350 session_id,
351 locale: locale.or(session.locale.clone()),
352 file_store: Some(adapter.file_store()),
353 model: None,
354 };
355 let collected = collect_capabilities_with_configs(
356 &resolved.resolved_capability_configs,
357 &capability_registry,
358 &prompt_ctx,
359 )
360 .await;
361
362 let mut registry = ToolRegistry::with_defaults();
363 for tool in collected.tools {
364 registry.register_boxed(tool);
365 }
366
367 let mut post_tool_hooks: Vec<Arc<dyn everruns_core::PostToolExecHook>> = resolved
372 .resolved_capability_configs
373 .iter()
374 .flat_map(|config| {
375 capability_registry
376 .get(config.capability_id())
377 .filter(|capability| capability.status() == CapabilityStatus::Available)
378 .map(|capability| capability.post_tool_exec_hooks())
379 .unwrap_or_default()
380 })
381 .collect();
382
383 let user_hook_specs =
390 finalize_specs_from_configs(&resolved.resolved_capability_configs, &capability_registry);
391 let mut pre_tool_hooks: Vec<Arc<dyn everruns_core::atoms::PreToolUseHook>> = resolved
394 .resolved_capability_configs
395 .iter()
396 .flat_map(|config| {
397 capability_registry
398 .get(config.capability_id())
399 .filter(|capability| capability.status() == CapabilityStatus::Available)
400 .map(|capability| capability.pre_tool_use_hooks())
401 .unwrap_or_default()
402 })
403 .collect();
404 if !user_hook_specs.is_empty() {
405 let dispatcher: Arc<dyn everruns_core::hook_executor::BashHookDispatcher> = Arc::new(
406 everruns_core::hook_dispatch::VirtualBashHookDispatcher::new(adapter.file_store()),
407 );
408 post_tool_hooks.extend(everruns_core::hook_adapter::build_post_tool_use_hooks(
409 &user_hook_specs,
410 dispatcher.clone(),
411 ));
412 pre_tool_hooks.extend(everruns_core::hook_adapter::build_pre_tool_use_hooks(
413 &user_hook_specs,
414 dispatcher,
415 ));
416 }
417
418 let tool_call_hooks = resolved
419 .resolved_capability_configs
420 .iter()
421 .flat_map(|config| {
422 capability_registry
423 .get(config.capability_id())
424 .filter(|capability| capability.status() == CapabilityStatus::Available)
425 .map(|capability| capability.tool_call_hooks())
426 .unwrap_or_default()
427 })
428 .collect();
429
430 Ok(RuntimeExecutionCapabilities {
431 tool_registry: registry,
432 post_tool_hooks,
433 pre_tool_hooks,
434 tool_call_hooks,
435 })
436}
437
438pub struct RuntimeSessionLifecycle<A: RuntimeHostAdapter> {
440 adapter: A,
441 org_id: i64,
442 session_id: SessionId,
443}
444
445impl<A: RuntimeHostAdapter> RuntimeSessionLifecycle<A> {
446 pub fn new(adapter: A, org_id: i64, session_id: SessionId) -> Self {
447 Self {
448 adapter,
449 org_id,
450 session_id,
451 }
452 }
453
454 async fn set_session_status(&self, status: SessionStatus, action: &'static str) {
455 if let Err(error) = self
456 .adapter
457 .set_session_status(self.org_id, self.session_id, status)
458 .await
459 {
460 warn!(
461 session_id = %self.session_id,
462 org_id = self.org_id,
463 action,
464 %error,
465 "runtime host lifecycle status update failed"
466 );
467 }
468 }
469
470 async fn emit_event(&self, request: EventRequest) {
471 let event_type = request.event_type.clone();
472 if let Err(error) = self.adapter.event_emitter().emit(request).await {
473 warn!(
474 session_id = %self.session_id,
475 org_id = self.org_id,
476 event_type,
477 %error,
478 "runtime host lifecycle event emission failed"
479 );
480 }
481 }
482
483 pub async fn turn_started(&self, turn_id: TurnId, input_message_id: MessageId) {
484 let input_content = self
485 .adapter
486 .message_store()
487 .get(self.session_id, input_message_id)
488 .await
489 .ok()
490 .flatten()
491 .map(|message| message.content_to_llm_string());
492
493 self.set_session_status(SessionStatus::Active, "turn_started")
494 .await;
495
496 self.emit_event(EventRequest::new(
497 self.session_id,
498 EventContext::turn(turn_id, input_message_id),
499 SessionActivatedData {
500 turn_id,
501 input_message_id,
502 },
503 ))
504 .await;
505
506 self.emit_event(EventRequest::new(
507 self.session_id,
508 EventContext::turn(turn_id, input_message_id),
509 TurnStartedData {
510 turn_id,
511 input_message_id,
512 input_content,
513 },
514 ))
515 .await;
516 }
517
518 pub async fn emit_turn_completed(&self, input_message_id: MessageId, data: TurnCompletedData) {
519 let turn_id = data.turn_id;
520 self.emit_event(EventRequest::new(
521 self.session_id,
522 EventContext::turn(turn_id, input_message_id),
523 data,
524 ))
525 .await;
526 }
527
528 pub async fn emit_session_idled(
529 &self,
530 turn_id: TurnId,
531 input_message_id: MessageId,
532 iterations: Option<u32>,
533 usage: Option<TokenUsage>,
534 ) {
535 self.set_session_status(SessionStatus::Idle, "emit_session_idled")
536 .await;
537
538 self.emit_event(EventRequest::new(
539 self.session_id,
540 EventContext::turn(turn_id, input_message_id),
541 SessionIdledData {
542 turn_id,
543 iterations,
544 usage,
545 },
546 ))
547 .await;
548 }
549
550 pub async fn turn_completed(
551 &self,
552 turn_id: TurnId,
553 input_message_id: MessageId,
554 iterations: u32,
555 usage: Option<TokenUsage>,
556 input_content: Option<String>,
557 ) {
558 self.emit_turn_completed(
559 input_message_id,
560 TurnCompletedData {
561 turn_id,
562 iterations,
563 duration_ms: None,
564 usage: usage.clone(),
565 input_content,
566 final_message_id: None,
567 final_answer_preview: None,
568 time_to_first_token_ms: None,
569 tool_call_count: None,
570 llm_call_count: None,
571 status: Some("completed".to_string()),
572 },
573 )
574 .await;
575 self.emit_session_idled(turn_id, input_message_id, Some(iterations), usage)
576 .await;
577 }
578
579 pub async fn fire_turn_end_hooks(
583 &self,
584 harness_id: HarnessId,
585 agent_id: Option<AgentId>,
586 turn_id: TurnId,
587 success: bool,
588 ) {
589 let (specs, dispatcher) = match collect_lifecycle_hook_specs(
590 &self.adapter,
591 self.org_id,
592 self.session_id,
593 harness_id,
594 agent_id,
595 )
596 .await
597 {
598 Ok(pair) => pair,
599 Err(error) => {
600 warn!(
601 session_id = %self.session_id,
602 %error,
603 "failed to collect turn_end hook specs; skipping"
604 );
605 return;
606 }
607 };
608 let hooks = everruns_core::lifecycle_hooks::build_turn_lifecycle_hooks(
609 &specs,
610 everruns_core::user_hook_types::HookEvent::TurnEnd,
611 dispatcher,
612 );
613 if hooks.is_empty() {
614 return;
615 }
616 let ctx = everruns_core::lifecycle_hooks::TurnHookContext {
617 session_id: self.session_id,
618 turn_id: Some(turn_id),
619 org_id: org_public_id_from_internal(self.org_id).parse().ok(),
620 agent_id: agent_id.map(|a| a.to_string()),
621 };
622 everruns_core::lifecycle_hooks::run_turn_end_hooks(
623 &hooks,
624 &ctx,
625 serde_json::json!({ "success": success }),
626 )
627 .await;
628 }
629
630 pub async fn user_prompt_blocked(
635 &self,
636 turn_id: TurnId,
637 input_message_id: MessageId,
638 reason: &str,
639 user_message: Option<&str>,
640 ) {
641 let user_error =
642 UserFacingError::new(everruns_core::user_facing_error_codes::BLOCKED_BY_HOOK);
643 let shown = user_message.unwrap_or(reason);
644 let mut error_message = Message::assistant(shown);
645 let mut metadata = std::collections::HashMap::new();
646 user_error.apply_to_message_metadata(&mut metadata);
647 error_message.metadata = Some(metadata);
648
649 self.emit_event(EventRequest::new(
650 self.session_id,
651 EventContext::turn(turn_id, input_message_id),
652 OutputMessageCompletedData::new(error_message).with_user_facing_error(&user_error),
653 ))
654 .await;
655
656 self.turn_failed(turn_id, input_message_id, reason, Some(&user_error))
657 .await;
658 }
659
660 pub async fn turn_failed(
661 &self,
662 turn_id: TurnId,
663 input_message_id: MessageId,
664 error: &str,
665 user_error: Option<&UserFacingError>,
666 ) {
667 self.set_session_status(SessionStatus::Idle, "turn_failed")
668 .await;
669
670 self.emit_event(EventRequest::new(
671 self.session_id,
672 EventContext::turn(turn_id, input_message_id),
673 {
674 let mut data = TurnFailedData {
675 turn_id,
676 error: error.to_string(),
677 error_code: None,
678 error_fields: None,
679 };
680 if let Some(user_error) = user_error {
681 user_error.apply_to_event_fields(&mut data.error_code, &mut data.error_fields);
682 }
683 data
684 },
685 ))
686 .await;
687
688 self.emit_event(EventRequest::new(
689 self.session_id,
690 EventContext::turn(turn_id, input_message_id),
691 SessionIdledData {
692 turn_id,
693 iterations: None,
694 usage: None,
695 },
696 ))
697 .await;
698 }
699
700 pub async fn waiting_for_tool_results(&self) {
701 self.set_session_status(
702 SessionStatus::WaitingForToolResults,
703 "waiting_for_tool_results",
704 )
705 .await;
706 }
707
708 pub async fn dependency_blocked(
709 &self,
710 turn_id: TurnId,
711 input_message_id: MessageId,
712 blocker: DependencyBlocker,
713 ) {
714 let user_error = UserFacingError::new(blocker.error_code())
715 .with_field(
716 "dependency",
717 match blocker {
718 DependencyBlocker::HarnessArchived | DependencyBlocker::HarnessDeleted => {
719 "harness"
720 }
721 DependencyBlocker::AgentArchived | DependencyBlocker::AgentDeleted => "agent",
722 },
723 )
724 .with_field(
725 "state",
726 match blocker {
727 DependencyBlocker::HarnessArchived | DependencyBlocker::AgentArchived => {
728 "archived"
729 }
730 DependencyBlocker::HarnessDeleted | DependencyBlocker::AgentDeleted => {
731 "deleted"
732 }
733 },
734 );
735 let mut error_message = Message::assistant(blocker.message());
736 let mut metadata = std::collections::HashMap::new();
737 user_error.apply_to_message_metadata(&mut metadata);
738 error_message.metadata = Some(metadata);
739
740 self.emit_event(EventRequest::new(
741 self.session_id,
742 EventContext::turn(turn_id, input_message_id),
743 OutputMessageCompletedData::new(error_message).with_user_facing_error(&user_error),
744 ))
745 .await;
746
747 self.turn_failed(
748 turn_id,
749 input_message_id,
750 blocker.message(),
751 Some(&user_error),
752 )
753 .await;
754 }
755}
756
757pub async fn detect_dependency_blocker<A: RuntimeHostAdapter>(
758 adapter: &A,
759 org_id: i64,
760 harness_id: HarnessId,
761 agent_id: Option<AgentId>,
762) -> everruns_core::error::Result<Option<DependencyBlocker>> {
763 let harness_store = adapter.harness_store(org_id);
764 let agent_store = adapter.agent_store(org_id);
765 everruns_core::detect_dependency_blocker(
766 harness_store.as_ref(),
767 agent_store.as_ref(),
768 harness_id,
769 agent_id,
770 )
771 .await
772}
773
774pub async fn execute_input_activity<A: RuntimeHostAdapter>(
775 adapter: &A,
776 org_id: i64,
777 input: InputAtomInput,
778) -> everruns_core::error::Result<InputAtomResult> {
779 RuntimeSessionLifecycle::new(adapter.clone(), org_id, input.context.session_id)
780 .turn_started(input.context.turn_id, input.context.input_message_id)
781 .await;
782
783 let atom = InputAtom::new(adapter.message_store());
784 atom.execute(input).await
785}
786
787async fn run_user_prompt_submit_for_turn<A: RuntimeHostAdapter>(
794 adapter: &A,
795 org_id: i64,
796 input: &ReasonInput,
797) -> everruns_core::error::Result<Option<everruns_core::lifecycle_hooks::UserPromptDecision>> {
798 let (specs, dispatcher) = match collect_lifecycle_hook_specs(
799 adapter,
800 org_id,
801 input.context.session_id,
802 input.harness_id,
803 input.agent_id,
804 )
805 .await
806 {
807 Ok(pair) => pair,
808 Err(error) => {
809 warn!(
810 session_id = %input.context.session_id,
811 %error,
812 "failed to collect user_prompt_submit hook specs; continuing without them"
813 );
814 return Ok(None);
815 }
816 };
817 let hooks = everruns_core::lifecycle_hooks::build_turn_lifecycle_hooks(
818 &specs,
819 everruns_core::user_hook_types::HookEvent::UserPromptSubmit,
820 dispatcher,
821 );
822 if hooks.is_empty() {
823 return Ok(None);
824 }
825
826 let message_text = adapter
827 .message_store()
828 .get(input.context.session_id, input.context.input_message_id)
829 .await
830 .ok()
831 .flatten()
832 .map(|m| m.content_to_llm_string())
833 .unwrap_or_default();
834
835 let ctx = everruns_core::lifecycle_hooks::TurnHookContext {
836 session_id: input.context.session_id,
837 turn_id: Some(input.context.turn_id),
838 org_id: org_public_id_from_internal(org_id).parse().ok(),
839 agent_id: input.agent_id.map(|a| a.to_string()),
840 };
841 Ok(Some(
842 everruns_core::lifecycle_hooks::run_user_prompt_submit_hooks(&hooks, &ctx, message_text)
843 .await,
844 ))
845}
846
847pub async fn execute_reason_activity<A: RuntimeHostAdapter>(
848 adapter: &A,
849 org_id: i64,
850 input: ReasonInput,
851) -> everruns_core::error::Result<ReasonResult> {
852 if let Some(blocker) =
853 detect_dependency_blocker(adapter, org_id, input.harness_id, input.agent_id).await?
854 {
855 RuntimeSessionLifecycle::new(adapter.clone(), org_id, input.context.session_id)
856 .dependency_blocked(
857 input.context.turn_id,
858 input.context.input_message_id,
859 blocker,
860 )
861 .await;
862 return Ok(ReasonResult {
863 success: false,
864 text: blocker.message().to_string(),
865 tool_calls: vec![],
866 has_tool_calls: false,
867 tool_definitions: vec![],
868 max_iterations: everruns_core::runtime_agent::default_max_iterations(),
869 error: Some("dependency_unavailable".to_string()),
870 usage: None,
871 output_message_id: None,
872 time_to_first_token_ms: None,
873 response_id: None,
874 locale: None,
875 network_access: None,
876 });
877 }
878
879 if input.iteration <= 1
887 && let Some(everruns_core::lifecycle_hooks::UserPromptDecision::Block {
888 reason,
889 user_message,
890 }) = run_user_prompt_submit_for_turn(adapter, org_id, &input).await?
891 {
892 RuntimeSessionLifecycle::new(adapter.clone(), org_id, input.context.session_id)
893 .user_prompt_blocked(
894 input.context.turn_id,
895 input.context.input_message_id,
896 &reason,
897 user_message.as_deref(),
898 )
899 .await;
900 return Ok(ReasonResult {
901 success: false,
902 text: user_message.unwrap_or_else(|| reason.clone()),
903 tool_calls: vec![],
904 has_tool_calls: false,
905 tool_definitions: vec![],
906 max_iterations: everruns_core::runtime_agent::default_max_iterations(),
907 error: Some("blocked_by_user_prompt_hook".to_string()),
908 usage: None,
909 output_message_id: None,
910 time_to_first_token_ms: None,
911 response_id: None,
912 locale: None,
913 network_access: None,
914 });
915 }
916
917 let turn_context = adapter
918 .load_turn_context(org_id, input.context.session_id)
919 .await?;
920
921 let mut atom = ReasonAtom::new(
922 adapter.harness_store(org_id),
923 adapter.agent_store(org_id),
924 adapter.session_store(org_id),
925 adapter.message_store(),
926 adapter.provider_store(org_id),
927 adapter.capability_registry(),
928 adapter.driver_registry(),
929 adapter.event_emitter(),
930 )
931 .with_file_store(adapter.file_store());
932 if let Some(image_resolver) = adapter.image_resolver(org_id) {
933 atom = atom.with_image_resolver(image_resolver);
934 }
935
936 atom.execute(ReasonInput {
937 mcp_tool_definitions: turn_context.mcp_tool_definitions,
938 ..input
939 })
940 .await
941}
942
943pub async fn execute_act_activity<A: RuntimeHostAdapter>(
944 adapter: &A,
945 input: ActInput,
946) -> everruns_core::error::Result<ActResult> {
947 let org_id = input.org_id.ok_or_else(|| {
948 everruns_core::error::AgentLoopError::config(
949 "ActInput.org_id must be set for runtime host execution",
950 )
951 })?;
952
953 if let Some(blocker) =
954 detect_dependency_blocker(adapter, org_id, input.harness_id, input.agent_id).await?
955 {
956 RuntimeSessionLifecycle::new(adapter.clone(), org_id, input.context.session_id)
957 .dependency_blocked(
958 input.context.turn_id,
959 input.context.input_message_id,
960 blocker,
961 )
962 .await;
963 return Ok(ActResult {
964 results: vec![],
965 completed: true,
966 success_count: 0,
967 error_count: 1,
968 waiting_for_tool_results: false,
969 blocked: true,
970 client_tool_calls: vec![],
971 client_tool_definitions: vec![],
972 });
973 }
974
975 let execution_capabilities = load_execution_capabilities(
976 adapter,
977 org_id,
978 input.context.session_id,
979 input.harness_id,
980 input.agent_id,
981 input.locale.clone(),
982 input.blueprint_id.as_deref(),
983 )
984 .await?;
985 let mut tool_registry = execution_capabilities.tool_registry;
986
987 if let Some(mcp) = adapter.mcp_executor(org_id, input.context.session_id).await {
994 let invoker: Arc<dyn everruns_core::McpToolInvoker> = mcp;
995 for tool in everruns_core::build_mcp_proxy_tools(&input.tool_definitions, invoker) {
996 tool_registry.register_boxed(tool);
997 }
998 }
999
1000 let builtin_tool_registry = Arc::new(tool_registry.clone());
1001 let executor: Arc<dyn everruns_core::traits::ToolExecutor> = Arc::new(tool_registry);
1002
1003 let mut atom =
1004 ActAtom::with_file_store(executor, adapter.event_emitter(), adapter.file_store())
1005 .with_session_store(adapter.session_store(org_id))
1006 .with_session_mutator(adapter.session_mutator(org_id))
1007 .with_agent_store(adapter.agent_store(org_id))
1008 .with_tool_registry(builtin_tool_registry)
1009 .with_org_id(
1010 org_public_id_from_internal(org_id)
1011 .parse()
1012 .expect("internal org id converts to valid public org id"),
1013 )
1014 .with_capability_registry(adapter.capability_registry())
1015 .with_post_tool_hooks(execution_capabilities.post_tool_hooks)
1016 .with_pre_tool_hooks(execution_capabilities.pre_tool_hooks)
1017 .with_tool_call_hooks(execution_capabilities.tool_call_hooks);
1018
1019 if let Some(storage_store) = adapter.storage_store() {
1020 atom = atom.with_storage_store(storage_store);
1021 }
1022 if let Some(image_store) = adapter.image_artifact_store(org_id) {
1023 atom = atom.with_image_store(image_store);
1024 }
1025 if let Some(provider_credential_store) = adapter.provider_credential_store(org_id) {
1026 atom = atom.with_provider_credential_store(provider_credential_store);
1027 }
1028 if let Some(utility_llm_service) = adapter.utility_llm_service() {
1029 atom = atom.with_utility_llm_service(utility_llm_service);
1030 }
1031 if let Some(egress_service) = adapter.egress_service() {
1032 atom = atom.with_egress_service(egress_service);
1033 }
1034 if let Some(memory_store) = adapter.memory_store(org_id) {
1035 atom = atom.with_memory_store(memory_store);
1036 }
1037 if let Some(connection_resolver) = adapter.connection_resolver() {
1038 atom = atom.with_connection_resolver(connection_resolver);
1039 }
1040 if let Some(sqldb_store) = adapter.sqldb_store() {
1041 atom = atom.with_sqldb_store(sqldb_store);
1042 }
1043 if let Some(leased_resource_store) = adapter.leased_resource_store() {
1044 atom = atom.with_leased_resource_store(leased_resource_store);
1045 }
1046 if let Some(registry) = adapter.session_resource_registry() {
1047 atom = atom.with_session_resource_registry(registry);
1048 }
1049 if let Some(schedule_store) = adapter.schedule_store(org_id) {
1050 atom = atom.with_schedule_store(schedule_store);
1051 }
1052 if let Some(platform_store) = adapter.platform_store(org_id, input.context.session_id) {
1053 atom = atom.with_platform_store(platform_store);
1054 }
1055 if let Some(budget_checker) = adapter.budget_checker(org_id, input.agent_id) {
1056 atom = atom.with_budget_checker(budget_checker);
1057 }
1058 if let Some(payment_authority) = adapter.payment_authority(org_id, input.agent_id) {
1059 atom = atom.with_payment_authority(payment_authority);
1060 }
1061 if let Some(limiter) = adapter.outbound_tool_rate_limiter(org_id) {
1062 atom = atom.with_outbound_tool_rate_limiter(limiter);
1063 }
1064
1065 atom.execute(input).await
1066}