1use crate::backends::{
6 EventBus, RuntimeAgentStore, RuntimeBackends, RuntimeHarnessStore, RuntimeMessageStore,
7 RuntimeProviderStore, RuntimeSessionStore,
8};
9use crate::builders::SingleSessionBuilder;
10use crate::host::{
11 RuntimeHostAdapter, RuntimeHostTurnContext, RuntimeSessionLifecycle, execute_act_activity,
12 execute_input_activity, execute_reason_activity,
13};
14use crate::in_memory::{InMemorySessionFileStore, InMemorySessionFileSystemFactory};
15use async_trait::async_trait;
16use everruns_core::agent::Agent;
17use everruns_core::atoms::{ActInput, AtomContext, InputAtomInput, ReasonInput};
18use everruns_core::capabilities::{Capability, CapabilityRegistry};
19use everruns_core::config_layer::AgentConfigOverlay;
20use everruns_core::driver_registry::{DriverId, DriverRegistry};
21use everruns_core::error::{AgentLoopError, Result};
22use everruns_core::events::{
23 Event, EventContext, EventData, EventRequest, InputMessageData, OutputMessageCompletedData,
24 ToolCompletedData,
25};
26use everruns_core::harness::Harness;
27use everruns_core::llmsim_driver::{LlmSimConfig, LlmSimDriver};
28use everruns_core::message::{ContentPart, Message};
29use everruns_core::platform_definition::PlatformDefinition;
30use everruns_core::plugins::{PluginFileSet, compile_plugin};
31use everruns_core::runtime_context::{AssembledTurnContext, inspect_turn_context};
32use everruns_core::session::{Session, SessionStatus};
33use everruns_core::session_file::{InitialFile, SessionFile};
34use everruns_core::tools::ToolResultImage;
35use everruns_core::traits::{
36 AgentStore, EventEmitter, HarnessStore, ProviderStore, ResolvedModel, SessionMutator,
37 SessionStorageStore, SessionStore, UserConnectionResolver,
38};
39use everruns_core::turn::{TurnAction, TurnContext, TurnOutcome, TurnStateMachine};
40use everruns_core::typed_id::{AgentId, HarnessId, OrgId, SessionId};
41use everruns_core::{
42 AgentCapabilityConfig, InputMessage, MessageRetriever, SessionFileSystem,
43 SessionFileSystemFactoryContext, plugin_capability_id,
44};
45use sha2::{Digest, Sha256};
46use std::path::Path;
47use std::sync::Arc;
48
49const HASH_INPUT_CAP_BYTES: usize = 128;
55
56pub fn in_process_internal_org_id(public_org_id: &str) -> i64 {
69 if public_org_id == everruns_core::DEFAULT_ORG_PUBLIC_ID {
70 return everruns_core::DEFAULT_ORG_ID;
71 }
72
73 let Ok(parsed) = public_org_id.parse::<OrgId>() else {
74 return hash_public_org_id(public_org_id);
75 };
76 let raw: u128 = parsed.uuid().as_u128();
77 if raw == 0 {
78 return hash_public_org_id(public_org_id);
79 }
80
81 if raw <= i64::MAX as u128 {
84 return raw as i64;
85 }
86
87 hash_public_org_id(public_org_id)
88}
89
90fn hash_public_org_id(public_org_id: &str) -> i64 {
95 let bytes = public_org_id.as_bytes();
96 let bounded = &bytes[..bytes.len().min(HASH_INPUT_CAP_BYTES)];
97 let digest = Sha256::digest(bounded);
98 let mut buf = [0u8; 8];
99 buf.copy_from_slice(&digest[..8]);
100 let raw = u64::from_be_bytes(buf);
101 ((raw % ((i64::MAX - 1) as u64)) as i64) + 2
102}
103
104#[derive(Debug, Clone)]
105pub struct TurnResult {
106 pub response: String,
108 pub iterations: usize,
110 pub tool_calls_count: usize,
112 pub success: bool,
114 pub error: Option<String>,
116 pub turn_id: everruns_core::typed_id::TurnId,
118}
119
120impl TurnResult {
121 fn from_outcome(outcome: TurnOutcome, turn_id: everruns_core::typed_id::TurnId) -> Self {
122 match outcome {
123 TurnOutcome::Success {
124 response,
125 iterations,
126 tool_calls_count,
127 } => Self {
128 response,
129 iterations,
130 tool_calls_count,
131 success: true,
132 error: None,
133 turn_id,
134 },
135 TurnOutcome::Failed { error, iterations } => Self {
136 response: String::new(),
137 iterations,
138 tool_calls_count: 0,
139 success: false,
140 error: Some(error),
141 turn_id,
142 },
143 TurnOutcome::MaxIterationsReached {
144 response,
145 iterations,
146 tool_calls_count,
147 } => Self {
148 response,
149 iterations,
150 tool_calls_count,
151 success: true,
152 error: None,
153 turn_id,
154 },
155 TurnOutcome::Sealed {
158 reason,
159 response,
160 iterations,
161 tool_calls_count,
162 } => Self {
163 response,
164 iterations,
165 tool_calls_count,
166 success: false,
167 error: Some(format!("turn sealed: {reason}")),
168 turn_id,
169 },
170 }
171 }
172}
173
174pub struct InProcessRuntimeBuilder {
184 platform_definition: PlatformDefinition,
185 llm_sim_config: Option<LlmSimConfig>,
186 default_model: Option<ResolvedModel>,
187 backends: Option<RuntimeBackends>,
188 session_file_system_factory_context: SessionFileSystemFactoryContext,
189 harnesses: Vec<Harness>,
190 agents: Vec<Agent>,
191 sessions: Vec<Session>,
192 default_session_id: Option<SessionId>,
193 seeded_files: Vec<(SessionId, InitialFile)>,
194 mcp_auth_provider: Option<Arc<dyn everruns_mcp::McpAuthProvider>>,
195 plugin_capability_configs: Vec<AgentCapabilityConfig>,
201 plugin_warnings: Vec<String>,
203}
204
205impl Default for InProcessRuntimeBuilder {
206 fn default() -> Self {
207 Self::new()
208 }
209}
210
211impl InProcessRuntimeBuilder {
212 pub fn new() -> Self {
219 Self {
220 platform_definition: PlatformDefinition::builder()
221 .capability_registry(CapabilityRegistry::with_builtins())
222 .driver_registry(DriverRegistry::new())
223 .session_file_system_factory(Arc::new(InMemorySessionFileSystemFactory))
224 .build(),
225 llm_sim_config: None,
226 default_model: None,
227 backends: None,
228 session_file_system_factory_context: SessionFileSystemFactoryContext::new(),
229 harnesses: Vec::new(),
230 agents: Vec::new(),
231 sessions: Vec::new(),
232 default_session_id: None,
233 seeded_files: Vec::new(),
234 mcp_auth_provider: None,
235 plugin_capability_configs: Vec::new(),
236 plugin_warnings: Vec::new(),
237 }
238 }
239
240 pub fn mcp_auth_provider(mut self, provider: Arc<dyn everruns_mcp::McpAuthProvider>) -> Self {
244 self.mcp_auth_provider = Some(provider);
245 self
246 }
247
248 pub fn platform_definition(mut self, platform_definition: PlatformDefinition) -> Self {
250 self.platform_definition = platform_definition;
251 self
252 }
253
254 pub fn capability<C: Capability + 'static>(mut self, capability: C) -> Self {
256 self.platform_definition
257 .capability_registry_mut()
258 .register(capability);
259 self
260 }
261
262 pub fn driver_registry(mut self, driver_registry: DriverRegistry) -> Self {
264 *self.platform_definition.driver_registry_mut() = driver_registry;
265 self
266 }
267
268 pub fn llm_sim(mut self, config: LlmSimConfig) -> Self {
270 self.llm_sim_config = Some(config);
271 self
272 }
273
274 pub fn default_model(mut self, model: ResolvedModel) -> Self {
276 self.default_model = Some(model);
277 self
278 }
279
280 pub fn backends(mut self, backends: RuntimeBackends) -> Self {
282 self.backends = Some(backends);
283 self
284 }
285
286 pub fn with_session_task_registry(
290 mut self,
291 registry: Arc<dyn everruns_core::session_task::SessionTaskRegistry>,
292 ) -> Self {
293 let backends = self
294 .backends
295 .take()
296 .unwrap_or_else(RuntimeBackends::in_memory);
297 self.backends = Some(backends.with_session_task_registry(registry));
298 self
299 }
300
301 pub fn with_schedule_store_factory(
303 mut self,
304 factory: crate::backends::ScheduleStoreFactory,
305 ) -> Self {
306 let backends = self
307 .backends
308 .take()
309 .unwrap_or_else(RuntimeBackends::in_memory);
310 self.backends = Some(backends.with_schedule_store_factory(factory));
311 self
312 }
313
314 pub fn with_platform_store_factory(
316 mut self,
317 factory: crate::backends::PlatformStoreFactory,
318 ) -> Self {
319 let backends = self
320 .backends
321 .take()
322 .unwrap_or_else(RuntimeBackends::in_memory);
323 self.backends = Some(backends.with_platform_store_factory(factory));
324 self
325 }
326
327 pub fn session_file_system_factory_context(
329 mut self,
330 context: SessionFileSystemFactoryContext,
331 ) -> Self {
332 self.session_file_system_factory_context = context;
333 self
334 }
335
336 pub fn harness(mut self, harness: Harness) -> Self {
338 self.harnesses.push(harness);
339 self
340 }
341
342 pub fn agent(mut self, agent: Agent) -> Self {
344 self.agents.push(agent);
345 self
346 }
347
348 pub fn session(mut self, session: Session) -> Self {
350 self.sessions.push(session);
351 self
352 }
353
354 pub fn single_session<F>(mut self, configure: F) -> Self
359 where
360 F: FnOnce(SingleSessionBuilder) -> SingleSessionBuilder,
361 {
362 let (harness, agent, session, session_id) =
363 configure(SingleSessionBuilder::default()).build();
364 self.harnesses.push(harness);
365 self.agents.push(agent);
366 self.sessions.push(session);
367 self.default_session_id = Some(session_id);
368 self
369 }
370
371 pub fn seed_text_file(
375 mut self,
376 session_id: SessionId,
377 path: impl Into<String>,
378 content: impl Into<String>,
379 ) -> Self {
380 self.seeded_files.push((
381 session_id,
382 InitialFile {
383 path: path.into(),
384 content: content.into(),
385 encoding: "text".to_string(),
386 is_readonly: false,
387 },
388 ));
389 self
390 }
391
392 pub fn with_plugin_dir(mut self, path: &Path) -> Result<Self> {
413 let file_set = PluginFileSet::from_dir(path)
414 .map_err(|e| AgentLoopError::config(format!("plugin directory load failed: {e}")))?;
415 let compiled = compile_plugin(&file_set)
416 .map_err(|e| AgentLoopError::config(format!("plugin compilation failed: {e}")))?;
417
418 for warning in &compiled.warnings {
419 tracing::warn!(plugin = %compiled.definition.name, warning = %warning, "plugin compile warning");
420 }
421 self.plugin_warnings.extend(compiled.warnings);
422
423 let cap_id = plugin_capability_id(&compiled.definition.name);
424 let hydrated_config = serde_json::to_value(&compiled.definition)
425 .unwrap_or(serde_json::Value::Object(serde_json::Map::new()));
426 self.plugin_capability_configs
427 .push(AgentCapabilityConfig::with_config(cap_id, hydrated_config));
428
429 Ok(self)
430 }
431
432 pub fn plugin_capability(&self, name: &str) -> Option<AgentCapabilityConfig> {
438 let cap_id = plugin_capability_id(name);
439 self.plugin_capability_configs
440 .iter()
441 .find(|c| c.capability_id() == cap_id)
442 .cloned()
443 }
444
445 pub async fn build(mut self) -> Result<InProcessRuntime> {
450 let backends = match self.backends.take() {
451 Some(backends) => backends,
452 None => RuntimeBackends::in_memory(),
453 };
454 let file_store = resolve_session_file_system(
455 &self.platform_definition,
456 self.session_file_system_factory_context.clone(),
457 )
458 .await?;
459
460 if let Some(config) = self.llm_sim_config.take() {
461 let driver = LlmSimDriver::new(config);
462 self.platform_definition
465 .driver_registry_mut()
466 .register_or_replace(DriverId::LlmSim, move |_config| Box::new(driver.clone()));
467
468 if self.default_model.is_none() {
469 self.default_model = Some(ResolvedModel {
470 model: "llmsim-model".to_string(),
471 provider_type: DriverId::LlmSim,
472 api_key: Some("fake-key".to_string()),
473 base_url: None,
474 provider_metadata: None,
475 });
476 }
477 }
478
479 let default_model = self.default_model.ok_or_else(|| {
480 AgentLoopError::config(
481 "in-process runtime requires a default model; call \
482 InProcessRuntimeBuilder::default_model(...) or \
483 InProcessRuntimeBuilder::llm_sim(...)",
484 )
485 })?;
486
487 backends
488 .provider_store
489 .set_default_model(default_model)
490 .await?;
491
492 for harness in &mut self.harnesses {
496 hydrate_plugin_refs(&mut harness.capabilities, &self.plugin_capability_configs);
497 }
498 for agent in &mut self.agents {
499 hydrate_plugin_refs(&mut agent.capabilities, &self.plugin_capability_configs);
500 }
501 for session in &mut self.sessions {
502 hydrate_plugin_refs(&mut session.capabilities, &self.plugin_capability_configs);
503 }
504
505 for harness in &self.harnesses {
506 backends.harness_store.add_harness(harness.clone()).await?;
507 }
508 for agent in &self.agents {
509 backends.agent_store.add_agent(agent.clone()).await?;
510 }
511 for session in &self.sessions {
512 backends.session_store.add_session(session.clone()).await?;
513 }
514
515 for session in &self.sessions {
516 seed_runtime_initial_files(
517 backends.harness_store.as_ref(),
518 backends.agent_store.as_ref(),
519 file_store.as_ref(),
520 session,
521 )
522 .await?;
523 }
524
525 for (session_id, file) in &self.seeded_files {
526 file_store.seed_initial_file(*session_id, file).await?;
527 }
528
529 let persisting_emitter =
530 PersistingEventEmitter::new(backends.event_bus.clone(), backends.message_store.clone());
531
532 Ok(InProcessRuntime {
533 platform_definition: Arc::new(self.platform_definition),
534 harness_store: backends.harness_store,
535 agent_store: backends.agent_store,
536 session_store: backends.session_store,
537 default_session_id: self.default_session_id,
538 message_store: backends.message_store,
539 provider_store: backends.provider_store,
540 event_bus: backends.event_bus,
541 persisting_emitter,
542 file_store,
543 storage_store: backends.storage_store,
544 connection_resolver: backends.connection_resolver,
545 session_task_registry: backends.session_task_registry,
546 schedule_store_factory: backends.schedule_store_factory,
547 platform_store_factory: backends.platform_store_factory,
548 mcp_auth_provider: self
549 .mcp_auth_provider
550 .unwrap_or_else(|| Arc::new(everruns_mcp::NoAuthProvider)),
551 mcp_discovery_cache: Arc::new(crate::mcp_cache::McpDiscoveryCache::new()),
552 plugin_warnings: self.plugin_warnings,
553 })
554 }
555}
556
557async fn resolve_session_file_system(
558 platform_definition: &PlatformDefinition,
559 file_system_factory_context: SessionFileSystemFactoryContext,
560) -> Result<Arc<dyn SessionFileSystem>> {
561 let file_system_factory = platform_definition.session_file_system_factory();
562 if file_system_factory.is_disabled() {
563 Ok(Arc::new(InMemorySessionFileStore::new()))
564 } else {
565 Ok(file_system_factory
566 .create_session_file_system(file_system_factory_context)
567 .await?)
568 }
569}
570
571#[derive(Clone)]
572pub struct InProcessRuntime {
578 platform_definition: Arc<PlatformDefinition>,
579 harness_store: Arc<dyn RuntimeHarnessStore>,
580 agent_store: Arc<dyn RuntimeAgentStore>,
581 session_store: Arc<dyn RuntimeSessionStore>,
582 default_session_id: Option<SessionId>,
583 message_store: Arc<dyn RuntimeMessageStore>,
584 provider_store: Arc<dyn RuntimeProviderStore>,
585 event_bus: Arc<dyn EventBus>,
586 persisting_emitter: PersistingEventEmitter,
587 file_store: Arc<dyn SessionFileSystem>,
588 storage_store: Arc<dyn SessionStorageStore>,
589 connection_resolver: Option<Arc<dyn UserConnectionResolver>>,
590 session_task_registry: Option<Arc<dyn everruns_core::session_task::SessionTaskRegistry>>,
591 schedule_store_factory: Option<crate::backends::ScheduleStoreFactory>,
592 platform_store_factory: Option<crate::backends::PlatformStoreFactory>,
593 mcp_auth_provider: Arc<dyn everruns_mcp::McpAuthProvider>,
594 mcp_discovery_cache: Arc<crate::mcp_cache::McpDiscoveryCache>,
595 plugin_warnings: Vec<String>,
598}
599
600impl InProcessRuntime {
601 fn mcp_client(&self) -> Arc<everruns_mcp::McpClient> {
604 Arc::new(everruns_mcp::McpClient::new(
605 self.platform_definition.egress_service(),
606 self.mcp_auth_provider.clone(),
607 ))
608 }
609
610 async fn session_mcp_servers(
612 &self,
613 session: &Session,
614 agent: Option<&Agent>,
615 ) -> everruns_core::ScopedMcpServers {
616 let harness_chain = self
617 .harness_store
618 .get_harness_chain(session.harness_id)
619 .await
620 .unwrap_or_default();
621 crate::mcp::merge_session_scoped_servers(&harness_chain, agent, session)
622 }
623 pub fn builder() -> InProcessRuntimeBuilder {
625 InProcessRuntimeBuilder::new()
626 }
627
628 pub fn default_session_id(&self) -> Option<SessionId> {
631 self.default_session_id
632 }
633
634 pub fn plugin_warnings(&self) -> &[String] {
639 &self.plugin_warnings
640 }
641
642 pub async fn run_turn(
648 &self,
649 session_id: SessionId,
650 input: impl Into<InputMessage>,
651 ) -> Result<TurnResult> {
652 let session = self
653 .session_store
654 .get_session(session_id)
655 .await?
656 .ok_or_else(|| AgentLoopError::store(format!("session not found: {session_id}")))?;
657
658 let input_message = self
663 .message_store
664 .add_input_message(session_id, input.into())
665 .await?;
666 self.event_bus
667 .emit(EventRequest::new(
668 session_id,
669 EventContext::empty(),
670 InputMessageData::new(input_message.clone()),
671 ))
672 .await?;
673
674 let assembled = self
675 .inspect_context_with_ids(session_id, session.harness_id, session.agent_id)
676 .await?;
677 let synthetic_agent_id = session
678 .agent_id
679 .unwrap_or_else(|| AgentId::from_uuid(session.id.uuid()));
680 let org_id = in_process_internal_org_id(&session.organization_id);
681 let mut state_machine = TurnStateMachine::new(
682 TurnContext::new(session_id, input_message.id, synthetic_agent_id, org_id),
683 assembled.runtime_agent.max_iterations,
684 );
685
686 let mut previous_response_id: Option<String> = None;
687 let mut last_reason_result: Option<everruns_core::ReasonResult> = None;
688
689 loop {
690 match state_machine.next_action() {
691 TurnAction::ExecuteInput => {
692 let ctx = state_machine.context();
693 let base_context =
694 AtomContext::new(ctx.session_id, ctx.turn_id, ctx.input_message_id)
695 .with_workspace_id(session.workspace_id);
696 execute_input_activity(
697 self,
698 org_id,
699 InputAtomInput {
700 context: base_context,
701 },
702 )
703 .await?;
704 state_machine.on_input_completed();
705 }
706 TurnAction::ExecuteReason => {
707 let ctx = state_machine.context();
708 let base_context =
709 AtomContext::new(ctx.session_id, ctx.turn_id, ctx.input_message_id)
710 .with_workspace_id(session.workspace_id);
711 let reason_result = execute_reason_activity(
712 self,
713 org_id,
714 ReasonInput {
715 context: base_context.next_exec(),
716 harness_id: session.harness_id,
717 agent_id: session.agent_id,
718 org_id,
719 mcp_tool_definitions: vec![],
720 previous_response_id: previous_response_id.take(),
721 iteration: state_machine.current_iteration() as u32 + 1,
722 },
723 )
724 .await?;
725 previous_response_id = reason_result.response_id.clone();
726 state_machine.on_reason_completed(
727 reason_result.text.clone(),
728 reason_result.has_tool_calls,
729 reason_result.tool_calls.len(),
730 reason_result.success,
731 reason_result.error.clone(),
732 false,
733 );
734 if reason_result.has_tool_calls {
735 last_reason_result = Some(reason_result);
736 }
737 }
738 TurnAction::ExecuteAct => {
739 let reason_result = last_reason_result
740 .take()
741 .expect("ExecuteAct requires a prior ReasonResult");
742 let ctx = state_machine.context();
743 let base_context =
744 AtomContext::new(ctx.session_id, ctx.turn_id, ctx.input_message_id)
745 .with_workspace_id(session.workspace_id);
746 execute_act_activity(
747 self,
748 ActInput {
749 org_id: Some(org_id),
750 context: base_context.next_exec(),
751 harness_id: session.harness_id,
752 agent_id: session.agent_id,
753 tool_calls: reason_result.tool_calls,
754 tool_definitions: reason_result.tool_definitions,
755 locale: reason_result.locale,
756 blueprint_id: None,
757 network_access: reason_result.network_access,
758 parallel_tool_calls: reason_result.parallel_tool_calls,
761 },
762 )
763 .await?;
764 state_machine.on_act_completed();
765 }
766 TurnAction::Complete(outcome) => {
767 let ctx = state_machine.context();
768 let lifecycle =
769 RuntimeSessionLifecycle::new(self.clone(), org_id, ctx.session_id);
770 let turn_succeeded = matches!(
771 &outcome,
772 TurnOutcome::Success { .. } | TurnOutcome::MaxIterationsReached { .. }
773 );
774 match &outcome {
775 TurnOutcome::Success { iterations, .. }
776 | TurnOutcome::MaxIterationsReached { iterations, .. } => {
777 lifecycle
778 .turn_completed(
779 ctx.turn_id,
780 ctx.input_message_id,
781 *iterations as u32,
782 None,
783 None,
784 )
785 .await;
786 }
787 TurnOutcome::Failed { error, .. } => {
788 lifecycle
789 .turn_failed(ctx.turn_id, ctx.input_message_id, error, None)
790 .await;
791 }
792 TurnOutcome::Sealed {
793 reason, iterations, ..
794 } => {
795 lifecycle
796 .turn_sealed(
797 ctx.turn_id,
798 ctx.input_message_id,
799 reason.as_str(),
800 *iterations as u32,
801 None,
802 )
803 .await;
804 }
805 }
806 lifecycle
809 .fire_turn_end_hooks(
810 session.harness_id,
811 session.agent_id,
812 ctx.turn_id,
813 turn_succeeded,
814 )
815 .await;
816 return Ok(TurnResult::from_outcome(outcome, ctx.turn_id));
817 }
818 }
819 }
820 }
821
822 pub async fn run_text_turn(
823 &self,
824 session_id: SessionId,
825 text: impl Into<String>,
826 ) -> Result<TurnResult> {
827 self.run_turn(session_id, InputMessage::user(text)).await
828 }
829
830 pub async fn messages(&self, session_id: SessionId) -> Result<Vec<Message>> {
832 self.message_store.load(session_id).await
833 }
834
835 pub async fn read_file(
837 &self,
838 session_id: SessionId,
839 path: &str,
840 ) -> Result<Option<SessionFile>> {
841 self.file_store.read_file(session_id, path).await
842 }
843
844 pub async fn load_context(&self, session_id: SessionId) -> Result<AssembledTurnContext> {
846 let session = self
847 .session_store
848 .get_session(session_id)
849 .await?
850 .ok_or_else(|| AgentLoopError::store(format!("session not found: {session_id}")))?;
851 self.inspect_context_with_ids(session_id, session.harness_id, session.agent_id)
852 .await
853 }
854
855 pub async fn events(&self) -> Result<Vec<Event>> {
860 Ok(self.event_bus.collected_events().await)
861 }
862
863 pub async fn execute_command(
873 &self,
874 session_id: SessionId,
875 request: everruns_core::command::ExecuteCommandRequest,
876 ) -> Result<everruns_core::command::CommandResult> {
877 let ctx = self.load_context(session_id).await?;
878 let registry = self.platform_definition.capability_registry();
879 let host = everruns_core::command_host::StoreCommandHost::new(
883 session_id,
884 self.harness_store.clone(),
885 self.agent_store.clone(),
886 self.session_store.clone(),
887 self.message_store.clone(),
888 self.provider_store.clone(),
889 registry.clone(),
890 self.platform_definition.driver_registry().clone(),
891 )
892 .with_file_store(self.file_store.clone())
893 .with_assembled_context(ctx.clone());
894 let exec_ctx =
895 everruns_core::command::CommandExecutionContext::new(session_id, Arc::new(host));
896 for config in &ctx.resolved_capability_configs {
897 let Some(capability) = registry.get(config.capability_id()) else {
898 continue;
899 };
900 if capability.commands().iter().any(|c| c.name == request.name) {
901 return capability.execute_command(&request, &exec_ctx).await;
902 }
903 }
904 Err(AgentLoopError::config(format!(
905 "no capability declares command /{}",
906 request.name
907 )))
908 }
909
910 pub async fn list_commands(
920 &self,
921 session_id: SessionId,
922 ) -> Result<Vec<everruns_core::command::CommandDescriptor>> {
923 let ctx = self.load_context(session_id).await?;
924 let registry = self.platform_definition.capability_registry();
925 let mut seen = std::collections::HashSet::new();
926 let mut commands = Vec::new();
927 for config in &ctx.resolved_capability_configs {
928 let Some(capability) = registry.get(config.capability_id()) else {
929 continue;
930 };
931 for command in capability.commands() {
932 if seen.insert(command.name.clone()) {
933 commands.push(command);
934 }
935 }
936 }
937 Ok(commands)
938 }
939
940 async fn inspect_context_with_ids(
941 &self,
942 session_id: SessionId,
943 harness_id: everruns_core::HarnessId,
944 agent_id: Option<AgentId>,
945 ) -> Result<AssembledTurnContext> {
946 inspect_turn_context(
947 self.harness_store.as_ref(),
948 self.agent_store.as_ref(),
949 self.session_store.as_ref(),
950 self.message_store.as_ref(),
951 self.provider_store.as_ref(),
952 self.platform_definition.capability_registry(),
953 session_id,
954 harness_id,
955 agent_id,
956 &[],
957 Some(self.file_store.clone()),
958 )
959 .await
960 }
961}
962
963#[async_trait]
964impl RuntimeHostAdapter for InProcessRuntime {
965 async fn get_agent(&self, _org_id: i64, agent_id: AgentId) -> Result<Option<Agent>> {
966 self.agent_store.get_agent(agent_id).await
967 }
968
969 async fn get_harness(&self, _org_id: i64, harness_id: HarnessId) -> Result<Option<Harness>> {
970 let chain = self.harness_store.get_harness_chain(harness_id).await?;
971 Ok(chain.into_iter().last())
972 }
973
974 async fn set_session_status(
975 &self,
976 _org_id: i64,
977 session_id: SessionId,
978 _status: SessionStatus,
979 ) -> Result<Session> {
980 self.session_store
984 .get_session(session_id)
985 .await?
986 .ok_or_else(|| AgentLoopError::store(format!("session not found: {session_id}")))
987 }
988
989 async fn load_turn_context(
990 &self,
991 _org_id: i64,
992 session_id: SessionId,
993 ) -> Result<RuntimeHostTurnContext> {
994 let mut session = self
995 .session_store
996 .get_session(session_id)
997 .await?
998 .ok_or_else(|| AgentLoopError::store(format!("session not found: {session_id}")))?;
999 everruns_core::ard_attachment::apply_session_attachments(
1002 self.storage_store.as_ref(),
1003 &mut session,
1004 )
1005 .await;
1006 let agent = match session.agent_id {
1007 Some(agent_id) => self.agent_store.get_agent(agent_id).await?,
1008 None => None,
1009 };
1010 let messages = self.message_store.load(session_id).await?;
1011 let model = self.provider_store.get_default_model().await?;
1012
1013 let scoped_servers = self.session_mcp_servers(&session, agent.as_ref()).await;
1016 let mcp_tool_definitions = if scoped_servers.is_empty() {
1017 vec![]
1018 } else {
1019 crate::mcp::discover_tool_definitions(
1020 &self.mcp_discovery_cache,
1021 self.mcp_client(),
1022 session_id.uuid(),
1023 &scoped_servers,
1024 )
1025 .await
1026 };
1027
1028 Ok(RuntimeHostTurnContext {
1029 agent,
1030 session,
1031 messages,
1032 model,
1033 mcp_tool_definitions,
1034 })
1035 }
1036
1037 async fn mcp_executor(
1038 &self,
1039 _org_id: i64,
1040 session_id: SessionId,
1041 ) -> Option<Arc<everruns_mcp::McpExecutor>> {
1042 let session = self.session_store.get_session(session_id).await.ok()??;
1043 let agent = match session.agent_id {
1044 Some(agent_id) => self.agent_store.get_agent(agent_id).await.ok().flatten(),
1045 None => None,
1046 };
1047 let scoped_servers = self.session_mcp_servers(&session, agent.as_ref()).await;
1048 crate::mcp::build_executor(self.mcp_client(), &scoped_servers)
1049 }
1050
1051 fn capability_registry(&self) -> CapabilityRegistry {
1052 self.platform_definition.capability_registry().clone()
1053 }
1054
1055 fn driver_registry(&self) -> DriverRegistry {
1056 self.platform_definition.driver_registry().clone()
1057 }
1058
1059 fn harness_store(&self, _org_id: i64) -> Arc<dyn HarnessStore> {
1060 self.harness_store.clone()
1061 }
1062
1063 fn agent_store(&self, _org_id: i64) -> Arc<dyn AgentStore> {
1064 self.agent_store.clone()
1065 }
1066
1067 fn session_store(&self, _org_id: i64) -> Arc<dyn SessionStore> {
1068 self.session_store.clone()
1069 }
1070
1071 fn session_mutator(&self, _org_id: i64) -> Arc<dyn SessionMutator> {
1072 self.session_store.clone()
1073 }
1074
1075 fn provider_store(&self, _org_id: i64) -> Arc<dyn ProviderStore> {
1076 self.provider_store.clone()
1077 }
1078
1079 fn message_store(&self) -> Arc<dyn MessageRetriever> {
1080 self.message_store.clone()
1081 }
1082
1083 fn event_emitter(&self) -> Arc<dyn EventEmitter> {
1084 Arc::new(self.persisting_emitter.clone())
1085 }
1086
1087 fn file_store(&self) -> Arc<dyn SessionFileSystem> {
1088 self.file_store.clone()
1089 }
1090
1091 fn storage_store(&self) -> Option<Arc<dyn SessionStorageStore>> {
1092 Some(self.storage_store.clone())
1093 }
1094
1095 fn connection_resolver(&self) -> Option<Arc<dyn UserConnectionResolver>> {
1096 self.connection_resolver.clone()
1097 }
1098
1099 fn session_task_registry(
1100 &self,
1101 ) -> Option<Arc<dyn everruns_core::session_task::SessionTaskRegistry>> {
1102 self.session_task_registry.clone()
1103 }
1104
1105 fn schedule_store(
1106 &self,
1107 org_id: i64,
1108 ) -> Option<Arc<dyn everruns_core::traits::SessionScheduleStore>> {
1109 self.schedule_store_factory
1110 .as_ref()
1111 .map(|factory| factory(org_id))
1112 }
1113
1114 fn platform_store(
1115 &self,
1116 org_id: i64,
1117 session_id: SessionId,
1118 ) -> Option<Arc<dyn everruns_core::platform_store::PlatformStore>> {
1119 self.platform_store_factory
1120 .as_ref()
1121 .map(|factory| factory(org_id, session_id))
1122 }
1123
1124 fn utility_llm_service(&self) -> Option<Arc<dyn everruns_core::UtilityLlmService>> {
1125 Some(self.platform_definition.utility_llm_service())
1126 }
1127
1128 fn egress_service(&self) -> Option<Arc<dyn everruns_core::EgressService>> {
1129 Some(self.platform_definition.egress_service())
1130 }
1131}
1132
1133#[derive(Clone)]
1134struct PersistingEventEmitter {
1135 inner: Arc<dyn EventBus>,
1136 message_store: Arc<dyn RuntimeMessageStore>,
1137}
1138
1139impl PersistingEventEmitter {
1140 fn new(inner: Arc<dyn EventBus>, message_store: Arc<dyn RuntimeMessageStore>) -> Self {
1141 Self {
1142 inner,
1143 message_store,
1144 }
1145 }
1146}
1147
1148#[async_trait]
1149impl EventEmitter for PersistingEventEmitter {
1150 async fn emit(&self, request: EventRequest) -> Result<Event> {
1151 let event = self.inner.emit(request.clone()).await?;
1152 if let Some(message) = message_from_event(&event.data) {
1153 self.message_store
1154 .store_message(request.session_id, message)
1155 .await?;
1156 }
1157 Ok(event)
1158 }
1159}
1160
1161fn effective_overlay(
1162 harness_chain: &[Harness],
1163 agent: Option<&Agent>,
1164 session: &Session,
1165) -> AgentConfigOverlay {
1166 let harness_layers = harness_chain.iter().map(AgentConfigOverlay::from);
1167 let agent_layers = agent.into_iter().map(AgentConfigOverlay::from);
1168 AgentConfigOverlay::fold(
1169 harness_layers
1170 .chain(agent_layers)
1171 .chain([AgentConfigOverlay::from(session)]),
1172 )
1173}
1174
1175fn hydrate_plugin_refs(
1182 capabilities: &mut [AgentCapabilityConfig],
1183 plugin_configs: &[AgentCapabilityConfig],
1184) {
1185 for cap in capabilities.iter_mut() {
1186 let cap_id = cap.capability_id();
1187 if !everruns_core::is_plugin_capability(cap_id) {
1188 continue;
1189 }
1190 let is_bare = cap.config.is_null()
1192 || cap
1193 .config
1194 .as_object()
1195 .map(|o| o.is_empty())
1196 .unwrap_or(false);
1197 if !is_bare {
1198 continue;
1199 }
1200 if let Some(hydrated) = plugin_configs.iter().find(|c| c.capability_id() == cap_id) {
1201 cap.config = hydrated.config.clone();
1202 }
1203 }
1204}
1205
1206async fn seed_runtime_initial_files(
1207 harness_store: &dyn RuntimeHarnessStore,
1208 agent_store: &dyn RuntimeAgentStore,
1209 file_store: &dyn SessionFileSystem,
1210 session: &Session,
1211) -> Result<()> {
1212 let harness_chain = harness_store.get_harness_chain(session.harness_id).await?;
1213 if harness_chain.is_empty() {
1214 return Err(AgentLoopError::store(format!(
1215 "harness not found while seeding files: {}",
1216 session.harness_id
1217 )));
1218 }
1219 let agent = match session.agent_id {
1220 Some(agent_id) => Some(
1221 agent_store
1222 .get_agent(agent_id)
1223 .await?
1224 .ok_or_else(|| AgentLoopError::store(format!("agent not found: {agent_id}")))?,
1225 ),
1226 None => None,
1227 };
1228 let overlay = effective_overlay(&harness_chain, agent.as_ref(), session);
1229 let seed_key = SessionId::from_uuid(session.workspace_id.uuid());
1232 for file in &overlay.initial_files {
1233 file_store.seed_initial_file(seed_key, file).await?;
1234 }
1235 Ok(())
1236}
1237
1238fn message_from_event(data: &EventData) -> Option<Message> {
1239 match data {
1240 EventData::InputMessage(data) => Some(data.message.clone()),
1241 EventData::OutputMessageCompleted(OutputMessageCompletedData { message, .. }) => {
1242 Some(message.clone())
1243 }
1244 EventData::ToolCompleted(data) => Some(tool_completed_to_message(data.clone())),
1245 _ => None,
1246 }
1247}
1248
1249fn tool_completed_to_message(data: ToolCompletedData) -> Message {
1250 let mut images: Vec<ToolResultImage> = Vec::new();
1251 let metadata = tool_result_metadata(&data);
1252 let result = data.result.map(|parts| {
1253 for part in &parts {
1254 if let ContentPart::Image(img) = part
1255 && let (Some(base64), Some(media_type)) = (&img.base64, &img.media_type)
1256 {
1257 images.push(ToolResultImage {
1258 base64: base64.clone(),
1259 media_type: media_type.clone(),
1260 });
1261 }
1262 }
1263
1264 let text_parts: Vec<&ContentPart> = parts
1265 .iter()
1266 .filter(|part| matches!(part, ContentPart::Text(_)))
1267 .collect();
1268 if text_parts.len() == 1
1269 && let ContentPart::Text(text) = text_parts[0]
1270 {
1271 return parse_structured_tool_result_text(&text.text);
1272 }
1273 if !text_parts.is_empty() {
1274 serde_json::to_value(&text_parts).unwrap_or_default()
1275 } else {
1276 serde_json::Value::Null
1277 }
1278 });
1279
1280 let mut message = if images.is_empty() {
1281 Message::tool_result(&data.tool_call_id, result, data.error)
1282 } else {
1283 Message::tool_result_with_images(&data.tool_call_id, result, images)
1284 };
1285 message.metadata = metadata;
1286 message
1287}
1288
1289fn tool_result_metadata(
1290 data: &ToolCompletedData,
1291) -> Option<std::collections::HashMap<String, serde_json::Value>> {
1292 let mut metadata = std::collections::HashMap::new();
1293 metadata.insert("tool_name".to_string(), serde_json::json!(data.tool_name));
1294 if let Some(fingerprint) = &data.tool_call_fingerprint {
1295 metadata.insert(
1296 "tool_call_fingerprint".to_string(),
1297 serde_json::json!(fingerprint),
1298 );
1299 }
1300 if let Some(fingerprint) = &data.tool_result_fingerprint {
1301 metadata.insert(
1302 "tool_result_fingerprint".to_string(),
1303 serde_json::json!(fingerprint),
1304 );
1305 }
1306 (!metadata.is_empty()).then_some(metadata)
1307}
1308
1309fn parse_structured_tool_result_text(text: &str) -> serde_json::Value {
1310 let trimmed = text.trim_start();
1311 if !trimmed.starts_with('{') && !trimmed.starts_with('[') {
1312 return serde_json::Value::String(text.to_string());
1313 }
1314
1315 match serde_json::from_str(text) {
1316 Ok(value @ (serde_json::Value::Object(_) | serde_json::Value::Array(_))) => value,
1317 _ => serde_json::Value::String(text.to_string()),
1318 }
1319}
1320
1321#[cfg(test)]
1322mod tool_completed_replay_tests {
1323 use super::*;
1324
1325 #[test]
1326 fn tool_completed_replay_preserves_json_object_shape() {
1327 let data = ToolCompletedData::success(
1328 "call_read".to_string(),
1329 "read_file".to_string(),
1330 vec![ContentPart::text(
1331 serde_json::json!({
1332 "path": "/workspace/src/lib.rs",
1333 "content": "1|fn main() {}"
1334 })
1335 .to_string(),
1336 )],
1337 Some(1),
1338 );
1339
1340 let message = tool_completed_to_message(data);
1341 let result = message
1342 .tool_result_content()
1343 .and_then(|content| content.result.as_ref())
1344 .expect("tool result should be present");
1345
1346 assert_eq!(result["path"], "/workspace/src/lib.rs");
1347 assert_eq!(result["content"], "1|fn main() {}");
1348 }
1349
1350 #[test]
1351 fn tool_completed_replay_keeps_scalar_json_as_text() {
1352 let data = ToolCompletedData::success(
1353 "call_scalar".to_string(),
1354 "custom_tool".to_string(),
1355 vec![ContentPart::text("123")],
1356 Some(1),
1357 );
1358
1359 let message = tool_completed_to_message(data);
1360 let result = message
1361 .tool_result_content()
1362 .and_then(|content| content.result.as_ref())
1363 .expect("tool result should be present");
1364
1365 assert_eq!(result, &serde_json::Value::String("123".to_string()));
1366 }
1367
1368 #[test]
1369 fn tool_completed_replay_preserves_fingerprints_as_metadata() {
1370 let data = ToolCompletedData::success(
1371 "call_read".to_string(),
1372 "read_file".to_string(),
1373 vec![ContentPart::text("{}")],
1374 Some(1),
1375 )
1376 .with_fingerprints("sha256:call".to_string(), "sha256:result".to_string());
1377
1378 let message = tool_completed_to_message(data);
1379 let metadata = message.metadata.expect("metadata should be present");
1380
1381 assert_eq!(metadata["tool_name"], "read_file");
1382 assert_eq!(metadata["tool_call_fingerprint"], "sha256:call");
1383 assert_eq!(metadata["tool_result_fingerprint"], "sha256:result");
1384 }
1385}
1386
1387#[cfg(test)]
1388mod org_id_mapping_tests {
1389 use super::*;
1390 use everruns_core::{DEFAULT_ORG_ID, DEFAULT_ORG_PUBLIC_ID, org_public_id_from_internal};
1391
1392 #[test]
1393 fn default_public_id_maps_to_default_org() {
1394 assert_eq!(
1395 in_process_internal_org_id(DEFAULT_ORG_PUBLIC_ID),
1396 DEFAULT_ORG_ID
1397 );
1398 }
1399
1400 #[test]
1401 fn invalid_public_id_does_not_fall_back_to_default() {
1402 for invalid in [
1403 "",
1404 "not-an-org",
1405 "org_short",
1406 "org_ZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZ",
1407 "ORG_00000000000000000000000000000001",
1408 ] {
1409 let mapped = in_process_internal_org_id(invalid);
1410 assert_ne!(mapped, everruns_core::DEFAULT_ORG_ID);
1411 assert!(
1412 mapped >= 2,
1413 "invalid input {invalid:?} should not map to default"
1414 );
1415 }
1416 }
1417
1418 #[test]
1419 fn zero_public_id_does_not_fall_back_to_default() {
1420 let mapped = in_process_internal_org_id("org_00000000000000000000000000000000");
1423 assert_ne!(mapped, everruns_core::DEFAULT_ORG_ID);
1424 assert!(mapped >= 2, "all-zero id should not map to default");
1425 }
1426
1427 #[test]
1428 fn synthetic_public_id_round_trips_with_internal_helper() {
1429 for internal in [1_i64, 2, 42, 1_000_000, i64::MAX - 1, i64::MAX] {
1430 let public = org_public_id_from_internal(internal);
1431 assert_eq!(
1432 in_process_internal_org_id(&public),
1433 internal,
1434 "round-trip failed for internal={internal}"
1435 );
1436 }
1437 }
1438
1439 #[test]
1440 fn distinct_synthetic_ids_map_to_distinct_internal_ids() {
1441 let a = org_public_id_from_internal(7);
1442 let b = org_public_id_from_internal(8);
1443 assert_ne!(a, b);
1444 assert_ne!(
1445 in_process_internal_org_id(&a),
1446 in_process_internal_org_id(&b)
1447 );
1448 }
1449
1450 #[test]
1451 fn high_entropy_uuid_style_id_hashes_into_reserved_range() {
1452 let high = "org_80000000000000000000000000000000";
1456 let mapped = in_process_internal_org_id(high);
1457 assert!(mapped >= 2, "mapped id {mapped} must be >= 2");
1458 assert_ne!(mapped, DEFAULT_ORG_ID);
1459
1460 assert_eq!(mapped, in_process_internal_org_id(high));
1462 }
1463
1464 #[test]
1465 fn high_entropy_ids_are_isolated_from_each_other() {
1466 let a = in_process_internal_org_id("org_80000000000000000000000000000001");
1467 let b = in_process_internal_org_id("org_80000000000000000000000000000002");
1468 assert_ne!(a, b);
1469 assert_ne!(a, DEFAULT_ORG_ID);
1470 assert_ne!(b, DEFAULT_ORG_ID);
1471 }
1472
1473 #[test]
1474 fn hash_uses_stable_sha256_truncation() {
1475 let mapped = in_process_internal_org_id("org_80000000000000000000000000000000");
1479 let expected = {
1480 let digest = sha2::Sha256::digest(b"org_80000000000000000000000000000000");
1481 let mut buf = [0u8; 8];
1482 buf.copy_from_slice(&digest[..8]);
1483 let raw = u64::from_be_bytes(buf);
1484 ((raw % ((i64::MAX - 1) as u64)) as i64) + 2
1485 };
1486 assert_eq!(mapped, expected);
1487 }
1488
1489 #[test]
1490 fn oversize_input_is_bounded_and_does_not_collide_silently() {
1491 let oversize = "x".repeat(super::HASH_INPUT_CAP_BYTES * 4);
1497 let mapped = in_process_internal_org_id(&oversize);
1498 assert!(mapped >= 2);
1499 assert_ne!(mapped, DEFAULT_ORG_ID);
1500 }
1501}