1use crate::backends::{
6 EventBus, RuntimeAgentStore, RuntimeBackends, RuntimeHarnessStore, RuntimeMessageStore,
7 RuntimeProviderStore, RuntimeSessionStore,
8};
9use crate::builders::SingleSessionBuilder;
10use crate::host::{
11 RuntimeHostAdapter, RuntimeHostTurnContext, RuntimeSessionLifecycle, execute_act_activity,
12 execute_input_activity, execute_reason_activity,
13};
14use crate::in_memory::{InMemorySessionFileStore, InMemorySessionFileSystemFactory};
15use async_trait::async_trait;
16use everruns_core::agent::Agent;
17use everruns_core::atoms::{ActInput, AtomContext, InputAtomInput, ReasonInput};
18use everruns_core::capabilities::{Capability, CapabilityRegistry};
19use everruns_core::config_layer::AgentConfigOverlay;
20use everruns_core::error::{AgentLoopError, Result};
21use everruns_core::events::{
22 Event, EventContext, EventData, EventRequest, InputMessageData, OutputMessageCompletedData,
23 ToolCompletedData,
24};
25use everruns_core::harness::Harness;
26use everruns_core::llm_driver_registry::{DriverRegistry, ProviderType};
27use everruns_core::llm_models::LlmProviderType;
28use everruns_core::llmsim_driver::{LlmSimConfig, LlmSimDriver};
29use everruns_core::message::{ContentPart, Message};
30use everruns_core::platform_definition::PlatformDefinition;
31use everruns_core::runtime_context::{AssembledTurnContext, inspect_turn_context};
32use everruns_core::session::{Session, SessionStatus};
33use everruns_core::session_file::{InitialFile, SessionFile};
34use everruns_core::tools::ToolResultImage;
35use everruns_core::traits::{
36 AgentStore, EventEmitter, HarnessStore, LlmProviderStore, ModelWithProvider, SessionMutator,
37 SessionStorageStore, SessionStore, UserConnectionResolver,
38};
39use everruns_core::turn::{TurnAction, TurnContext, TurnOutcome, TurnStateMachine};
40use everruns_core::typed_id::{AgentId, HarnessId, OrgId, SessionId};
41use everruns_core::{
42 InputMessage, MessageRetriever, SessionFileSystem, SessionFileSystemFactoryContext,
43};
44use sha2::{Digest, Sha256};
45use std::sync::Arc;
46
47const HASH_INPUT_CAP_BYTES: usize = 128;
53
54fn in_process_internal_org_id(public_org_id: &str) -> i64 {
64 if public_org_id == everruns_core::DEFAULT_ORG_PUBLIC_ID {
65 return everruns_core::DEFAULT_ORG_ID;
66 }
67
68 let Ok(parsed) = public_org_id.parse::<OrgId>() else {
69 return hash_public_org_id(public_org_id);
70 };
71 let raw: u128 = parsed.uuid().as_u128();
72 if raw == 0 {
73 return hash_public_org_id(public_org_id);
74 }
75
76 if raw <= i64::MAX as u128 {
79 return raw as i64;
80 }
81
82 hash_public_org_id(public_org_id)
83}
84
85fn hash_public_org_id(public_org_id: &str) -> i64 {
90 let bytes = public_org_id.as_bytes();
91 let bounded = &bytes[..bytes.len().min(HASH_INPUT_CAP_BYTES)];
92 let digest = Sha256::digest(bounded);
93 let mut buf = [0u8; 8];
94 buf.copy_from_slice(&digest[..8]);
95 let raw = u64::from_be_bytes(buf);
96 ((raw % ((i64::MAX - 1) as u64)) as i64) + 2
97}
98
99#[derive(Debug, Clone)]
100pub struct TurnResult {
101 pub response: String,
103 pub iterations: usize,
105 pub tool_calls_count: usize,
107 pub success: bool,
109 pub error: Option<String>,
111 pub turn_id: everruns_core::typed_id::TurnId,
113}
114
115impl TurnResult {
116 fn from_outcome(outcome: TurnOutcome, turn_id: everruns_core::typed_id::TurnId) -> Self {
117 match outcome {
118 TurnOutcome::Success {
119 response,
120 iterations,
121 tool_calls_count,
122 } => Self {
123 response,
124 iterations,
125 tool_calls_count,
126 success: true,
127 error: None,
128 turn_id,
129 },
130 TurnOutcome::Failed { error, iterations } => Self {
131 response: String::new(),
132 iterations,
133 tool_calls_count: 0,
134 success: false,
135 error: Some(error),
136 turn_id,
137 },
138 TurnOutcome::MaxIterationsReached {
139 response,
140 iterations,
141 tool_calls_count,
142 } => Self {
143 response,
144 iterations,
145 tool_calls_count,
146 success: true,
147 error: None,
148 turn_id,
149 },
150 }
151 }
152}
153
154pub struct InProcessRuntimeBuilder {
164 platform_definition: PlatformDefinition,
165 llm_sim_config: Option<LlmSimConfig>,
166 default_model: Option<ModelWithProvider>,
167 backends: Option<RuntimeBackends>,
168 session_file_system_factory_context: SessionFileSystemFactoryContext,
169 harnesses: Vec<Harness>,
170 agents: Vec<Agent>,
171 sessions: Vec<Session>,
172 default_session_id: Option<SessionId>,
173 seeded_files: Vec<(SessionId, InitialFile)>,
174 mcp_auth_provider: Option<Arc<dyn everruns_mcp::McpAuthProvider>>,
175}
176
177impl Default for InProcessRuntimeBuilder {
178 fn default() -> Self {
179 Self::new()
180 }
181}
182
183impl InProcessRuntimeBuilder {
184 pub fn new() -> Self {
191 Self {
192 platform_definition: PlatformDefinition::builder()
193 .capability_registry(CapabilityRegistry::with_builtins())
194 .driver_registry(DriverRegistry::new())
195 .session_file_system_factory(Arc::new(InMemorySessionFileSystemFactory))
196 .build(),
197 llm_sim_config: None,
198 default_model: None,
199 backends: None,
200 session_file_system_factory_context: SessionFileSystemFactoryContext::new(),
201 harnesses: Vec::new(),
202 agents: Vec::new(),
203 sessions: Vec::new(),
204 default_session_id: None,
205 seeded_files: Vec::new(),
206 mcp_auth_provider: None,
207 }
208 }
209
210 pub fn mcp_auth_provider(mut self, provider: Arc<dyn everruns_mcp::McpAuthProvider>) -> Self {
214 self.mcp_auth_provider = Some(provider);
215 self
216 }
217
218 pub fn platform_definition(mut self, platform_definition: PlatformDefinition) -> Self {
220 self.platform_definition = platform_definition;
221 self
222 }
223
224 pub fn capability<C: Capability + 'static>(mut self, capability: C) -> Self {
226 self.platform_definition
227 .capability_registry_mut()
228 .register(capability);
229 self
230 }
231
232 pub fn driver_registry(mut self, driver_registry: DriverRegistry) -> Self {
234 *self.platform_definition.driver_registry_mut() = driver_registry;
235 self
236 }
237
238 pub fn llm_sim(mut self, config: LlmSimConfig) -> Self {
240 self.llm_sim_config = Some(config);
241 self
242 }
243
244 pub fn default_model(mut self, model: ModelWithProvider) -> Self {
246 self.default_model = Some(model);
247 self
248 }
249
250 pub fn backends(mut self, backends: RuntimeBackends) -> Self {
252 self.backends = Some(backends);
253 self
254 }
255
256 pub fn session_file_system_factory_context(
258 mut self,
259 context: SessionFileSystemFactoryContext,
260 ) -> Self {
261 self.session_file_system_factory_context = context;
262 self
263 }
264
265 pub fn harness(mut self, harness: Harness) -> Self {
267 self.harnesses.push(harness);
268 self
269 }
270
271 pub fn agent(mut self, agent: Agent) -> Self {
273 self.agents.push(agent);
274 self
275 }
276
277 pub fn session(mut self, session: Session) -> Self {
279 self.sessions.push(session);
280 self
281 }
282
283 pub fn single_session<F>(mut self, configure: F) -> Self
288 where
289 F: FnOnce(SingleSessionBuilder) -> SingleSessionBuilder,
290 {
291 let (harness, agent, session, session_id) =
292 configure(SingleSessionBuilder::default()).build();
293 self.harnesses.push(harness);
294 self.agents.push(agent);
295 self.sessions.push(session);
296 self.default_session_id = Some(session_id);
297 self
298 }
299
300 pub fn seed_text_file(
304 mut self,
305 session_id: SessionId,
306 path: impl Into<String>,
307 content: impl Into<String>,
308 ) -> Self {
309 self.seeded_files.push((
310 session_id,
311 InitialFile {
312 path: path.into(),
313 content: content.into(),
314 encoding: "text".to_string(),
315 is_readonly: false,
316 },
317 ));
318 self
319 }
320
321 pub async fn build(mut self) -> Result<InProcessRuntime> {
326 let backends = match self.backends.take() {
327 Some(backends) => backends,
328 None => RuntimeBackends::in_memory(),
329 };
330 let file_store = resolve_session_file_system(
331 &self.platform_definition,
332 self.session_file_system_factory_context.clone(),
333 )
334 .await?;
335
336 if let Some(config) = self.llm_sim_config.take() {
337 let driver = LlmSimDriver::new(config);
338 self.platform_definition
339 .driver_registry_mut()
340 .register(ProviderType::LlmSim, move |_api_key, _base_url| {
341 Box::new(driver.clone())
342 });
343
344 if self.default_model.is_none() {
345 self.default_model = Some(ModelWithProvider {
346 model: "llmsim-model".to_string(),
347 provider_type: LlmProviderType::LlmSim,
348 api_key: Some("fake-key".to_string()),
349 base_url: None,
350 });
351 }
352 }
353
354 let default_model = self.default_model.ok_or_else(|| {
355 AgentLoopError::config(
356 "in-process runtime requires a default model; call \
357 InProcessRuntimeBuilder::default_model(...) or \
358 InProcessRuntimeBuilder::llm_sim(...)",
359 )
360 })?;
361
362 backends
363 .provider_store
364 .set_default_model(default_model)
365 .await?;
366
367 for harness in &self.harnesses {
368 backends.harness_store.add_harness(harness.clone()).await?;
369 }
370 for agent in &self.agents {
371 backends.agent_store.add_agent(agent.clone()).await?;
372 }
373 for session in &self.sessions {
374 backends.session_store.add_session(session.clone()).await?;
375 }
376
377 for session in &self.sessions {
378 seed_runtime_initial_files(
379 backends.harness_store.as_ref(),
380 backends.agent_store.as_ref(),
381 file_store.as_ref(),
382 session,
383 )
384 .await?;
385 }
386
387 for (session_id, file) in &self.seeded_files {
388 file_store.seed_initial_file(*session_id, file).await?;
389 }
390
391 let persisting_emitter =
392 PersistingEventEmitter::new(backends.event_bus.clone(), backends.message_store.clone());
393
394 Ok(InProcessRuntime {
395 platform_definition: Arc::new(self.platform_definition),
396 harness_store: backends.harness_store,
397 agent_store: backends.agent_store,
398 session_store: backends.session_store,
399 default_session_id: self.default_session_id,
400 message_store: backends.message_store,
401 provider_store: backends.provider_store,
402 event_bus: backends.event_bus,
403 persisting_emitter,
404 file_store,
405 storage_store: backends.storage_store,
406 connection_resolver: backends.connection_resolver,
407 mcp_auth_provider: self
408 .mcp_auth_provider
409 .unwrap_or_else(|| Arc::new(everruns_mcp::NoAuthProvider)),
410 mcp_discovery_cache: Arc::new(crate::mcp_cache::McpDiscoveryCache::new()),
411 })
412 }
413}
414
415async fn resolve_session_file_system(
416 platform_definition: &PlatformDefinition,
417 file_system_factory_context: SessionFileSystemFactoryContext,
418) -> Result<Arc<dyn SessionFileSystem>> {
419 let file_system_factory = platform_definition.session_file_system_factory();
420 if file_system_factory.is_disabled() {
421 Ok(Arc::new(InMemorySessionFileStore::new()))
422 } else {
423 Ok(file_system_factory
424 .create_session_file_system(file_system_factory_context)
425 .await?)
426 }
427}
428
429#[derive(Clone)]
430pub struct InProcessRuntime {
436 platform_definition: Arc<PlatformDefinition>,
437 harness_store: Arc<dyn RuntimeHarnessStore>,
438 agent_store: Arc<dyn RuntimeAgentStore>,
439 session_store: Arc<dyn RuntimeSessionStore>,
440 default_session_id: Option<SessionId>,
441 message_store: Arc<dyn RuntimeMessageStore>,
442 provider_store: Arc<dyn RuntimeProviderStore>,
443 event_bus: Arc<dyn EventBus>,
444 persisting_emitter: PersistingEventEmitter,
445 file_store: Arc<dyn SessionFileSystem>,
446 storage_store: Arc<dyn SessionStorageStore>,
447 connection_resolver: Option<Arc<dyn UserConnectionResolver>>,
448 mcp_auth_provider: Arc<dyn everruns_mcp::McpAuthProvider>,
449 mcp_discovery_cache: Arc<crate::mcp_cache::McpDiscoveryCache>,
450}
451
452impl InProcessRuntime {
453 fn mcp_client(&self) -> Arc<everruns_mcp::McpClient> {
456 Arc::new(everruns_mcp::McpClient::new(
457 self.platform_definition.egress_service(),
458 self.mcp_auth_provider.clone(),
459 ))
460 }
461
462 async fn session_mcp_servers(
464 &self,
465 session: &Session,
466 agent: Option<&Agent>,
467 ) -> everruns_core::ScopedMcpServers {
468 let harness_chain = self
469 .harness_store
470 .get_harness_chain(session.harness_id)
471 .await
472 .unwrap_or_default();
473 crate::mcp::merge_session_scoped_servers(&harness_chain, agent, session)
474 }
475 pub fn builder() -> InProcessRuntimeBuilder {
477 InProcessRuntimeBuilder::new()
478 }
479
480 pub fn default_session_id(&self) -> Option<SessionId> {
483 self.default_session_id
484 }
485
486 pub async fn run_turn(
492 &self,
493 session_id: SessionId,
494 input: impl Into<InputMessage>,
495 ) -> Result<TurnResult> {
496 let session = self
497 .session_store
498 .get_session(session_id)
499 .await?
500 .ok_or_else(|| AgentLoopError::store(format!("session not found: {session_id}")))?;
501
502 let input_message = self
507 .message_store
508 .add_input_message(session_id, input.into())
509 .await?;
510 self.event_bus
511 .emit(EventRequest::new(
512 session_id,
513 EventContext::empty(),
514 InputMessageData::new(input_message.clone()),
515 ))
516 .await?;
517
518 let assembled = self
519 .inspect_context_with_ids(session_id, session.harness_id, session.agent_id)
520 .await?;
521 let synthetic_agent_id = session
522 .agent_id
523 .unwrap_or_else(|| AgentId::from_uuid(session.id.uuid()));
524 let org_id = in_process_internal_org_id(&session.organization_id);
525 let mut state_machine = TurnStateMachine::new(
526 TurnContext::new(session_id, input_message.id, synthetic_agent_id, org_id),
527 assembled.runtime_agent.max_iterations,
528 );
529
530 let mut previous_response_id: Option<String> = None;
531 let mut last_reason_result: Option<everruns_core::ReasonResult> = None;
532
533 loop {
534 match state_machine.next_action() {
535 TurnAction::ExecuteInput => {
536 let ctx = state_machine.context();
537 let base_context =
538 AtomContext::new(ctx.session_id, ctx.turn_id, ctx.input_message_id);
539 execute_input_activity(
540 self,
541 org_id,
542 InputAtomInput {
543 context: base_context,
544 },
545 )
546 .await?;
547 state_machine.on_input_completed();
548 }
549 TurnAction::ExecuteReason => {
550 let ctx = state_machine.context();
551 let base_context =
552 AtomContext::new(ctx.session_id, ctx.turn_id, ctx.input_message_id);
553 let reason_result = execute_reason_activity(
554 self,
555 org_id,
556 ReasonInput {
557 context: base_context.next_exec(),
558 harness_id: session.harness_id,
559 agent_id: session.agent_id,
560 org_id,
561 mcp_tool_definitions: vec![],
562 previous_response_id: previous_response_id.take(),
563 iteration: state_machine.current_iteration() as u32 + 1,
564 },
565 )
566 .await?;
567 previous_response_id = reason_result.response_id.clone();
568 state_machine.on_reason_completed(
569 reason_result.text.clone(),
570 reason_result.has_tool_calls,
571 reason_result.tool_calls.len(),
572 reason_result.success,
573 reason_result.error.clone(),
574 false,
575 );
576 if reason_result.has_tool_calls {
577 last_reason_result = Some(reason_result);
578 }
579 }
580 TurnAction::ExecuteAct => {
581 let reason_result = last_reason_result
582 .take()
583 .expect("ExecuteAct requires a prior ReasonResult");
584 let ctx = state_machine.context();
585 let base_context =
586 AtomContext::new(ctx.session_id, ctx.turn_id, ctx.input_message_id);
587 execute_act_activity(
588 self,
589 ActInput {
590 org_id: Some(org_id),
591 context: base_context.next_exec(),
592 harness_id: session.harness_id,
593 agent_id: session.agent_id,
594 tool_calls: reason_result.tool_calls,
595 tool_definitions: reason_result.tool_definitions,
596 locale: reason_result.locale,
597 blueprint_id: None,
598 network_access: reason_result.network_access,
599 parallel_tool_calls: None,
603 },
604 )
605 .await?;
606 state_machine.on_act_completed();
607 }
608 TurnAction::Complete(outcome) => {
609 let ctx = state_machine.context();
610 let lifecycle =
611 RuntimeSessionLifecycle::new(self.clone(), org_id, ctx.session_id);
612 let turn_succeeded = matches!(
613 &outcome,
614 TurnOutcome::Success { .. } | TurnOutcome::MaxIterationsReached { .. }
615 );
616 match &outcome {
617 TurnOutcome::Success { iterations, .. }
618 | TurnOutcome::MaxIterationsReached { iterations, .. } => {
619 lifecycle
620 .turn_completed(
621 ctx.turn_id,
622 ctx.input_message_id,
623 *iterations as u32,
624 None,
625 None,
626 )
627 .await;
628 }
629 TurnOutcome::Failed { error, .. } => {
630 lifecycle
631 .turn_failed(ctx.turn_id, ctx.input_message_id, error, None)
632 .await;
633 }
634 }
635 lifecycle
638 .fire_turn_end_hooks(
639 session.harness_id,
640 session.agent_id,
641 ctx.turn_id,
642 turn_succeeded,
643 )
644 .await;
645 return Ok(TurnResult::from_outcome(outcome, ctx.turn_id));
646 }
647 }
648 }
649 }
650
651 pub async fn run_text_turn(
652 &self,
653 session_id: SessionId,
654 text: impl Into<String>,
655 ) -> Result<TurnResult> {
656 self.run_turn(session_id, InputMessage::user(text)).await
657 }
658
659 pub async fn messages(&self, session_id: SessionId) -> Result<Vec<Message>> {
661 self.message_store.load(session_id).await
662 }
663
664 pub async fn read_file(
666 &self,
667 session_id: SessionId,
668 path: &str,
669 ) -> Result<Option<SessionFile>> {
670 self.file_store.read_file(session_id, path).await
671 }
672
673 pub async fn load_context(&self, session_id: SessionId) -> Result<AssembledTurnContext> {
675 let session = self
676 .session_store
677 .get_session(session_id)
678 .await?
679 .ok_or_else(|| AgentLoopError::store(format!("session not found: {session_id}")))?;
680 self.inspect_context_with_ids(session_id, session.harness_id, session.agent_id)
681 .await
682 }
683
684 pub async fn events(&self) -> Result<Vec<Event>> {
689 Ok(self.event_bus.collected_events().await)
690 }
691
692 pub async fn execute_command(
702 &self,
703 session_id: SessionId,
704 request: everruns_core::command::ExecuteCommandRequest,
705 ) -> Result<everruns_core::command::CommandResult> {
706 let ctx = self.load_context(session_id).await?;
707 let registry = self.platform_definition.capability_registry();
708 let host = everruns_core::command_host::StoreCommandHost::new(
712 session_id,
713 self.harness_store.clone(),
714 self.agent_store.clone(),
715 self.session_store.clone(),
716 self.message_store.clone(),
717 self.provider_store.clone(),
718 registry.clone(),
719 self.platform_definition.driver_registry().clone(),
720 )
721 .with_file_store(self.file_store.clone())
722 .with_assembled_context(ctx.clone());
723 let exec_ctx =
724 everruns_core::command::CommandExecutionContext::new(session_id, Arc::new(host));
725 for config in &ctx.resolved_capability_configs {
726 let Some(capability) = registry.get(config.capability_id()) else {
727 continue;
728 };
729 if capability.commands().iter().any(|c| c.name == request.name) {
730 return capability.execute_command(&request, &exec_ctx).await;
731 }
732 }
733 Err(AgentLoopError::config(format!(
734 "no capability declares command /{}",
735 request.name
736 )))
737 }
738
739 pub async fn list_commands(
749 &self,
750 session_id: SessionId,
751 ) -> Result<Vec<everruns_core::command::CommandDescriptor>> {
752 let ctx = self.load_context(session_id).await?;
753 let registry = self.platform_definition.capability_registry();
754 let mut seen = std::collections::HashSet::new();
755 let mut commands = Vec::new();
756 for config in &ctx.resolved_capability_configs {
757 let Some(capability) = registry.get(config.capability_id()) else {
758 continue;
759 };
760 for command in capability.commands() {
761 if seen.insert(command.name.clone()) {
762 commands.push(command);
763 }
764 }
765 }
766 Ok(commands)
767 }
768
769 async fn inspect_context_with_ids(
770 &self,
771 session_id: SessionId,
772 harness_id: everruns_core::HarnessId,
773 agent_id: Option<AgentId>,
774 ) -> Result<AssembledTurnContext> {
775 inspect_turn_context(
776 self.harness_store.as_ref(),
777 self.agent_store.as_ref(),
778 self.session_store.as_ref(),
779 self.message_store.as_ref(),
780 self.provider_store.as_ref(),
781 self.platform_definition.capability_registry(),
782 session_id,
783 harness_id,
784 agent_id,
785 &[],
786 Some(self.file_store.clone()),
787 )
788 .await
789 }
790}
791
792#[async_trait]
793impl RuntimeHostAdapter for InProcessRuntime {
794 async fn get_agent(&self, _org_id: i64, agent_id: AgentId) -> Result<Option<Agent>> {
795 self.agent_store.get_agent(agent_id).await
796 }
797
798 async fn get_harness(&self, _org_id: i64, harness_id: HarnessId) -> Result<Option<Harness>> {
799 let chain = self.harness_store.get_harness_chain(harness_id).await?;
800 Ok(chain.into_iter().last())
801 }
802
803 async fn set_session_status(
804 &self,
805 _org_id: i64,
806 session_id: SessionId,
807 _status: SessionStatus,
808 ) -> Result<Session> {
809 self.session_store
813 .get_session(session_id)
814 .await?
815 .ok_or_else(|| AgentLoopError::store(format!("session not found: {session_id}")))
816 }
817
818 async fn load_turn_context(
819 &self,
820 _org_id: i64,
821 session_id: SessionId,
822 ) -> Result<RuntimeHostTurnContext> {
823 let session = self
824 .session_store
825 .get_session(session_id)
826 .await?
827 .ok_or_else(|| AgentLoopError::store(format!("session not found: {session_id}")))?;
828 let agent = match session.agent_id {
829 Some(agent_id) => self.agent_store.get_agent(agent_id).await?,
830 None => None,
831 };
832 let messages = self.message_store.load(session_id).await?;
833 let model = self.provider_store.get_default_model().await?;
834
835 let scoped_servers = self.session_mcp_servers(&session, agent.as_ref()).await;
838 let mcp_tool_definitions = if scoped_servers.is_empty() {
839 vec![]
840 } else {
841 crate::mcp::discover_tool_definitions(
842 &self.mcp_discovery_cache,
843 self.mcp_client(),
844 session_id.uuid(),
845 &scoped_servers,
846 )
847 .await
848 };
849
850 Ok(RuntimeHostTurnContext {
851 agent,
852 session,
853 messages,
854 model,
855 mcp_tool_definitions,
856 })
857 }
858
859 async fn mcp_executor(
860 &self,
861 _org_id: i64,
862 session_id: SessionId,
863 ) -> Option<Arc<everruns_mcp::McpExecutor>> {
864 let session = self.session_store.get_session(session_id).await.ok()??;
865 let agent = match session.agent_id {
866 Some(agent_id) => self.agent_store.get_agent(agent_id).await.ok().flatten(),
867 None => None,
868 };
869 let scoped_servers = self.session_mcp_servers(&session, agent.as_ref()).await;
870 crate::mcp::build_executor(self.mcp_client(), &scoped_servers)
871 }
872
873 fn capability_registry(&self) -> CapabilityRegistry {
874 self.platform_definition.capability_registry().clone()
875 }
876
877 fn driver_registry(&self) -> DriverRegistry {
878 self.platform_definition.driver_registry().clone()
879 }
880
881 fn harness_store(&self, _org_id: i64) -> Arc<dyn HarnessStore> {
882 self.harness_store.clone()
883 }
884
885 fn agent_store(&self, _org_id: i64) -> Arc<dyn AgentStore> {
886 self.agent_store.clone()
887 }
888
889 fn session_store(&self, _org_id: i64) -> Arc<dyn SessionStore> {
890 self.session_store.clone()
891 }
892
893 fn session_mutator(&self, _org_id: i64) -> Arc<dyn SessionMutator> {
894 self.session_store.clone()
895 }
896
897 fn provider_store(&self, _org_id: i64) -> Arc<dyn LlmProviderStore> {
898 self.provider_store.clone()
899 }
900
901 fn message_store(&self) -> Arc<dyn MessageRetriever> {
902 self.message_store.clone()
903 }
904
905 fn event_emitter(&self) -> Arc<dyn EventEmitter> {
906 Arc::new(self.persisting_emitter.clone())
907 }
908
909 fn file_store(&self) -> Arc<dyn SessionFileSystem> {
910 self.file_store.clone()
911 }
912
913 fn storage_store(&self) -> Option<Arc<dyn SessionStorageStore>> {
914 Some(self.storage_store.clone())
915 }
916
917 fn connection_resolver(&self) -> Option<Arc<dyn UserConnectionResolver>> {
918 self.connection_resolver.clone()
919 }
920
921 fn utility_llm_service(&self) -> Option<Arc<dyn everruns_core::UtilityLlmService>> {
922 Some(self.platform_definition.utility_llm_service())
923 }
924
925 fn egress_service(&self) -> Option<Arc<dyn everruns_core::EgressService>> {
926 Some(self.platform_definition.egress_service())
927 }
928}
929
930#[derive(Clone)]
931struct PersistingEventEmitter {
932 inner: Arc<dyn EventBus>,
933 message_store: Arc<dyn RuntimeMessageStore>,
934}
935
936impl PersistingEventEmitter {
937 fn new(inner: Arc<dyn EventBus>, message_store: Arc<dyn RuntimeMessageStore>) -> Self {
938 Self {
939 inner,
940 message_store,
941 }
942 }
943}
944
945#[async_trait]
946impl EventEmitter for PersistingEventEmitter {
947 async fn emit(&self, request: EventRequest) -> Result<Event> {
948 let event = self.inner.emit(request.clone()).await?;
949 if let Some(message) = message_from_event(&event.data) {
950 self.message_store
951 .store_message(request.session_id, message)
952 .await?;
953 }
954 Ok(event)
955 }
956}
957
958fn effective_overlay(
959 harness_chain: &[Harness],
960 agent: Option<&Agent>,
961 session: &Session,
962) -> AgentConfigOverlay {
963 let harness_layers = harness_chain.iter().map(AgentConfigOverlay::from);
964 let agent_layers = agent.into_iter().map(AgentConfigOverlay::from);
965 AgentConfigOverlay::fold(
966 harness_layers
967 .chain(agent_layers)
968 .chain([AgentConfigOverlay::from(session)]),
969 )
970}
971
972async fn seed_runtime_initial_files(
973 harness_store: &dyn RuntimeHarnessStore,
974 agent_store: &dyn RuntimeAgentStore,
975 file_store: &dyn SessionFileSystem,
976 session: &Session,
977) -> Result<()> {
978 let harness_chain = harness_store.get_harness_chain(session.harness_id).await?;
979 if harness_chain.is_empty() {
980 return Err(AgentLoopError::store(format!(
981 "harness not found while seeding files: {}",
982 session.harness_id
983 )));
984 }
985 let agent = match session.agent_id {
986 Some(agent_id) => Some(
987 agent_store
988 .get_agent(agent_id)
989 .await?
990 .ok_or_else(|| AgentLoopError::store(format!("agent not found: {agent_id}")))?,
991 ),
992 None => None,
993 };
994 let overlay = effective_overlay(&harness_chain, agent.as_ref(), session);
995 for file in &overlay.initial_files {
996 file_store.seed_initial_file(session.id, file).await?;
997 }
998 Ok(())
999}
1000
1001fn message_from_event(data: &EventData) -> Option<Message> {
1002 match data {
1003 EventData::InputMessage(data) => Some(data.message.clone()),
1004 EventData::OutputMessageCompleted(OutputMessageCompletedData { message, .. }) => {
1005 Some(message.clone())
1006 }
1007 EventData::ToolCompleted(data) => Some(tool_completed_to_message(data.clone())),
1008 _ => None,
1009 }
1010}
1011
1012fn tool_completed_to_message(data: ToolCompletedData) -> Message {
1013 let mut images: Vec<ToolResultImage> = Vec::new();
1014 let metadata = tool_result_metadata(&data);
1015 let result = data.result.map(|parts| {
1016 for part in &parts {
1017 if let ContentPart::Image(img) = part
1018 && let (Some(base64), Some(media_type)) = (&img.base64, &img.media_type)
1019 {
1020 images.push(ToolResultImage {
1021 base64: base64.clone(),
1022 media_type: media_type.clone(),
1023 });
1024 }
1025 }
1026
1027 let text_parts: Vec<&ContentPart> = parts
1028 .iter()
1029 .filter(|part| matches!(part, ContentPart::Text(_)))
1030 .collect();
1031 if text_parts.len() == 1
1032 && let ContentPart::Text(text) = text_parts[0]
1033 {
1034 return parse_structured_tool_result_text(&text.text);
1035 }
1036 if !text_parts.is_empty() {
1037 serde_json::to_value(&text_parts).unwrap_or_default()
1038 } else {
1039 serde_json::Value::Null
1040 }
1041 });
1042
1043 let mut message = if images.is_empty() {
1044 Message::tool_result(&data.tool_call_id, result, data.error)
1045 } else {
1046 Message::tool_result_with_images(&data.tool_call_id, result, images)
1047 };
1048 message.metadata = metadata;
1049 message
1050}
1051
1052fn tool_result_metadata(
1053 data: &ToolCompletedData,
1054) -> Option<std::collections::HashMap<String, serde_json::Value>> {
1055 let mut metadata = std::collections::HashMap::new();
1056 metadata.insert("tool_name".to_string(), serde_json::json!(data.tool_name));
1057 if let Some(fingerprint) = &data.tool_call_fingerprint {
1058 metadata.insert(
1059 "tool_call_fingerprint".to_string(),
1060 serde_json::json!(fingerprint),
1061 );
1062 }
1063 if let Some(fingerprint) = &data.tool_result_fingerprint {
1064 metadata.insert(
1065 "tool_result_fingerprint".to_string(),
1066 serde_json::json!(fingerprint),
1067 );
1068 }
1069 (!metadata.is_empty()).then_some(metadata)
1070}
1071
1072fn parse_structured_tool_result_text(text: &str) -> serde_json::Value {
1073 let trimmed = text.trim_start();
1074 if !trimmed.starts_with('{') && !trimmed.starts_with('[') {
1075 return serde_json::Value::String(text.to_string());
1076 }
1077
1078 match serde_json::from_str(text) {
1079 Ok(value @ (serde_json::Value::Object(_) | serde_json::Value::Array(_))) => value,
1080 _ => serde_json::Value::String(text.to_string()),
1081 }
1082}
1083
1084#[cfg(test)]
1085mod tool_completed_replay_tests {
1086 use super::*;
1087
1088 #[test]
1089 fn tool_completed_replay_preserves_json_object_shape() {
1090 let data = ToolCompletedData::success(
1091 "call_read".to_string(),
1092 "read_file".to_string(),
1093 vec![ContentPart::text(
1094 serde_json::json!({
1095 "path": "/workspace/src/lib.rs",
1096 "content": "1|fn main() {}"
1097 })
1098 .to_string(),
1099 )],
1100 Some(1),
1101 );
1102
1103 let message = tool_completed_to_message(data);
1104 let result = message
1105 .tool_result_content()
1106 .and_then(|content| content.result.as_ref())
1107 .expect("tool result should be present");
1108
1109 assert_eq!(result["path"], "/workspace/src/lib.rs");
1110 assert_eq!(result["content"], "1|fn main() {}");
1111 }
1112
1113 #[test]
1114 fn tool_completed_replay_keeps_scalar_json_as_text() {
1115 let data = ToolCompletedData::success(
1116 "call_scalar".to_string(),
1117 "custom_tool".to_string(),
1118 vec![ContentPart::text("123")],
1119 Some(1),
1120 );
1121
1122 let message = tool_completed_to_message(data);
1123 let result = message
1124 .tool_result_content()
1125 .and_then(|content| content.result.as_ref())
1126 .expect("tool result should be present");
1127
1128 assert_eq!(result, &serde_json::Value::String("123".to_string()));
1129 }
1130
1131 #[test]
1132 fn tool_completed_replay_preserves_fingerprints_as_metadata() {
1133 let data = ToolCompletedData::success(
1134 "call_read".to_string(),
1135 "read_file".to_string(),
1136 vec![ContentPart::text("{}")],
1137 Some(1),
1138 )
1139 .with_fingerprints("sha256:call".to_string(), "sha256:result".to_string());
1140
1141 let message = tool_completed_to_message(data);
1142 let metadata = message.metadata.expect("metadata should be present");
1143
1144 assert_eq!(metadata["tool_name"], "read_file");
1145 assert_eq!(metadata["tool_call_fingerprint"], "sha256:call");
1146 assert_eq!(metadata["tool_result_fingerprint"], "sha256:result");
1147 }
1148}
1149
1150#[cfg(test)]
1151mod org_id_mapping_tests {
1152 use super::*;
1153 use everruns_core::{DEFAULT_ORG_ID, DEFAULT_ORG_PUBLIC_ID, org_public_id_from_internal};
1154
1155 #[test]
1156 fn default_public_id_maps_to_default_org() {
1157 assert_eq!(
1158 in_process_internal_org_id(DEFAULT_ORG_PUBLIC_ID),
1159 DEFAULT_ORG_ID
1160 );
1161 }
1162
1163 #[test]
1164 fn invalid_public_id_does_not_fall_back_to_default() {
1165 for invalid in [
1166 "",
1167 "not-an-org",
1168 "org_short",
1169 "org_ZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZ",
1170 "ORG_00000000000000000000000000000001",
1171 ] {
1172 let mapped = in_process_internal_org_id(invalid);
1173 assert_ne!(mapped, everruns_core::DEFAULT_ORG_ID);
1174 assert!(
1175 mapped >= 2,
1176 "invalid input {invalid:?} should not map to default"
1177 );
1178 }
1179 }
1180
1181 #[test]
1182 fn zero_public_id_does_not_fall_back_to_default() {
1183 let mapped = in_process_internal_org_id("org_00000000000000000000000000000000");
1186 assert_ne!(mapped, everruns_core::DEFAULT_ORG_ID);
1187 assert!(mapped >= 2, "all-zero id should not map to default");
1188 }
1189
1190 #[test]
1191 fn synthetic_public_id_round_trips_with_internal_helper() {
1192 for internal in [1_i64, 2, 42, 1_000_000, i64::MAX - 1, i64::MAX] {
1193 let public = org_public_id_from_internal(internal);
1194 assert_eq!(
1195 in_process_internal_org_id(&public),
1196 internal,
1197 "round-trip failed for internal={internal}"
1198 );
1199 }
1200 }
1201
1202 #[test]
1203 fn distinct_synthetic_ids_map_to_distinct_internal_ids() {
1204 let a = org_public_id_from_internal(7);
1205 let b = org_public_id_from_internal(8);
1206 assert_ne!(a, b);
1207 assert_ne!(
1208 in_process_internal_org_id(&a),
1209 in_process_internal_org_id(&b)
1210 );
1211 }
1212
1213 #[test]
1214 fn high_entropy_uuid_style_id_hashes_into_reserved_range() {
1215 let high = "org_80000000000000000000000000000000";
1219 let mapped = in_process_internal_org_id(high);
1220 assert!(mapped >= 2, "mapped id {mapped} must be >= 2");
1221 assert_ne!(mapped, DEFAULT_ORG_ID);
1222
1223 assert_eq!(mapped, in_process_internal_org_id(high));
1225 }
1226
1227 #[test]
1228 fn high_entropy_ids_are_isolated_from_each_other() {
1229 let a = in_process_internal_org_id("org_80000000000000000000000000000001");
1230 let b = in_process_internal_org_id("org_80000000000000000000000000000002");
1231 assert_ne!(a, b);
1232 assert_ne!(a, DEFAULT_ORG_ID);
1233 assert_ne!(b, DEFAULT_ORG_ID);
1234 }
1235
1236 #[test]
1237 fn hash_uses_stable_sha256_truncation() {
1238 let mapped = in_process_internal_org_id("org_80000000000000000000000000000000");
1242 let expected = {
1243 let digest = sha2::Sha256::digest(b"org_80000000000000000000000000000000");
1244 let mut buf = [0u8; 8];
1245 buf.copy_from_slice(&digest[..8]);
1246 let raw = u64::from_be_bytes(buf);
1247 ((raw % ((i64::MAX - 1) as u64)) as i64) + 2
1248 };
1249 assert_eq!(mapped, expected);
1250 }
1251
1252 #[test]
1253 fn oversize_input_is_bounded_and_does_not_collide_silently() {
1254 let oversize = "x".repeat(super::HASH_INPUT_CAP_BYTES * 4);
1260 let mapped = in_process_internal_org_id(&oversize);
1261 assert!(mapped >= 2);
1262 assert_ne!(mapped, DEFAULT_ORG_ID);
1263 }
1264}