1use async_trait::async_trait;
6use everruns_core::atoms::{
7 ActAtom, ActInput, ActResult, Atom, InputAtom, InputAtomInput, InputAtomResult, ReasonAtom,
8 ReasonInput, ReasonResult,
9};
10use everruns_core::capabilities::{SystemPromptContext, collect_capabilities_with_configs};
11use everruns_core::events::{
12 EventContext, EventRequest, OutputMessageCompletedData, SessionActivatedData, SessionIdledData,
13 TurnCompletedData, TurnFailedData, TurnStartedData,
14};
15use everruns_core::message::Message;
16use everruns_core::message_retriever::MessageRetriever;
17use everruns_core::platform_store::PlatformStore;
18use everruns_core::session::SessionStatus;
19use everruns_core::traits::{
20 AgentStore, BudgetChecker, EventEmitter, HarnessStore, ImageArtifactStore, ImageResolver,
21 LeasedResourceStore, LlmProviderStore, ModelWithProvider, PaymentAuthority,
22 ProviderCredentialStore, SessionFileSystem, SessionMutator, SessionResourceRegistry,
23 SessionScheduleStore, SessionSqlDbStoreRef, SessionStorageStore, SessionStore,
24 UserConnectionResolver,
25};
26use everruns_core::typed_id::{AgentId, HarnessId, MessageId, SessionId, TurnId};
27use everruns_core::{
28 Agent, CapabilityRegistry, DependencyBlocker, DriverRegistry, EgressService, Harness, Session,
29 TokenUsage, ToolDefinition, ToolRegistry, UserFacingError, UtilityLlmService,
30 org_public_id_from_internal, resolve_runtime_capabilities,
31};
32use std::sync::Arc;
33use tracing::warn;
34
35#[derive(Debug, Clone)]
37pub struct RuntimeHostTurnContext {
38 pub agent: Option<Agent>,
39 pub session: Session,
40 pub messages: Vec<Message>,
41 pub model: Option<ModelWithProvider>,
42 pub mcp_tool_definitions: Vec<ToolDefinition>,
43}
44
45#[async_trait]
56pub trait RuntimeHostAdapter: Send + Sync + Clone + 'static {
57 async fn get_agent(
58 &self,
59 org_id: i64,
60 agent_id: AgentId,
61 ) -> everruns_core::error::Result<Option<Agent>>;
62
63 async fn get_harness(
64 &self,
65 org_id: i64,
66 harness_id: HarnessId,
67 ) -> everruns_core::error::Result<Option<Harness>>;
68
69 async fn set_session_status(
70 &self,
71 org_id: i64,
72 session_id: SessionId,
73 status: SessionStatus,
74 ) -> everruns_core::error::Result<Session>;
75
76 async fn load_turn_context(
77 &self,
78 org_id: i64,
79 session_id: SessionId,
80 ) -> everruns_core::error::Result<RuntimeHostTurnContext>;
81
82 fn capability_registry(&self) -> CapabilityRegistry;
83
84 fn driver_registry(&self) -> DriverRegistry;
85
86 fn harness_store(&self, org_id: i64) -> Arc<dyn HarnessStore>;
87
88 fn agent_store(&self, org_id: i64) -> Arc<dyn AgentStore>;
89
90 fn session_store(&self, org_id: i64) -> Arc<dyn SessionStore>;
91
92 fn session_mutator(&self, org_id: i64) -> Arc<dyn SessionMutator>;
93
94 fn provider_store(&self, org_id: i64) -> Arc<dyn LlmProviderStore>;
95
96 fn message_store(&self) -> Arc<dyn MessageRetriever>;
97
98 fn event_emitter(&self) -> Arc<dyn EventEmitter>;
99
100 fn file_store(&self) -> Arc<dyn SessionFileSystem>;
101
102 fn image_resolver(&self, _org_id: i64) -> Option<Arc<dyn ImageResolver>> {
103 None
104 }
105
106 fn image_artifact_store(&self, _org_id: i64) -> Option<Arc<dyn ImageArtifactStore>> {
107 None
108 }
109
110 fn provider_credential_store(&self, _org_id: i64) -> Option<Arc<dyn ProviderCredentialStore>> {
111 None
112 }
113
114 fn utility_llm_service(&self) -> Option<Arc<dyn UtilityLlmService>> {
115 None
116 }
117
118 fn egress_service(&self) -> Option<Arc<dyn EgressService>> {
119 None
120 }
121
122 fn storage_store(&self) -> Option<Arc<dyn SessionStorageStore>> {
123 None
124 }
125
126 fn memory_store(&self, _org_id: i64) -> Option<Arc<dyn everruns_core::MemoryStoreBackend>> {
127 None
128 }
129
130 fn connection_resolver(&self) -> Option<Arc<dyn UserConnectionResolver>> {
131 None
132 }
133
134 fn sqldb_store(&self) -> Option<SessionSqlDbStoreRef> {
135 None
136 }
137
138 fn leased_resource_store(&self) -> Option<Arc<dyn LeasedResourceStore>> {
139 None
140 }
141
142 fn session_resource_registry(&self) -> Option<Arc<dyn SessionResourceRegistry>> {
143 None
144 }
145
146 fn schedule_store(&self, _org_id: i64) -> Option<Arc<dyn SessionScheduleStore>> {
147 None
148 }
149
150 fn platform_store(
151 &self,
152 _org_id: i64,
153 _session_id: SessionId,
154 ) -> Option<Arc<dyn PlatformStore>> {
155 None
156 }
157
158 fn budget_checker(
159 &self,
160 _org_id: i64,
161 _agent_id: Option<AgentId>,
162 ) -> Option<Arc<dyn BudgetChecker>> {
163 None
164 }
165
166 fn payment_authority(
167 &self,
168 _org_id: i64,
169 _agent_id: Option<AgentId>,
170 ) -> Option<Arc<dyn PaymentAuthority>> {
171 None
172 }
173}
174
175struct RuntimeExecutionCapabilities {
176 tool_registry: ToolRegistry,
177 post_tool_hooks: Vec<Arc<dyn everruns_core::PostToolExecHook>>,
178 tool_call_hooks: Vec<Arc<dyn everruns_core::ToolCallHook>>,
179}
180
181async fn load_execution_capabilities<A: RuntimeHostAdapter>(
182 adapter: &A,
183 org_id: i64,
184 session_id: SessionId,
185 harness_id: HarnessId,
186 agent_id: Option<AgentId>,
187 locale: Option<String>,
188 blueprint_id: Option<&str>,
189) -> everruns_core::error::Result<RuntimeExecutionCapabilities> {
190 let capability_registry = adapter.capability_registry();
191 if let Some(blueprint_id) = blueprint_id {
192 let mut registry = ToolRegistry::with_defaults();
193 let blueprint = capability_registry.blueprint(blueprint_id).ok_or_else(|| {
194 everruns_core::error::AgentLoopError::config(format!(
195 "Blueprint \"{blueprint_id}\" not found in registry"
196 ))
197 })?;
198 for tool in blueprint.tools {
199 registry.register_boxed(tool);
200 }
201 return Ok(RuntimeExecutionCapabilities {
202 tool_registry: registry,
203 post_tool_hooks: Vec::new(),
204 tool_call_hooks: Vec::new(),
205 });
206 }
207
208 let harness_chain = adapter
209 .harness_store(org_id)
210 .get_harness_chain(harness_id)
211 .await?;
212 if harness_chain.is_empty() {
213 return Err(everruns_core::error::AgentLoopError::harness_not_found(
214 harness_id,
215 ));
216 }
217
218 let session = adapter
219 .session_store(org_id)
220 .get_session(session_id)
221 .await?
222 .ok_or_else(|| everruns_core::error::AgentLoopError::session_not_found(session_id))?;
223
224 let agent_store = adapter.agent_store(org_id);
225 let agent = match agent_id {
226 Some(agent_id) => Some(
227 agent_store
228 .get_agent(agent_id)
229 .await?
230 .ok_or_else(|| everruns_core::error::AgentLoopError::agent_not_found(agent_id))?,
231 ),
232 None => None,
233 };
234
235 let resolved = resolve_runtime_capabilities(
236 &harness_chain,
237 agent.as_ref(),
238 &session,
239 &capability_registry,
240 );
241 let prompt_ctx = SystemPromptContext {
242 session_id,
243 locale: locale.or(session.locale.clone()),
244 file_store: Some(adapter.file_store()),
245 };
246 let collected = collect_capabilities_with_configs(
247 &resolved.resolved_capability_configs,
248 &capability_registry,
249 &prompt_ctx,
250 )
251 .await;
252
253 let mut registry = ToolRegistry::with_defaults();
254 for tool in collected.tools {
255 registry.register_boxed(tool);
256 }
257
258 let post_tool_hooks = resolved
259 .resolved_capability_configs
260 .iter()
261 .flat_map(|config| {
262 capability_registry
263 .get(config.capability_id())
264 .map(|capability| capability.post_tool_exec_hooks())
265 .unwrap_or_default()
266 })
267 .collect();
268 let tool_call_hooks = resolved
269 .resolved_capability_configs
270 .iter()
271 .flat_map(|config| {
272 capability_registry
273 .get(config.capability_id())
274 .map(|capability| capability.tool_call_hooks())
275 .unwrap_or_default()
276 })
277 .collect();
278
279 Ok(RuntimeExecutionCapabilities {
280 tool_registry: registry,
281 post_tool_hooks,
282 tool_call_hooks,
283 })
284}
285
286pub struct RuntimeSessionLifecycle<A: RuntimeHostAdapter> {
288 adapter: A,
289 org_id: i64,
290 session_id: SessionId,
291}
292
293impl<A: RuntimeHostAdapter> RuntimeSessionLifecycle<A> {
294 pub fn new(adapter: A, org_id: i64, session_id: SessionId) -> Self {
295 Self {
296 adapter,
297 org_id,
298 session_id,
299 }
300 }
301
302 async fn set_session_status(&self, status: SessionStatus, action: &'static str) {
303 if let Err(error) = self
304 .adapter
305 .set_session_status(self.org_id, self.session_id, status)
306 .await
307 {
308 warn!(
309 session_id = %self.session_id,
310 org_id = self.org_id,
311 action,
312 %error,
313 "runtime host lifecycle status update failed"
314 );
315 }
316 }
317
318 async fn emit_event(&self, request: EventRequest) {
319 let event_type = request.event_type.clone();
320 if let Err(error) = self.adapter.event_emitter().emit(request).await {
321 warn!(
322 session_id = %self.session_id,
323 org_id = self.org_id,
324 event_type,
325 %error,
326 "runtime host lifecycle event emission failed"
327 );
328 }
329 }
330
331 pub async fn turn_started(&self, turn_id: TurnId, input_message_id: MessageId) {
332 let input_content = self
333 .adapter
334 .message_store()
335 .get(self.session_id, input_message_id)
336 .await
337 .ok()
338 .flatten()
339 .map(|message| message.content_to_llm_string());
340
341 self.set_session_status(SessionStatus::Active, "turn_started")
342 .await;
343
344 self.emit_event(EventRequest::new(
345 self.session_id,
346 EventContext::turn(turn_id, input_message_id),
347 SessionActivatedData {
348 turn_id,
349 input_message_id,
350 },
351 ))
352 .await;
353
354 self.emit_event(EventRequest::new(
355 self.session_id,
356 EventContext::turn(turn_id, input_message_id),
357 TurnStartedData {
358 turn_id,
359 input_message_id,
360 input_content,
361 },
362 ))
363 .await;
364 }
365
366 pub async fn emit_turn_completed(&self, input_message_id: MessageId, data: TurnCompletedData) {
367 let turn_id = data.turn_id;
368 self.emit_event(EventRequest::new(
369 self.session_id,
370 EventContext::turn(turn_id, input_message_id),
371 data,
372 ))
373 .await;
374 }
375
376 pub async fn emit_session_idled(
377 &self,
378 turn_id: TurnId,
379 input_message_id: MessageId,
380 iterations: Option<u32>,
381 usage: Option<TokenUsage>,
382 ) {
383 self.set_session_status(SessionStatus::Idle, "emit_session_idled")
384 .await;
385
386 self.emit_event(EventRequest::new(
387 self.session_id,
388 EventContext::turn(turn_id, input_message_id),
389 SessionIdledData {
390 turn_id,
391 iterations,
392 usage,
393 },
394 ))
395 .await;
396 }
397
398 pub async fn turn_completed(
399 &self,
400 turn_id: TurnId,
401 input_message_id: MessageId,
402 iterations: u32,
403 usage: Option<TokenUsage>,
404 input_content: Option<String>,
405 ) {
406 self.emit_turn_completed(
407 input_message_id,
408 TurnCompletedData {
409 turn_id,
410 iterations,
411 duration_ms: None,
412 usage: usage.clone(),
413 input_content,
414 final_message_id: None,
415 final_answer_preview: None,
416 time_to_first_token_ms: None,
417 tool_call_count: None,
418 llm_call_count: None,
419 status: Some("completed".to_string()),
420 },
421 )
422 .await;
423 self.emit_session_idled(turn_id, input_message_id, Some(iterations), usage)
424 .await;
425 }
426
427 pub async fn turn_failed(
428 &self,
429 turn_id: TurnId,
430 input_message_id: MessageId,
431 error: &str,
432 user_error: Option<&UserFacingError>,
433 ) {
434 self.set_session_status(SessionStatus::Idle, "turn_failed")
435 .await;
436
437 self.emit_event(EventRequest::new(
438 self.session_id,
439 EventContext::turn(turn_id, input_message_id),
440 {
441 let mut data = TurnFailedData {
442 turn_id,
443 error: error.to_string(),
444 error_code: None,
445 error_fields: None,
446 };
447 if let Some(user_error) = user_error {
448 user_error.apply_to_event_fields(&mut data.error_code, &mut data.error_fields);
449 }
450 data
451 },
452 ))
453 .await;
454
455 self.emit_event(EventRequest::new(
456 self.session_id,
457 EventContext::turn(turn_id, input_message_id),
458 SessionIdledData {
459 turn_id,
460 iterations: None,
461 usage: None,
462 },
463 ))
464 .await;
465 }
466
467 pub async fn waiting_for_tool_results(&self) {
468 self.set_session_status(
469 SessionStatus::WaitingForToolResults,
470 "waiting_for_tool_results",
471 )
472 .await;
473 }
474
475 pub async fn dependency_blocked(
476 &self,
477 turn_id: TurnId,
478 input_message_id: MessageId,
479 blocker: DependencyBlocker,
480 ) {
481 let user_error = UserFacingError::new(blocker.error_code())
482 .with_field(
483 "dependency",
484 match blocker {
485 DependencyBlocker::HarnessArchived | DependencyBlocker::HarnessDeleted => {
486 "harness"
487 }
488 DependencyBlocker::AgentArchived | DependencyBlocker::AgentDeleted => "agent",
489 },
490 )
491 .with_field(
492 "state",
493 match blocker {
494 DependencyBlocker::HarnessArchived | DependencyBlocker::AgentArchived => {
495 "archived"
496 }
497 DependencyBlocker::HarnessDeleted | DependencyBlocker::AgentDeleted => {
498 "deleted"
499 }
500 },
501 );
502 let mut error_message = Message::assistant(blocker.message());
503 let mut metadata = std::collections::HashMap::new();
504 user_error.apply_to_message_metadata(&mut metadata);
505 error_message.metadata = Some(metadata);
506
507 self.emit_event(EventRequest::new(
508 self.session_id,
509 EventContext::turn(turn_id, input_message_id),
510 OutputMessageCompletedData::new(error_message).with_user_facing_error(&user_error),
511 ))
512 .await;
513
514 self.turn_failed(
515 turn_id,
516 input_message_id,
517 blocker.message(),
518 Some(&user_error),
519 )
520 .await;
521 }
522}
523
524pub async fn detect_dependency_blocker<A: RuntimeHostAdapter>(
525 adapter: &A,
526 org_id: i64,
527 harness_id: HarnessId,
528 agent_id: Option<AgentId>,
529) -> everruns_core::error::Result<Option<DependencyBlocker>> {
530 let harness_store = adapter.harness_store(org_id);
531 let agent_store = adapter.agent_store(org_id);
532 everruns_core::detect_dependency_blocker(
533 harness_store.as_ref(),
534 agent_store.as_ref(),
535 harness_id,
536 agent_id,
537 )
538 .await
539}
540
541pub async fn execute_input_activity<A: RuntimeHostAdapter>(
542 adapter: &A,
543 org_id: i64,
544 input: InputAtomInput,
545) -> everruns_core::error::Result<InputAtomResult> {
546 RuntimeSessionLifecycle::new(adapter.clone(), org_id, input.context.session_id)
547 .turn_started(input.context.turn_id, input.context.input_message_id)
548 .await;
549
550 let atom = InputAtom::new(adapter.message_store());
551 atom.execute(input).await
552}
553
554pub async fn execute_reason_activity<A: RuntimeHostAdapter>(
555 adapter: &A,
556 org_id: i64,
557 input: ReasonInput,
558) -> everruns_core::error::Result<ReasonResult> {
559 if let Some(blocker) =
560 detect_dependency_blocker(adapter, org_id, input.harness_id, input.agent_id).await?
561 {
562 RuntimeSessionLifecycle::new(adapter.clone(), org_id, input.context.session_id)
563 .dependency_blocked(
564 input.context.turn_id,
565 input.context.input_message_id,
566 blocker,
567 )
568 .await;
569 return Ok(ReasonResult {
570 success: false,
571 text: blocker.message().to_string(),
572 tool_calls: vec![],
573 has_tool_calls: false,
574 tool_definitions: vec![],
575 max_iterations: everruns_core::runtime_agent::default_max_iterations(),
576 error: Some("dependency_unavailable".to_string()),
577 usage: None,
578 output_message_id: None,
579 time_to_first_token_ms: None,
580 response_id: None,
581 locale: None,
582 network_access: None,
583 });
584 }
585
586 let turn_context = adapter
587 .load_turn_context(org_id, input.context.session_id)
588 .await?;
589
590 let mut atom = ReasonAtom::new(
591 adapter.harness_store(org_id),
592 adapter.agent_store(org_id),
593 adapter.session_store(org_id),
594 adapter.message_store(),
595 adapter.provider_store(org_id),
596 adapter.capability_registry(),
597 adapter.driver_registry(),
598 adapter.event_emitter(),
599 )
600 .with_file_store(adapter.file_store());
601 if let Some(image_resolver) = adapter.image_resolver(org_id) {
602 atom = atom.with_image_resolver(image_resolver);
603 }
604
605 atom.execute(ReasonInput {
606 mcp_tool_definitions: turn_context.mcp_tool_definitions,
607 ..input
608 })
609 .await
610}
611
612pub async fn execute_act_activity<A: RuntimeHostAdapter>(
613 adapter: &A,
614 input: ActInput,
615) -> everruns_core::error::Result<ActResult> {
616 let org_id = input.org_id.ok_or_else(|| {
617 everruns_core::error::AgentLoopError::config(
618 "ActInput.org_id must be set for runtime host execution",
619 )
620 })?;
621
622 if let Some(blocker) =
623 detect_dependency_blocker(adapter, org_id, input.harness_id, input.agent_id).await?
624 {
625 RuntimeSessionLifecycle::new(adapter.clone(), org_id, input.context.session_id)
626 .dependency_blocked(
627 input.context.turn_id,
628 input.context.input_message_id,
629 blocker,
630 )
631 .await;
632 return Ok(ActResult {
633 results: vec![],
634 completed: true,
635 success_count: 0,
636 error_count: 1,
637 waiting_for_tool_results: false,
638 blocked: true,
639 client_tool_calls: vec![],
640 client_tool_definitions: vec![],
641 });
642 }
643
644 let execution_capabilities = load_execution_capabilities(
645 adapter,
646 org_id,
647 input.context.session_id,
648 input.harness_id,
649 input.agent_id,
650 input.locale.clone(),
651 input.blueprint_id.as_deref(),
652 )
653 .await?;
654 let tool_registry = execution_capabilities.tool_registry;
655 let builtin_tool_registry = Arc::new(tool_registry.clone());
656
657 let mut atom =
658 ActAtom::with_file_store(tool_registry, adapter.event_emitter(), adapter.file_store())
659 .with_session_store(adapter.session_store(org_id))
660 .with_session_mutator(adapter.session_mutator(org_id))
661 .with_agent_store(adapter.agent_store(org_id))
662 .with_tool_registry(builtin_tool_registry)
663 .with_org_id(
664 org_public_id_from_internal(org_id)
665 .parse()
666 .expect("internal org id converts to valid public org id"),
667 )
668 .with_capability_registry(adapter.capability_registry())
669 .with_post_tool_hooks(execution_capabilities.post_tool_hooks)
670 .with_tool_call_hooks(execution_capabilities.tool_call_hooks);
671
672 if let Some(storage_store) = adapter.storage_store() {
673 atom = atom.with_storage_store(storage_store);
674 }
675 if let Some(image_store) = adapter.image_artifact_store(org_id) {
676 atom = atom.with_image_store(image_store);
677 }
678 if let Some(provider_credential_store) = adapter.provider_credential_store(org_id) {
679 atom = atom.with_provider_credential_store(provider_credential_store);
680 }
681 if let Some(utility_llm_service) = adapter.utility_llm_service() {
682 atom = atom.with_utility_llm_service(utility_llm_service);
683 }
684 if let Some(egress_service) = adapter.egress_service() {
685 atom = atom.with_egress_service(egress_service);
686 }
687 if let Some(memory_store) = adapter.memory_store(org_id) {
688 atom = atom.with_memory_store(memory_store);
689 }
690 if let Some(connection_resolver) = adapter.connection_resolver() {
691 atom = atom.with_connection_resolver(connection_resolver);
692 }
693 if let Some(sqldb_store) = adapter.sqldb_store() {
694 atom = atom.with_sqldb_store(sqldb_store);
695 }
696 if let Some(leased_resource_store) = adapter.leased_resource_store() {
697 atom = atom.with_leased_resource_store(leased_resource_store);
698 }
699 if let Some(registry) = adapter.session_resource_registry() {
700 atom = atom.with_session_resource_registry(registry);
701 }
702 if let Some(schedule_store) = adapter.schedule_store(org_id) {
703 atom = atom.with_schedule_store(schedule_store);
704 }
705 if let Some(platform_store) = adapter.platform_store(org_id, input.context.session_id) {
706 atom = atom.with_platform_store(platform_store);
707 }
708 if let Some(budget_checker) = adapter.budget_checker(org_id, input.agent_id) {
709 atom = atom.with_budget_checker(budget_checker);
710 }
711 if let Some(payment_authority) = adapter.payment_authority(org_id, input.agent_id) {
712 atom = atom.with_payment_authority(payment_authority);
713 }
714
715 atom.execute(input).await
716}