1use crate::backends::{
6 EventBus, RuntimeAgentStore, RuntimeBackends, RuntimeHarnessStore, RuntimeMessageStore,
7 RuntimeProviderStore, RuntimeSessionStore,
8};
9use crate::builders::SingleSessionBuilder;
10use crate::host::{
11 RuntimeHostAdapter, RuntimeHostTurnContext, RuntimeSessionLifecycle, execute_act_activity,
12 execute_input_activity, execute_reason_activity,
13};
14use crate::in_memory::{InMemorySessionFileStore, InMemorySessionFileSystemFactory};
15use async_trait::async_trait;
16use everruns_core::agent::Agent;
17use everruns_core::atoms::{ActInput, AtomContext, InputAtomInput, ReasonInput};
18use everruns_core::capabilities::{Capability, CapabilityRegistry};
19use everruns_core::config_layer::AgentConfigOverlay;
20use everruns_core::error::{AgentLoopError, Result};
21use everruns_core::events::{
22 Event, EventContext, EventData, EventRequest, InputMessageData, OutputMessageCompletedData,
23 ToolCompletedData,
24};
25use everruns_core::harness::Harness;
26use everruns_core::llm_driver_registry::{DriverId, DriverRegistry};
27use everruns_core::llmsim_driver::{LlmSimConfig, LlmSimDriver};
28use everruns_core::message::{ContentPart, Message};
29use everruns_core::platform_definition::PlatformDefinition;
30use everruns_core::plugins::{PluginFileSet, compile_plugin};
31use everruns_core::runtime_context::{AssembledTurnContext, inspect_turn_context};
32use everruns_core::session::{Session, SessionStatus};
33use everruns_core::session_file::{InitialFile, SessionFile};
34use everruns_core::tools::ToolResultImage;
35use everruns_core::traits::{
36 AgentStore, EventEmitter, HarnessStore, ProviderStore, ResolvedModel, SessionMutator,
37 SessionStorageStore, SessionStore, UserConnectionResolver,
38};
39use everruns_core::turn::{TurnAction, TurnContext, TurnOutcome, TurnStateMachine};
40use everruns_core::typed_id::{AgentId, HarnessId, OrgId, SessionId};
41use everruns_core::{
42 AgentCapabilityConfig, InputMessage, MessageRetriever, SessionFileSystem,
43 SessionFileSystemFactoryContext, plugin_capability_id,
44};
45use sha2::{Digest, Sha256};
46use std::path::Path;
47use std::sync::Arc;
48
49const HASH_INPUT_CAP_BYTES: usize = 128;
55
56fn in_process_internal_org_id(public_org_id: &str) -> i64 {
66 if public_org_id == everruns_core::DEFAULT_ORG_PUBLIC_ID {
67 return everruns_core::DEFAULT_ORG_ID;
68 }
69
70 let Ok(parsed) = public_org_id.parse::<OrgId>() else {
71 return hash_public_org_id(public_org_id);
72 };
73 let raw: u128 = parsed.uuid().as_u128();
74 if raw == 0 {
75 return hash_public_org_id(public_org_id);
76 }
77
78 if raw <= i64::MAX as u128 {
81 return raw as i64;
82 }
83
84 hash_public_org_id(public_org_id)
85}
86
87fn hash_public_org_id(public_org_id: &str) -> i64 {
92 let bytes = public_org_id.as_bytes();
93 let bounded = &bytes[..bytes.len().min(HASH_INPUT_CAP_BYTES)];
94 let digest = Sha256::digest(bounded);
95 let mut buf = [0u8; 8];
96 buf.copy_from_slice(&digest[..8]);
97 let raw = u64::from_be_bytes(buf);
98 ((raw % ((i64::MAX - 1) as u64)) as i64) + 2
99}
100
101#[derive(Debug, Clone)]
102pub struct TurnResult {
103 pub response: String,
105 pub iterations: usize,
107 pub tool_calls_count: usize,
109 pub success: bool,
111 pub error: Option<String>,
113 pub turn_id: everruns_core::typed_id::TurnId,
115}
116
117impl TurnResult {
118 fn from_outcome(outcome: TurnOutcome, turn_id: everruns_core::typed_id::TurnId) -> Self {
119 match outcome {
120 TurnOutcome::Success {
121 response,
122 iterations,
123 tool_calls_count,
124 } => Self {
125 response,
126 iterations,
127 tool_calls_count,
128 success: true,
129 error: None,
130 turn_id,
131 },
132 TurnOutcome::Failed { error, iterations } => Self {
133 response: String::new(),
134 iterations,
135 tool_calls_count: 0,
136 success: false,
137 error: Some(error),
138 turn_id,
139 },
140 TurnOutcome::MaxIterationsReached {
141 response,
142 iterations,
143 tool_calls_count,
144 } => Self {
145 response,
146 iterations,
147 tool_calls_count,
148 success: true,
149 error: None,
150 turn_id,
151 },
152 }
153 }
154}
155
156pub struct InProcessRuntimeBuilder {
166 platform_definition: PlatformDefinition,
167 llm_sim_config: Option<LlmSimConfig>,
168 default_model: Option<ResolvedModel>,
169 backends: Option<RuntimeBackends>,
170 session_file_system_factory_context: SessionFileSystemFactoryContext,
171 harnesses: Vec<Harness>,
172 agents: Vec<Agent>,
173 sessions: Vec<Session>,
174 default_session_id: Option<SessionId>,
175 seeded_files: Vec<(SessionId, InitialFile)>,
176 mcp_auth_provider: Option<Arc<dyn everruns_mcp::McpAuthProvider>>,
177 plugin_capability_configs: Vec<AgentCapabilityConfig>,
183 plugin_warnings: Vec<String>,
185}
186
187impl Default for InProcessRuntimeBuilder {
188 fn default() -> Self {
189 Self::new()
190 }
191}
192
193impl InProcessRuntimeBuilder {
194 pub fn new() -> Self {
201 Self {
202 platform_definition: PlatformDefinition::builder()
203 .capability_registry(CapabilityRegistry::with_builtins())
204 .driver_registry(DriverRegistry::new())
205 .session_file_system_factory(Arc::new(InMemorySessionFileSystemFactory))
206 .build(),
207 llm_sim_config: None,
208 default_model: None,
209 backends: None,
210 session_file_system_factory_context: SessionFileSystemFactoryContext::new(),
211 harnesses: Vec::new(),
212 agents: Vec::new(),
213 sessions: Vec::new(),
214 default_session_id: None,
215 seeded_files: Vec::new(),
216 mcp_auth_provider: None,
217 plugin_capability_configs: Vec::new(),
218 plugin_warnings: Vec::new(),
219 }
220 }
221
222 pub fn mcp_auth_provider(mut self, provider: Arc<dyn everruns_mcp::McpAuthProvider>) -> Self {
226 self.mcp_auth_provider = Some(provider);
227 self
228 }
229
230 pub fn platform_definition(mut self, platform_definition: PlatformDefinition) -> Self {
232 self.platform_definition = platform_definition;
233 self
234 }
235
236 pub fn capability<C: Capability + 'static>(mut self, capability: C) -> Self {
238 self.platform_definition
239 .capability_registry_mut()
240 .register(capability);
241 self
242 }
243
244 pub fn driver_registry(mut self, driver_registry: DriverRegistry) -> Self {
246 *self.platform_definition.driver_registry_mut() = driver_registry;
247 self
248 }
249
250 pub fn llm_sim(mut self, config: LlmSimConfig) -> Self {
252 self.llm_sim_config = Some(config);
253 self
254 }
255
256 pub fn default_model(mut self, model: ResolvedModel) -> Self {
258 self.default_model = Some(model);
259 self
260 }
261
262 pub fn backends(mut self, backends: RuntimeBackends) -> Self {
264 self.backends = Some(backends);
265 self
266 }
267
268 pub fn session_file_system_factory_context(
270 mut self,
271 context: SessionFileSystemFactoryContext,
272 ) -> Self {
273 self.session_file_system_factory_context = context;
274 self
275 }
276
277 pub fn harness(mut self, harness: Harness) -> Self {
279 self.harnesses.push(harness);
280 self
281 }
282
283 pub fn agent(mut self, agent: Agent) -> Self {
285 self.agents.push(agent);
286 self
287 }
288
289 pub fn session(mut self, session: Session) -> Self {
291 self.sessions.push(session);
292 self
293 }
294
295 pub fn single_session<F>(mut self, configure: F) -> Self
300 where
301 F: FnOnce(SingleSessionBuilder) -> SingleSessionBuilder,
302 {
303 let (harness, agent, session, session_id) =
304 configure(SingleSessionBuilder::default()).build();
305 self.harnesses.push(harness);
306 self.agents.push(agent);
307 self.sessions.push(session);
308 self.default_session_id = Some(session_id);
309 self
310 }
311
312 pub fn seed_text_file(
316 mut self,
317 session_id: SessionId,
318 path: impl Into<String>,
319 content: impl Into<String>,
320 ) -> Self {
321 self.seeded_files.push((
322 session_id,
323 InitialFile {
324 path: path.into(),
325 content: content.into(),
326 encoding: "text".to_string(),
327 is_readonly: false,
328 },
329 ));
330 self
331 }
332
333 pub fn with_plugin_dir(mut self, path: &Path) -> Result<Self> {
354 let file_set = PluginFileSet::from_dir(path)
355 .map_err(|e| AgentLoopError::config(format!("plugin directory load failed: {e}")))?;
356 let compiled = compile_plugin(&file_set)
357 .map_err(|e| AgentLoopError::config(format!("plugin compilation failed: {e}")))?;
358
359 for warning in &compiled.warnings {
360 tracing::warn!(plugin = %compiled.definition.name, warning = %warning, "plugin compile warning");
361 }
362 self.plugin_warnings.extend(compiled.warnings);
363
364 let cap_id = plugin_capability_id(&compiled.definition.name);
365 let hydrated_config = serde_json::to_value(&compiled.definition)
366 .unwrap_or(serde_json::Value::Object(serde_json::Map::new()));
367 self.plugin_capability_configs
368 .push(AgentCapabilityConfig::with_config(cap_id, hydrated_config));
369
370 Ok(self)
371 }
372
373 pub fn plugin_capability(&self, name: &str) -> Option<AgentCapabilityConfig> {
379 let cap_id = plugin_capability_id(name);
380 self.plugin_capability_configs
381 .iter()
382 .find(|c| c.capability_id() == cap_id)
383 .cloned()
384 }
385
386 pub async fn build(mut self) -> Result<InProcessRuntime> {
391 let backends = match self.backends.take() {
392 Some(backends) => backends,
393 None => RuntimeBackends::in_memory(),
394 };
395 let file_store = resolve_session_file_system(
396 &self.platform_definition,
397 self.session_file_system_factory_context.clone(),
398 )
399 .await?;
400
401 if let Some(config) = self.llm_sim_config.take() {
402 let driver = LlmSimDriver::new(config);
403 self.platform_definition
406 .driver_registry_mut()
407 .register_or_replace(DriverId::LlmSim, move |_config| Box::new(driver.clone()));
408
409 if self.default_model.is_none() {
410 self.default_model = Some(ResolvedModel {
411 model: "llmsim-model".to_string(),
412 provider_type: DriverId::LlmSim,
413 api_key: Some("fake-key".to_string()),
414 base_url: None,
415 provider_metadata: None,
416 });
417 }
418 }
419
420 let default_model = self.default_model.ok_or_else(|| {
421 AgentLoopError::config(
422 "in-process runtime requires a default model; call \
423 InProcessRuntimeBuilder::default_model(...) or \
424 InProcessRuntimeBuilder::llm_sim(...)",
425 )
426 })?;
427
428 backends
429 .provider_store
430 .set_default_model(default_model)
431 .await?;
432
433 for harness in &mut self.harnesses {
437 hydrate_plugin_refs(&mut harness.capabilities, &self.plugin_capability_configs);
438 }
439 for agent in &mut self.agents {
440 hydrate_plugin_refs(&mut agent.capabilities, &self.plugin_capability_configs);
441 }
442 for session in &mut self.sessions {
443 hydrate_plugin_refs(&mut session.capabilities, &self.plugin_capability_configs);
444 }
445
446 for harness in &self.harnesses {
447 backends.harness_store.add_harness(harness.clone()).await?;
448 }
449 for agent in &self.agents {
450 backends.agent_store.add_agent(agent.clone()).await?;
451 }
452 for session in &self.sessions {
453 backends.session_store.add_session(session.clone()).await?;
454 }
455
456 for session in &self.sessions {
457 seed_runtime_initial_files(
458 backends.harness_store.as_ref(),
459 backends.agent_store.as_ref(),
460 file_store.as_ref(),
461 session,
462 )
463 .await?;
464 }
465
466 for (session_id, file) in &self.seeded_files {
467 file_store.seed_initial_file(*session_id, file).await?;
468 }
469
470 let persisting_emitter =
471 PersistingEventEmitter::new(backends.event_bus.clone(), backends.message_store.clone());
472
473 Ok(InProcessRuntime {
474 platform_definition: Arc::new(self.platform_definition),
475 harness_store: backends.harness_store,
476 agent_store: backends.agent_store,
477 session_store: backends.session_store,
478 default_session_id: self.default_session_id,
479 message_store: backends.message_store,
480 provider_store: backends.provider_store,
481 event_bus: backends.event_bus,
482 persisting_emitter,
483 file_store,
484 storage_store: backends.storage_store,
485 connection_resolver: backends.connection_resolver,
486 mcp_auth_provider: self
487 .mcp_auth_provider
488 .unwrap_or_else(|| Arc::new(everruns_mcp::NoAuthProvider)),
489 mcp_discovery_cache: Arc::new(crate::mcp_cache::McpDiscoveryCache::new()),
490 plugin_warnings: self.plugin_warnings,
491 })
492 }
493}
494
495async fn resolve_session_file_system(
496 platform_definition: &PlatformDefinition,
497 file_system_factory_context: SessionFileSystemFactoryContext,
498) -> Result<Arc<dyn SessionFileSystem>> {
499 let file_system_factory = platform_definition.session_file_system_factory();
500 if file_system_factory.is_disabled() {
501 Ok(Arc::new(InMemorySessionFileStore::new()))
502 } else {
503 Ok(file_system_factory
504 .create_session_file_system(file_system_factory_context)
505 .await?)
506 }
507}
508
509#[derive(Clone)]
510pub struct InProcessRuntime {
516 platform_definition: Arc<PlatformDefinition>,
517 harness_store: Arc<dyn RuntimeHarnessStore>,
518 agent_store: Arc<dyn RuntimeAgentStore>,
519 session_store: Arc<dyn RuntimeSessionStore>,
520 default_session_id: Option<SessionId>,
521 message_store: Arc<dyn RuntimeMessageStore>,
522 provider_store: Arc<dyn RuntimeProviderStore>,
523 event_bus: Arc<dyn EventBus>,
524 persisting_emitter: PersistingEventEmitter,
525 file_store: Arc<dyn SessionFileSystem>,
526 storage_store: Arc<dyn SessionStorageStore>,
527 connection_resolver: Option<Arc<dyn UserConnectionResolver>>,
528 mcp_auth_provider: Arc<dyn everruns_mcp::McpAuthProvider>,
529 mcp_discovery_cache: Arc<crate::mcp_cache::McpDiscoveryCache>,
530 plugin_warnings: Vec<String>,
533}
534
535impl InProcessRuntime {
536 fn mcp_client(&self) -> Arc<everruns_mcp::McpClient> {
539 Arc::new(everruns_mcp::McpClient::new(
540 self.platform_definition.egress_service(),
541 self.mcp_auth_provider.clone(),
542 ))
543 }
544
545 async fn session_mcp_servers(
547 &self,
548 session: &Session,
549 agent: Option<&Agent>,
550 ) -> everruns_core::ScopedMcpServers {
551 let harness_chain = self
552 .harness_store
553 .get_harness_chain(session.harness_id)
554 .await
555 .unwrap_or_default();
556 crate::mcp::merge_session_scoped_servers(&harness_chain, agent, session)
557 }
558 pub fn builder() -> InProcessRuntimeBuilder {
560 InProcessRuntimeBuilder::new()
561 }
562
563 pub fn default_session_id(&self) -> Option<SessionId> {
566 self.default_session_id
567 }
568
569 pub fn plugin_warnings(&self) -> &[String] {
574 &self.plugin_warnings
575 }
576
577 pub async fn run_turn(
583 &self,
584 session_id: SessionId,
585 input: impl Into<InputMessage>,
586 ) -> Result<TurnResult> {
587 let session = self
588 .session_store
589 .get_session(session_id)
590 .await?
591 .ok_or_else(|| AgentLoopError::store(format!("session not found: {session_id}")))?;
592
593 let input_message = self
598 .message_store
599 .add_input_message(session_id, input.into())
600 .await?;
601 self.event_bus
602 .emit(EventRequest::new(
603 session_id,
604 EventContext::empty(),
605 InputMessageData::new(input_message.clone()),
606 ))
607 .await?;
608
609 let assembled = self
610 .inspect_context_with_ids(session_id, session.harness_id, session.agent_id)
611 .await?;
612 let synthetic_agent_id = session
613 .agent_id
614 .unwrap_or_else(|| AgentId::from_uuid(session.id.uuid()));
615 let org_id = in_process_internal_org_id(&session.organization_id);
616 let mut state_machine = TurnStateMachine::new(
617 TurnContext::new(session_id, input_message.id, synthetic_agent_id, org_id),
618 assembled.runtime_agent.max_iterations,
619 );
620
621 let mut previous_response_id: Option<String> = None;
622 let mut last_reason_result: Option<everruns_core::ReasonResult> = None;
623
624 loop {
625 match state_machine.next_action() {
626 TurnAction::ExecuteInput => {
627 let ctx = state_machine.context();
628 let base_context =
629 AtomContext::new(ctx.session_id, ctx.turn_id, ctx.input_message_id)
630 .with_workspace_id(session.workspace_id);
631 execute_input_activity(
632 self,
633 org_id,
634 InputAtomInput {
635 context: base_context,
636 },
637 )
638 .await?;
639 state_machine.on_input_completed();
640 }
641 TurnAction::ExecuteReason => {
642 let ctx = state_machine.context();
643 let base_context =
644 AtomContext::new(ctx.session_id, ctx.turn_id, ctx.input_message_id)
645 .with_workspace_id(session.workspace_id);
646 let reason_result = execute_reason_activity(
647 self,
648 org_id,
649 ReasonInput {
650 context: base_context.next_exec(),
651 harness_id: session.harness_id,
652 agent_id: session.agent_id,
653 org_id,
654 mcp_tool_definitions: vec![],
655 previous_response_id: previous_response_id.take(),
656 iteration: state_machine.current_iteration() as u32 + 1,
657 },
658 )
659 .await?;
660 previous_response_id = reason_result.response_id.clone();
661 state_machine.on_reason_completed(
662 reason_result.text.clone(),
663 reason_result.has_tool_calls,
664 reason_result.tool_calls.len(),
665 reason_result.success,
666 reason_result.error.clone(),
667 false,
668 );
669 if reason_result.has_tool_calls {
670 last_reason_result = Some(reason_result);
671 }
672 }
673 TurnAction::ExecuteAct => {
674 let reason_result = last_reason_result
675 .take()
676 .expect("ExecuteAct requires a prior ReasonResult");
677 let ctx = state_machine.context();
678 let base_context =
679 AtomContext::new(ctx.session_id, ctx.turn_id, ctx.input_message_id)
680 .with_workspace_id(session.workspace_id);
681 execute_act_activity(
682 self,
683 ActInput {
684 org_id: Some(org_id),
685 context: base_context.next_exec(),
686 harness_id: session.harness_id,
687 agent_id: session.agent_id,
688 tool_calls: reason_result.tool_calls,
689 tool_definitions: reason_result.tool_definitions,
690 locale: reason_result.locale,
691 blueprint_id: None,
692 network_access: reason_result.network_access,
693 parallel_tool_calls: None,
697 },
698 )
699 .await?;
700 state_machine.on_act_completed();
701 }
702 TurnAction::Complete(outcome) => {
703 let ctx = state_machine.context();
704 let lifecycle =
705 RuntimeSessionLifecycle::new(self.clone(), org_id, ctx.session_id);
706 let turn_succeeded = matches!(
707 &outcome,
708 TurnOutcome::Success { .. } | TurnOutcome::MaxIterationsReached { .. }
709 );
710 match &outcome {
711 TurnOutcome::Success { iterations, .. }
712 | TurnOutcome::MaxIterationsReached { iterations, .. } => {
713 lifecycle
714 .turn_completed(
715 ctx.turn_id,
716 ctx.input_message_id,
717 *iterations as u32,
718 None,
719 None,
720 )
721 .await;
722 }
723 TurnOutcome::Failed { error, .. } => {
724 lifecycle
725 .turn_failed(ctx.turn_id, ctx.input_message_id, error, None)
726 .await;
727 }
728 }
729 lifecycle
732 .fire_turn_end_hooks(
733 session.harness_id,
734 session.agent_id,
735 ctx.turn_id,
736 turn_succeeded,
737 )
738 .await;
739 return Ok(TurnResult::from_outcome(outcome, ctx.turn_id));
740 }
741 }
742 }
743 }
744
745 pub async fn run_text_turn(
746 &self,
747 session_id: SessionId,
748 text: impl Into<String>,
749 ) -> Result<TurnResult> {
750 self.run_turn(session_id, InputMessage::user(text)).await
751 }
752
753 pub async fn messages(&self, session_id: SessionId) -> Result<Vec<Message>> {
755 self.message_store.load(session_id).await
756 }
757
758 pub async fn read_file(
760 &self,
761 session_id: SessionId,
762 path: &str,
763 ) -> Result<Option<SessionFile>> {
764 self.file_store.read_file(session_id, path).await
765 }
766
767 pub async fn load_context(&self, session_id: SessionId) -> Result<AssembledTurnContext> {
769 let session = self
770 .session_store
771 .get_session(session_id)
772 .await?
773 .ok_or_else(|| AgentLoopError::store(format!("session not found: {session_id}")))?;
774 self.inspect_context_with_ids(session_id, session.harness_id, session.agent_id)
775 .await
776 }
777
778 pub async fn events(&self) -> Result<Vec<Event>> {
783 Ok(self.event_bus.collected_events().await)
784 }
785
786 pub async fn execute_command(
796 &self,
797 session_id: SessionId,
798 request: everruns_core::command::ExecuteCommandRequest,
799 ) -> Result<everruns_core::command::CommandResult> {
800 let ctx = self.load_context(session_id).await?;
801 let registry = self.platform_definition.capability_registry();
802 let host = everruns_core::command_host::StoreCommandHost::new(
806 session_id,
807 self.harness_store.clone(),
808 self.agent_store.clone(),
809 self.session_store.clone(),
810 self.message_store.clone(),
811 self.provider_store.clone(),
812 registry.clone(),
813 self.platform_definition.driver_registry().clone(),
814 )
815 .with_file_store(self.file_store.clone())
816 .with_assembled_context(ctx.clone());
817 let exec_ctx =
818 everruns_core::command::CommandExecutionContext::new(session_id, Arc::new(host));
819 for config in &ctx.resolved_capability_configs {
820 let Some(capability) = registry.get(config.capability_id()) else {
821 continue;
822 };
823 if capability.commands().iter().any(|c| c.name == request.name) {
824 return capability.execute_command(&request, &exec_ctx).await;
825 }
826 }
827 Err(AgentLoopError::config(format!(
828 "no capability declares command /{}",
829 request.name
830 )))
831 }
832
833 pub async fn list_commands(
843 &self,
844 session_id: SessionId,
845 ) -> Result<Vec<everruns_core::command::CommandDescriptor>> {
846 let ctx = self.load_context(session_id).await?;
847 let registry = self.platform_definition.capability_registry();
848 let mut seen = std::collections::HashSet::new();
849 let mut commands = Vec::new();
850 for config in &ctx.resolved_capability_configs {
851 let Some(capability) = registry.get(config.capability_id()) else {
852 continue;
853 };
854 for command in capability.commands() {
855 if seen.insert(command.name.clone()) {
856 commands.push(command);
857 }
858 }
859 }
860 Ok(commands)
861 }
862
863 async fn inspect_context_with_ids(
864 &self,
865 session_id: SessionId,
866 harness_id: everruns_core::HarnessId,
867 agent_id: Option<AgentId>,
868 ) -> Result<AssembledTurnContext> {
869 inspect_turn_context(
870 self.harness_store.as_ref(),
871 self.agent_store.as_ref(),
872 self.session_store.as_ref(),
873 self.message_store.as_ref(),
874 self.provider_store.as_ref(),
875 self.platform_definition.capability_registry(),
876 session_id,
877 harness_id,
878 agent_id,
879 &[],
880 Some(self.file_store.clone()),
881 )
882 .await
883 }
884}
885
886#[async_trait]
887impl RuntimeHostAdapter for InProcessRuntime {
888 async fn get_agent(&self, _org_id: i64, agent_id: AgentId) -> Result<Option<Agent>> {
889 self.agent_store.get_agent(agent_id).await
890 }
891
892 async fn get_harness(&self, _org_id: i64, harness_id: HarnessId) -> Result<Option<Harness>> {
893 let chain = self.harness_store.get_harness_chain(harness_id).await?;
894 Ok(chain.into_iter().last())
895 }
896
897 async fn set_session_status(
898 &self,
899 _org_id: i64,
900 session_id: SessionId,
901 _status: SessionStatus,
902 ) -> Result<Session> {
903 self.session_store
907 .get_session(session_id)
908 .await?
909 .ok_or_else(|| AgentLoopError::store(format!("session not found: {session_id}")))
910 }
911
912 async fn load_turn_context(
913 &self,
914 _org_id: i64,
915 session_id: SessionId,
916 ) -> Result<RuntimeHostTurnContext> {
917 let session = self
918 .session_store
919 .get_session(session_id)
920 .await?
921 .ok_or_else(|| AgentLoopError::store(format!("session not found: {session_id}")))?;
922 let agent = match session.agent_id {
923 Some(agent_id) => self.agent_store.get_agent(agent_id).await?,
924 None => None,
925 };
926 let messages = self.message_store.load(session_id).await?;
927 let model = self.provider_store.get_default_model().await?;
928
929 let scoped_servers = self.session_mcp_servers(&session, agent.as_ref()).await;
932 let mcp_tool_definitions = if scoped_servers.is_empty() {
933 vec![]
934 } else {
935 crate::mcp::discover_tool_definitions(
936 &self.mcp_discovery_cache,
937 self.mcp_client(),
938 session_id.uuid(),
939 &scoped_servers,
940 )
941 .await
942 };
943
944 Ok(RuntimeHostTurnContext {
945 agent,
946 session,
947 messages,
948 model,
949 mcp_tool_definitions,
950 })
951 }
952
953 async fn mcp_executor(
954 &self,
955 _org_id: i64,
956 session_id: SessionId,
957 ) -> Option<Arc<everruns_mcp::McpExecutor>> {
958 let session = self.session_store.get_session(session_id).await.ok()??;
959 let agent = match session.agent_id {
960 Some(agent_id) => self.agent_store.get_agent(agent_id).await.ok().flatten(),
961 None => None,
962 };
963 let scoped_servers = self.session_mcp_servers(&session, agent.as_ref()).await;
964 crate::mcp::build_executor(self.mcp_client(), &scoped_servers)
965 }
966
967 fn capability_registry(&self) -> CapabilityRegistry {
968 self.platform_definition.capability_registry().clone()
969 }
970
971 fn driver_registry(&self) -> DriverRegistry {
972 self.platform_definition.driver_registry().clone()
973 }
974
975 fn harness_store(&self, _org_id: i64) -> Arc<dyn HarnessStore> {
976 self.harness_store.clone()
977 }
978
979 fn agent_store(&self, _org_id: i64) -> Arc<dyn AgentStore> {
980 self.agent_store.clone()
981 }
982
983 fn session_store(&self, _org_id: i64) -> Arc<dyn SessionStore> {
984 self.session_store.clone()
985 }
986
987 fn session_mutator(&self, _org_id: i64) -> Arc<dyn SessionMutator> {
988 self.session_store.clone()
989 }
990
991 fn provider_store(&self, _org_id: i64) -> Arc<dyn ProviderStore> {
992 self.provider_store.clone()
993 }
994
995 fn message_store(&self) -> Arc<dyn MessageRetriever> {
996 self.message_store.clone()
997 }
998
999 fn event_emitter(&self) -> Arc<dyn EventEmitter> {
1000 Arc::new(self.persisting_emitter.clone())
1001 }
1002
1003 fn file_store(&self) -> Arc<dyn SessionFileSystem> {
1004 self.file_store.clone()
1005 }
1006
1007 fn storage_store(&self) -> Option<Arc<dyn SessionStorageStore>> {
1008 Some(self.storage_store.clone())
1009 }
1010
1011 fn connection_resolver(&self) -> Option<Arc<dyn UserConnectionResolver>> {
1012 self.connection_resolver.clone()
1013 }
1014
1015 fn utility_llm_service(&self) -> Option<Arc<dyn everruns_core::UtilityLlmService>> {
1016 Some(self.platform_definition.utility_llm_service())
1017 }
1018
1019 fn egress_service(&self) -> Option<Arc<dyn everruns_core::EgressService>> {
1020 Some(self.platform_definition.egress_service())
1021 }
1022}
1023
1024#[derive(Clone)]
1025struct PersistingEventEmitter {
1026 inner: Arc<dyn EventBus>,
1027 message_store: Arc<dyn RuntimeMessageStore>,
1028}
1029
1030impl PersistingEventEmitter {
1031 fn new(inner: Arc<dyn EventBus>, message_store: Arc<dyn RuntimeMessageStore>) -> Self {
1032 Self {
1033 inner,
1034 message_store,
1035 }
1036 }
1037}
1038
1039#[async_trait]
1040impl EventEmitter for PersistingEventEmitter {
1041 async fn emit(&self, request: EventRequest) -> Result<Event> {
1042 let event = self.inner.emit(request.clone()).await?;
1043 if let Some(message) = message_from_event(&event.data) {
1044 self.message_store
1045 .store_message(request.session_id, message)
1046 .await?;
1047 }
1048 Ok(event)
1049 }
1050}
1051
1052fn effective_overlay(
1053 harness_chain: &[Harness],
1054 agent: Option<&Agent>,
1055 session: &Session,
1056) -> AgentConfigOverlay {
1057 let harness_layers = harness_chain.iter().map(AgentConfigOverlay::from);
1058 let agent_layers = agent.into_iter().map(AgentConfigOverlay::from);
1059 AgentConfigOverlay::fold(
1060 harness_layers
1061 .chain(agent_layers)
1062 .chain([AgentConfigOverlay::from(session)]),
1063 )
1064}
1065
1066fn hydrate_plugin_refs(
1073 capabilities: &mut [AgentCapabilityConfig],
1074 plugin_configs: &[AgentCapabilityConfig],
1075) {
1076 for cap in capabilities.iter_mut() {
1077 let cap_id = cap.capability_id();
1078 if !everruns_core::is_plugin_capability(cap_id) {
1079 continue;
1080 }
1081 let is_bare = cap.config.is_null()
1083 || cap
1084 .config
1085 .as_object()
1086 .map(|o| o.is_empty())
1087 .unwrap_or(false);
1088 if !is_bare {
1089 continue;
1090 }
1091 if let Some(hydrated) = plugin_configs.iter().find(|c| c.capability_id() == cap_id) {
1092 cap.config = hydrated.config.clone();
1093 }
1094 }
1095}
1096
1097async fn seed_runtime_initial_files(
1098 harness_store: &dyn RuntimeHarnessStore,
1099 agent_store: &dyn RuntimeAgentStore,
1100 file_store: &dyn SessionFileSystem,
1101 session: &Session,
1102) -> Result<()> {
1103 let harness_chain = harness_store.get_harness_chain(session.harness_id).await?;
1104 if harness_chain.is_empty() {
1105 return Err(AgentLoopError::store(format!(
1106 "harness not found while seeding files: {}",
1107 session.harness_id
1108 )));
1109 }
1110 let agent = match session.agent_id {
1111 Some(agent_id) => Some(
1112 agent_store
1113 .get_agent(agent_id)
1114 .await?
1115 .ok_or_else(|| AgentLoopError::store(format!("agent not found: {agent_id}")))?,
1116 ),
1117 None => None,
1118 };
1119 let overlay = effective_overlay(&harness_chain, agent.as_ref(), session);
1120 let seed_key = SessionId::from_uuid(session.workspace_id.uuid());
1123 for file in &overlay.initial_files {
1124 file_store.seed_initial_file(seed_key, file).await?;
1125 }
1126 Ok(())
1127}
1128
1129fn message_from_event(data: &EventData) -> Option<Message> {
1130 match data {
1131 EventData::InputMessage(data) => Some(data.message.clone()),
1132 EventData::OutputMessageCompleted(OutputMessageCompletedData { message, .. }) => {
1133 Some(message.clone())
1134 }
1135 EventData::ToolCompleted(data) => Some(tool_completed_to_message(data.clone())),
1136 _ => None,
1137 }
1138}
1139
1140fn tool_completed_to_message(data: ToolCompletedData) -> Message {
1141 let mut images: Vec<ToolResultImage> = Vec::new();
1142 let metadata = tool_result_metadata(&data);
1143 let result = data.result.map(|parts| {
1144 for part in &parts {
1145 if let ContentPart::Image(img) = part
1146 && let (Some(base64), Some(media_type)) = (&img.base64, &img.media_type)
1147 {
1148 images.push(ToolResultImage {
1149 base64: base64.clone(),
1150 media_type: media_type.clone(),
1151 });
1152 }
1153 }
1154
1155 let text_parts: Vec<&ContentPart> = parts
1156 .iter()
1157 .filter(|part| matches!(part, ContentPart::Text(_)))
1158 .collect();
1159 if text_parts.len() == 1
1160 && let ContentPart::Text(text) = text_parts[0]
1161 {
1162 return parse_structured_tool_result_text(&text.text);
1163 }
1164 if !text_parts.is_empty() {
1165 serde_json::to_value(&text_parts).unwrap_or_default()
1166 } else {
1167 serde_json::Value::Null
1168 }
1169 });
1170
1171 let mut message = if images.is_empty() {
1172 Message::tool_result(&data.tool_call_id, result, data.error)
1173 } else {
1174 Message::tool_result_with_images(&data.tool_call_id, result, images)
1175 };
1176 message.metadata = metadata;
1177 message
1178}
1179
1180fn tool_result_metadata(
1181 data: &ToolCompletedData,
1182) -> Option<std::collections::HashMap<String, serde_json::Value>> {
1183 let mut metadata = std::collections::HashMap::new();
1184 metadata.insert("tool_name".to_string(), serde_json::json!(data.tool_name));
1185 if let Some(fingerprint) = &data.tool_call_fingerprint {
1186 metadata.insert(
1187 "tool_call_fingerprint".to_string(),
1188 serde_json::json!(fingerprint),
1189 );
1190 }
1191 if let Some(fingerprint) = &data.tool_result_fingerprint {
1192 metadata.insert(
1193 "tool_result_fingerprint".to_string(),
1194 serde_json::json!(fingerprint),
1195 );
1196 }
1197 (!metadata.is_empty()).then_some(metadata)
1198}
1199
1200fn parse_structured_tool_result_text(text: &str) -> serde_json::Value {
1201 let trimmed = text.trim_start();
1202 if !trimmed.starts_with('{') && !trimmed.starts_with('[') {
1203 return serde_json::Value::String(text.to_string());
1204 }
1205
1206 match serde_json::from_str(text) {
1207 Ok(value @ (serde_json::Value::Object(_) | serde_json::Value::Array(_))) => value,
1208 _ => serde_json::Value::String(text.to_string()),
1209 }
1210}
1211
1212#[cfg(test)]
1213mod tool_completed_replay_tests {
1214 use super::*;
1215
1216 #[test]
1217 fn tool_completed_replay_preserves_json_object_shape() {
1218 let data = ToolCompletedData::success(
1219 "call_read".to_string(),
1220 "read_file".to_string(),
1221 vec![ContentPart::text(
1222 serde_json::json!({
1223 "path": "/workspace/src/lib.rs",
1224 "content": "1|fn main() {}"
1225 })
1226 .to_string(),
1227 )],
1228 Some(1),
1229 );
1230
1231 let message = tool_completed_to_message(data);
1232 let result = message
1233 .tool_result_content()
1234 .and_then(|content| content.result.as_ref())
1235 .expect("tool result should be present");
1236
1237 assert_eq!(result["path"], "/workspace/src/lib.rs");
1238 assert_eq!(result["content"], "1|fn main() {}");
1239 }
1240
1241 #[test]
1242 fn tool_completed_replay_keeps_scalar_json_as_text() {
1243 let data = ToolCompletedData::success(
1244 "call_scalar".to_string(),
1245 "custom_tool".to_string(),
1246 vec![ContentPart::text("123")],
1247 Some(1),
1248 );
1249
1250 let message = tool_completed_to_message(data);
1251 let result = message
1252 .tool_result_content()
1253 .and_then(|content| content.result.as_ref())
1254 .expect("tool result should be present");
1255
1256 assert_eq!(result, &serde_json::Value::String("123".to_string()));
1257 }
1258
1259 #[test]
1260 fn tool_completed_replay_preserves_fingerprints_as_metadata() {
1261 let data = ToolCompletedData::success(
1262 "call_read".to_string(),
1263 "read_file".to_string(),
1264 vec![ContentPart::text("{}")],
1265 Some(1),
1266 )
1267 .with_fingerprints("sha256:call".to_string(), "sha256:result".to_string());
1268
1269 let message = tool_completed_to_message(data);
1270 let metadata = message.metadata.expect("metadata should be present");
1271
1272 assert_eq!(metadata["tool_name"], "read_file");
1273 assert_eq!(metadata["tool_call_fingerprint"], "sha256:call");
1274 assert_eq!(metadata["tool_result_fingerprint"], "sha256:result");
1275 }
1276}
1277
1278#[cfg(test)]
1279mod org_id_mapping_tests {
1280 use super::*;
1281 use everruns_core::{DEFAULT_ORG_ID, DEFAULT_ORG_PUBLIC_ID, org_public_id_from_internal};
1282
1283 #[test]
1284 fn default_public_id_maps_to_default_org() {
1285 assert_eq!(
1286 in_process_internal_org_id(DEFAULT_ORG_PUBLIC_ID),
1287 DEFAULT_ORG_ID
1288 );
1289 }
1290
1291 #[test]
1292 fn invalid_public_id_does_not_fall_back_to_default() {
1293 for invalid in [
1294 "",
1295 "not-an-org",
1296 "org_short",
1297 "org_ZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZ",
1298 "ORG_00000000000000000000000000000001",
1299 ] {
1300 let mapped = in_process_internal_org_id(invalid);
1301 assert_ne!(mapped, everruns_core::DEFAULT_ORG_ID);
1302 assert!(
1303 mapped >= 2,
1304 "invalid input {invalid:?} should not map to default"
1305 );
1306 }
1307 }
1308
1309 #[test]
1310 fn zero_public_id_does_not_fall_back_to_default() {
1311 let mapped = in_process_internal_org_id("org_00000000000000000000000000000000");
1314 assert_ne!(mapped, everruns_core::DEFAULT_ORG_ID);
1315 assert!(mapped >= 2, "all-zero id should not map to default");
1316 }
1317
1318 #[test]
1319 fn synthetic_public_id_round_trips_with_internal_helper() {
1320 for internal in [1_i64, 2, 42, 1_000_000, i64::MAX - 1, i64::MAX] {
1321 let public = org_public_id_from_internal(internal);
1322 assert_eq!(
1323 in_process_internal_org_id(&public),
1324 internal,
1325 "round-trip failed for internal={internal}"
1326 );
1327 }
1328 }
1329
1330 #[test]
1331 fn distinct_synthetic_ids_map_to_distinct_internal_ids() {
1332 let a = org_public_id_from_internal(7);
1333 let b = org_public_id_from_internal(8);
1334 assert_ne!(a, b);
1335 assert_ne!(
1336 in_process_internal_org_id(&a),
1337 in_process_internal_org_id(&b)
1338 );
1339 }
1340
1341 #[test]
1342 fn high_entropy_uuid_style_id_hashes_into_reserved_range() {
1343 let high = "org_80000000000000000000000000000000";
1347 let mapped = in_process_internal_org_id(high);
1348 assert!(mapped >= 2, "mapped id {mapped} must be >= 2");
1349 assert_ne!(mapped, DEFAULT_ORG_ID);
1350
1351 assert_eq!(mapped, in_process_internal_org_id(high));
1353 }
1354
1355 #[test]
1356 fn high_entropy_ids_are_isolated_from_each_other() {
1357 let a = in_process_internal_org_id("org_80000000000000000000000000000001");
1358 let b = in_process_internal_org_id("org_80000000000000000000000000000002");
1359 assert_ne!(a, b);
1360 assert_ne!(a, DEFAULT_ORG_ID);
1361 assert_ne!(b, DEFAULT_ORG_ID);
1362 }
1363
1364 #[test]
1365 fn hash_uses_stable_sha256_truncation() {
1366 let mapped = in_process_internal_org_id("org_80000000000000000000000000000000");
1370 let expected = {
1371 let digest = sha2::Sha256::digest(b"org_80000000000000000000000000000000");
1372 let mut buf = [0u8; 8];
1373 buf.copy_from_slice(&digest[..8]);
1374 let raw = u64::from_be_bytes(buf);
1375 ((raw % ((i64::MAX - 1) as u64)) as i64) + 2
1376 };
1377 assert_eq!(mapped, expected);
1378 }
1379
1380 #[test]
1381 fn oversize_input_is_bounded_and_does_not_collide_silently() {
1382 let oversize = "x".repeat(super::HASH_INPUT_CAP_BYTES * 4);
1388 let mapped = in_process_internal_org_id(&oversize);
1389 assert!(mapped >= 2);
1390 assert_ne!(mapped, DEFAULT_ORG_ID);
1391 }
1392}