1use async_trait::async_trait;
6use everruns_core::atoms::{
7 ActAtom, ActInput, ActResult, Atom, InputAtom, InputAtomInput, InputAtomResult, ReasonAtom,
8 ReasonInput, ReasonResult,
9};
10use everruns_core::capabilities::{SystemPromptContext, collect_capabilities_with_configs};
11use everruns_core::events::{
12 EventContext, EventRequest, OutputMessageCompletedData, SessionActivatedData, SessionIdledData,
13 TurnCompletedData, TurnFailedData, TurnStartedData,
14};
15use everruns_core::message::{ContentPart, Message};
16use everruns_core::message_retriever::MessageRetriever;
17use everruns_core::platform_store::PlatformStore;
18use everruns_core::session::SessionStatus;
19use everruns_core::traits::{
20 AgentStore, BudgetChecker, EventEmitter, HarnessStore, ImageArtifactStore, ImageResolver,
21 LeasedResourceStore, LlmProviderStore, ModelWithProvider, PaymentAuthority,
22 ProviderCredentialStore, SessionFileSystem, SessionMutator, SessionResourceRegistry,
23 SessionScheduleStore, SessionSqlDbStoreRef, SessionStorageStore, SessionStore,
24 UserConnectionResolver,
25};
26use everruns_core::typed_id::{AgentId, HarnessId, MessageId, SessionId, TurnId};
27use everruns_core::{
28 Agent, CapabilityRegistry, CapabilityStatus, DependencyBlocker, DriverRegistry, EgressService,
29 Harness, Session, TokenUsage, ToolDefinition, ToolRegistry, UserFacingError, UtilityLlmService,
30 assemble_turn_context, org_public_id_from_internal, resolve_runtime_capabilities,
31};
32use std::sync::Arc;
33use tracing::warn;
34
35#[derive(Debug, Clone)]
37pub struct RuntimeHostTurnContext {
38 pub agent: Option<Agent>,
39 pub session: Session,
40 pub messages: Vec<Message>,
41 pub model: Option<ModelWithProvider>,
42 pub mcp_tool_definitions: Vec<ToolDefinition>,
43}
44
45#[async_trait]
56pub trait RuntimeHostAdapter: Send + Sync + Clone + 'static {
57 async fn get_agent(
58 &self,
59 org_id: i64,
60 agent_id: AgentId,
61 ) -> everruns_core::error::Result<Option<Agent>>;
62
63 async fn get_harness(
64 &self,
65 org_id: i64,
66 harness_id: HarnessId,
67 ) -> everruns_core::error::Result<Option<Harness>>;
68
69 async fn set_session_status(
70 &self,
71 org_id: i64,
72 session_id: SessionId,
73 status: SessionStatus,
74 ) -> everruns_core::error::Result<Session>;
75
76 async fn load_turn_context(
77 &self,
78 org_id: i64,
79 session_id: SessionId,
80 ) -> everruns_core::error::Result<RuntimeHostTurnContext>;
81
82 fn capability_registry(&self) -> CapabilityRegistry;
83
84 fn driver_registry(&self) -> DriverRegistry;
85
86 fn harness_store(&self, org_id: i64) -> Arc<dyn HarnessStore>;
87
88 fn agent_store(&self, org_id: i64) -> Arc<dyn AgentStore>;
89
90 fn session_store(&self, org_id: i64) -> Arc<dyn SessionStore>;
91
92 fn session_mutator(&self, org_id: i64) -> Arc<dyn SessionMutator>;
93
94 fn provider_store(&self, org_id: i64) -> Arc<dyn LlmProviderStore>;
95
96 fn message_store(&self) -> Arc<dyn MessageRetriever>;
97
98 fn event_emitter(&self) -> Arc<dyn EventEmitter>;
99
100 fn file_store(&self) -> Arc<dyn SessionFileSystem>;
101
102 fn image_resolver(&self, _org_id: i64) -> Option<Arc<dyn ImageResolver>> {
103 None
104 }
105
106 fn image_artifact_store(&self, _org_id: i64) -> Option<Arc<dyn ImageArtifactStore>> {
107 None
108 }
109
110 fn provider_credential_store(&self, _org_id: i64) -> Option<Arc<dyn ProviderCredentialStore>> {
111 None
112 }
113
114 fn utility_llm_service(&self) -> Option<Arc<dyn UtilityLlmService>> {
115 None
116 }
117
118 fn egress_service(&self) -> Option<Arc<dyn EgressService>> {
119 None
120 }
121
122 fn storage_store(&self) -> Option<Arc<dyn SessionStorageStore>> {
123 None
124 }
125
126 fn memory_store(&self, _org_id: i64) -> Option<Arc<dyn everruns_core::MemoryStoreBackend>> {
127 None
128 }
129
130 fn connection_resolver(&self) -> Option<Arc<dyn UserConnectionResolver>> {
131 None
132 }
133
134 fn sqldb_store(&self) -> Option<SessionSqlDbStoreRef> {
135 None
136 }
137
138 fn leased_resource_store(&self) -> Option<Arc<dyn LeasedResourceStore>> {
139 None
140 }
141
142 fn session_resource_registry(&self) -> Option<Arc<dyn SessionResourceRegistry>> {
143 None
144 }
145
146 fn schedule_store(&self, _org_id: i64) -> Option<Arc<dyn SessionScheduleStore>> {
147 None
148 }
149
150 fn platform_store(
151 &self,
152 _org_id: i64,
153 _session_id: SessionId,
154 ) -> Option<Arc<dyn PlatformStore>> {
155 None
156 }
157
158 fn budget_checker(
159 &self,
160 _org_id: i64,
161 _agent_id: Option<AgentId>,
162 ) -> Option<Arc<dyn BudgetChecker>> {
163 None
164 }
165
166 fn payment_authority(
167 &self,
168 _org_id: i64,
169 _agent_id: Option<AgentId>,
170 ) -> Option<Arc<dyn PaymentAuthority>> {
171 None
172 }
173
174 fn outbound_tool_rate_limiter(
177 &self,
178 _org_id: i64,
179 ) -> Option<Arc<dyn everruns_core::OutboundToolRateLimiter>> {
180 None
181 }
182
183 fn durable_tool_result_store(&self) -> Option<Arc<dyn everruns_core::DurableToolResultStore>> {
186 None
187 }
188
189 fn stream_heartbeater(&self) -> Option<Arc<dyn everruns_core::StreamHeartbeater>> {
192 None
193 }
194
195 fn provider_stall_timeout(&self) -> Option<std::time::Duration> {
198 None
199 }
200
201 async fn mcp_executor(
205 &self,
206 _org_id: i64,
207 _session_id: SessionId,
208 ) -> Option<Arc<everruns_mcp::McpExecutor>> {
209 None
210 }
211}
212
213struct RuntimeExecutionCapabilities {
214 tool_registry: ToolRegistry,
215 post_tool_hooks: Vec<Arc<dyn everruns_core::PostToolExecHook>>,
216 pre_tool_hooks: Vec<Arc<dyn everruns_core::atoms::PreToolUseHook>>,
217 tool_call_hooks: Vec<Arc<dyn everruns_core::ToolCallHook>>,
218}
219
220fn finalize_specs_from_configs(
230 resolved_capability_configs: &[everruns_core::capability_types::AgentCapabilityConfig],
231 capability_registry: &CapabilityRegistry,
232) -> Vec<everruns_core::user_hook_types::UserHookSpec> {
233 let mut hook_contributions: Vec<(String, Vec<everruns_core::user_hook_types::UserHookSpec>)> =
234 Vec::new();
235 let mut disabled_contributions: Vec<String> = Vec::new();
236 for config in resolved_capability_configs {
237 let Some(capability) = capability_registry.get(config.capability_id()) else {
238 continue;
239 };
240 let specs = capability.user_hooks_with_config(&config.config);
241 if !specs.is_empty() {
242 hook_contributions.push((config.capability_id().to_string(), specs));
243 }
244 if config.capability_id() == "user_hooks" {
245 disabled_contributions.extend(
246 everruns_core::capabilities::user_hooks::disabled_contributions(&config.config),
247 );
248 }
249 }
250 everruns_core::hook_adapter::finalize_hook_specs(hook_contributions, &disabled_contributions)
251}
252
253async fn collect_lifecycle_hook_specs<A: RuntimeHostAdapter>(
258 adapter: &A,
259 org_id: i64,
260 session_id: SessionId,
261 harness_id: HarnessId,
262 agent_id: Option<AgentId>,
263) -> everruns_core::error::Result<(
264 Vec<everruns_core::user_hook_types::UserHookSpec>,
265 Arc<dyn everruns_core::hook_executor::BashHookDispatcher>,
266)> {
267 let capability_registry = adapter.capability_registry();
268 let harness_chain = adapter
269 .harness_store(org_id)
270 .get_harness_chain(harness_id)
271 .await?;
272 if harness_chain.is_empty() {
273 return Err(everruns_core::error::AgentLoopError::harness_not_found(
274 harness_id,
275 ));
276 }
277 let session = adapter
278 .session_store(org_id)
279 .get_session(session_id)
280 .await?
281 .ok_or_else(|| everruns_core::error::AgentLoopError::session_not_found(session_id))?;
282 let agent = match agent_id {
283 Some(agent_id) => adapter.agent_store(org_id).get_agent(agent_id).await?,
284 None => None,
285 };
286 let resolved = resolve_runtime_capabilities(
287 &harness_chain,
288 agent.as_ref(),
289 &session,
290 &capability_registry,
291 );
292 let specs =
293 finalize_specs_from_configs(&resolved.resolved_capability_configs, &capability_registry);
294 let dispatcher: Arc<dyn everruns_core::hook_executor::BashHookDispatcher> = Arc::new(
295 everruns_core::hook_dispatch::VirtualBashHookDispatcher::new(adapter.file_store()),
296 );
297 Ok((specs, dispatcher))
298}
299
300async fn load_execution_capabilities<A: RuntimeHostAdapter>(
301 adapter: &A,
302 org_id: i64,
303 session_id: SessionId,
304 harness_id: HarnessId,
305 agent_id: Option<AgentId>,
306 locale: Option<String>,
307 blueprint_id: Option<&str>,
308) -> everruns_core::error::Result<RuntimeExecutionCapabilities> {
309 let capability_registry = adapter.capability_registry();
310 if let Some(blueprint_id) = blueprint_id {
311 let mut registry = ToolRegistry::with_defaults();
312 let blueprint = capability_registry.blueprint(blueprint_id).ok_or_else(|| {
313 everruns_core::error::AgentLoopError::config(format!(
314 "Blueprint \"{blueprint_id}\" not found in registry"
315 ))
316 })?;
317 for tool in blueprint.tools {
318 registry.register_boxed(tool);
319 }
320 return Ok(RuntimeExecutionCapabilities {
321 tool_registry: registry,
322 post_tool_hooks: Vec::new(),
323 pre_tool_hooks: Vec::new(),
324 tool_call_hooks: Vec::new(),
325 });
326 }
327
328 let harness_chain = adapter
329 .harness_store(org_id)
330 .get_harness_chain(harness_id)
331 .await?;
332 if harness_chain.is_empty() {
333 return Err(everruns_core::error::AgentLoopError::harness_not_found(
334 harness_id,
335 ));
336 }
337
338 let session = adapter
339 .session_store(org_id)
340 .get_session(session_id)
341 .await?
342 .ok_or_else(|| everruns_core::error::AgentLoopError::session_not_found(session_id))?;
343
344 let agent_store = adapter.agent_store(org_id);
345 let agent = match agent_id {
346 Some(agent_id) => Some(
347 agent_store
348 .get_agent(agent_id)
349 .await?
350 .ok_or_else(|| everruns_core::error::AgentLoopError::agent_not_found(agent_id))?,
351 ),
352 None => None,
353 };
354
355 let resolved = resolve_runtime_capabilities(
356 &harness_chain,
357 agent.as_ref(),
358 &session,
359 &capability_registry,
360 );
361 let prompt_ctx = SystemPromptContext {
368 session_id,
369 locale: locale.or(session.locale.clone()),
370 file_store: Some(adapter.file_store()),
371 model: None,
372 };
373 let collected = collect_capabilities_with_configs(
374 &resolved.resolved_capability_configs,
375 &capability_registry,
376 &prompt_ctx,
377 )
378 .await;
379
380 let mut registry = ToolRegistry::with_defaults();
381 for tool in collected.tools {
382 registry.register_boxed(tool);
383 }
384
385 let mut post_tool_hooks: Vec<Arc<dyn everruns_core::PostToolExecHook>> = resolved
390 .resolved_capability_configs
391 .iter()
392 .flat_map(|config| {
393 capability_registry
394 .get(config.capability_id())
395 .filter(|capability| capability.status() == CapabilityStatus::Available)
396 .map(|capability| capability.post_tool_exec_hooks())
397 .unwrap_or_default()
398 })
399 .collect();
400
401 let user_hook_specs =
408 finalize_specs_from_configs(&resolved.resolved_capability_configs, &capability_registry);
409 let mut pre_tool_hooks: Vec<Arc<dyn everruns_core::atoms::PreToolUseHook>> = resolved
412 .resolved_capability_configs
413 .iter()
414 .flat_map(|config| {
415 capability_registry
416 .get(config.capability_id())
417 .filter(|capability| capability.status() == CapabilityStatus::Available)
418 .map(|capability| capability.pre_tool_use_hooks())
419 .unwrap_or_default()
420 })
421 .collect();
422 if !user_hook_specs.is_empty() {
423 let dispatcher: Arc<dyn everruns_core::hook_executor::BashHookDispatcher> = Arc::new(
424 everruns_core::hook_dispatch::VirtualBashHookDispatcher::new(adapter.file_store()),
425 );
426 post_tool_hooks.extend(everruns_core::hook_adapter::build_post_tool_use_hooks(
427 &user_hook_specs,
428 dispatcher.clone(),
429 ));
430 pre_tool_hooks.extend(everruns_core::hook_adapter::build_pre_tool_use_hooks(
431 &user_hook_specs,
432 dispatcher,
433 ));
434 }
435
436 let tool_call_hooks = resolved
437 .resolved_capability_configs
438 .iter()
439 .flat_map(|config| {
440 capability_registry
441 .get(config.capability_id())
442 .filter(|capability| capability.status() == CapabilityStatus::Available)
443 .map(|capability| capability.tool_call_hooks())
444 .unwrap_or_default()
445 })
446 .collect();
447
448 Ok(RuntimeExecutionCapabilities {
449 tool_registry: registry,
450 post_tool_hooks,
451 pre_tool_hooks,
452 tool_call_hooks,
453 })
454}
455
456pub struct RuntimeSessionLifecycle<A: RuntimeHostAdapter> {
458 adapter: A,
459 org_id: i64,
460 session_id: SessionId,
461}
462
463impl<A: RuntimeHostAdapter> RuntimeSessionLifecycle<A> {
464 pub fn new(adapter: A, org_id: i64, session_id: SessionId) -> Self {
465 Self {
466 adapter,
467 org_id,
468 session_id,
469 }
470 }
471
472 async fn set_session_status(&self, status: SessionStatus, action: &'static str) {
473 if let Err(error) = self
474 .adapter
475 .set_session_status(self.org_id, self.session_id, status)
476 .await
477 {
478 warn!(
479 session_id = %self.session_id,
480 org_id = self.org_id,
481 action,
482 %error,
483 "runtime host lifecycle status update failed"
484 );
485 }
486 }
487
488 async fn emit_event(&self, request: EventRequest) {
489 let event_type = request.event_type.clone();
490 if let Err(error) = self.adapter.event_emitter().emit(request).await {
491 warn!(
492 session_id = %self.session_id,
493 org_id = self.org_id,
494 event_type,
495 %error,
496 "runtime host lifecycle event emission failed"
497 );
498 }
499 }
500
501 pub async fn turn_started(&self, turn_id: TurnId, input_message_id: MessageId) {
502 let input_content = self
503 .adapter
504 .message_store()
505 .get(self.session_id, input_message_id)
506 .await
507 .ok()
508 .flatten()
509 .map(|message| message.content_to_llm_string());
510
511 self.set_session_status(SessionStatus::Active, "turn_started")
512 .await;
513
514 self.emit_event(EventRequest::new(
515 self.session_id,
516 EventContext::turn(turn_id, input_message_id),
517 SessionActivatedData {
518 turn_id,
519 input_message_id,
520 },
521 ))
522 .await;
523
524 self.emit_event(EventRequest::new(
525 self.session_id,
526 EventContext::turn(turn_id, input_message_id),
527 TurnStartedData {
528 turn_id,
529 input_message_id,
530 input_content,
531 },
532 ))
533 .await;
534 }
535
536 pub async fn emit_turn_completed(&self, input_message_id: MessageId, data: TurnCompletedData) {
537 let turn_id = data.turn_id;
538 self.emit_event(EventRequest::new(
539 self.session_id,
540 EventContext::turn(turn_id, input_message_id),
541 data,
542 ))
543 .await;
544 }
545
546 pub async fn emit_session_idled(
547 &self,
548 turn_id: TurnId,
549 input_message_id: MessageId,
550 iterations: Option<u32>,
551 usage: Option<TokenUsage>,
552 ) {
553 self.set_session_status(SessionStatus::Idle, "emit_session_idled")
554 .await;
555
556 self.emit_event(EventRequest::new(
557 self.session_id,
558 EventContext::turn(turn_id, input_message_id),
559 SessionIdledData {
560 turn_id,
561 iterations,
562 usage,
563 },
564 ))
565 .await;
566 }
567
568 pub async fn turn_completed(
569 &self,
570 turn_id: TurnId,
571 input_message_id: MessageId,
572 iterations: u32,
573 usage: Option<TokenUsage>,
574 input_content: Option<String>,
575 ) {
576 self.emit_turn_completed(
577 input_message_id,
578 TurnCompletedData {
579 turn_id,
580 iterations,
581 duration_ms: None,
582 usage: usage.clone(),
583 input_content,
584 final_message_id: None,
585 final_answer_preview: None,
586 time_to_first_token_ms: None,
587 tool_call_count: None,
588 llm_call_count: None,
589 status: Some("completed".to_string()),
590 },
591 )
592 .await;
593 self.emit_session_idled(turn_id, input_message_id, Some(iterations), usage)
594 .await;
595 }
596
597 pub async fn fire_turn_end_hooks(
601 &self,
602 harness_id: HarnessId,
603 agent_id: Option<AgentId>,
604 turn_id: TurnId,
605 success: bool,
606 ) {
607 let (specs, dispatcher) = match collect_lifecycle_hook_specs(
608 &self.adapter,
609 self.org_id,
610 self.session_id,
611 harness_id,
612 agent_id,
613 )
614 .await
615 {
616 Ok(pair) => pair,
617 Err(error) => {
618 warn!(
619 session_id = %self.session_id,
620 %error,
621 "failed to collect turn_end hook specs; skipping"
622 );
623 return;
624 }
625 };
626 let hooks = everruns_core::lifecycle_hooks::build_turn_lifecycle_hooks(
627 &specs,
628 everruns_core::user_hook_types::HookEvent::TurnEnd,
629 dispatcher,
630 );
631 if hooks.is_empty() {
632 return;
633 }
634 let ctx = everruns_core::lifecycle_hooks::TurnHookContext {
635 session_id: self.session_id,
636 turn_id: Some(turn_id),
637 org_id: org_public_id_from_internal(self.org_id).parse().ok(),
638 agent_id: agent_id.map(|a| a.to_string()),
639 };
640 everruns_core::lifecycle_hooks::run_turn_end_hooks(
641 &hooks,
642 &ctx,
643 serde_json::json!({ "success": success }),
644 )
645 .await;
646 }
647
648 pub async fn user_prompt_blocked(
653 &self,
654 turn_id: TurnId,
655 input_message_id: MessageId,
656 reason: &str,
657 user_message: Option<&str>,
658 ) {
659 let user_error =
660 UserFacingError::new(everruns_core::user_facing_error_codes::BLOCKED_BY_HOOK);
661 let shown = user_message.unwrap_or(reason);
662 let mut error_message = Message::assistant(shown);
663 let mut metadata = std::collections::HashMap::new();
664 user_error.apply_to_message_metadata(&mut metadata);
665 error_message.metadata = Some(metadata);
666
667 self.emit_event(EventRequest::new(
668 self.session_id,
669 EventContext::turn(turn_id, input_message_id),
670 OutputMessageCompletedData::new(error_message).with_user_facing_error(&user_error),
671 ))
672 .await;
673
674 self.turn_failed(turn_id, input_message_id, reason, Some(&user_error))
675 .await;
676 }
677
678 pub async fn turn_failed(
679 &self,
680 turn_id: TurnId,
681 input_message_id: MessageId,
682 error: &str,
683 user_error: Option<&UserFacingError>,
684 ) {
685 self.set_session_status(SessionStatus::Idle, "turn_failed")
686 .await;
687
688 self.emit_event(EventRequest::new(
689 self.session_id,
690 EventContext::turn(turn_id, input_message_id),
691 {
692 let mut data = TurnFailedData {
693 turn_id,
694 error: error.to_string(),
695 error_code: None,
696 error_fields: None,
697 };
698 if let Some(user_error) = user_error {
699 user_error.apply_to_event_fields(&mut data.error_code, &mut data.error_fields);
700 }
701 data
702 },
703 ))
704 .await;
705
706 self.emit_event(EventRequest::new(
707 self.session_id,
708 EventContext::turn(turn_id, input_message_id),
709 SessionIdledData {
710 turn_id,
711 iterations: None,
712 usage: None,
713 },
714 ))
715 .await;
716 }
717
718 pub async fn waiting_for_tool_results(&self) {
719 self.set_session_status(
720 SessionStatus::WaitingForToolResults,
721 "waiting_for_tool_results",
722 )
723 .await;
724 }
725
726 pub async fn dependency_blocked(
727 &self,
728 turn_id: TurnId,
729 input_message_id: MessageId,
730 blocker: DependencyBlocker,
731 ) {
732 let user_error = UserFacingError::new(blocker.error_code())
733 .with_field(
734 "dependency",
735 match blocker {
736 DependencyBlocker::HarnessArchived | DependencyBlocker::HarnessDeleted => {
737 "harness"
738 }
739 DependencyBlocker::AgentArchived | DependencyBlocker::AgentDeleted => "agent",
740 },
741 )
742 .with_field(
743 "state",
744 match blocker {
745 DependencyBlocker::HarnessArchived | DependencyBlocker::AgentArchived => {
746 "archived"
747 }
748 DependencyBlocker::HarnessDeleted | DependencyBlocker::AgentDeleted => {
749 "deleted"
750 }
751 },
752 );
753 let mut error_message = Message::assistant(blocker.message());
754 let mut metadata = std::collections::HashMap::new();
755 user_error.apply_to_message_metadata(&mut metadata);
756 error_message.metadata = Some(metadata);
757
758 self.emit_event(EventRequest::new(
759 self.session_id,
760 EventContext::turn(turn_id, input_message_id),
761 OutputMessageCompletedData::new(error_message).with_user_facing_error(&user_error),
762 ))
763 .await;
764
765 self.turn_failed(
766 turn_id,
767 input_message_id,
768 blocker.message(),
769 Some(&user_error),
770 )
771 .await;
772 }
773}
774
775pub async fn detect_dependency_blocker<A: RuntimeHostAdapter>(
776 adapter: &A,
777 org_id: i64,
778 harness_id: HarnessId,
779 agent_id: Option<AgentId>,
780) -> everruns_core::error::Result<Option<DependencyBlocker>> {
781 let harness_store = adapter.harness_store(org_id);
782 let agent_store = adapter.agent_store(org_id);
783 everruns_core::detect_dependency_blocker(
784 harness_store.as_ref(),
785 agent_store.as_ref(),
786 harness_id,
787 agent_id,
788 )
789 .await
790}
791
792pub async fn execute_input_activity<A: RuntimeHostAdapter>(
793 adapter: &A,
794 org_id: i64,
795 input: InputAtomInput,
796) -> everruns_core::error::Result<InputAtomResult> {
797 RuntimeSessionLifecycle::new(adapter.clone(), org_id, input.context.session_id)
798 .turn_started(input.context.turn_id, input.context.input_message_id)
799 .await;
800
801 let atom = InputAtom::new(adapter.message_store());
802 atom.execute(input).await
803}
804
805struct UserPromptHookResult {
812 decision: everruns_core::lifecycle_hooks::UserPromptDecision,
813 original_message: String,
814}
815
816async fn run_user_prompt_submit_for_turn<A: RuntimeHostAdapter>(
817 adapter: &A,
818 org_id: i64,
819 input: &ReasonInput,
820) -> everruns_core::error::Result<Option<UserPromptHookResult>> {
821 let (specs, dispatcher) = match collect_lifecycle_hook_specs(
822 adapter,
823 org_id,
824 input.context.session_id,
825 input.harness_id,
826 input.agent_id,
827 )
828 .await
829 {
830 Ok(pair) => pair,
831 Err(error) => {
832 warn!(
833 session_id = %input.context.session_id,
834 %error,
835 "failed to collect user_prompt_submit hook specs; continuing without them"
836 );
837 return Ok(None);
838 }
839 };
840 let hooks = everruns_core::lifecycle_hooks::build_turn_lifecycle_hooks(
841 &specs,
842 everruns_core::user_hook_types::HookEvent::UserPromptSubmit,
843 dispatcher,
844 );
845 if hooks.is_empty() {
846 return Ok(None);
847 }
848
849 let message_text = adapter
850 .message_store()
851 .get(input.context.session_id, input.context.input_message_id)
852 .await
853 .ok()
854 .flatten()
855 .map(|m| m.content_to_llm_string())
856 .unwrap_or_default();
857
858 let ctx = everruns_core::lifecycle_hooks::TurnHookContext {
859 session_id: input.context.session_id,
860 turn_id: Some(input.context.turn_id),
861 org_id: org_public_id_from_internal(org_id).parse().ok(),
862 agent_id: input.agent_id.map(|a| a.to_string()),
863 };
864 let original_message = message_text.clone();
865 let decision =
866 everruns_core::lifecycle_hooks::run_user_prompt_submit_hooks(&hooks, &ctx, message_text)
867 .await;
868 Ok(Some(UserPromptHookResult {
869 decision,
870 original_message,
871 }))
872}
873
874pub async fn execute_reason_activity<A: RuntimeHostAdapter>(
875 adapter: &A,
876 org_id: i64,
877 input: ReasonInput,
878) -> everruns_core::error::Result<ReasonResult> {
879 if let Some(blocker) =
880 detect_dependency_blocker(adapter, org_id, input.harness_id, input.agent_id).await?
881 {
882 RuntimeSessionLifecycle::new(adapter.clone(), org_id, input.context.session_id)
883 .dependency_blocked(
884 input.context.turn_id,
885 input.context.input_message_id,
886 blocker,
887 )
888 .await;
889 return Ok(ReasonResult {
890 success: false,
891 text: blocker.message().to_string(),
892 tool_calls: vec![],
893 has_tool_calls: false,
894 tool_definitions: vec![],
895 max_iterations: everruns_core::runtime_agent::default_max_iterations(),
896 error: Some("dependency_unavailable".to_string()),
897 usage: None,
898 output_message_id: None,
899 time_to_first_token_ms: None,
900 response_id: None,
901 locale: None,
902 network_access: None,
903 });
904 }
905
906 let mut user_prompt_message_override = None;
914 if input.iteration <= 1
915 && let Some(hook_result) = run_user_prompt_submit_for_turn(adapter, org_id, &input).await?
916 {
917 match hook_result.decision {
918 everruns_core::lifecycle_hooks::UserPromptDecision::Block {
919 reason,
920 user_message,
921 } => {
922 RuntimeSessionLifecycle::new(adapter.clone(), org_id, input.context.session_id)
923 .user_prompt_blocked(
924 input.context.turn_id,
925 input.context.input_message_id,
926 &reason,
927 user_message.as_deref(),
928 )
929 .await;
930 return Ok(ReasonResult {
931 success: false,
932 text: user_message.unwrap_or_else(|| reason.clone()),
933 tool_calls: vec![],
934 has_tool_calls: false,
935 tool_definitions: vec![],
936 max_iterations: everruns_core::runtime_agent::default_max_iterations(),
937 error: Some("blocked_by_user_prompt_hook".to_string()),
938 usage: None,
939 output_message_id: None,
940 time_to_first_token_ms: None,
941 response_id: None,
942 locale: None,
943 network_access: None,
944 });
945 }
946 everruns_core::lifecycle_hooks::UserPromptDecision::Continue { message } => {
947 if message != hook_result.original_message {
948 user_prompt_message_override = Some(message);
949 }
950 }
951 }
952 }
953
954 let turn_context = adapter
955 .load_turn_context(org_id, input.context.session_id)
956 .await?;
957
958 let mut atom = ReasonAtom::new(
959 adapter.harness_store(org_id),
960 adapter.agent_store(org_id),
961 adapter.session_store(org_id),
962 adapter.message_store(),
963 adapter.provider_store(org_id),
964 adapter.capability_registry(),
965 adapter.driver_registry(),
966 adapter.event_emitter(),
967 )
968 .with_file_store(adapter.file_store());
969 if let Some(image_resolver) = adapter.image_resolver(org_id) {
970 atom = atom.with_image_resolver(image_resolver);
971 }
972 if let Some(hb) = adapter.stream_heartbeater() {
973 atom = atom.with_stream_heartbeater(hb);
974 }
975 if let Some(timeout) = adapter.provider_stall_timeout() {
976 atom = atom.with_provider_stall_timeout(timeout);
977 }
978
979 let input = ReasonInput {
980 mcp_tool_definitions: turn_context.mcp_tool_definitions,
981 ..input
982 };
983
984 if let Some(message_override) = user_prompt_message_override {
985 let mut assembled = assemble_turn_context(
986 adapter.harness_store(org_id).as_ref(),
987 adapter.agent_store(org_id).as_ref(),
988 adapter.session_store(org_id).as_ref(),
989 adapter.message_store().as_ref(),
990 adapter.provider_store(org_id).as_ref(),
991 &adapter.capability_registry(),
992 input.context.session_id,
993 input.harness_id,
994 input.agent_id,
995 &input.mcp_tool_definitions,
996 Some(adapter.file_store()),
997 )
998 .await?;
999
1000 let message = assembled
1001 .messages
1002 .iter_mut()
1003 .find(|message| message.id == input.context.input_message_id)
1004 .ok_or_else(|| {
1005 everruns_core::error::AgentLoopError::config(
1006 "user_prompt_submit mutation: input message not found in assembled context",
1007 )
1008 })?;
1009
1010 message
1015 .content
1016 .retain(|part| !matches!(part, ContentPart::Text(_)));
1017 message
1018 .content
1019 .insert(0, ContentPart::text(message_override));
1020
1021 return atom.execute_with_assembled_context(input, assembled).await;
1022 }
1023
1024 atom.execute(input).await
1025}
1026
1027pub async fn execute_act_activity<A: RuntimeHostAdapter>(
1028 adapter: &A,
1029 input: ActInput,
1030) -> everruns_core::error::Result<ActResult> {
1031 let org_id = input.org_id.ok_or_else(|| {
1032 everruns_core::error::AgentLoopError::config(
1033 "ActInput.org_id must be set for runtime host execution",
1034 )
1035 })?;
1036
1037 if let Some(blocker) =
1038 detect_dependency_blocker(adapter, org_id, input.harness_id, input.agent_id).await?
1039 {
1040 RuntimeSessionLifecycle::new(adapter.clone(), org_id, input.context.session_id)
1041 .dependency_blocked(
1042 input.context.turn_id,
1043 input.context.input_message_id,
1044 blocker,
1045 )
1046 .await;
1047 return Ok(ActResult {
1048 results: vec![],
1049 completed: true,
1050 success_count: 0,
1051 error_count: 1,
1052 waiting_for_tool_results: false,
1053 blocked: true,
1054 client_tool_calls: vec![],
1055 client_tool_definitions: vec![],
1056 });
1057 }
1058
1059 let execution_capabilities = load_execution_capabilities(
1060 adapter,
1061 org_id,
1062 input.context.session_id,
1063 input.harness_id,
1064 input.agent_id,
1065 input.locale.clone(),
1066 input.blueprint_id.as_deref(),
1067 )
1068 .await?;
1069 let mut tool_registry = execution_capabilities.tool_registry;
1070
1071 if let Some(mcp) = adapter.mcp_executor(org_id, input.context.session_id).await {
1078 let invoker: Arc<dyn everruns_core::McpToolInvoker> = mcp;
1079 for tool in everruns_core::build_mcp_proxy_tools(&input.tool_definitions, invoker) {
1080 tool_registry.register_boxed(tool);
1081 }
1082 }
1083
1084 let builtin_tool_registry = Arc::new(tool_registry.clone());
1085 let executor: Arc<dyn everruns_core::traits::ToolExecutor> = Arc::new(tool_registry);
1086
1087 let mut atom =
1088 ActAtom::with_file_store(executor, adapter.event_emitter(), adapter.file_store())
1089 .with_session_store(adapter.session_store(org_id))
1090 .with_session_mutator(adapter.session_mutator(org_id))
1091 .with_agent_store(adapter.agent_store(org_id))
1092 .with_tool_registry(builtin_tool_registry)
1093 .with_org_id(
1094 org_public_id_from_internal(org_id)
1095 .parse()
1096 .expect("internal org id converts to valid public org id"),
1097 )
1098 .with_capability_registry(adapter.capability_registry())
1099 .with_post_tool_hooks(execution_capabilities.post_tool_hooks)
1100 .with_pre_tool_hooks(execution_capabilities.pre_tool_hooks)
1101 .with_tool_call_hooks(execution_capabilities.tool_call_hooks);
1102
1103 if let Some(storage_store) = adapter.storage_store() {
1104 atom = atom.with_storage_store(storage_store);
1105 }
1106 if let Some(image_store) = adapter.image_artifact_store(org_id) {
1107 atom = atom.with_image_store(image_store);
1108 }
1109 if let Some(provider_credential_store) = adapter.provider_credential_store(org_id) {
1110 atom = atom.with_provider_credential_store(provider_credential_store);
1111 }
1112 if let Some(utility_llm_service) = adapter.utility_llm_service() {
1113 atom = atom.with_utility_llm_service(utility_llm_service);
1114 }
1115 if let Some(egress_service) = adapter.egress_service() {
1116 atom = atom.with_egress_service(egress_service);
1117 }
1118 if let Some(memory_store) = adapter.memory_store(org_id) {
1119 atom = atom.with_memory_store(memory_store);
1120 }
1121 if let Some(connection_resolver) = adapter.connection_resolver() {
1122 atom = atom.with_connection_resolver(connection_resolver);
1123 }
1124 if let Some(sqldb_store) = adapter.sqldb_store() {
1125 atom = atom.with_sqldb_store(sqldb_store);
1126 }
1127 if let Some(leased_resource_store) = adapter.leased_resource_store() {
1128 atom = atom.with_leased_resource_store(leased_resource_store);
1129 }
1130 if let Some(registry) = adapter.session_resource_registry() {
1131 atom = atom.with_session_resource_registry(registry);
1132 }
1133 if let Some(schedule_store) = adapter.schedule_store(org_id) {
1134 atom = atom.with_schedule_store(schedule_store);
1135 }
1136 if let Some(platform_store) = adapter.platform_store(org_id, input.context.session_id) {
1137 atom = atom.with_platform_store(platform_store);
1138 }
1139 if let Some(budget_checker) = adapter.budget_checker(org_id, input.agent_id) {
1140 atom = atom.with_budget_checker(budget_checker);
1141 }
1142 if let Some(payment_authority) = adapter.payment_authority(org_id, input.agent_id) {
1143 atom = atom.with_payment_authority(payment_authority);
1144 }
1145 if let Some(limiter) = adapter.outbound_tool_rate_limiter(org_id) {
1146 atom = atom.with_outbound_tool_rate_limiter(limiter);
1147 }
1148 if let Some(store) = adapter.durable_tool_result_store() {
1149 atom = atom.with_durable_tool_result_store(store);
1150 }
1151
1152 atom.execute(input).await
1153}