1use async_trait::async_trait;
6use everruns_core::atoms::{
7 ActAtom, ActInput, ActResult, Atom, InputAtom, InputAtomInput, InputAtomResult, ReasonAtom,
8 ReasonInput, ReasonResult,
9};
10use everruns_core::capabilities::{SystemPromptContext, collect_capabilities_with_configs};
11use everruns_core::events::{
12 EventContext, EventRequest, OutputMessageCompletedData, SessionActivatedData, SessionIdledData,
13 TurnCompletedData, TurnFailedData, TurnStartedData,
14};
15use everruns_core::message::{ContentPart, Message};
16use everruns_core::message_retriever::MessageRetriever;
17use everruns_core::platform_store::PlatformStore;
18use everruns_core::session::SessionStatus;
19use everruns_core::traits::{
20 AgentStore, BudgetChecker, EventEmitter, HarnessStore, ImageArtifactStore, ImageResolver,
21 LeasedResourceStore, LlmProviderStore, ModelWithProvider, PaymentAuthority,
22 ProviderCredentialStore, SessionFileSystem, SessionMutator, SessionResourceRegistry,
23 SessionScheduleStore, SessionSqlDbStoreRef, SessionStorageStore, SessionStore,
24 UserConnectionResolver,
25};
26use everruns_core::typed_id::{AgentId, HarnessId, MessageId, SessionId, TurnId};
27use everruns_core::{
28 Agent, CapabilityRegistry, CapabilityStatus, DependencyBlocker, DriverRegistry, EgressService,
29 Harness, Session, TokenUsage, ToolDefinition, ToolRegistry, UserFacingError, UtilityLlmService,
30 assemble_turn_context, org_public_id_from_internal, resolve_runtime_capabilities,
31};
32use std::sync::Arc;
33use tracing::warn;
34
35#[derive(Debug, Clone)]
37pub struct RuntimeHostTurnContext {
38 pub agent: Option<Agent>,
39 pub session: Session,
40 pub messages: Vec<Message>,
41 pub model: Option<ModelWithProvider>,
42 pub mcp_tool_definitions: Vec<ToolDefinition>,
43}
44
45#[async_trait]
56pub trait RuntimeHostAdapter: Send + Sync + Clone + 'static {
57 async fn get_agent(
58 &self,
59 org_id: i64,
60 agent_id: AgentId,
61 ) -> everruns_core::error::Result<Option<Agent>>;
62
63 async fn get_harness(
64 &self,
65 org_id: i64,
66 harness_id: HarnessId,
67 ) -> everruns_core::error::Result<Option<Harness>>;
68
69 async fn set_session_status(
70 &self,
71 org_id: i64,
72 session_id: SessionId,
73 status: SessionStatus,
74 ) -> everruns_core::error::Result<Session>;
75
76 async fn load_turn_context(
77 &self,
78 org_id: i64,
79 session_id: SessionId,
80 ) -> everruns_core::error::Result<RuntimeHostTurnContext>;
81
82 fn capability_registry(&self) -> CapabilityRegistry;
83
84 fn driver_registry(&self) -> DriverRegistry;
85
86 fn harness_store(&self, org_id: i64) -> Arc<dyn HarnessStore>;
87
88 fn agent_store(&self, org_id: i64) -> Arc<dyn AgentStore>;
89
90 fn session_store(&self, org_id: i64) -> Arc<dyn SessionStore>;
91
92 fn session_mutator(&self, org_id: i64) -> Arc<dyn SessionMutator>;
93
94 fn provider_store(&self, org_id: i64) -> Arc<dyn LlmProviderStore>;
95
96 fn message_store(&self) -> Arc<dyn MessageRetriever>;
97
98 fn event_emitter(&self) -> Arc<dyn EventEmitter>;
99
100 fn file_store(&self) -> Arc<dyn SessionFileSystem>;
101
102 fn image_resolver(&self, _org_id: i64) -> Option<Arc<dyn ImageResolver>> {
103 None
104 }
105
106 fn image_artifact_store(&self, _org_id: i64) -> Option<Arc<dyn ImageArtifactStore>> {
107 None
108 }
109
110 fn provider_credential_store(&self, _org_id: i64) -> Option<Arc<dyn ProviderCredentialStore>> {
111 None
112 }
113
114 fn utility_llm_service(&self) -> Option<Arc<dyn UtilityLlmService>> {
115 None
116 }
117
118 fn egress_service(&self) -> Option<Arc<dyn EgressService>> {
119 None
120 }
121
122 fn storage_store(&self) -> Option<Arc<dyn SessionStorageStore>> {
123 None
124 }
125
126 fn connection_resolver(&self) -> Option<Arc<dyn UserConnectionResolver>> {
127 None
128 }
129
130 fn sqldb_store(&self) -> Option<SessionSqlDbStoreRef> {
131 None
132 }
133
134 fn leased_resource_store(&self) -> Option<Arc<dyn LeasedResourceStore>> {
135 None
136 }
137
138 fn session_resource_registry(&self) -> Option<Arc<dyn SessionResourceRegistry>> {
139 None
140 }
141
142 fn session_task_registry(
143 &self,
144 ) -> Option<Arc<dyn everruns_core::session_task::SessionTaskRegistry>> {
145 None
146 }
147
148 fn schedule_store(&self, _org_id: i64) -> Option<Arc<dyn SessionScheduleStore>> {
149 None
150 }
151
152 fn platform_store(
153 &self,
154 _org_id: i64,
155 _session_id: SessionId,
156 ) -> Option<Arc<dyn PlatformStore>> {
157 None
158 }
159
160 fn budget_checker(
161 &self,
162 _org_id: i64,
163 _agent_id: Option<AgentId>,
164 ) -> Option<Arc<dyn BudgetChecker>> {
165 None
166 }
167
168 fn payment_authority(
169 &self,
170 _org_id: i64,
171 _agent_id: Option<AgentId>,
172 ) -> Option<Arc<dyn PaymentAuthority>> {
173 None
174 }
175
176 fn outbound_tool_rate_limiter(
179 &self,
180 _org_id: i64,
181 ) -> Option<Arc<dyn everruns_core::OutboundToolRateLimiter>> {
182 None
183 }
184
185 fn durable_tool_result_store(&self) -> Option<Arc<dyn everruns_core::DurableToolResultStore>> {
188 None
189 }
190
191 fn subagent_spawn_store(&self) -> Option<Arc<dyn everruns_core::SubagentSpawnStore>> {
194 None
195 }
196
197 fn stream_heartbeater(&self) -> Option<Arc<dyn everruns_core::StreamHeartbeater>> {
200 None
201 }
202
203 fn partial_stream_store(&self) -> Option<Arc<dyn everruns_core::PartialStreamStore>> {
206 None
207 }
208
209 fn provider_stall_timeout(&self) -> Option<std::time::Duration> {
212 None
213 }
214
215 async fn mcp_executor(
219 &self,
220 _org_id: i64,
221 _session_id: SessionId,
222 ) -> Option<Arc<everruns_mcp::McpExecutor>> {
223 None
224 }
225}
226
227struct RuntimeExecutionCapabilities {
228 tool_registry: ToolRegistry,
229 post_tool_hooks: Vec<Arc<dyn everruns_core::PostToolExecHook>>,
230 pre_tool_hooks: Vec<Arc<dyn everruns_core::atoms::PreToolUseHook>>,
231 tool_call_hooks: Vec<Arc<dyn everruns_core::ToolCallHook>>,
232}
233
234fn finalize_specs_from_configs(
244 resolved_capability_configs: &[everruns_core::capability_types::AgentCapabilityConfig],
245 capability_registry: &CapabilityRegistry,
246) -> Vec<everruns_core::user_hook_types::UserHookSpec> {
247 let mut hook_contributions: Vec<(String, Vec<everruns_core::user_hook_types::UserHookSpec>)> =
248 Vec::new();
249 let mut disabled_contributions: Vec<String> = Vec::new();
250 for config in resolved_capability_configs {
251 let Some(capability) = capability_registry.get(config.capability_id()) else {
252 continue;
253 };
254 let specs = capability.user_hooks_with_config(&config.config);
255 if !specs.is_empty() {
256 hook_contributions.push((config.capability_id().to_string(), specs));
257 }
258 if config.capability_id() == "user_hooks" {
259 disabled_contributions.extend(
260 everruns_core::capabilities::user_hooks::disabled_contributions(&config.config),
261 );
262 }
263 }
264 everruns_core::hook_adapter::finalize_hook_specs(hook_contributions, &disabled_contributions)
265}
266
267async fn collect_lifecycle_hook_specs<A: RuntimeHostAdapter>(
272 adapter: &A,
273 org_id: i64,
274 session_id: SessionId,
275 harness_id: HarnessId,
276 agent_id: Option<AgentId>,
277) -> everruns_core::error::Result<(
278 Vec<everruns_core::user_hook_types::UserHookSpec>,
279 Arc<dyn everruns_core::hook_executor::BashHookDispatcher>,
280)> {
281 let capability_registry = adapter.capability_registry();
282 let harness_chain = adapter
283 .harness_store(org_id)
284 .get_harness_chain(harness_id)
285 .await?;
286 if harness_chain.is_empty() {
287 return Err(everruns_core::error::AgentLoopError::harness_not_found(
288 harness_id,
289 ));
290 }
291 let session = adapter
292 .session_store(org_id)
293 .get_session(session_id)
294 .await?
295 .ok_or_else(|| everruns_core::error::AgentLoopError::session_not_found(session_id))?;
296 let agent = match agent_id {
297 Some(agent_id) => adapter.agent_store(org_id).get_agent(agent_id).await?,
298 None => None,
299 };
300 let resolved = resolve_runtime_capabilities(
301 &harness_chain,
302 agent.as_ref(),
303 &session,
304 &capability_registry,
305 );
306 let specs =
307 finalize_specs_from_configs(&resolved.resolved_capability_configs, &capability_registry);
308 let dispatcher: Arc<dyn everruns_core::hook_executor::BashHookDispatcher> = Arc::new(
309 everruns_core::hook_dispatch::BashkitShellHookDispatcher::new(adapter.file_store()),
310 );
311 Ok((specs, dispatcher))
312}
313
314async fn load_execution_capabilities<A: RuntimeHostAdapter>(
315 adapter: &A,
316 org_id: i64,
317 session_id: SessionId,
318 harness_id: HarnessId,
319 agent_id: Option<AgentId>,
320 locale: Option<String>,
321 blueprint_id: Option<&str>,
322) -> everruns_core::error::Result<RuntimeExecutionCapabilities> {
323 let capability_registry = adapter.capability_registry();
324 if let Some(blueprint_id) = blueprint_id {
325 let mut registry = ToolRegistry::with_defaults();
326 let blueprint = capability_registry.blueprint(blueprint_id).ok_or_else(|| {
327 everruns_core::error::AgentLoopError::config(format!(
328 "Blueprint \"{blueprint_id}\" not found in registry"
329 ))
330 })?;
331 for tool in blueprint.tools {
332 registry.register_boxed(tool);
333 }
334 return Ok(RuntimeExecutionCapabilities {
335 tool_registry: registry,
336 post_tool_hooks: Vec::new(),
337 pre_tool_hooks: Vec::new(),
338 tool_call_hooks: Vec::new(),
339 });
340 }
341
342 let harness_chain = adapter
343 .harness_store(org_id)
344 .get_harness_chain(harness_id)
345 .await?;
346 if harness_chain.is_empty() {
347 return Err(everruns_core::error::AgentLoopError::harness_not_found(
348 harness_id,
349 ));
350 }
351
352 let session = adapter
353 .session_store(org_id)
354 .get_session(session_id)
355 .await?
356 .ok_or_else(|| everruns_core::error::AgentLoopError::session_not_found(session_id))?;
357
358 let agent_store = adapter.agent_store(org_id);
359 let agent = match agent_id {
360 Some(agent_id) => Some(
361 agent_store
362 .get_agent(agent_id)
363 .await?
364 .ok_or_else(|| everruns_core::error::AgentLoopError::agent_not_found(agent_id))?,
365 ),
366 None => None,
367 };
368
369 let resolved = resolve_runtime_capabilities(
370 &harness_chain,
371 agent.as_ref(),
372 &session,
373 &capability_registry,
374 );
375 let prompt_ctx = SystemPromptContext {
382 session_id,
383 locale: locale.or(session.locale.clone()),
384 file_store: Some(adapter.file_store()),
385 model: None,
386 };
387 let collected = collect_capabilities_with_configs(
388 &resolved.resolved_capability_configs,
389 &capability_registry,
390 &prompt_ctx,
391 )
392 .await;
393
394 let mut registry = ToolRegistry::with_defaults();
395 for tool in collected.tools {
396 registry.register_boxed(tool);
397 }
398
399 let mut post_tool_hooks: Vec<Arc<dyn everruns_core::PostToolExecHook>> = resolved
404 .resolved_capability_configs
405 .iter()
406 .flat_map(|config| {
407 capability_registry
408 .get(config.capability_id())
409 .filter(|capability| capability.status() == CapabilityStatus::Available)
410 .map(|capability| capability.post_tool_exec_hooks())
411 .unwrap_or_default()
412 })
413 .collect();
414
415 let user_hook_specs =
422 finalize_specs_from_configs(&resolved.resolved_capability_configs, &capability_registry);
423 let mut pre_tool_hooks: Vec<Arc<dyn everruns_core::atoms::PreToolUseHook>> = resolved
426 .resolved_capability_configs
427 .iter()
428 .flat_map(|config| {
429 capability_registry
430 .get(config.capability_id())
431 .filter(|capability| capability.status() == CapabilityStatus::Available)
432 .map(|capability| capability.pre_tool_use_hooks())
433 .unwrap_or_default()
434 })
435 .collect();
436 if !user_hook_specs.is_empty() {
437 let dispatcher: Arc<dyn everruns_core::hook_executor::BashHookDispatcher> = Arc::new(
438 everruns_core::hook_dispatch::BashkitShellHookDispatcher::new(adapter.file_store()),
439 );
440 post_tool_hooks.extend(everruns_core::hook_adapter::build_post_tool_use_hooks(
441 &user_hook_specs,
442 dispatcher.clone(),
443 ));
444 pre_tool_hooks.extend(everruns_core::hook_adapter::build_pre_tool_use_hooks(
445 &user_hook_specs,
446 dispatcher,
447 ));
448 }
449
450 let tool_call_hooks = resolved
451 .resolved_capability_configs
452 .iter()
453 .flat_map(|config| {
454 capability_registry
455 .get(config.capability_id())
456 .filter(|capability| capability.status() == CapabilityStatus::Available)
457 .map(|capability| capability.tool_call_hooks())
458 .unwrap_or_default()
459 })
460 .collect();
461
462 Ok(RuntimeExecutionCapabilities {
463 tool_registry: registry,
464 post_tool_hooks,
465 pre_tool_hooks,
466 tool_call_hooks,
467 })
468}
469
470pub struct RuntimeSessionLifecycle<A: RuntimeHostAdapter> {
472 adapter: A,
473 org_id: i64,
474 session_id: SessionId,
475}
476
477impl<A: RuntimeHostAdapter> RuntimeSessionLifecycle<A> {
478 pub fn new(adapter: A, org_id: i64, session_id: SessionId) -> Self {
479 Self {
480 adapter,
481 org_id,
482 session_id,
483 }
484 }
485
486 async fn set_session_status(&self, status: SessionStatus, action: &'static str) {
487 if let Err(error) = self
488 .adapter
489 .set_session_status(self.org_id, self.session_id, status)
490 .await
491 {
492 warn!(
493 session_id = %self.session_id,
494 org_id = self.org_id,
495 action,
496 %error,
497 "runtime host lifecycle status update failed"
498 );
499 }
500 }
501
502 async fn emit_event(&self, request: EventRequest) {
503 let event_type = request.event_type.clone();
504 if let Err(error) = self.adapter.event_emitter().emit(request).await {
505 warn!(
506 session_id = %self.session_id,
507 org_id = self.org_id,
508 event_type,
509 %error,
510 "runtime host lifecycle event emission failed"
511 );
512 }
513 }
514
515 pub async fn turn_started(&self, turn_id: TurnId, input_message_id: MessageId) {
516 let input_content = self
517 .adapter
518 .message_store()
519 .get(self.session_id, input_message_id)
520 .await
521 .ok()
522 .flatten()
523 .map(|message| message.content_to_llm_string());
524
525 self.set_session_status(SessionStatus::Active, "turn_started")
526 .await;
527
528 self.emit_event(EventRequest::new(
529 self.session_id,
530 EventContext::turn(turn_id, input_message_id),
531 SessionActivatedData {
532 turn_id,
533 input_message_id,
534 },
535 ))
536 .await;
537
538 self.emit_event(EventRequest::new(
539 self.session_id,
540 EventContext::turn(turn_id, input_message_id),
541 TurnStartedData {
542 turn_id,
543 input_message_id,
544 input_content,
545 },
546 ))
547 .await;
548 }
549
550 pub async fn emit_turn_completed(&self, input_message_id: MessageId, data: TurnCompletedData) {
551 let turn_id = data.turn_id;
552 self.emit_event(EventRequest::new(
553 self.session_id,
554 EventContext::turn(turn_id, input_message_id),
555 data,
556 ))
557 .await;
558 }
559
560 pub async fn emit_session_idled(
561 &self,
562 turn_id: TurnId,
563 input_message_id: MessageId,
564 iterations: Option<u32>,
565 usage: Option<TokenUsage>,
566 ) {
567 self.set_session_status(SessionStatus::Idle, "emit_session_idled")
568 .await;
569
570 self.emit_event(EventRequest::new(
571 self.session_id,
572 EventContext::turn(turn_id, input_message_id),
573 SessionIdledData {
574 turn_id,
575 iterations,
576 usage,
577 },
578 ))
579 .await;
580 }
581
582 pub async fn turn_completed(
583 &self,
584 turn_id: TurnId,
585 input_message_id: MessageId,
586 iterations: u32,
587 usage: Option<TokenUsage>,
588 input_content: Option<String>,
589 ) {
590 self.emit_turn_completed(
591 input_message_id,
592 TurnCompletedData {
593 turn_id,
594 iterations,
595 duration_ms: None,
596 usage: usage.clone(),
597 input_content,
598 final_message_id: None,
599 final_answer_preview: None,
600 time_to_first_token_ms: None,
601 tool_call_count: None,
602 llm_call_count: None,
603 status: Some("completed".to_string()),
604 },
605 )
606 .await;
607 self.emit_session_idled(turn_id, input_message_id, Some(iterations), usage)
608 .await;
609 }
610
611 pub async fn fire_turn_end_hooks(
615 &self,
616 harness_id: HarnessId,
617 agent_id: Option<AgentId>,
618 turn_id: TurnId,
619 success: bool,
620 ) {
621 let (specs, dispatcher) = match collect_lifecycle_hook_specs(
622 &self.adapter,
623 self.org_id,
624 self.session_id,
625 harness_id,
626 agent_id,
627 )
628 .await
629 {
630 Ok(pair) => pair,
631 Err(error) => {
632 warn!(
633 session_id = %self.session_id,
634 %error,
635 "failed to collect turn_end hook specs; skipping"
636 );
637 return;
638 }
639 };
640 let hooks = everruns_core::lifecycle_hooks::build_turn_lifecycle_hooks(
641 &specs,
642 everruns_core::user_hook_types::HookEvent::TurnEnd,
643 dispatcher,
644 );
645 if hooks.is_empty() {
646 return;
647 }
648 let ctx = everruns_core::lifecycle_hooks::TurnHookContext {
649 session_id: self.session_id,
650 turn_id: Some(turn_id),
651 org_id: org_public_id_from_internal(self.org_id).parse().ok(),
652 agent_id: agent_id.map(|a| a.to_string()),
653 };
654 everruns_core::lifecycle_hooks::run_turn_end_hooks(
655 &hooks,
656 &ctx,
657 serde_json::json!({ "success": success }),
658 )
659 .await;
660 }
661
662 pub async fn user_prompt_blocked(
667 &self,
668 turn_id: TurnId,
669 input_message_id: MessageId,
670 reason: &str,
671 user_message: Option<&str>,
672 ) {
673 let user_error =
674 UserFacingError::new(everruns_core::user_facing_error_codes::BLOCKED_BY_HOOK);
675 let shown = user_message.unwrap_or(reason);
676 let mut error_message = Message::assistant(shown);
677 let mut metadata = std::collections::HashMap::new();
678 user_error.apply_to_message_metadata(&mut metadata);
679 error_message.metadata = Some(metadata);
680
681 self.emit_event(EventRequest::new(
682 self.session_id,
683 EventContext::turn(turn_id, input_message_id),
684 OutputMessageCompletedData::new(error_message).with_user_facing_error(&user_error),
685 ))
686 .await;
687
688 self.turn_failed(turn_id, input_message_id, reason, Some(&user_error))
689 .await;
690 }
691
692 pub async fn turn_failed(
693 &self,
694 turn_id: TurnId,
695 input_message_id: MessageId,
696 error: &str,
697 user_error: Option<&UserFacingError>,
698 ) {
699 self.set_session_status(SessionStatus::Idle, "turn_failed")
700 .await;
701
702 self.emit_event(EventRequest::new(
703 self.session_id,
704 EventContext::turn(turn_id, input_message_id),
705 {
706 let mut data = TurnFailedData {
707 turn_id,
708 error: error.to_string(),
709 error_code: None,
710 error_fields: None,
711 };
712 if let Some(user_error) = user_error {
713 user_error.apply_to_event_fields(&mut data.error_code, &mut data.error_fields);
714 }
715 data
716 },
717 ))
718 .await;
719
720 self.emit_event(EventRequest::new(
721 self.session_id,
722 EventContext::turn(turn_id, input_message_id),
723 SessionIdledData {
724 turn_id,
725 iterations: None,
726 usage: None,
727 },
728 ))
729 .await;
730 }
731
732 pub async fn waiting_for_tool_results(&self) {
733 self.set_session_status(
734 SessionStatus::WaitingForToolResults,
735 "waiting_for_tool_results",
736 )
737 .await;
738 }
739
740 pub async fn dependency_blocked(
741 &self,
742 turn_id: TurnId,
743 input_message_id: MessageId,
744 blocker: DependencyBlocker,
745 ) {
746 let user_error = UserFacingError::new(blocker.error_code())
747 .with_field(
748 "dependency",
749 match blocker {
750 DependencyBlocker::HarnessArchived | DependencyBlocker::HarnessDeleted => {
751 "harness"
752 }
753 DependencyBlocker::AgentArchived | DependencyBlocker::AgentDeleted => "agent",
754 },
755 )
756 .with_field(
757 "state",
758 match blocker {
759 DependencyBlocker::HarnessArchived | DependencyBlocker::AgentArchived => {
760 "archived"
761 }
762 DependencyBlocker::HarnessDeleted | DependencyBlocker::AgentDeleted => {
763 "deleted"
764 }
765 },
766 );
767 let mut error_message = Message::assistant(blocker.message());
768 let mut metadata = std::collections::HashMap::new();
769 user_error.apply_to_message_metadata(&mut metadata);
770 error_message.metadata = Some(metadata);
771
772 self.emit_event(EventRequest::new(
773 self.session_id,
774 EventContext::turn(turn_id, input_message_id),
775 OutputMessageCompletedData::new(error_message).with_user_facing_error(&user_error),
776 ))
777 .await;
778
779 self.turn_failed(
780 turn_id,
781 input_message_id,
782 blocker.message(),
783 Some(&user_error),
784 )
785 .await;
786 }
787}
788
789pub async fn detect_dependency_blocker<A: RuntimeHostAdapter>(
790 adapter: &A,
791 org_id: i64,
792 harness_id: HarnessId,
793 agent_id: Option<AgentId>,
794) -> everruns_core::error::Result<Option<DependencyBlocker>> {
795 let harness_store = adapter.harness_store(org_id);
796 let agent_store = adapter.agent_store(org_id);
797 everruns_core::detect_dependency_blocker(
798 harness_store.as_ref(),
799 agent_store.as_ref(),
800 harness_id,
801 agent_id,
802 )
803 .await
804}
805
806pub async fn execute_input_activity<A: RuntimeHostAdapter>(
807 adapter: &A,
808 org_id: i64,
809 input: InputAtomInput,
810) -> everruns_core::error::Result<InputAtomResult> {
811 RuntimeSessionLifecycle::new(adapter.clone(), org_id, input.context.session_id)
812 .turn_started(input.context.turn_id, input.context.input_message_id)
813 .await;
814
815 let atom = InputAtom::new(adapter.message_store());
816 atom.execute(input).await
817}
818
819struct UserPromptHookResult {
826 decision: everruns_core::lifecycle_hooks::UserPromptDecision,
827 original_message: String,
828}
829
830async fn run_user_prompt_submit_for_turn<A: RuntimeHostAdapter>(
831 adapter: &A,
832 org_id: i64,
833 input: &ReasonInput,
834) -> everruns_core::error::Result<Option<UserPromptHookResult>> {
835 let (specs, dispatcher) = match collect_lifecycle_hook_specs(
836 adapter,
837 org_id,
838 input.context.session_id,
839 input.harness_id,
840 input.agent_id,
841 )
842 .await
843 {
844 Ok(pair) => pair,
845 Err(error) => {
846 warn!(
847 session_id = %input.context.session_id,
848 %error,
849 "failed to collect user_prompt_submit hook specs; continuing without them"
850 );
851 return Ok(None);
852 }
853 };
854 let hooks = everruns_core::lifecycle_hooks::build_turn_lifecycle_hooks(
855 &specs,
856 everruns_core::user_hook_types::HookEvent::UserPromptSubmit,
857 dispatcher,
858 );
859 if hooks.is_empty() {
860 return Ok(None);
861 }
862
863 let message_text = adapter
864 .message_store()
865 .get(input.context.session_id, input.context.input_message_id)
866 .await
867 .ok()
868 .flatten()
869 .map(|m| m.content_to_llm_string())
870 .unwrap_or_default();
871
872 let ctx = everruns_core::lifecycle_hooks::TurnHookContext {
873 session_id: input.context.session_id,
874 turn_id: Some(input.context.turn_id),
875 org_id: org_public_id_from_internal(org_id).parse().ok(),
876 agent_id: input.agent_id.map(|a| a.to_string()),
877 };
878 let original_message = message_text.clone();
879 let decision =
880 everruns_core::lifecycle_hooks::run_user_prompt_submit_hooks(&hooks, &ctx, message_text)
881 .await;
882 Ok(Some(UserPromptHookResult {
883 decision,
884 original_message,
885 }))
886}
887
888pub async fn execute_reason_activity<A: RuntimeHostAdapter>(
889 adapter: &A,
890 org_id: i64,
891 input: ReasonInput,
892) -> everruns_core::error::Result<ReasonResult> {
893 if let Some(blocker) =
894 detect_dependency_blocker(adapter, org_id, input.harness_id, input.agent_id).await?
895 {
896 RuntimeSessionLifecycle::new(adapter.clone(), org_id, input.context.session_id)
897 .dependency_blocked(
898 input.context.turn_id,
899 input.context.input_message_id,
900 blocker,
901 )
902 .await;
903 return Ok(ReasonResult {
904 success: false,
905 text: blocker.message().to_string(),
906 tool_calls: vec![],
907 has_tool_calls: false,
908 tool_definitions: vec![],
909 max_iterations: everruns_core::runtime_agent::default_max_iterations(),
910 error: Some("dependency_unavailable".to_string()),
911 usage: None,
912 output_message_id: None,
913 time_to_first_token_ms: None,
914 response_id: None,
915 locale: None,
916 network_access: None,
917 });
918 }
919
920 let mut user_prompt_message_override = None;
928 if input.iteration <= 1
929 && let Some(hook_result) = run_user_prompt_submit_for_turn(adapter, org_id, &input).await?
930 {
931 match hook_result.decision {
932 everruns_core::lifecycle_hooks::UserPromptDecision::Block {
933 reason,
934 user_message,
935 } => {
936 RuntimeSessionLifecycle::new(adapter.clone(), org_id, input.context.session_id)
937 .user_prompt_blocked(
938 input.context.turn_id,
939 input.context.input_message_id,
940 &reason,
941 user_message.as_deref(),
942 )
943 .await;
944 return Ok(ReasonResult {
945 success: false,
946 text: user_message.unwrap_or_else(|| reason.clone()),
947 tool_calls: vec![],
948 has_tool_calls: false,
949 tool_definitions: vec![],
950 max_iterations: everruns_core::runtime_agent::default_max_iterations(),
951 error: Some("blocked_by_user_prompt_hook".to_string()),
952 usage: None,
953 output_message_id: None,
954 time_to_first_token_ms: None,
955 response_id: None,
956 locale: None,
957 network_access: None,
958 });
959 }
960 everruns_core::lifecycle_hooks::UserPromptDecision::Continue { message } => {
961 if message != hook_result.original_message {
962 user_prompt_message_override = Some(message);
963 }
964 }
965 }
966 }
967
968 let turn_context = adapter
969 .load_turn_context(org_id, input.context.session_id)
970 .await?;
971
972 let mut atom = ReasonAtom::new(
973 adapter.harness_store(org_id),
974 adapter.agent_store(org_id),
975 adapter.session_store(org_id),
976 adapter.message_store(),
977 adapter.provider_store(org_id),
978 adapter.capability_registry(),
979 adapter.driver_registry(),
980 adapter.event_emitter(),
981 )
982 .with_file_store(adapter.file_store());
983 if let Some(image_resolver) = adapter.image_resolver(org_id) {
984 atom = atom.with_image_resolver(image_resolver);
985 }
986 if let Some(hb) = adapter.stream_heartbeater() {
987 atom = atom.with_stream_heartbeater(hb);
988 }
989 if let Some(timeout) = adapter.provider_stall_timeout() {
990 atom = atom.with_provider_stall_timeout(timeout);
991 }
992 if let Some(store) = adapter.partial_stream_store() {
993 atom = atom.with_partial_stream_store(store);
994 }
995 if let Some(store) = adapter.durable_tool_result_store() {
996 atom = atom.with_durable_tool_result_store(store);
997 }
998
999 let input = ReasonInput {
1000 mcp_tool_definitions: turn_context.mcp_tool_definitions,
1001 ..input
1002 };
1003
1004 if let Some(message_override) = user_prompt_message_override {
1005 let mut assembled = assemble_turn_context(
1006 adapter.harness_store(org_id).as_ref(),
1007 adapter.agent_store(org_id).as_ref(),
1008 adapter.session_store(org_id).as_ref(),
1009 adapter.message_store().as_ref(),
1010 adapter.provider_store(org_id).as_ref(),
1011 &adapter.capability_registry(),
1012 input.context.session_id,
1013 input.harness_id,
1014 input.agent_id,
1015 &input.mcp_tool_definitions,
1016 Some(adapter.file_store()),
1017 )
1018 .await?;
1019
1020 let message = assembled
1021 .messages
1022 .iter_mut()
1023 .find(|message| message.id == input.context.input_message_id)
1024 .ok_or_else(|| {
1025 everruns_core::error::AgentLoopError::config(
1026 "user_prompt_submit mutation: input message not found in assembled context",
1027 )
1028 })?;
1029
1030 message
1035 .content
1036 .retain(|part| !matches!(part, ContentPart::Text(_)));
1037 message
1038 .content
1039 .insert(0, ContentPart::text(message_override));
1040
1041 return atom.execute_with_assembled_context(input, assembled).await;
1042 }
1043
1044 atom.execute(input).await
1045}
1046
1047pub async fn execute_act_activity<A: RuntimeHostAdapter>(
1048 adapter: &A,
1049 input: ActInput,
1050) -> everruns_core::error::Result<ActResult> {
1051 let org_id = input.org_id.ok_or_else(|| {
1052 everruns_core::error::AgentLoopError::config(
1053 "ActInput.org_id must be set for runtime host execution",
1054 )
1055 })?;
1056
1057 if let Some(blocker) =
1058 detect_dependency_blocker(adapter, org_id, input.harness_id, input.agent_id).await?
1059 {
1060 RuntimeSessionLifecycle::new(adapter.clone(), org_id, input.context.session_id)
1061 .dependency_blocked(
1062 input.context.turn_id,
1063 input.context.input_message_id,
1064 blocker,
1065 )
1066 .await;
1067 return Ok(ActResult {
1068 results: vec![],
1069 completed: true,
1070 success_count: 0,
1071 error_count: 1,
1072 waiting_for_tool_results: false,
1073 blocked: true,
1074 client_tool_calls: vec![],
1075 client_tool_definitions: vec![],
1076 });
1077 }
1078
1079 let execution_capabilities = load_execution_capabilities(
1080 adapter,
1081 org_id,
1082 input.context.session_id,
1083 input.harness_id,
1084 input.agent_id,
1085 input.locale.clone(),
1086 input.blueprint_id.as_deref(),
1087 )
1088 .await?;
1089 let mut tool_registry = execution_capabilities.tool_registry;
1090
1091 if let Some(mcp) = adapter.mcp_executor(org_id, input.context.session_id).await {
1098 let invoker: Arc<dyn everruns_core::McpToolInvoker> = mcp;
1099 for tool in everruns_core::build_mcp_proxy_tools(&input.tool_definitions, invoker) {
1100 tool_registry.register_boxed(tool);
1101 }
1102 }
1103
1104 let builtin_tool_registry = Arc::new(tool_registry.clone());
1105 let executor: Arc<dyn everruns_core::traits::ToolExecutor> = Arc::new(tool_registry);
1106
1107 let mut atom =
1108 ActAtom::with_file_store(executor, adapter.event_emitter(), adapter.file_store())
1109 .with_session_store(adapter.session_store(org_id))
1110 .with_session_mutator(adapter.session_mutator(org_id))
1111 .with_agent_store(adapter.agent_store(org_id))
1112 .with_tool_registry(builtin_tool_registry)
1113 .with_org_id(
1114 org_public_id_from_internal(org_id)
1115 .parse()
1116 .expect("internal org id converts to valid public org id"),
1117 )
1118 .with_capability_registry(adapter.capability_registry())
1119 .with_post_tool_hooks(execution_capabilities.post_tool_hooks)
1120 .with_pre_tool_hooks(execution_capabilities.pre_tool_hooks)
1121 .with_tool_call_hooks(execution_capabilities.tool_call_hooks);
1122
1123 if let Some(storage_store) = adapter.storage_store() {
1124 atom = atom.with_storage_store(storage_store);
1125 }
1126 if let Some(image_store) = adapter.image_artifact_store(org_id) {
1127 atom = atom.with_image_store(image_store);
1128 }
1129 if let Some(provider_credential_store) = adapter.provider_credential_store(org_id) {
1130 atom = atom.with_provider_credential_store(provider_credential_store);
1131 }
1132 if let Some(utility_llm_service) = adapter.utility_llm_service() {
1133 atom = atom.with_utility_llm_service(utility_llm_service);
1134 }
1135 if let Some(egress_service) = adapter.egress_service() {
1136 atom = atom.with_egress_service(egress_service);
1137 }
1138 if let Some(connection_resolver) = adapter.connection_resolver() {
1139 atom = atom.with_connection_resolver(connection_resolver);
1140 }
1141 if let Some(sqldb_store) = adapter.sqldb_store() {
1142 atom = atom.with_sqldb_store(sqldb_store);
1143 }
1144 if let Some(leased_resource_store) = adapter.leased_resource_store() {
1145 atom = atom.with_leased_resource_store(leased_resource_store);
1146 }
1147 if let Some(registry) = adapter.session_resource_registry() {
1148 atom = atom.with_session_resource_registry(registry);
1149 }
1150 if let Some(registry) = adapter.session_task_registry() {
1151 atom = atom.with_session_task_registry(registry);
1152 }
1153 if let Some(schedule_store) = adapter.schedule_store(org_id) {
1154 atom = atom.with_schedule_store(schedule_store);
1155 }
1156 if let Some(platform_store) = adapter.platform_store(org_id, input.context.session_id) {
1157 atom = atom.with_platform_store(platform_store);
1158 }
1159 if let Some(budget_checker) = adapter.budget_checker(org_id, input.agent_id) {
1160 atom = atom.with_budget_checker(budget_checker);
1161 }
1162 if let Some(payment_authority) = adapter.payment_authority(org_id, input.agent_id) {
1163 atom = atom.with_payment_authority(payment_authority);
1164 }
1165 if let Some(limiter) = adapter.outbound_tool_rate_limiter(org_id) {
1166 atom = atom.with_outbound_tool_rate_limiter(limiter);
1167 }
1168 if let Some(store) = adapter.durable_tool_result_store() {
1169 atom = atom.with_durable_tool_result_store(store);
1170 }
1171 if let Some(store) = adapter.subagent_spawn_store() {
1172 atom = atom.with_subagent_spawn_store(store);
1173 }
1174
1175 atom.execute(input).await
1176}