1use async_trait::async_trait;
6use everruns_core::atoms::{
7 ActAtom, ActInput, ActResult, Atom, InputAtom, InputAtomInput, InputAtomResult, ReasonAtom,
8 ReasonInput, ReasonResult,
9};
10use everruns_core::capabilities::{SystemPromptContext, collect_capabilities_with_configs};
11use everruns_core::events::{
12 EventContext, EventRequest, OutputMessageCompletedData, SessionActivatedData, SessionIdledData,
13 TurnCompletedData, TurnFailedData, TurnStartedData,
14};
15use everruns_core::message::{ContentPart, Message};
16use everruns_core::message_retriever::MessageRetriever;
17use everruns_core::platform_store::PlatformStore;
18use everruns_core::session::SessionStatus;
19use everruns_core::traits::{
20 AgentStore, BudgetChecker, EventEmitter, HarnessStore, ImageArtifactStore, ImageResolver,
21 LeasedResourceStore, PaymentAuthority, ProviderCredentialStore, ProviderStore, ResolvedModel,
22 SessionFileSystem, SessionMutator, SessionResourceRegistry, SessionScheduleStore,
23 SessionSqlDbStoreRef, SessionStorageStore, SessionStore, UserConnectionResolver,
24};
25use everruns_core::typed_id::{AgentId, HarnessId, MessageId, SessionId, TurnId};
26use everruns_core::{
27 Agent, CapabilityRegistry, CapabilityStatus, DependencyBlocker, DriverRegistry, EgressService,
28 ErrorDisclosure, Harness, Session, TokenUsage, ToolDefinition, ToolRegistry, UserFacingError,
29 UtilityLlmService, assemble_turn_context, org_public_id_from_internal,
30 resolve_runtime_capabilities,
31};
32use std::sync::Arc;
33use tracing::warn;
34
35#[derive(Debug, Clone)]
37pub struct RuntimeHostTurnContext {
38 pub agent: Option<Agent>,
39 pub session: Session,
40 pub messages: Vec<Message>,
41 pub model: Option<ResolvedModel>,
42 pub mcp_tool_definitions: Vec<ToolDefinition>,
43}
44
45#[async_trait]
56pub trait RuntimeHostAdapter: Send + Sync + Clone + 'static {
57 async fn get_agent(
58 &self,
59 org_id: i64,
60 agent_id: AgentId,
61 ) -> everruns_core::error::Result<Option<Agent>>;
62
63 async fn get_harness(
64 &self,
65 org_id: i64,
66 harness_id: HarnessId,
67 ) -> everruns_core::error::Result<Option<Harness>>;
68
69 async fn set_session_status(
70 &self,
71 org_id: i64,
72 session_id: SessionId,
73 status: SessionStatus,
74 ) -> everruns_core::error::Result<Session>;
75
76 async fn load_turn_context(
77 &self,
78 org_id: i64,
79 session_id: SessionId,
80 ) -> everruns_core::error::Result<RuntimeHostTurnContext>;
81
82 fn capability_registry(&self) -> CapabilityRegistry;
83
84 fn driver_registry(&self) -> DriverRegistry;
85
86 fn harness_store(&self, org_id: i64) -> Arc<dyn HarnessStore>;
87
88 fn agent_store(&self, org_id: i64) -> Arc<dyn AgentStore>;
89
90 fn session_store(&self, org_id: i64) -> Arc<dyn SessionStore>;
91
92 fn session_mutator(&self, org_id: i64) -> Arc<dyn SessionMutator>;
93
94 fn provider_store(&self, org_id: i64) -> Arc<dyn ProviderStore>;
95
96 fn message_store(&self) -> Arc<dyn MessageRetriever>;
97
98 fn event_emitter(&self) -> Arc<dyn EventEmitter>;
99
100 fn file_store(&self) -> Arc<dyn SessionFileSystem>;
101
102 fn image_resolver(&self, _org_id: i64) -> Option<Arc<dyn ImageResolver>> {
103 None
104 }
105
106 fn image_artifact_store(&self, _org_id: i64) -> Option<Arc<dyn ImageArtifactStore>> {
107 None
108 }
109
110 fn provider_credential_store(&self, _org_id: i64) -> Option<Arc<dyn ProviderCredentialStore>> {
111 None
112 }
113
114 fn utility_llm_service(&self) -> Option<Arc<dyn UtilityLlmService>> {
115 None
116 }
117
118 fn egress_service(&self) -> Option<Arc<dyn EgressService>> {
119 None
120 }
121
122 fn storage_store(&self) -> Option<Arc<dyn SessionStorageStore>> {
123 None
124 }
125
126 fn connection_resolver(&self) -> Option<Arc<dyn UserConnectionResolver>> {
127 None
128 }
129
130 fn sqldb_store(&self) -> Option<SessionSqlDbStoreRef> {
131 None
132 }
133
134 fn leased_resource_store(&self) -> Option<Arc<dyn LeasedResourceStore>> {
135 None
136 }
137
138 fn session_resource_registry(&self) -> Option<Arc<dyn SessionResourceRegistry>> {
139 None
140 }
141
142 fn session_task_registry(
143 &self,
144 ) -> Option<Arc<dyn everruns_core::session_task::SessionTaskRegistry>> {
145 None
146 }
147
148 fn schedule_store(&self, _org_id: i64) -> Option<Arc<dyn SessionScheduleStore>> {
149 None
150 }
151
152 fn platform_store(
153 &self,
154 _org_id: i64,
155 _session_id: SessionId,
156 ) -> Option<Arc<dyn PlatformStore>> {
157 None
158 }
159
160 fn budget_checker(
161 &self,
162 _org_id: i64,
163 _agent_id: Option<AgentId>,
164 ) -> Option<Arc<dyn BudgetChecker>> {
165 None
166 }
167
168 fn payment_authority(
169 &self,
170 _org_id: i64,
171 _agent_id: Option<AgentId>,
172 ) -> Option<Arc<dyn PaymentAuthority>> {
173 None
174 }
175
176 fn outbound_tool_rate_limiter(
179 &self,
180 _org_id: i64,
181 ) -> Option<Arc<dyn everruns_core::OutboundToolRateLimiter>> {
182 None
183 }
184
185 fn durable_tool_result_store(&self) -> Option<Arc<dyn everruns_core::DurableToolResultStore>> {
188 None
189 }
190
191 fn subagent_spawn_store(&self) -> Option<Arc<dyn everruns_core::SubagentSpawnStore>> {
194 None
195 }
196
197 fn stream_heartbeater(&self) -> Option<Arc<dyn everruns_core::StreamHeartbeater>> {
200 None
201 }
202
203 fn partial_stream_store(&self) -> Option<Arc<dyn everruns_core::PartialStreamStore>> {
206 None
207 }
208
209 fn provider_stall_timeout(&self) -> Option<std::time::Duration> {
212 None
213 }
214
215 async fn mcp_executor(
219 &self,
220 _org_id: i64,
221 _session_id: SessionId,
222 ) -> Option<Arc<everruns_mcp::McpExecutor>> {
223 None
224 }
225}
226
227struct RuntimeExecutionCapabilities {
228 tool_registry: ToolRegistry,
229 post_tool_hooks: Vec<Arc<dyn everruns_core::PostToolExecHook>>,
230 pre_tool_hooks: Vec<Arc<dyn everruns_core::atoms::PreToolUseHook>>,
231 tool_call_hooks: Vec<Arc<dyn everruns_core::ToolCallHook>>,
232}
233
234fn finalize_specs_from_configs(
244 resolved_capability_configs: &[everruns_core::capability_types::AgentCapabilityConfig],
245 capability_registry: &CapabilityRegistry,
246) -> Vec<everruns_core::user_hook_types::UserHookSpec> {
247 let mut hook_contributions: Vec<(String, Vec<everruns_core::user_hook_types::UserHookSpec>)> =
248 Vec::new();
249 let mut disabled_contributions: Vec<String> = Vec::new();
250 for config in resolved_capability_configs {
251 let Some(capability) = capability_registry.get(config.capability_id()) else {
252 continue;
253 };
254 let specs = capability.user_hooks_with_config(&config.config);
255 if !specs.is_empty() {
256 hook_contributions.push((config.capability_id().to_string(), specs));
257 }
258 if config.capability_id() == "user_hooks" {
259 disabled_contributions.extend(
260 everruns_core::capabilities::user_hooks::disabled_contributions(&config.config),
261 );
262 }
263 }
264 everruns_core::hook_adapter::finalize_hook_specs(hook_contributions, &disabled_contributions)
265}
266
267async fn collect_lifecycle_hook_specs<A: RuntimeHostAdapter>(
272 adapter: &A,
273 org_id: i64,
274 session_id: SessionId,
275 harness_id: HarnessId,
276 agent_id: Option<AgentId>,
277) -> everruns_core::error::Result<(
278 Vec<everruns_core::user_hook_types::UserHookSpec>,
279 Arc<dyn everruns_core::hook_executor::BashHookDispatcher>,
280)> {
281 let capability_registry = adapter.capability_registry();
282 let harness_chain = adapter
283 .harness_store(org_id)
284 .get_harness_chain(harness_id)
285 .await?;
286 if harness_chain.is_empty() {
287 return Err(everruns_core::error::AgentLoopError::harness_not_found(
288 harness_id,
289 ));
290 }
291 let session = adapter
292 .session_store(org_id)
293 .get_session(session_id)
294 .await?
295 .ok_or_else(|| everruns_core::error::AgentLoopError::session_not_found(session_id))?;
296 let agent = match agent_id {
297 Some(agent_id) => adapter.agent_store(org_id).get_agent(agent_id).await?,
298 None => None,
299 };
300 let resolved = resolve_runtime_capabilities(
301 &harness_chain,
302 agent.as_ref(),
303 &session,
304 &capability_registry,
305 );
306 let specs =
307 finalize_specs_from_configs(&resolved.resolved_capability_configs, &capability_registry);
308 let dispatcher: Arc<dyn everruns_core::hook_executor::BashHookDispatcher> = Arc::new(
309 everruns_core::hook_dispatch::BashkitShellHookDispatcher::new(adapter.file_store()),
310 );
311 Ok((specs, dispatcher))
312}
313
314async fn load_execution_capabilities<A: RuntimeHostAdapter>(
315 adapter: &A,
316 org_id: i64,
317 session_id: SessionId,
318 harness_id: HarnessId,
319 agent_id: Option<AgentId>,
320 locale: Option<String>,
321 blueprint_id: Option<&str>,
322) -> everruns_core::error::Result<RuntimeExecutionCapabilities> {
323 let capability_registry = adapter.capability_registry();
324 if let Some(blueprint_id) = blueprint_id {
325 let mut registry = ToolRegistry::with_defaults();
326 let blueprint = capability_registry.blueprint(blueprint_id).ok_or_else(|| {
327 everruns_core::error::AgentLoopError::config(format!(
328 "Blueprint \"{blueprint_id}\" not found in registry"
329 ))
330 })?;
331 for tool in blueprint.tools {
332 registry.register_boxed(tool);
333 }
334 return Ok(RuntimeExecutionCapabilities {
335 tool_registry: registry,
336 post_tool_hooks: Vec::new(),
337 pre_tool_hooks: Vec::new(),
338 tool_call_hooks: Vec::new(),
339 });
340 }
341
342 let harness_chain = adapter
343 .harness_store(org_id)
344 .get_harness_chain(harness_id)
345 .await?;
346 if harness_chain.is_empty() {
347 return Err(everruns_core::error::AgentLoopError::harness_not_found(
348 harness_id,
349 ));
350 }
351
352 let session = adapter
353 .session_store(org_id)
354 .get_session(session_id)
355 .await?
356 .ok_or_else(|| everruns_core::error::AgentLoopError::session_not_found(session_id))?;
357
358 let agent_store = adapter.agent_store(org_id);
359 let agent = match agent_id {
360 Some(agent_id) => Some(
361 agent_store
362 .get_agent(agent_id)
363 .await?
364 .ok_or_else(|| everruns_core::error::AgentLoopError::agent_not_found(agent_id))?,
365 ),
366 None => None,
367 };
368
369 let resolved = resolve_runtime_capabilities(
370 &harness_chain,
371 agent.as_ref(),
372 &session,
373 &capability_registry,
374 );
375 let prompt_ctx = SystemPromptContext {
382 session_id,
383 locale: locale.or(session.locale.clone()),
384 file_store: Some(everruns_core::WorkspaceScopedFileSystem::wrap(
387 adapter.file_store(),
388 session.workspace_id,
389 )),
390 model: None,
391 };
392 let collected = collect_capabilities_with_configs(
393 &resolved.resolved_capability_configs,
394 &capability_registry,
395 &prompt_ctx,
396 )
397 .await;
398
399 let mut registry = ToolRegistry::with_defaults();
400 for tool in collected.tools {
401 registry.register_boxed(tool);
402 }
403
404 let mut post_tool_hooks: Vec<Arc<dyn everruns_core::PostToolExecHook>> = resolved
409 .resolved_capability_configs
410 .iter()
411 .flat_map(|config| {
412 capability_registry
413 .get(config.capability_id())
414 .filter(|capability| capability.status() == CapabilityStatus::Available)
415 .map(|capability| capability.post_tool_exec_hooks_with_config(&config.config))
416 .unwrap_or_default()
417 })
418 .collect();
419
420 let user_hook_specs =
427 finalize_specs_from_configs(&resolved.resolved_capability_configs, &capability_registry);
428 let mut pre_tool_hooks: Vec<Arc<dyn everruns_core::atoms::PreToolUseHook>> = resolved
431 .resolved_capability_configs
432 .iter()
433 .flat_map(|config| {
434 capability_registry
435 .get(config.capability_id())
436 .filter(|capability| capability.status() == CapabilityStatus::Available)
437 .map(|capability| capability.pre_tool_use_hooks_with_config(&config.config))
438 .unwrap_or_default()
439 })
440 .collect();
441 if !user_hook_specs.is_empty() {
442 let dispatcher: Arc<dyn everruns_core::hook_executor::BashHookDispatcher> = Arc::new(
443 everruns_core::hook_dispatch::BashkitShellHookDispatcher::new(adapter.file_store()),
444 );
445 post_tool_hooks.extend(everruns_core::hook_adapter::build_post_tool_use_hooks(
446 &user_hook_specs,
447 dispatcher.clone(),
448 ));
449 pre_tool_hooks.extend(everruns_core::hook_adapter::build_pre_tool_use_hooks(
450 &user_hook_specs,
451 dispatcher,
452 ));
453 }
454
455 let tool_call_hooks = resolved
456 .resolved_capability_configs
457 .iter()
458 .flat_map(|config| {
459 capability_registry
460 .get(config.capability_id())
461 .filter(|capability| capability.status() == CapabilityStatus::Available)
462 .map(|capability| capability.tool_call_hooks())
463 .unwrap_or_default()
464 })
465 .collect();
466
467 Ok(RuntimeExecutionCapabilities {
468 tool_registry: registry,
469 post_tool_hooks,
470 pre_tool_hooks,
471 tool_call_hooks,
472 })
473}
474
475pub struct RuntimeSessionLifecycle<A: RuntimeHostAdapter> {
477 adapter: A,
478 org_id: i64,
479 session_id: SessionId,
480}
481
482impl<A: RuntimeHostAdapter> RuntimeSessionLifecycle<A> {
483 pub fn new(adapter: A, org_id: i64, session_id: SessionId) -> Self {
484 Self {
485 adapter,
486 org_id,
487 session_id,
488 }
489 }
490
491 async fn set_session_status(&self, status: SessionStatus, action: &'static str) {
492 if let Err(error) = self
493 .adapter
494 .set_session_status(self.org_id, self.session_id, status)
495 .await
496 {
497 warn!(
498 session_id = %self.session_id,
499 org_id = self.org_id,
500 action,
501 %error,
502 "runtime host lifecycle status update failed"
503 );
504 }
505 }
506
507 async fn emit_event(&self, request: EventRequest) {
508 let event_type = request.event_type.clone();
509 if let Err(error) = self.adapter.event_emitter().emit(request).await {
510 warn!(
511 session_id = %self.session_id,
512 org_id = self.org_id,
513 event_type,
514 %error,
515 "runtime host lifecycle event emission failed"
516 );
517 }
518 }
519
520 pub async fn turn_started(&self, turn_id: TurnId, input_message_id: MessageId) {
521 let input_content = self
522 .adapter
523 .message_store()
524 .get(self.session_id, input_message_id)
525 .await
526 .ok()
527 .flatten()
528 .map(|message| message.content_to_llm_string());
529
530 self.set_session_status(SessionStatus::Active, "turn_started")
531 .await;
532
533 self.emit_event(EventRequest::new(
534 self.session_id,
535 EventContext::turn(turn_id, input_message_id),
536 SessionActivatedData {
537 turn_id,
538 input_message_id,
539 },
540 ))
541 .await;
542
543 self.emit_event(EventRequest::new(
544 self.session_id,
545 EventContext::turn(turn_id, input_message_id),
546 TurnStartedData {
547 turn_id,
548 input_message_id,
549 input_content,
550 },
551 ))
552 .await;
553 }
554
555 pub async fn emit_turn_completed(&self, input_message_id: MessageId, data: TurnCompletedData) {
556 let turn_id = data.turn_id;
557 self.emit_event(EventRequest::new(
558 self.session_id,
559 EventContext::turn(turn_id, input_message_id),
560 data,
561 ))
562 .await;
563 }
564
565 pub async fn emit_session_idled(
566 &self,
567 turn_id: TurnId,
568 input_message_id: MessageId,
569 iterations: Option<u32>,
570 usage: Option<TokenUsage>,
571 ) {
572 self.set_session_status(SessionStatus::Idle, "emit_session_idled")
573 .await;
574
575 self.emit_event(EventRequest::new(
576 self.session_id,
577 EventContext::turn(turn_id, input_message_id),
578 SessionIdledData {
579 turn_id,
580 iterations,
581 usage,
582 },
583 ))
584 .await;
585 }
586
587 pub async fn turn_completed(
588 &self,
589 turn_id: TurnId,
590 input_message_id: MessageId,
591 iterations: u32,
592 usage: Option<TokenUsage>,
593 input_content: Option<String>,
594 ) {
595 self.emit_turn_completed(
596 input_message_id,
597 TurnCompletedData {
598 turn_id,
599 iterations,
600 duration_ms: None,
601 usage: usage.clone(),
602 input_content,
603 final_message_id: None,
604 final_answer_preview: None,
605 time_to_first_token_ms: None,
606 tool_call_count: None,
607 llm_call_count: None,
608 status: Some("completed".to_string()),
609 },
610 )
611 .await;
612 self.emit_session_idled(turn_id, input_message_id, Some(iterations), usage)
613 .await;
614 }
615
616 pub async fn fire_turn_end_hooks(
620 &self,
621 harness_id: HarnessId,
622 agent_id: Option<AgentId>,
623 turn_id: TurnId,
624 success: bool,
625 ) {
626 let (specs, dispatcher) = match collect_lifecycle_hook_specs(
627 &self.adapter,
628 self.org_id,
629 self.session_id,
630 harness_id,
631 agent_id,
632 )
633 .await
634 {
635 Ok(pair) => pair,
636 Err(error) => {
637 warn!(
638 session_id = %self.session_id,
639 %error,
640 "failed to collect turn_end hook specs; skipping"
641 );
642 return;
643 }
644 };
645 let hooks = everruns_core::lifecycle_hooks::build_turn_lifecycle_hooks(
646 &specs,
647 everruns_core::user_hook_types::HookEvent::TurnEnd,
648 dispatcher,
649 );
650 if hooks.is_empty() {
651 return;
652 }
653 let ctx = everruns_core::lifecycle_hooks::TurnHookContext {
654 session_id: self.session_id,
655 turn_id: Some(turn_id),
656 org_id: org_public_id_from_internal(self.org_id).parse().ok(),
657 agent_id: agent_id.map(|a| a.to_string()),
658 };
659 everruns_core::lifecycle_hooks::run_turn_end_hooks(
660 &hooks,
661 &ctx,
662 serde_json::json!({ "success": success }),
663 )
664 .await;
665 }
666
667 pub async fn user_prompt_blocked(
672 &self,
673 turn_id: TurnId,
674 input_message_id: MessageId,
675 reason: &str,
676 user_message: Option<&str>,
677 ) {
678 let user_error =
679 UserFacingError::new(everruns_core::user_facing_error_codes::BLOCKED_BY_HOOK);
680 let shown = user_message.unwrap_or(reason);
681 let mut error_message = Message::assistant(shown);
682 let mut metadata = std::collections::HashMap::new();
683 user_error.apply_to_message_metadata(&mut metadata);
684 error_message.metadata = Some(metadata);
685
686 self.emit_event(EventRequest::new(
687 self.session_id,
688 EventContext::turn(turn_id, input_message_id),
689 OutputMessageCompletedData::new(error_message).with_user_facing_error(&user_error),
690 ))
691 .await;
692
693 self.turn_failed(turn_id, input_message_id, reason, Some(&user_error))
694 .await;
695 }
696
697 pub async fn turn_failed(
698 &self,
699 turn_id: TurnId,
700 input_message_id: MessageId,
701 error: &str,
702 user_error: Option<&UserFacingError>,
703 ) {
704 self.turn_failed_with_disclosure(turn_id, input_message_id, error, user_error, None)
705 .await;
706 }
707
708 pub async fn turn_failed_with_disclosure(
712 &self,
713 turn_id: TurnId,
714 input_message_id: MessageId,
715 error: &str,
716 user_error: Option<&UserFacingError>,
717 disclosure: Option<ErrorDisclosure>,
718 ) {
719 self.set_session_status(SessionStatus::Idle, "turn_failed")
720 .await;
721
722 self.emit_event(EventRequest::new(
723 self.session_id,
724 EventContext::turn(turn_id, input_message_id),
725 {
726 let mut data = TurnFailedData {
727 turn_id,
728 error: error.to_string(),
729 error_code: None,
730 error_fields: None,
731 error_disclosure: disclosure.map(|mode| mode.as_str().to_string()),
732 };
733 if let Some(user_error) = user_error {
734 user_error.apply_to_event_fields(&mut data.error_code, &mut data.error_fields);
735 }
736 data
737 },
738 ))
739 .await;
740
741 self.emit_event(EventRequest::new(
742 self.session_id,
743 EventContext::turn(turn_id, input_message_id),
744 SessionIdledData {
745 turn_id,
746 iterations: None,
747 usage: None,
748 },
749 ))
750 .await;
751 }
752
753 pub async fn waiting_for_tool_results(&self) {
754 self.set_session_status(
755 SessionStatus::WaitingForToolResults,
756 "waiting_for_tool_results",
757 )
758 .await;
759 }
760
761 pub async fn dependency_blocked(
762 &self,
763 turn_id: TurnId,
764 input_message_id: MessageId,
765 blocker: DependencyBlocker,
766 ) {
767 let user_error = UserFacingError::new(blocker.error_code())
768 .with_field(
769 "dependency",
770 match blocker {
771 DependencyBlocker::HarnessArchived | DependencyBlocker::HarnessDeleted => {
772 "harness"
773 }
774 DependencyBlocker::AgentArchived | DependencyBlocker::AgentDeleted => "agent",
775 },
776 )
777 .with_field(
778 "state",
779 match blocker {
780 DependencyBlocker::HarnessArchived | DependencyBlocker::AgentArchived => {
781 "archived"
782 }
783 DependencyBlocker::HarnessDeleted | DependencyBlocker::AgentDeleted => {
784 "deleted"
785 }
786 },
787 );
788 let mut error_message = Message::assistant(blocker.message());
789 let mut metadata = std::collections::HashMap::new();
790 user_error.apply_to_message_metadata(&mut metadata);
791 error_message.metadata = Some(metadata);
792
793 self.emit_event(EventRequest::new(
794 self.session_id,
795 EventContext::turn(turn_id, input_message_id),
796 OutputMessageCompletedData::new(error_message).with_user_facing_error(&user_error),
797 ))
798 .await;
799
800 self.turn_failed(
801 turn_id,
802 input_message_id,
803 blocker.message(),
804 Some(&user_error),
805 )
806 .await;
807 }
808}
809
810pub async fn detect_dependency_blocker<A: RuntimeHostAdapter>(
811 adapter: &A,
812 org_id: i64,
813 harness_id: HarnessId,
814 agent_id: Option<AgentId>,
815) -> everruns_core::error::Result<Option<DependencyBlocker>> {
816 let harness_store = adapter.harness_store(org_id);
817 let agent_store = adapter.agent_store(org_id);
818 everruns_core::detect_dependency_blocker(
819 harness_store.as_ref(),
820 agent_store.as_ref(),
821 harness_id,
822 agent_id,
823 )
824 .await
825}
826
827pub async fn execute_input_activity<A: RuntimeHostAdapter>(
828 adapter: &A,
829 org_id: i64,
830 input: InputAtomInput,
831) -> everruns_core::error::Result<InputAtomResult> {
832 RuntimeSessionLifecycle::new(adapter.clone(), org_id, input.context.session_id)
833 .turn_started(input.context.turn_id, input.context.input_message_id)
834 .await;
835
836 let atom = InputAtom::new(adapter.message_store());
837 atom.execute(input).await
838}
839
840struct UserPromptHookResult {
847 decision: everruns_core::lifecycle_hooks::UserPromptDecision,
848 original_message: String,
849}
850
851async fn run_user_prompt_submit_for_turn<A: RuntimeHostAdapter>(
852 adapter: &A,
853 org_id: i64,
854 input: &ReasonInput,
855) -> everruns_core::error::Result<Option<UserPromptHookResult>> {
856 let (specs, dispatcher) = match collect_lifecycle_hook_specs(
857 adapter,
858 org_id,
859 input.context.session_id,
860 input.harness_id,
861 input.agent_id,
862 )
863 .await
864 {
865 Ok(pair) => pair,
866 Err(error) => {
867 warn!(
868 session_id = %input.context.session_id,
869 %error,
870 "failed to collect user_prompt_submit hook specs; continuing without them"
871 );
872 return Ok(None);
873 }
874 };
875 let hooks = everruns_core::lifecycle_hooks::build_turn_lifecycle_hooks(
876 &specs,
877 everruns_core::user_hook_types::HookEvent::UserPromptSubmit,
878 dispatcher,
879 );
880 if hooks.is_empty() {
881 return Ok(None);
882 }
883
884 let message_text = adapter
885 .message_store()
886 .get(input.context.session_id, input.context.input_message_id)
887 .await
888 .ok()
889 .flatten()
890 .map(|m| m.content_to_llm_string())
891 .unwrap_or_default();
892
893 let ctx = everruns_core::lifecycle_hooks::TurnHookContext {
894 session_id: input.context.session_id,
895 turn_id: Some(input.context.turn_id),
896 org_id: org_public_id_from_internal(org_id).parse().ok(),
897 agent_id: input.agent_id.map(|a| a.to_string()),
898 };
899 let original_message = message_text.clone();
900 let decision =
901 everruns_core::lifecycle_hooks::run_user_prompt_submit_hooks(&hooks, &ctx, message_text)
902 .await;
903 Ok(Some(UserPromptHookResult {
904 decision,
905 original_message,
906 }))
907}
908
909pub async fn execute_reason_activity<A: RuntimeHostAdapter>(
910 adapter: &A,
911 org_id: i64,
912 input: ReasonInput,
913) -> everruns_core::error::Result<ReasonResult> {
914 if let Some(blocker) =
915 detect_dependency_blocker(adapter, org_id, input.harness_id, input.agent_id).await?
916 {
917 RuntimeSessionLifecycle::new(adapter.clone(), org_id, input.context.session_id)
918 .dependency_blocked(
919 input.context.turn_id,
920 input.context.input_message_id,
921 blocker,
922 )
923 .await;
924 return Ok(ReasonResult {
925 success: false,
926 text: blocker.message().to_string(),
927 tool_calls: vec![],
928 has_tool_calls: false,
929 tool_definitions: vec![],
930 max_iterations: everruns_core::runtime_agent::default_max_iterations(),
931 error: Some("dependency_unavailable".to_string()),
932 user_facing_error: None,
933 error_disclosure: None,
934 usage: None,
935 output_message_id: None,
936 time_to_first_token_ms: None,
937 response_id: None,
938 locale: None,
939 network_access: None,
940 });
941 }
942
943 let mut user_prompt_message_override = None;
951 if input.iteration <= 1
952 && let Some(hook_result) = run_user_prompt_submit_for_turn(adapter, org_id, &input).await?
953 {
954 match hook_result.decision {
955 everruns_core::lifecycle_hooks::UserPromptDecision::Block {
956 reason,
957 user_message,
958 } => {
959 RuntimeSessionLifecycle::new(adapter.clone(), org_id, input.context.session_id)
960 .user_prompt_blocked(
961 input.context.turn_id,
962 input.context.input_message_id,
963 &reason,
964 user_message.as_deref(),
965 )
966 .await;
967 return Ok(ReasonResult {
968 success: false,
969 text: user_message.unwrap_or_else(|| reason.clone()),
970 tool_calls: vec![],
971 has_tool_calls: false,
972 tool_definitions: vec![],
973 max_iterations: everruns_core::runtime_agent::default_max_iterations(),
974 error: Some("blocked_by_user_prompt_hook".to_string()),
975 user_facing_error: None,
976 error_disclosure: None,
977 usage: None,
978 output_message_id: None,
979 time_to_first_token_ms: None,
980 response_id: None,
981 locale: None,
982 network_access: None,
983 });
984 }
985 everruns_core::lifecycle_hooks::UserPromptDecision::Continue { message } => {
986 if message != hook_result.original_message {
987 user_prompt_message_override = Some(message);
988 }
989 }
990 }
991 }
992
993 let turn_context = adapter
994 .load_turn_context(org_id, input.context.session_id)
995 .await?;
996
997 let mut atom = ReasonAtom::new(
998 adapter.harness_store(org_id),
999 adapter.agent_store(org_id),
1000 adapter.session_store(org_id),
1001 adapter.message_store(),
1002 adapter.provider_store(org_id),
1003 adapter.capability_registry(),
1004 adapter.driver_registry(),
1005 adapter.event_emitter(),
1006 )
1007 .with_file_store(adapter.file_store());
1008 if let Some(image_resolver) = adapter.image_resolver(org_id) {
1009 atom = atom.with_image_resolver(image_resolver);
1010 }
1011 if let Some(hb) = adapter.stream_heartbeater() {
1012 atom = atom.with_stream_heartbeater(hb);
1013 }
1014 if let Some(timeout) = adapter.provider_stall_timeout() {
1015 atom = atom.with_provider_stall_timeout(timeout);
1016 }
1017 if let Some(store) = adapter.partial_stream_store() {
1018 atom = atom.with_partial_stream_store(store);
1019 }
1020 if let Some(store) = adapter.durable_tool_result_store() {
1021 atom = atom.with_durable_tool_result_store(store);
1022 }
1023
1024 let input = ReasonInput {
1025 mcp_tool_definitions: turn_context.mcp_tool_definitions,
1026 ..input
1027 };
1028
1029 if let Some(message_override) = user_prompt_message_override {
1030 let mut assembled = assemble_turn_context(
1031 adapter.harness_store(org_id).as_ref(),
1032 adapter.agent_store(org_id).as_ref(),
1033 adapter.session_store(org_id).as_ref(),
1034 adapter.message_store().as_ref(),
1035 adapter.provider_store(org_id).as_ref(),
1036 &adapter.capability_registry(),
1037 input.context.session_id,
1038 input.harness_id,
1039 input.agent_id,
1040 &input.mcp_tool_definitions,
1041 Some(adapter.file_store()),
1042 )
1043 .await?;
1044
1045 let message = assembled
1046 .messages
1047 .iter_mut()
1048 .find(|message| message.id == input.context.input_message_id)
1049 .ok_or_else(|| {
1050 everruns_core::error::AgentLoopError::config(
1051 "user_prompt_submit mutation: input message not found in assembled context",
1052 )
1053 })?;
1054
1055 message
1060 .content
1061 .retain(|part| !matches!(part, ContentPart::Text(_)));
1062 message
1063 .content
1064 .insert(0, ContentPart::text(message_override));
1065
1066 return atom.execute_with_assembled_context(input, assembled).await;
1067 }
1068
1069 atom.execute(input).await
1070}
1071
1072pub async fn execute_act_activity<A: RuntimeHostAdapter>(
1073 adapter: &A,
1074 input: ActInput,
1075) -> everruns_core::error::Result<ActResult> {
1076 let org_id = input.org_id.ok_or_else(|| {
1077 everruns_core::error::AgentLoopError::config(
1078 "ActInput.org_id must be set for runtime host execution",
1079 )
1080 })?;
1081
1082 if let Some(blocker) =
1083 detect_dependency_blocker(adapter, org_id, input.harness_id, input.agent_id).await?
1084 {
1085 RuntimeSessionLifecycle::new(adapter.clone(), org_id, input.context.session_id)
1086 .dependency_blocked(
1087 input.context.turn_id,
1088 input.context.input_message_id,
1089 blocker,
1090 )
1091 .await;
1092 return Ok(ActResult {
1093 results: vec![],
1094 completed: true,
1095 success_count: 0,
1096 error_count: 1,
1097 waiting_for_tool_results: false,
1098 blocked: true,
1099 client_tool_calls: vec![],
1100 client_tool_definitions: vec![],
1101 });
1102 }
1103
1104 let execution_capabilities = load_execution_capabilities(
1105 adapter,
1106 org_id,
1107 input.context.session_id,
1108 input.harness_id,
1109 input.agent_id,
1110 input.locale.clone(),
1111 input.blueprint_id.as_deref(),
1112 )
1113 .await?;
1114 let mut tool_registry = execution_capabilities.tool_registry;
1115
1116 if let Some(mcp) = adapter.mcp_executor(org_id, input.context.session_id).await {
1123 let invoker: Arc<dyn everruns_core::McpToolInvoker> = mcp;
1124 for tool in everruns_core::build_mcp_proxy_tools(&input.tool_definitions, invoker) {
1125 tool_registry.register_boxed(tool);
1126 }
1127 }
1128
1129 let builtin_tool_registry = Arc::new(tool_registry.clone());
1130 let executor: Arc<dyn everruns_core::traits::ToolExecutor> = Arc::new(tool_registry);
1131
1132 let mut atom =
1133 ActAtom::with_file_store(executor, adapter.event_emitter(), adapter.file_store())
1134 .with_session_store(adapter.session_store(org_id))
1135 .with_session_mutator(adapter.session_mutator(org_id))
1136 .with_agent_store(adapter.agent_store(org_id))
1137 .with_tool_registry(builtin_tool_registry)
1138 .with_org_id(
1139 org_public_id_from_internal(org_id)
1140 .parse()
1141 .expect("internal org id converts to valid public org id"),
1142 )
1143 .with_capability_registry(adapter.capability_registry())
1144 .with_post_tool_hooks(execution_capabilities.post_tool_hooks)
1145 .with_pre_tool_hooks(execution_capabilities.pre_tool_hooks)
1146 .with_tool_call_hooks(execution_capabilities.tool_call_hooks);
1147
1148 if let Some(storage_store) = adapter.storage_store() {
1149 atom = atom.with_storage_store(storage_store);
1150 }
1151 if let Some(image_store) = adapter.image_artifact_store(org_id) {
1152 atom = atom.with_image_store(image_store);
1153 }
1154 if let Some(provider_credential_store) = adapter.provider_credential_store(org_id) {
1155 atom = atom.with_provider_credential_store(provider_credential_store);
1156 }
1157 if let Some(utility_llm_service) = adapter.utility_llm_service() {
1158 atom = atom.with_utility_llm_service(utility_llm_service);
1159 }
1160 if let Some(egress_service) = adapter.egress_service() {
1161 atom = atom.with_egress_service(egress_service);
1162 }
1163 if let Some(connection_resolver) = adapter.connection_resolver() {
1164 atom = atom.with_connection_resolver(connection_resolver);
1165 }
1166 if let Some(sqldb_store) = adapter.sqldb_store() {
1167 atom = atom.with_sqldb_store(sqldb_store);
1168 }
1169 if let Some(leased_resource_store) = adapter.leased_resource_store() {
1170 atom = atom.with_leased_resource_store(leased_resource_store);
1171 }
1172 if let Some(registry) = adapter.session_resource_registry() {
1173 atom = atom.with_session_resource_registry(registry);
1174 }
1175 if let Some(registry) = adapter.session_task_registry() {
1176 atom = atom.with_session_task_registry(registry);
1177 }
1178 if let Some(schedule_store) = adapter.schedule_store(org_id) {
1179 atom = atom.with_schedule_store(schedule_store);
1180 }
1181 if let Some(platform_store) = adapter.platform_store(org_id, input.context.session_id) {
1182 atom = atom.with_platform_store(platform_store);
1183 }
1184 if let Some(budget_checker) = adapter.budget_checker(org_id, input.agent_id) {
1185 atom = atom.with_budget_checker(budget_checker);
1186 }
1187 if let Some(payment_authority) = adapter.payment_authority(org_id, input.agent_id) {
1188 atom = atom.with_payment_authority(payment_authority);
1189 }
1190 if let Some(limiter) = adapter.outbound_tool_rate_limiter(org_id) {
1191 atom = atom.with_outbound_tool_rate_limiter(limiter);
1192 }
1193 if let Some(store) = adapter.durable_tool_result_store() {
1194 atom = atom.with_durable_tool_result_store(store);
1195 }
1196 if let Some(store) = adapter.subagent_spawn_store() {
1197 atom = atom.with_subagent_spawn_store(store);
1198 }
1199
1200 atom.execute(input).await
1201}