1use async_trait::async_trait;
6use everruns_core::atoms::{
7 ActAtom, ActInput, ActResult, Atom, InputAtom, InputAtomInput, InputAtomResult, ReasonAtom,
8 ReasonInput, ReasonResult,
9};
10use everruns_core::capabilities::{SystemPromptContext, collect_capabilities_with_configs};
11use everruns_core::events::{
12 EventContext, EventRequest, OutputMessageCompletedData, SessionActivatedData, SessionIdledData,
13 TurnCompletedData, TurnFailedData, TurnStartedData,
14};
15use everruns_core::message::Message;
16use everruns_core::message_retriever::MessageRetriever;
17use everruns_core::platform_store::PlatformStore;
18use everruns_core::session::SessionStatus;
19use everruns_core::traits::{
20 AgentStore, BudgetChecker, EventEmitter, HarnessStore, ImageArtifactStore, ImageResolver,
21 LeasedResourceStore, LlmProviderStore, ModelWithProvider, PaymentAuthority,
22 ProviderCredentialStore, SessionFileSystem, SessionMutator, SessionResourceRegistry,
23 SessionScheduleStore, SessionSqlDbStoreRef, SessionStorageStore, SessionStore,
24 UserConnectionResolver,
25};
26use everruns_core::typed_id::{AgentId, HarnessId, MessageId, SessionId, TurnId};
27use everruns_core::{
28 Agent, CapabilityRegistry, DependencyBlocker, DriverRegistry, Harness, Session, TokenUsage,
29 ToolDefinition, ToolRegistry, UserFacingError, UtilityLlmService, org_public_id_from_internal,
30 resolve_runtime_capabilities,
31};
32use std::sync::Arc;
33use tracing::warn;
34
35#[derive(Debug, Clone)]
37pub struct RuntimeHostTurnContext {
38 pub agent: Option<Agent>,
39 pub session: Session,
40 pub messages: Vec<Message>,
41 pub model: Option<ModelWithProvider>,
42 pub mcp_tool_definitions: Vec<ToolDefinition>,
43}
44
45#[async_trait]
56pub trait RuntimeHostAdapter: Send + Sync + Clone + 'static {
57 async fn get_agent(
58 &self,
59 org_id: i64,
60 agent_id: AgentId,
61 ) -> everruns_core::error::Result<Option<Agent>>;
62
63 async fn get_harness(
64 &self,
65 org_id: i64,
66 harness_id: HarnessId,
67 ) -> everruns_core::error::Result<Option<Harness>>;
68
69 async fn set_session_status(
70 &self,
71 org_id: i64,
72 session_id: SessionId,
73 status: SessionStatus,
74 ) -> everruns_core::error::Result<Session>;
75
76 async fn load_turn_context(
77 &self,
78 org_id: i64,
79 session_id: SessionId,
80 ) -> everruns_core::error::Result<RuntimeHostTurnContext>;
81
82 fn capability_registry(&self) -> CapabilityRegistry;
83
84 fn driver_registry(&self) -> DriverRegistry;
85
86 fn harness_store(&self, org_id: i64) -> Arc<dyn HarnessStore>;
87
88 fn agent_store(&self, org_id: i64) -> Arc<dyn AgentStore>;
89
90 fn session_store(&self, org_id: i64) -> Arc<dyn SessionStore>;
91
92 fn session_mutator(&self, org_id: i64) -> Arc<dyn SessionMutator>;
93
94 fn provider_store(&self, org_id: i64) -> Arc<dyn LlmProviderStore>;
95
96 fn message_store(&self) -> Arc<dyn MessageRetriever>;
97
98 fn event_emitter(&self) -> Arc<dyn EventEmitter>;
99
100 fn file_store(&self) -> Arc<dyn SessionFileSystem>;
101
102 fn image_resolver(&self, _org_id: i64) -> Option<Arc<dyn ImageResolver>> {
103 None
104 }
105
106 fn image_artifact_store(&self, _org_id: i64) -> Option<Arc<dyn ImageArtifactStore>> {
107 None
108 }
109
110 fn provider_credential_store(&self, _org_id: i64) -> Option<Arc<dyn ProviderCredentialStore>> {
111 None
112 }
113
114 fn utility_llm_service(&self) -> Option<Arc<dyn UtilityLlmService>> {
115 None
116 }
117
118 fn storage_store(&self) -> Option<Arc<dyn SessionStorageStore>> {
119 None
120 }
121
122 fn memory_store(&self, _org_id: i64) -> Option<Arc<dyn everruns_core::MemoryStoreBackend>> {
123 None
124 }
125
126 fn connection_resolver(&self) -> Option<Arc<dyn UserConnectionResolver>> {
127 None
128 }
129
130 fn sqldb_store(&self) -> Option<SessionSqlDbStoreRef> {
131 None
132 }
133
134 fn leased_resource_store(&self) -> Option<Arc<dyn LeasedResourceStore>> {
135 None
136 }
137
138 fn session_resource_registry(&self) -> Option<Arc<dyn SessionResourceRegistry>> {
139 None
140 }
141
142 fn schedule_store(&self, _org_id: i64) -> Option<Arc<dyn SessionScheduleStore>> {
143 None
144 }
145
146 fn platform_store(
147 &self,
148 _org_id: i64,
149 _session_id: SessionId,
150 ) -> Option<Arc<dyn PlatformStore>> {
151 None
152 }
153
154 fn budget_checker(
155 &self,
156 _org_id: i64,
157 _agent_id: Option<AgentId>,
158 ) -> Option<Arc<dyn BudgetChecker>> {
159 None
160 }
161
162 fn payment_authority(
163 &self,
164 _org_id: i64,
165 _agent_id: Option<AgentId>,
166 ) -> Option<Arc<dyn PaymentAuthority>> {
167 None
168 }
169}
170
171struct RuntimeExecutionCapabilities {
172 tool_registry: ToolRegistry,
173 post_tool_hooks: Vec<Arc<dyn everruns_core::PostToolExecHook>>,
174 tool_call_hooks: Vec<Arc<dyn everruns_core::ToolCallHook>>,
175}
176
177async fn load_execution_capabilities<A: RuntimeHostAdapter>(
178 adapter: &A,
179 org_id: i64,
180 session_id: SessionId,
181 harness_id: HarnessId,
182 agent_id: Option<AgentId>,
183 locale: Option<String>,
184 blueprint_id: Option<&str>,
185) -> everruns_core::error::Result<RuntimeExecutionCapabilities> {
186 let capability_registry = adapter.capability_registry();
187 if let Some(blueprint_id) = blueprint_id {
188 let mut registry = ToolRegistry::with_defaults();
189 let blueprint = capability_registry.blueprint(blueprint_id).ok_or_else(|| {
190 everruns_core::error::AgentLoopError::config(format!(
191 "Blueprint \"{blueprint_id}\" not found in registry"
192 ))
193 })?;
194 for tool in blueprint.tools {
195 registry.register_boxed(tool);
196 }
197 return Ok(RuntimeExecutionCapabilities {
198 tool_registry: registry,
199 post_tool_hooks: Vec::new(),
200 tool_call_hooks: Vec::new(),
201 });
202 }
203
204 let harness_chain = adapter
205 .harness_store(org_id)
206 .get_harness_chain(harness_id)
207 .await?;
208 if harness_chain.is_empty() {
209 return Err(everruns_core::error::AgentLoopError::harness_not_found(
210 harness_id,
211 ));
212 }
213
214 let session = adapter
215 .session_store(org_id)
216 .get_session(session_id)
217 .await?
218 .ok_or_else(|| everruns_core::error::AgentLoopError::session_not_found(session_id))?;
219
220 let agent_store = adapter.agent_store(org_id);
221 let agent = match agent_id {
222 Some(agent_id) => Some(
223 agent_store
224 .get_agent(agent_id)
225 .await?
226 .ok_or_else(|| everruns_core::error::AgentLoopError::agent_not_found(agent_id))?,
227 ),
228 None => None,
229 };
230
231 let resolved = resolve_runtime_capabilities(
232 &harness_chain,
233 agent.as_ref(),
234 &session,
235 &capability_registry,
236 );
237 let prompt_ctx = SystemPromptContext {
238 session_id,
239 locale: locale.or(session.locale.clone()),
240 file_store: Some(adapter.file_store()),
241 };
242 let collected = collect_capabilities_with_configs(
243 &resolved.resolved_capability_configs,
244 &capability_registry,
245 &prompt_ctx,
246 )
247 .await;
248
249 let mut registry = ToolRegistry::with_defaults();
250 for tool in collected.tools {
251 registry.register_boxed(tool);
252 }
253
254 let post_tool_hooks = resolved
255 .resolved_capability_configs
256 .iter()
257 .flat_map(|config| {
258 capability_registry
259 .get(config.capability_id())
260 .map(|capability| capability.post_tool_exec_hooks())
261 .unwrap_or_default()
262 })
263 .collect();
264 let tool_call_hooks = resolved
265 .resolved_capability_configs
266 .iter()
267 .flat_map(|config| {
268 capability_registry
269 .get(config.capability_id())
270 .map(|capability| capability.tool_call_hooks())
271 .unwrap_or_default()
272 })
273 .collect();
274
275 Ok(RuntimeExecutionCapabilities {
276 tool_registry: registry,
277 post_tool_hooks,
278 tool_call_hooks,
279 })
280}
281
282pub struct RuntimeSessionLifecycle<A: RuntimeHostAdapter> {
284 adapter: A,
285 org_id: i64,
286 session_id: SessionId,
287}
288
289impl<A: RuntimeHostAdapter> RuntimeSessionLifecycle<A> {
290 pub fn new(adapter: A, org_id: i64, session_id: SessionId) -> Self {
291 Self {
292 adapter,
293 org_id,
294 session_id,
295 }
296 }
297
298 async fn set_session_status(&self, status: SessionStatus, action: &'static str) {
299 if let Err(error) = self
300 .adapter
301 .set_session_status(self.org_id, self.session_id, status)
302 .await
303 {
304 warn!(
305 session_id = %self.session_id,
306 org_id = self.org_id,
307 action,
308 %error,
309 "runtime host lifecycle status update failed"
310 );
311 }
312 }
313
314 async fn emit_event(&self, request: EventRequest) {
315 let event_type = request.event_type.clone();
316 if let Err(error) = self.adapter.event_emitter().emit(request).await {
317 warn!(
318 session_id = %self.session_id,
319 org_id = self.org_id,
320 event_type,
321 %error,
322 "runtime host lifecycle event emission failed"
323 );
324 }
325 }
326
327 pub async fn turn_started(&self, turn_id: TurnId, input_message_id: MessageId) {
328 let input_content = self
329 .adapter
330 .message_store()
331 .get(self.session_id, input_message_id)
332 .await
333 .ok()
334 .flatten()
335 .map(|message| message.content_to_llm_string());
336
337 self.set_session_status(SessionStatus::Active, "turn_started")
338 .await;
339
340 self.emit_event(EventRequest::new(
341 self.session_id,
342 EventContext::turn(turn_id, input_message_id),
343 SessionActivatedData {
344 turn_id,
345 input_message_id,
346 },
347 ))
348 .await;
349
350 self.emit_event(EventRequest::new(
351 self.session_id,
352 EventContext::turn(turn_id, input_message_id),
353 TurnStartedData {
354 turn_id,
355 input_message_id,
356 input_content,
357 },
358 ))
359 .await;
360 }
361
362 pub async fn emit_turn_completed(&self, input_message_id: MessageId, data: TurnCompletedData) {
363 let turn_id = data.turn_id;
364 self.emit_event(EventRequest::new(
365 self.session_id,
366 EventContext::turn(turn_id, input_message_id),
367 data,
368 ))
369 .await;
370 }
371
372 pub async fn emit_session_idled(
373 &self,
374 turn_id: TurnId,
375 input_message_id: MessageId,
376 iterations: Option<u32>,
377 usage: Option<TokenUsage>,
378 ) {
379 self.set_session_status(SessionStatus::Idle, "emit_session_idled")
380 .await;
381
382 self.emit_event(EventRequest::new(
383 self.session_id,
384 EventContext::turn(turn_id, input_message_id),
385 SessionIdledData {
386 turn_id,
387 iterations,
388 usage,
389 },
390 ))
391 .await;
392 }
393
394 pub async fn turn_completed(
395 &self,
396 turn_id: TurnId,
397 input_message_id: MessageId,
398 iterations: u32,
399 usage: Option<TokenUsage>,
400 input_content: Option<String>,
401 ) {
402 self.emit_turn_completed(
403 input_message_id,
404 TurnCompletedData {
405 turn_id,
406 iterations,
407 duration_ms: None,
408 usage: usage.clone(),
409 input_content,
410 final_message_id: None,
411 final_answer_preview: None,
412 time_to_first_token_ms: None,
413 tool_call_count: None,
414 llm_call_count: None,
415 status: Some("completed".to_string()),
416 },
417 )
418 .await;
419 self.emit_session_idled(turn_id, input_message_id, Some(iterations), usage)
420 .await;
421 }
422
423 pub async fn turn_failed(
424 &self,
425 turn_id: TurnId,
426 input_message_id: MessageId,
427 error: &str,
428 user_error: Option<&UserFacingError>,
429 ) {
430 self.set_session_status(SessionStatus::Idle, "turn_failed")
431 .await;
432
433 self.emit_event(EventRequest::new(
434 self.session_id,
435 EventContext::turn(turn_id, input_message_id),
436 {
437 let mut data = TurnFailedData {
438 turn_id,
439 error: error.to_string(),
440 error_code: None,
441 error_fields: None,
442 };
443 if let Some(user_error) = user_error {
444 user_error.apply_to_event_fields(&mut data.error_code, &mut data.error_fields);
445 }
446 data
447 },
448 ))
449 .await;
450
451 self.emit_event(EventRequest::new(
452 self.session_id,
453 EventContext::turn(turn_id, input_message_id),
454 SessionIdledData {
455 turn_id,
456 iterations: None,
457 usage: None,
458 },
459 ))
460 .await;
461 }
462
463 pub async fn waiting_for_tool_results(&self) {
464 self.set_session_status(
465 SessionStatus::WaitingForToolResults,
466 "waiting_for_tool_results",
467 )
468 .await;
469 }
470
471 pub async fn dependency_blocked(
472 &self,
473 turn_id: TurnId,
474 input_message_id: MessageId,
475 blocker: DependencyBlocker,
476 ) {
477 let user_error = UserFacingError::new(blocker.error_code())
478 .with_field(
479 "dependency",
480 match blocker {
481 DependencyBlocker::HarnessArchived | DependencyBlocker::HarnessDeleted => {
482 "harness"
483 }
484 DependencyBlocker::AgentArchived | DependencyBlocker::AgentDeleted => "agent",
485 },
486 )
487 .with_field(
488 "state",
489 match blocker {
490 DependencyBlocker::HarnessArchived | DependencyBlocker::AgentArchived => {
491 "archived"
492 }
493 DependencyBlocker::HarnessDeleted | DependencyBlocker::AgentDeleted => {
494 "deleted"
495 }
496 },
497 );
498 let mut error_message = Message::assistant(blocker.message());
499 let mut metadata = std::collections::HashMap::new();
500 user_error.apply_to_message_metadata(&mut metadata);
501 error_message.metadata = Some(metadata);
502
503 self.emit_event(EventRequest::new(
504 self.session_id,
505 EventContext::turn(turn_id, input_message_id),
506 OutputMessageCompletedData::new(error_message).with_user_facing_error(&user_error),
507 ))
508 .await;
509
510 self.turn_failed(
511 turn_id,
512 input_message_id,
513 blocker.message(),
514 Some(&user_error),
515 )
516 .await;
517 }
518}
519
520pub async fn detect_dependency_blocker<A: RuntimeHostAdapter>(
521 adapter: &A,
522 org_id: i64,
523 harness_id: HarnessId,
524 agent_id: Option<AgentId>,
525) -> everruns_core::error::Result<Option<DependencyBlocker>> {
526 let harness_store = adapter.harness_store(org_id);
527 let agent_store = adapter.agent_store(org_id);
528 everruns_core::detect_dependency_blocker(
529 harness_store.as_ref(),
530 agent_store.as_ref(),
531 harness_id,
532 agent_id,
533 )
534 .await
535}
536
537pub async fn execute_input_activity<A: RuntimeHostAdapter>(
538 adapter: &A,
539 org_id: i64,
540 input: InputAtomInput,
541) -> everruns_core::error::Result<InputAtomResult> {
542 RuntimeSessionLifecycle::new(adapter.clone(), org_id, input.context.session_id)
543 .turn_started(input.context.turn_id, input.context.input_message_id)
544 .await;
545
546 let atom = InputAtom::new(adapter.message_store());
547 atom.execute(input).await
548}
549
550pub async fn execute_reason_activity<A: RuntimeHostAdapter>(
551 adapter: &A,
552 org_id: i64,
553 input: ReasonInput,
554) -> everruns_core::error::Result<ReasonResult> {
555 if let Some(blocker) =
556 detect_dependency_blocker(adapter, org_id, input.harness_id, input.agent_id).await?
557 {
558 RuntimeSessionLifecycle::new(adapter.clone(), org_id, input.context.session_id)
559 .dependency_blocked(
560 input.context.turn_id,
561 input.context.input_message_id,
562 blocker,
563 )
564 .await;
565 return Ok(ReasonResult {
566 success: false,
567 text: blocker.message().to_string(),
568 tool_calls: vec![],
569 has_tool_calls: false,
570 tool_definitions: vec![],
571 max_iterations: everruns_core::runtime_agent::default_max_iterations(),
572 error: Some("dependency_unavailable".to_string()),
573 usage: None,
574 output_message_id: None,
575 time_to_first_token_ms: None,
576 response_id: None,
577 locale: None,
578 network_access: None,
579 });
580 }
581
582 let turn_context = adapter
583 .load_turn_context(org_id, input.context.session_id)
584 .await?;
585
586 let mut atom = ReasonAtom::new(
587 adapter.harness_store(org_id),
588 adapter.agent_store(org_id),
589 adapter.session_store(org_id),
590 adapter.message_store(),
591 adapter.provider_store(org_id),
592 adapter.capability_registry(),
593 adapter.driver_registry(),
594 adapter.event_emitter(),
595 )
596 .with_file_store(adapter.file_store());
597 if let Some(image_resolver) = adapter.image_resolver(org_id) {
598 atom = atom.with_image_resolver(image_resolver);
599 }
600
601 atom.execute(ReasonInput {
602 mcp_tool_definitions: turn_context.mcp_tool_definitions,
603 ..input
604 })
605 .await
606}
607
608pub async fn execute_act_activity<A: RuntimeHostAdapter>(
609 adapter: &A,
610 input: ActInput,
611) -> everruns_core::error::Result<ActResult> {
612 let org_id = input.org_id.ok_or_else(|| {
613 everruns_core::error::AgentLoopError::config(
614 "ActInput.org_id must be set for runtime host execution",
615 )
616 })?;
617
618 if let Some(blocker) =
619 detect_dependency_blocker(adapter, org_id, input.harness_id, input.agent_id).await?
620 {
621 RuntimeSessionLifecycle::new(adapter.clone(), org_id, input.context.session_id)
622 .dependency_blocked(
623 input.context.turn_id,
624 input.context.input_message_id,
625 blocker,
626 )
627 .await;
628 return Ok(ActResult {
629 results: vec![],
630 completed: true,
631 success_count: 0,
632 error_count: 1,
633 waiting_for_tool_results: false,
634 blocked: true,
635 client_tool_calls: vec![],
636 client_tool_definitions: vec![],
637 });
638 }
639
640 let execution_capabilities = load_execution_capabilities(
641 adapter,
642 org_id,
643 input.context.session_id,
644 input.harness_id,
645 input.agent_id,
646 input.locale.clone(),
647 input.blueprint_id.as_deref(),
648 )
649 .await?;
650 let tool_registry = execution_capabilities.tool_registry;
651 let builtin_tool_registry = Arc::new(tool_registry.clone());
652
653 let mut atom =
654 ActAtom::with_file_store(tool_registry, adapter.event_emitter(), adapter.file_store())
655 .with_session_store(adapter.session_store(org_id))
656 .with_session_mutator(adapter.session_mutator(org_id))
657 .with_agent_store(adapter.agent_store(org_id))
658 .with_tool_registry(builtin_tool_registry)
659 .with_org_id(
660 org_public_id_from_internal(org_id)
661 .parse()
662 .expect("internal org id converts to valid public org id"),
663 )
664 .with_capability_registry(adapter.capability_registry())
665 .with_post_tool_hooks(execution_capabilities.post_tool_hooks)
666 .with_tool_call_hooks(execution_capabilities.tool_call_hooks);
667
668 if let Some(storage_store) = adapter.storage_store() {
669 atom = atom.with_storage_store(storage_store);
670 }
671 if let Some(image_store) = adapter.image_artifact_store(org_id) {
672 atom = atom.with_image_store(image_store);
673 }
674 if let Some(provider_credential_store) = adapter.provider_credential_store(org_id) {
675 atom = atom.with_provider_credential_store(provider_credential_store);
676 }
677 if let Some(utility_llm_service) = adapter.utility_llm_service() {
678 atom = atom.with_utility_llm_service(utility_llm_service);
679 }
680 if let Some(memory_store) = adapter.memory_store(org_id) {
681 atom = atom.with_memory_store(memory_store);
682 }
683 if let Some(connection_resolver) = adapter.connection_resolver() {
684 atom = atom.with_connection_resolver(connection_resolver);
685 }
686 if let Some(sqldb_store) = adapter.sqldb_store() {
687 atom = atom.with_sqldb_store(sqldb_store);
688 }
689 if let Some(leased_resource_store) = adapter.leased_resource_store() {
690 atom = atom.with_leased_resource_store(leased_resource_store);
691 }
692 if let Some(registry) = adapter.session_resource_registry() {
693 atom = atom.with_session_resource_registry(registry);
694 }
695 if let Some(schedule_store) = adapter.schedule_store(org_id) {
696 atom = atom.with_schedule_store(schedule_store);
697 }
698 if let Some(platform_store) = adapter.platform_store(org_id, input.context.session_id) {
699 atom = atom.with_platform_store(platform_store);
700 }
701 if let Some(budget_checker) = adapter.budget_checker(org_id, input.agent_id) {
702 atom = atom.with_budget_checker(budget_checker);
703 }
704 if let Some(payment_authority) = adapter.payment_authority(org_id, input.agent_id) {
705 atom = atom.with_payment_authority(payment_authority);
706 }
707
708 atom.execute(input).await
709}