1use async_trait::async_trait;
6use everruns_core::atoms::{
7 ActAtom, ActInput, ActResult, Atom, InputAtom, InputAtomInput, InputAtomResult, ReasonAtom,
8 ReasonInput, ReasonResult,
9};
10use everruns_core::capabilities::{SystemPromptContext, collect_capabilities_with_configs};
11use everruns_core::events::{
12 EventContext, EventRequest, OutputMessageCompletedData, SessionActivatedData, SessionIdledData,
13 TurnCompletedData, TurnFailedData, TurnStartedData,
14};
15use everruns_core::message::Message;
16use everruns_core::message_retriever::MessageRetriever;
17use everruns_core::platform_store::PlatformStore;
18use everruns_core::session::SessionStatus;
19use everruns_core::traits::{
20 AgentStore, BudgetChecker, EventEmitter, HarnessStore, ImageArtifactStore, ImageResolver,
21 LeasedResourceStore, LlmProviderStore, ModelWithProvider, PaymentAuthority,
22 ProviderCredentialStore, SessionFileSystem, SessionMutator, SessionResourceRegistry,
23 SessionScheduleStore, SessionSqlDbStoreRef, SessionStorageStore, SessionStore,
24 UserConnectionResolver,
25};
26use everruns_core::typed_id::{AgentId, HarnessId, MessageId, SessionId, TurnId};
27use everruns_core::{
28 Agent, CapabilityRegistry, DependencyBlocker, DriverRegistry, Harness, Session, TokenUsage,
29 ToolDefinition, ToolRegistry, UserFacingError, org_public_id_from_internal,
30 resolve_runtime_capabilities,
31};
32use std::sync::Arc;
33use tracing::warn;
34
35#[derive(Debug, Clone)]
37pub struct RuntimeHostTurnContext {
38 pub agent: Option<Agent>,
39 pub session: Session,
40 pub messages: Vec<Message>,
41 pub model: Option<ModelWithProvider>,
42 pub mcp_tool_definitions: Vec<ToolDefinition>,
43}
44
45#[async_trait]
56pub trait RuntimeHostAdapter: Send + Sync + Clone + 'static {
57 async fn get_agent(
58 &self,
59 org_id: i64,
60 agent_id: AgentId,
61 ) -> everruns_core::error::Result<Option<Agent>>;
62
63 async fn get_harness(
64 &self,
65 org_id: i64,
66 harness_id: HarnessId,
67 ) -> everruns_core::error::Result<Option<Harness>>;
68
69 async fn set_session_status(
70 &self,
71 org_id: i64,
72 session_id: SessionId,
73 status: SessionStatus,
74 ) -> everruns_core::error::Result<Session>;
75
76 async fn load_turn_context(
77 &self,
78 org_id: i64,
79 session_id: SessionId,
80 ) -> everruns_core::error::Result<RuntimeHostTurnContext>;
81
82 fn capability_registry(&self) -> CapabilityRegistry;
83
84 fn driver_registry(&self) -> DriverRegistry;
85
86 fn harness_store(&self, org_id: i64) -> Arc<dyn HarnessStore>;
87
88 fn agent_store(&self, org_id: i64) -> Arc<dyn AgentStore>;
89
90 fn session_store(&self, org_id: i64) -> Arc<dyn SessionStore>;
91
92 fn session_mutator(&self, org_id: i64) -> Arc<dyn SessionMutator>;
93
94 fn provider_store(&self, org_id: i64) -> Arc<dyn LlmProviderStore>;
95
96 fn message_store(&self) -> Arc<dyn MessageRetriever>;
97
98 fn event_emitter(&self) -> Arc<dyn EventEmitter>;
99
100 fn file_store(&self) -> Arc<dyn SessionFileSystem>;
101
102 fn image_resolver(&self, _org_id: i64) -> Option<Arc<dyn ImageResolver>> {
103 None
104 }
105
106 fn image_artifact_store(&self, _org_id: i64) -> Option<Arc<dyn ImageArtifactStore>> {
107 None
108 }
109
110 fn provider_credential_store(&self, _org_id: i64) -> Option<Arc<dyn ProviderCredentialStore>> {
111 None
112 }
113
114 fn storage_store(&self) -> Option<Arc<dyn SessionStorageStore>> {
115 None
116 }
117
118 fn memory_store(&self, _org_id: i64) -> Option<Arc<dyn everruns_core::MemoryStoreBackend>> {
119 None
120 }
121
122 fn connection_resolver(&self) -> Option<Arc<dyn UserConnectionResolver>> {
123 None
124 }
125
126 fn sqldb_store(&self) -> Option<SessionSqlDbStoreRef> {
127 None
128 }
129
130 fn leased_resource_store(&self) -> Option<Arc<dyn LeasedResourceStore>> {
131 None
132 }
133
134 fn session_resource_registry(&self) -> Option<Arc<dyn SessionResourceRegistry>> {
135 None
136 }
137
138 fn schedule_store(&self, _org_id: i64) -> Option<Arc<dyn SessionScheduleStore>> {
139 None
140 }
141
142 fn platform_store(
143 &self,
144 _org_id: i64,
145 _session_id: SessionId,
146 ) -> Option<Arc<dyn PlatformStore>> {
147 None
148 }
149
150 fn budget_checker(
151 &self,
152 _org_id: i64,
153 _agent_id: Option<AgentId>,
154 ) -> Option<Arc<dyn BudgetChecker>> {
155 None
156 }
157
158 fn payment_authority(
159 &self,
160 _org_id: i64,
161 _agent_id: Option<AgentId>,
162 ) -> Option<Arc<dyn PaymentAuthority>> {
163 None
164 }
165}
166
167struct RuntimeExecutionCapabilities {
168 tool_registry: ToolRegistry,
169 post_tool_hooks: Vec<Arc<dyn everruns_core::PostToolExecHook>>,
170 tool_call_hooks: Vec<Arc<dyn everruns_core::ToolCallHook>>,
171}
172
173async fn load_execution_capabilities<A: RuntimeHostAdapter>(
174 adapter: &A,
175 org_id: i64,
176 session_id: SessionId,
177 harness_id: HarnessId,
178 agent_id: Option<AgentId>,
179 locale: Option<String>,
180 blueprint_id: Option<&str>,
181) -> everruns_core::error::Result<RuntimeExecutionCapabilities> {
182 let capability_registry = adapter.capability_registry();
183 if let Some(blueprint_id) = blueprint_id {
184 let mut registry = ToolRegistry::with_defaults();
185 let blueprint = capability_registry.blueprint(blueprint_id).ok_or_else(|| {
186 everruns_core::error::AgentLoopError::config(format!(
187 "Blueprint \"{blueprint_id}\" not found in registry"
188 ))
189 })?;
190 for tool in blueprint.tools {
191 registry.register_boxed(tool);
192 }
193 return Ok(RuntimeExecutionCapabilities {
194 tool_registry: registry,
195 post_tool_hooks: Vec::new(),
196 tool_call_hooks: Vec::new(),
197 });
198 }
199
200 let harness_chain = adapter
201 .harness_store(org_id)
202 .get_harness_chain(harness_id)
203 .await?;
204 if harness_chain.is_empty() {
205 return Err(everruns_core::error::AgentLoopError::harness_not_found(
206 harness_id,
207 ));
208 }
209
210 let session = adapter
211 .session_store(org_id)
212 .get_session(session_id)
213 .await?
214 .ok_or_else(|| everruns_core::error::AgentLoopError::session_not_found(session_id))?;
215
216 let agent_store = adapter.agent_store(org_id);
217 let agent = match agent_id {
218 Some(agent_id) => Some(
219 agent_store
220 .get_agent(agent_id)
221 .await?
222 .ok_or_else(|| everruns_core::error::AgentLoopError::agent_not_found(agent_id))?,
223 ),
224 None => None,
225 };
226
227 let resolved = resolve_runtime_capabilities(
228 &harness_chain,
229 agent.as_ref(),
230 &session,
231 &capability_registry,
232 );
233 let prompt_ctx = SystemPromptContext {
234 session_id,
235 locale: locale.or(session.locale.clone()),
236 file_store: Some(adapter.file_store()),
237 };
238 let collected = collect_capabilities_with_configs(
239 &resolved.resolved_capability_configs,
240 &capability_registry,
241 &prompt_ctx,
242 )
243 .await;
244
245 let mut registry = ToolRegistry::with_defaults();
246 for tool in collected.tools {
247 registry.register_boxed(tool);
248 }
249
250 let post_tool_hooks = resolved
251 .resolved_capability_configs
252 .iter()
253 .flat_map(|config| {
254 capability_registry
255 .get(config.capability_id())
256 .map(|capability| capability.post_tool_exec_hooks())
257 .unwrap_or_default()
258 })
259 .collect();
260 let tool_call_hooks = resolved
261 .resolved_capability_configs
262 .iter()
263 .flat_map(|config| {
264 capability_registry
265 .get(config.capability_id())
266 .map(|capability| capability.tool_call_hooks())
267 .unwrap_or_default()
268 })
269 .collect();
270
271 Ok(RuntimeExecutionCapabilities {
272 tool_registry: registry,
273 post_tool_hooks,
274 tool_call_hooks,
275 })
276}
277
278pub struct RuntimeSessionLifecycle<A: RuntimeHostAdapter> {
280 adapter: A,
281 org_id: i64,
282 session_id: SessionId,
283}
284
285impl<A: RuntimeHostAdapter> RuntimeSessionLifecycle<A> {
286 pub fn new(adapter: A, org_id: i64, session_id: SessionId) -> Self {
287 Self {
288 adapter,
289 org_id,
290 session_id,
291 }
292 }
293
294 async fn set_session_status(&self, status: SessionStatus, action: &'static str) {
295 if let Err(error) = self
296 .adapter
297 .set_session_status(self.org_id, self.session_id, status)
298 .await
299 {
300 warn!(
301 session_id = %self.session_id,
302 org_id = self.org_id,
303 action,
304 %error,
305 "runtime host lifecycle status update failed"
306 );
307 }
308 }
309
310 async fn emit_event(&self, request: EventRequest) {
311 let event_type = request.event_type.clone();
312 if let Err(error) = self.adapter.event_emitter().emit(request).await {
313 warn!(
314 session_id = %self.session_id,
315 org_id = self.org_id,
316 event_type,
317 %error,
318 "runtime host lifecycle event emission failed"
319 );
320 }
321 }
322
323 pub async fn turn_started(&self, turn_id: TurnId, input_message_id: MessageId) {
324 let input_content = self
325 .adapter
326 .message_store()
327 .get(self.session_id, input_message_id)
328 .await
329 .ok()
330 .flatten()
331 .map(|message| message.content_to_llm_string());
332
333 self.set_session_status(SessionStatus::Active, "turn_started")
334 .await;
335
336 self.emit_event(EventRequest::new(
337 self.session_id,
338 EventContext::turn(turn_id, input_message_id),
339 SessionActivatedData {
340 turn_id,
341 input_message_id,
342 },
343 ))
344 .await;
345
346 self.emit_event(EventRequest::new(
347 self.session_id,
348 EventContext::turn(turn_id, input_message_id),
349 TurnStartedData {
350 turn_id,
351 input_message_id,
352 input_content,
353 },
354 ))
355 .await;
356 }
357
358 pub async fn emit_turn_completed(
359 &self,
360 turn_id: TurnId,
361 input_message_id: MessageId,
362 iterations: u32,
363 usage: Option<TokenUsage>,
364 input_content: Option<String>,
365 ) {
366 self.emit_event(EventRequest::new(
367 self.session_id,
368 EventContext::turn(turn_id, input_message_id),
369 TurnCompletedData {
370 turn_id,
371 iterations,
372 duration_ms: None,
373 usage,
374 input_content,
375 },
376 ))
377 .await;
378 }
379
380 pub async fn emit_session_idled(
381 &self,
382 turn_id: TurnId,
383 input_message_id: MessageId,
384 iterations: Option<u32>,
385 usage: Option<TokenUsage>,
386 ) {
387 self.set_session_status(SessionStatus::Idle, "emit_session_idled")
388 .await;
389
390 self.emit_event(EventRequest::new(
391 self.session_id,
392 EventContext::turn(turn_id, input_message_id),
393 SessionIdledData {
394 turn_id,
395 iterations,
396 usage,
397 },
398 ))
399 .await;
400 }
401
402 pub async fn turn_completed(
403 &self,
404 turn_id: TurnId,
405 input_message_id: MessageId,
406 iterations: u32,
407 usage: Option<TokenUsage>,
408 input_content: Option<String>,
409 ) {
410 self.emit_turn_completed(
411 turn_id,
412 input_message_id,
413 iterations,
414 usage.clone(),
415 input_content,
416 )
417 .await;
418 self.emit_session_idled(turn_id, input_message_id, Some(iterations), usage)
419 .await;
420 }
421
422 pub async fn turn_failed(
423 &self,
424 turn_id: TurnId,
425 input_message_id: MessageId,
426 error: &str,
427 user_error: Option<&UserFacingError>,
428 ) {
429 self.set_session_status(SessionStatus::Idle, "turn_failed")
430 .await;
431
432 self.emit_event(EventRequest::new(
433 self.session_id,
434 EventContext::turn(turn_id, input_message_id),
435 {
436 let mut data = TurnFailedData {
437 turn_id,
438 error: error.to_string(),
439 error_code: None,
440 error_fields: None,
441 };
442 if let Some(user_error) = user_error {
443 user_error.apply_to_event_fields(&mut data.error_code, &mut data.error_fields);
444 }
445 data
446 },
447 ))
448 .await;
449
450 self.emit_event(EventRequest::new(
451 self.session_id,
452 EventContext::turn(turn_id, input_message_id),
453 SessionIdledData {
454 turn_id,
455 iterations: None,
456 usage: None,
457 },
458 ))
459 .await;
460 }
461
462 pub async fn waiting_for_tool_results(&self) {
463 self.set_session_status(
464 SessionStatus::WaitingForToolResults,
465 "waiting_for_tool_results",
466 )
467 .await;
468 }
469
470 pub async fn dependency_blocked(
471 &self,
472 turn_id: TurnId,
473 input_message_id: MessageId,
474 blocker: DependencyBlocker,
475 ) {
476 let user_error = UserFacingError::new(blocker.error_code())
477 .with_field(
478 "dependency",
479 match blocker {
480 DependencyBlocker::HarnessArchived | DependencyBlocker::HarnessDeleted => {
481 "harness"
482 }
483 DependencyBlocker::AgentArchived | DependencyBlocker::AgentDeleted => "agent",
484 },
485 )
486 .with_field(
487 "state",
488 match blocker {
489 DependencyBlocker::HarnessArchived | DependencyBlocker::AgentArchived => {
490 "archived"
491 }
492 DependencyBlocker::HarnessDeleted | DependencyBlocker::AgentDeleted => {
493 "deleted"
494 }
495 },
496 );
497 let mut error_message = Message::assistant(blocker.message());
498 let mut metadata = std::collections::HashMap::new();
499 user_error.apply_to_message_metadata(&mut metadata);
500 error_message.metadata = Some(metadata);
501
502 self.emit_event(EventRequest::new(
503 self.session_id,
504 EventContext::turn(turn_id, input_message_id),
505 OutputMessageCompletedData::new(error_message).with_user_facing_error(&user_error),
506 ))
507 .await;
508
509 self.turn_failed(
510 turn_id,
511 input_message_id,
512 blocker.message(),
513 Some(&user_error),
514 )
515 .await;
516 }
517}
518
519pub async fn detect_dependency_blocker<A: RuntimeHostAdapter>(
520 adapter: &A,
521 org_id: i64,
522 harness_id: HarnessId,
523 agent_id: Option<AgentId>,
524) -> everruns_core::error::Result<Option<DependencyBlocker>> {
525 let harness_store = adapter.harness_store(org_id);
526 let agent_store = adapter.agent_store(org_id);
527 everruns_core::detect_dependency_blocker(
528 harness_store.as_ref(),
529 agent_store.as_ref(),
530 harness_id,
531 agent_id,
532 )
533 .await
534}
535
536pub async fn execute_input_activity<A: RuntimeHostAdapter>(
537 adapter: &A,
538 org_id: i64,
539 input: InputAtomInput,
540) -> everruns_core::error::Result<InputAtomResult> {
541 RuntimeSessionLifecycle::new(adapter.clone(), org_id, input.context.session_id)
542 .turn_started(input.context.turn_id, input.context.input_message_id)
543 .await;
544
545 let atom = InputAtom::new(adapter.message_store());
546 atom.execute(input).await
547}
548
549pub async fn execute_reason_activity<A: RuntimeHostAdapter>(
550 adapter: &A,
551 org_id: i64,
552 input: ReasonInput,
553) -> everruns_core::error::Result<ReasonResult> {
554 if let Some(blocker) =
555 detect_dependency_blocker(adapter, org_id, input.harness_id, input.agent_id).await?
556 {
557 RuntimeSessionLifecycle::new(adapter.clone(), org_id, input.context.session_id)
558 .dependency_blocked(
559 input.context.turn_id,
560 input.context.input_message_id,
561 blocker,
562 )
563 .await;
564 return Ok(ReasonResult {
565 success: false,
566 text: blocker.message().to_string(),
567 tool_calls: vec![],
568 has_tool_calls: false,
569 tool_definitions: vec![],
570 max_iterations: everruns_core::runtime_agent::default_max_iterations(),
571 error: Some("dependency_unavailable".to_string()),
572 usage: None,
573 response_id: None,
574 locale: None,
575 network_access: None,
576 });
577 }
578
579 let turn_context = adapter
580 .load_turn_context(org_id, input.context.session_id)
581 .await?;
582
583 let mut atom = ReasonAtom::new(
584 adapter.harness_store(org_id),
585 adapter.agent_store(org_id),
586 adapter.session_store(org_id),
587 adapter.message_store(),
588 adapter.provider_store(org_id),
589 adapter.capability_registry(),
590 adapter.driver_registry(),
591 adapter.event_emitter(),
592 )
593 .with_file_store(adapter.file_store());
594 if let Some(image_resolver) = adapter.image_resolver(org_id) {
595 atom = atom.with_image_resolver(image_resolver);
596 }
597
598 atom.execute(ReasonInput {
599 mcp_tool_definitions: turn_context.mcp_tool_definitions,
600 ..input
601 })
602 .await
603}
604
605pub async fn execute_act_activity<A: RuntimeHostAdapter>(
606 adapter: &A,
607 input: ActInput,
608) -> everruns_core::error::Result<ActResult> {
609 let org_id = input.org_id.ok_or_else(|| {
610 everruns_core::error::AgentLoopError::config(
611 "ActInput.org_id must be set for runtime host execution",
612 )
613 })?;
614
615 if let Some(blocker) =
616 detect_dependency_blocker(adapter, org_id, input.harness_id, input.agent_id).await?
617 {
618 RuntimeSessionLifecycle::new(adapter.clone(), org_id, input.context.session_id)
619 .dependency_blocked(
620 input.context.turn_id,
621 input.context.input_message_id,
622 blocker,
623 )
624 .await;
625 return Ok(ActResult {
626 results: vec![],
627 completed: true,
628 success_count: 0,
629 error_count: 1,
630 waiting_for_tool_results: false,
631 blocked: true,
632 client_tool_calls: vec![],
633 client_tool_definitions: vec![],
634 });
635 }
636
637 let execution_capabilities = load_execution_capabilities(
638 adapter,
639 org_id,
640 input.context.session_id,
641 input.harness_id,
642 input.agent_id,
643 input.locale.clone(),
644 input.blueprint_id.as_deref(),
645 )
646 .await?;
647 let tool_registry = execution_capabilities.tool_registry;
648 let builtin_tool_registry = Arc::new(tool_registry.clone());
649
650 let mut atom =
651 ActAtom::with_file_store(tool_registry, adapter.event_emitter(), adapter.file_store())
652 .with_session_store(adapter.session_store(org_id))
653 .with_session_mutator(adapter.session_mutator(org_id))
654 .with_agent_store(adapter.agent_store(org_id))
655 .with_tool_registry(builtin_tool_registry)
656 .with_org_id(
657 org_public_id_from_internal(org_id)
658 .parse()
659 .expect("internal org id converts to valid public org id"),
660 )
661 .with_capability_registry(adapter.capability_registry())
662 .with_post_tool_hooks(execution_capabilities.post_tool_hooks)
663 .with_tool_call_hooks(execution_capabilities.tool_call_hooks);
664
665 if let Some(storage_store) = adapter.storage_store() {
666 atom = atom.with_storage_store(storage_store);
667 }
668 if let Some(image_store) = adapter.image_artifact_store(org_id) {
669 atom = atom.with_image_store(image_store);
670 }
671 if let Some(provider_credential_store) = adapter.provider_credential_store(org_id) {
672 atom = atom.with_provider_credential_store(provider_credential_store);
673 }
674 if let Some(memory_store) = adapter.memory_store(org_id) {
675 atom = atom.with_memory_store(memory_store);
676 }
677 if let Some(connection_resolver) = adapter.connection_resolver() {
678 atom = atom.with_connection_resolver(connection_resolver);
679 }
680 if let Some(sqldb_store) = adapter.sqldb_store() {
681 atom = atom.with_sqldb_store(sqldb_store);
682 }
683 if let Some(leased_resource_store) = adapter.leased_resource_store() {
684 atom = atom.with_leased_resource_store(leased_resource_store);
685 }
686 if let Some(registry) = adapter.session_resource_registry() {
687 atom = atom.with_session_resource_registry(registry);
688 }
689 if let Some(schedule_store) = adapter.schedule_store(org_id) {
690 atom = atom.with_schedule_store(schedule_store);
691 }
692 if let Some(platform_store) = adapter.platform_store(org_id, input.context.session_id) {
693 atom = atom.with_platform_store(platform_store);
694 }
695 if let Some(budget_checker) = adapter.budget_checker(org_id, input.agent_id) {
696 atom = atom.with_budget_checker(budget_checker);
697 }
698 if let Some(payment_authority) = adapter.payment_authority(org_id, input.agent_id) {
699 atom = atom.with_payment_authority(payment_authority);
700 }
701
702 atom.execute(input).await
703}