1use crate::backends::{
6 EventBus, RuntimeAgentStore, RuntimeBackends, RuntimeHarnessStore, RuntimeMessageStore,
7 RuntimeProviderStore, RuntimeSessionStore,
8};
9use crate::builders::SingleSessionBuilder;
10use crate::host::{
11 RuntimeHostAdapter, RuntimeHostTurnContext, RuntimeSessionLifecycle, execute_act_activity,
12 execute_input_activity, execute_reason_activity,
13};
14use crate::in_memory::{InMemorySessionFileStore, InMemorySessionFileSystemFactory};
15use async_trait::async_trait;
16use everruns_core::agent::Agent;
17use everruns_core::atoms::{ActInput, AtomContext, InputAtomInput, ReasonInput};
18use everruns_core::capabilities::{Capability, CapabilityRegistry};
19use everruns_core::config_layer::AgentConfigOverlay;
20use everruns_core::error::{AgentLoopError, Result};
21use everruns_core::events::{
22 Event, EventContext, EventData, EventRequest, InputMessageData, OutputMessageCompletedData,
23 ToolCompletedData,
24};
25use everruns_core::harness::Harness;
26use everruns_core::llm_driver_registry::{DriverRegistry, ProviderType};
27use everruns_core::llm_models::LlmProviderType;
28use everruns_core::llmsim_driver::{LlmSimConfig, LlmSimDriver};
29use everruns_core::message::{ContentPart, Message};
30use everruns_core::platform_definition::PlatformDefinition;
31use everruns_core::runtime_context::{AssembledTurnContext, inspect_turn_context};
32use everruns_core::session::{Session, SessionStatus};
33use everruns_core::session_file::{InitialFile, SessionFile};
34use everruns_core::tools::ToolResultImage;
35use everruns_core::traits::{
36 AgentStore, EventEmitter, HarnessStore, LlmProviderStore, ModelWithProvider, SessionMutator,
37 SessionStorageStore, SessionStore,
38};
39use everruns_core::turn::{TurnAction, TurnContext, TurnOutcome, TurnStateMachine};
40use everruns_core::typed_id::{AgentId, HarnessId, OrgId, SessionId};
41use everruns_core::{
42 InputMessage, MemoryStoreBackend, MessageRetriever, SessionFileSystem,
43 SessionFileSystemFactoryContext,
44};
45use sha2::{Digest, Sha256};
46use std::sync::Arc;
47
48const HASH_INPUT_CAP_BYTES: usize = 128;
54
55fn in_process_internal_org_id(public_org_id: &str) -> i64 {
65 if public_org_id == everruns_core::DEFAULT_ORG_PUBLIC_ID {
66 return everruns_core::DEFAULT_ORG_ID;
67 }
68
69 let Ok(parsed) = public_org_id.parse::<OrgId>() else {
70 return hash_public_org_id(public_org_id);
71 };
72 let raw: u128 = parsed.uuid().as_u128();
73 if raw == 0 {
74 return hash_public_org_id(public_org_id);
75 }
76
77 if raw <= i64::MAX as u128 {
80 return raw as i64;
81 }
82
83 hash_public_org_id(public_org_id)
84}
85
86fn hash_public_org_id(public_org_id: &str) -> i64 {
91 let bytes = public_org_id.as_bytes();
92 let bounded = &bytes[..bytes.len().min(HASH_INPUT_CAP_BYTES)];
93 let digest = Sha256::digest(bounded);
94 let mut buf = [0u8; 8];
95 buf.copy_from_slice(&digest[..8]);
96 let raw = u64::from_be_bytes(buf);
97 ((raw % ((i64::MAX - 1) as u64)) as i64) + 2
98}
99
100#[derive(Debug, Clone)]
101pub struct TurnResult {
102 pub response: String,
104 pub iterations: usize,
106 pub tool_calls_count: usize,
108 pub success: bool,
110 pub error: Option<String>,
112 pub turn_id: everruns_core::typed_id::TurnId,
114}
115
116impl TurnResult {
117 fn from_outcome(outcome: TurnOutcome, turn_id: everruns_core::typed_id::TurnId) -> Self {
118 match outcome {
119 TurnOutcome::Success {
120 response,
121 iterations,
122 tool_calls_count,
123 } => Self {
124 response,
125 iterations,
126 tool_calls_count,
127 success: true,
128 error: None,
129 turn_id,
130 },
131 TurnOutcome::Failed { error, iterations } => Self {
132 response: String::new(),
133 iterations,
134 tool_calls_count: 0,
135 success: false,
136 error: Some(error),
137 turn_id,
138 },
139 TurnOutcome::MaxIterationsReached {
140 response,
141 iterations,
142 tool_calls_count,
143 } => Self {
144 response,
145 iterations,
146 tool_calls_count,
147 success: true,
148 error: None,
149 turn_id,
150 },
151 }
152 }
153}
154
155pub struct InProcessRuntimeBuilder {
165 platform_definition: PlatformDefinition,
166 llm_sim_config: Option<LlmSimConfig>,
167 default_model: Option<ModelWithProvider>,
168 backends: Option<RuntimeBackends>,
169 session_file_system_factory_context: SessionFileSystemFactoryContext,
170 harnesses: Vec<Harness>,
171 agents: Vec<Agent>,
172 sessions: Vec<Session>,
173 default_session_id: Option<SessionId>,
174 seeded_files: Vec<(SessionId, InitialFile)>,
175 mcp_auth_provider: Option<Arc<dyn everruns_mcp::McpAuthProvider>>,
176}
177
178impl Default for InProcessRuntimeBuilder {
179 fn default() -> Self {
180 Self::new()
181 }
182}
183
184impl InProcessRuntimeBuilder {
185 pub fn new() -> Self {
192 Self {
193 platform_definition: PlatformDefinition::builder()
194 .capability_registry(CapabilityRegistry::with_builtins())
195 .driver_registry(DriverRegistry::new())
196 .session_file_system_factory(Arc::new(InMemorySessionFileSystemFactory))
197 .build(),
198 llm_sim_config: None,
199 default_model: None,
200 backends: None,
201 session_file_system_factory_context: SessionFileSystemFactoryContext::new(),
202 harnesses: Vec::new(),
203 agents: Vec::new(),
204 sessions: Vec::new(),
205 default_session_id: None,
206 seeded_files: Vec::new(),
207 mcp_auth_provider: None,
208 }
209 }
210
211 pub fn mcp_auth_provider(mut self, provider: Arc<dyn everruns_mcp::McpAuthProvider>) -> Self {
215 self.mcp_auth_provider = Some(provider);
216 self
217 }
218
219 pub fn platform_definition(mut self, platform_definition: PlatformDefinition) -> Self {
221 self.platform_definition = platform_definition;
222 self
223 }
224
225 pub fn capability<C: Capability + 'static>(mut self, capability: C) -> Self {
227 self.platform_definition
228 .capability_registry_mut()
229 .register(capability);
230 self
231 }
232
233 pub fn driver_registry(mut self, driver_registry: DriverRegistry) -> Self {
235 *self.platform_definition.driver_registry_mut() = driver_registry;
236 self
237 }
238
239 pub fn llm_sim(mut self, config: LlmSimConfig) -> Self {
241 self.llm_sim_config = Some(config);
242 self
243 }
244
245 pub fn default_model(mut self, model: ModelWithProvider) -> Self {
247 self.default_model = Some(model);
248 self
249 }
250
251 pub fn backends(mut self, backends: RuntimeBackends) -> Self {
253 self.backends = Some(backends);
254 self
255 }
256
257 pub fn session_file_system_factory_context(
259 mut self,
260 context: SessionFileSystemFactoryContext,
261 ) -> Self {
262 self.session_file_system_factory_context = context;
263 self
264 }
265
266 pub fn harness(mut self, harness: Harness) -> Self {
268 self.harnesses.push(harness);
269 self
270 }
271
272 pub fn agent(mut self, agent: Agent) -> Self {
274 self.agents.push(agent);
275 self
276 }
277
278 pub fn session(mut self, session: Session) -> Self {
280 self.sessions.push(session);
281 self
282 }
283
284 pub fn single_session<F>(mut self, configure: F) -> Self
289 where
290 F: FnOnce(SingleSessionBuilder) -> SingleSessionBuilder,
291 {
292 let (harness, agent, session, session_id) =
293 configure(SingleSessionBuilder::default()).build();
294 self.harnesses.push(harness);
295 self.agents.push(agent);
296 self.sessions.push(session);
297 self.default_session_id = Some(session_id);
298 self
299 }
300
301 pub fn seed_text_file(
305 mut self,
306 session_id: SessionId,
307 path: impl Into<String>,
308 content: impl Into<String>,
309 ) -> Self {
310 self.seeded_files.push((
311 session_id,
312 InitialFile {
313 path: path.into(),
314 content: content.into(),
315 encoding: "text".to_string(),
316 is_readonly: false,
317 },
318 ));
319 self
320 }
321
322 pub async fn build(mut self) -> Result<InProcessRuntime> {
327 let backends = match self.backends.take() {
328 Some(backends) => backends,
329 None => RuntimeBackends::in_memory(),
330 };
331 let file_store = resolve_session_file_system(
332 &self.platform_definition,
333 self.session_file_system_factory_context.clone(),
334 )
335 .await?;
336
337 if let Some(config) = self.llm_sim_config.take() {
338 let driver = LlmSimDriver::new(config);
339 self.platform_definition
340 .driver_registry_mut()
341 .register(ProviderType::LlmSim, move |_api_key, _base_url| {
342 Box::new(driver.clone())
343 });
344
345 if self.default_model.is_none() {
346 self.default_model = Some(ModelWithProvider {
347 model: "llmsim-model".to_string(),
348 provider_type: LlmProviderType::LlmSim,
349 api_key: Some("fake-key".to_string()),
350 base_url: None,
351 });
352 }
353 }
354
355 let default_model = self.default_model.ok_or_else(|| {
356 AgentLoopError::config(
357 "in-process runtime requires a default model; call \
358 InProcessRuntimeBuilder::default_model(...) or \
359 InProcessRuntimeBuilder::llm_sim(...)",
360 )
361 })?;
362
363 backends
364 .provider_store
365 .set_default_model(default_model)
366 .await?;
367
368 for harness in &self.harnesses {
369 backends.harness_store.add_harness(harness.clone()).await?;
370 }
371 for agent in &self.agents {
372 backends.agent_store.add_agent(agent.clone()).await?;
373 }
374 for session in &self.sessions {
375 backends.session_store.add_session(session.clone()).await?;
376 }
377
378 for session in &self.sessions {
379 seed_runtime_initial_files(
380 backends.harness_store.as_ref(),
381 backends.agent_store.as_ref(),
382 file_store.as_ref(),
383 session,
384 )
385 .await?;
386 }
387
388 for (session_id, file) in &self.seeded_files {
389 file_store.seed_initial_file(*session_id, file).await?;
390 }
391
392 let persisting_emitter =
393 PersistingEventEmitter::new(backends.event_bus.clone(), backends.message_store.clone());
394
395 Ok(InProcessRuntime {
396 platform_definition: Arc::new(self.platform_definition),
397 harness_store: backends.harness_store,
398 agent_store: backends.agent_store,
399 session_store: backends.session_store,
400 default_session_id: self.default_session_id,
401 message_store: backends.message_store,
402 provider_store: backends.provider_store,
403 event_bus: backends.event_bus,
404 persisting_emitter,
405 file_store,
406 storage_store: backends.storage_store,
407 memory_store: backends.memory_store,
408 mcp_auth_provider: self
409 .mcp_auth_provider
410 .unwrap_or_else(|| Arc::new(everruns_mcp::NoAuthProvider)),
411 })
412 }
413}
414
415async fn resolve_session_file_system(
416 platform_definition: &PlatformDefinition,
417 file_system_factory_context: SessionFileSystemFactoryContext,
418) -> Result<Arc<dyn SessionFileSystem>> {
419 let file_system_factory = platform_definition.session_file_system_factory();
420 if file_system_factory.is_disabled() {
421 Ok(Arc::new(InMemorySessionFileStore::new()))
422 } else {
423 Ok(file_system_factory
424 .create_session_file_system(file_system_factory_context)
425 .await?)
426 }
427}
428
429#[derive(Clone)]
430pub struct InProcessRuntime {
436 platform_definition: Arc<PlatformDefinition>,
437 harness_store: Arc<dyn RuntimeHarnessStore>,
438 agent_store: Arc<dyn RuntimeAgentStore>,
439 session_store: Arc<dyn RuntimeSessionStore>,
440 default_session_id: Option<SessionId>,
441 message_store: Arc<dyn RuntimeMessageStore>,
442 provider_store: Arc<dyn RuntimeProviderStore>,
443 event_bus: Arc<dyn EventBus>,
444 persisting_emitter: PersistingEventEmitter,
445 file_store: Arc<dyn SessionFileSystem>,
446 storage_store: Arc<dyn SessionStorageStore>,
447 memory_store: Arc<dyn MemoryStoreBackend>,
448 mcp_auth_provider: Arc<dyn everruns_mcp::McpAuthProvider>,
449}
450
451impl InProcessRuntime {
452 fn mcp_client(&self) -> Arc<everruns_mcp::McpClient> {
455 Arc::new(everruns_mcp::McpClient::new(
456 self.platform_definition.egress_service(),
457 self.mcp_auth_provider.clone(),
458 ))
459 }
460
461 async fn session_mcp_servers(
463 &self,
464 session: &Session,
465 agent: Option<&Agent>,
466 ) -> everruns_core::ScopedMcpServers {
467 let harness_chain = self
468 .harness_store
469 .get_harness_chain(session.harness_id)
470 .await
471 .unwrap_or_default();
472 crate::mcp::merge_session_scoped_servers(&harness_chain, agent, session)
473 }
474 pub fn builder() -> InProcessRuntimeBuilder {
476 InProcessRuntimeBuilder::new()
477 }
478
479 pub fn default_session_id(&self) -> Option<SessionId> {
482 self.default_session_id
483 }
484
485 pub async fn run_turn(
491 &self,
492 session_id: SessionId,
493 input: impl Into<InputMessage>,
494 ) -> Result<TurnResult> {
495 let session = self
496 .session_store
497 .get_session(session_id)
498 .await?
499 .ok_or_else(|| AgentLoopError::store(format!("session not found: {session_id}")))?;
500
501 let input_message = self
506 .message_store
507 .add_input_message(session_id, input.into())
508 .await?;
509 self.event_bus
510 .emit(EventRequest::new(
511 session_id,
512 EventContext::empty(),
513 InputMessageData::new(input_message.clone()),
514 ))
515 .await?;
516
517 let assembled = self
518 .inspect_context_with_ids(session_id, session.harness_id, session.agent_id)
519 .await?;
520 let synthetic_agent_id = session
521 .agent_id
522 .unwrap_or_else(|| AgentId::from_uuid(session.id.uuid()));
523 let org_id = in_process_internal_org_id(&session.organization_id);
524 let mut state_machine = TurnStateMachine::new(
525 TurnContext::new(session_id, input_message.id, synthetic_agent_id, org_id),
526 assembled.runtime_agent.max_iterations,
527 );
528
529 let mut previous_response_id: Option<String> = None;
530 let mut last_reason_result: Option<everruns_core::ReasonResult> = None;
531
532 loop {
533 match state_machine.next_action() {
534 TurnAction::ExecuteInput => {
535 let ctx = state_machine.context();
536 let base_context =
537 AtomContext::new(ctx.session_id, ctx.turn_id, ctx.input_message_id);
538 execute_input_activity(
539 self,
540 org_id,
541 InputAtomInput {
542 context: base_context,
543 },
544 )
545 .await?;
546 state_machine.on_input_completed();
547 }
548 TurnAction::ExecuteReason => {
549 let ctx = state_machine.context();
550 let base_context =
551 AtomContext::new(ctx.session_id, ctx.turn_id, ctx.input_message_id);
552 let reason_result = execute_reason_activity(
553 self,
554 org_id,
555 ReasonInput {
556 context: base_context.next_exec(),
557 harness_id: session.harness_id,
558 agent_id: session.agent_id,
559 org_id,
560 mcp_tool_definitions: vec![],
561 previous_response_id: previous_response_id.take(),
562 iteration: state_machine.current_iteration() as u32 + 1,
563 },
564 )
565 .await?;
566 previous_response_id = reason_result.response_id.clone();
567 state_machine.on_reason_completed(
568 reason_result.text.clone(),
569 reason_result.has_tool_calls,
570 reason_result.tool_calls.len(),
571 reason_result.success,
572 reason_result.error.clone(),
573 false,
574 );
575 if reason_result.has_tool_calls {
576 last_reason_result = Some(reason_result);
577 }
578 }
579 TurnAction::ExecuteAct => {
580 let reason_result = last_reason_result
581 .take()
582 .expect("ExecuteAct requires a prior ReasonResult");
583 let ctx = state_machine.context();
584 let base_context =
585 AtomContext::new(ctx.session_id, ctx.turn_id, ctx.input_message_id);
586 execute_act_activity(
587 self,
588 ActInput {
589 org_id: Some(org_id),
590 context: base_context.next_exec(),
591 harness_id: session.harness_id,
592 agent_id: session.agent_id,
593 tool_calls: reason_result.tool_calls,
594 tool_definitions: reason_result.tool_definitions,
595 locale: reason_result.locale,
596 blueprint_id: None,
597 network_access: reason_result.network_access,
598 parallel_tool_calls: None,
602 },
603 )
604 .await?;
605 state_machine.on_act_completed();
606 }
607 TurnAction::Complete(outcome) => {
608 let ctx = state_machine.context();
609 let lifecycle =
610 RuntimeSessionLifecycle::new(self.clone(), org_id, ctx.session_id);
611 let turn_succeeded = matches!(
612 &outcome,
613 TurnOutcome::Success { .. } | TurnOutcome::MaxIterationsReached { .. }
614 );
615 match &outcome {
616 TurnOutcome::Success { iterations, .. }
617 | TurnOutcome::MaxIterationsReached { iterations, .. } => {
618 lifecycle
619 .turn_completed(
620 ctx.turn_id,
621 ctx.input_message_id,
622 *iterations as u32,
623 None,
624 None,
625 )
626 .await;
627 }
628 TurnOutcome::Failed { error, .. } => {
629 lifecycle
630 .turn_failed(ctx.turn_id, ctx.input_message_id, error, None)
631 .await;
632 }
633 }
634 lifecycle
637 .fire_turn_end_hooks(
638 session.harness_id,
639 session.agent_id,
640 ctx.turn_id,
641 turn_succeeded,
642 )
643 .await;
644 return Ok(TurnResult::from_outcome(outcome, ctx.turn_id));
645 }
646 }
647 }
648 }
649
650 pub async fn run_text_turn(
651 &self,
652 session_id: SessionId,
653 text: impl Into<String>,
654 ) -> Result<TurnResult> {
655 self.run_turn(session_id, InputMessage::user(text)).await
656 }
657
658 pub async fn messages(&self, session_id: SessionId) -> Result<Vec<Message>> {
660 self.message_store.load(session_id).await
661 }
662
663 pub async fn read_file(
665 &self,
666 session_id: SessionId,
667 path: &str,
668 ) -> Result<Option<SessionFile>> {
669 self.file_store.read_file(session_id, path).await
670 }
671
672 pub async fn load_context(&self, session_id: SessionId) -> Result<AssembledTurnContext> {
674 let session = self
675 .session_store
676 .get_session(session_id)
677 .await?
678 .ok_or_else(|| AgentLoopError::store(format!("session not found: {session_id}")))?;
679 self.inspect_context_with_ids(session_id, session.harness_id, session.agent_id)
680 .await
681 }
682
683 pub async fn events(&self) -> Result<Vec<Event>> {
688 Ok(self.event_bus.collected_events().await)
689 }
690
691 pub async fn execute_command(
701 &self,
702 session_id: SessionId,
703 request: everruns_core::command::ExecuteCommandRequest,
704 ) -> Result<everruns_core::command::CommandResult> {
705 let ctx = self.load_context(session_id).await?;
706 let registry = self.platform_definition.capability_registry();
707 let exec_ctx = everruns_core::command::CommandExecutionContext { session_id };
708 for config in &ctx.resolved_capability_configs {
709 let Some(capability) = registry.get(config.capability_id()) else {
710 continue;
711 };
712 if capability.commands().iter().any(|c| c.name == request.name) {
713 return capability.execute_command(&request, &exec_ctx).await;
714 }
715 }
716 Err(AgentLoopError::config(format!(
717 "no capability declares command /{}",
718 request.name
719 )))
720 }
721
722 pub async fn list_commands(
732 &self,
733 session_id: SessionId,
734 ) -> Result<Vec<everruns_core::command::CommandDescriptor>> {
735 let ctx = self.load_context(session_id).await?;
736 let registry = self.platform_definition.capability_registry();
737 let mut seen = std::collections::HashSet::new();
738 let mut commands = Vec::new();
739 for config in &ctx.resolved_capability_configs {
740 let Some(capability) = registry.get(config.capability_id()) else {
741 continue;
742 };
743 for command in capability.commands() {
744 if seen.insert(command.name.clone()) {
745 commands.push(command);
746 }
747 }
748 }
749 Ok(commands)
750 }
751
752 async fn inspect_context_with_ids(
753 &self,
754 session_id: SessionId,
755 harness_id: everruns_core::HarnessId,
756 agent_id: Option<AgentId>,
757 ) -> Result<AssembledTurnContext> {
758 inspect_turn_context(
759 self.harness_store.as_ref(),
760 self.agent_store.as_ref(),
761 self.session_store.as_ref(),
762 self.message_store.as_ref(),
763 self.provider_store.as_ref(),
764 self.platform_definition.capability_registry(),
765 session_id,
766 harness_id,
767 agent_id,
768 &[],
769 Some(self.file_store.clone()),
770 )
771 .await
772 }
773}
774
775#[async_trait]
776impl RuntimeHostAdapter for InProcessRuntime {
777 async fn get_agent(&self, _org_id: i64, agent_id: AgentId) -> Result<Option<Agent>> {
778 self.agent_store.get_agent(agent_id).await
779 }
780
781 async fn get_harness(&self, _org_id: i64, harness_id: HarnessId) -> Result<Option<Harness>> {
782 let chain = self.harness_store.get_harness_chain(harness_id).await?;
783 Ok(chain.into_iter().last())
784 }
785
786 async fn set_session_status(
787 &self,
788 _org_id: i64,
789 session_id: SessionId,
790 _status: SessionStatus,
791 ) -> Result<Session> {
792 self.session_store
796 .get_session(session_id)
797 .await?
798 .ok_or_else(|| AgentLoopError::store(format!("session not found: {session_id}")))
799 }
800
801 async fn load_turn_context(
802 &self,
803 _org_id: i64,
804 session_id: SessionId,
805 ) -> Result<RuntimeHostTurnContext> {
806 let session = self
807 .session_store
808 .get_session(session_id)
809 .await?
810 .ok_or_else(|| AgentLoopError::store(format!("session not found: {session_id}")))?;
811 let agent = match session.agent_id {
812 Some(agent_id) => self.agent_store.get_agent(agent_id).await?,
813 None => None,
814 };
815 let messages = self.message_store.load(session_id).await?;
816 let model = self.provider_store.get_default_model().await?;
817
818 let scoped_servers = self.session_mcp_servers(&session, agent.as_ref()).await;
821 let mcp_tool_definitions = if scoped_servers.is_empty() {
822 vec![]
823 } else {
824 crate::mcp::discover_tool_definitions(
825 &self.mcp_client(),
826 session_id.uuid(),
827 &scoped_servers,
828 )
829 .await
830 };
831
832 Ok(RuntimeHostTurnContext {
833 agent,
834 session,
835 messages,
836 model,
837 mcp_tool_definitions,
838 })
839 }
840
841 async fn mcp_executor(
842 &self,
843 _org_id: i64,
844 session_id: SessionId,
845 ) -> Option<Arc<everruns_mcp::McpExecutor>> {
846 let session = self.session_store.get_session(session_id).await.ok()??;
847 let agent = match session.agent_id {
848 Some(agent_id) => self.agent_store.get_agent(agent_id).await.ok().flatten(),
849 None => None,
850 };
851 let scoped_servers = self.session_mcp_servers(&session, agent.as_ref()).await;
852 crate::mcp::build_executor(self.mcp_client(), &scoped_servers)
853 }
854
855 fn capability_registry(&self) -> CapabilityRegistry {
856 self.platform_definition.capability_registry().clone()
857 }
858
859 fn driver_registry(&self) -> DriverRegistry {
860 self.platform_definition.driver_registry().clone()
861 }
862
863 fn harness_store(&self, _org_id: i64) -> Arc<dyn HarnessStore> {
864 self.harness_store.clone()
865 }
866
867 fn agent_store(&self, _org_id: i64) -> Arc<dyn AgentStore> {
868 self.agent_store.clone()
869 }
870
871 fn session_store(&self, _org_id: i64) -> Arc<dyn SessionStore> {
872 self.session_store.clone()
873 }
874
875 fn session_mutator(&self, _org_id: i64) -> Arc<dyn SessionMutator> {
876 self.session_store.clone()
877 }
878
879 fn provider_store(&self, _org_id: i64) -> Arc<dyn LlmProviderStore> {
880 self.provider_store.clone()
881 }
882
883 fn message_store(&self) -> Arc<dyn MessageRetriever> {
884 self.message_store.clone()
885 }
886
887 fn event_emitter(&self) -> Arc<dyn EventEmitter> {
888 Arc::new(self.persisting_emitter.clone())
889 }
890
891 fn file_store(&self) -> Arc<dyn SessionFileSystem> {
892 self.file_store.clone()
893 }
894
895 fn storage_store(&self) -> Option<Arc<dyn SessionStorageStore>> {
896 Some(self.storage_store.clone())
897 }
898
899 fn utility_llm_service(&self) -> Option<Arc<dyn everruns_core::UtilityLlmService>> {
900 Some(self.platform_definition.utility_llm_service())
901 }
902
903 fn egress_service(&self) -> Option<Arc<dyn everruns_core::EgressService>> {
904 Some(self.platform_definition.egress_service())
905 }
906
907 fn memory_store(&self, _org_id: i64) -> Option<Arc<dyn MemoryStoreBackend>> {
908 Some(self.memory_store.clone())
909 }
910}
911
912#[derive(Clone)]
913struct PersistingEventEmitter {
914 inner: Arc<dyn EventBus>,
915 message_store: Arc<dyn RuntimeMessageStore>,
916}
917
918impl PersistingEventEmitter {
919 fn new(inner: Arc<dyn EventBus>, message_store: Arc<dyn RuntimeMessageStore>) -> Self {
920 Self {
921 inner,
922 message_store,
923 }
924 }
925}
926
927#[async_trait]
928impl EventEmitter for PersistingEventEmitter {
929 async fn emit(&self, request: EventRequest) -> Result<Event> {
930 let event = self.inner.emit(request.clone()).await?;
931 if let Some(message) = message_from_event(&event.data) {
932 self.message_store
933 .store_message(request.session_id, message)
934 .await?;
935 }
936 Ok(event)
937 }
938}
939
940fn effective_overlay(
941 harness_chain: &[Harness],
942 agent: Option<&Agent>,
943 session: &Session,
944) -> AgentConfigOverlay {
945 let harness_layers = harness_chain.iter().map(AgentConfigOverlay::from);
946 let agent_layers = agent.into_iter().map(AgentConfigOverlay::from);
947 AgentConfigOverlay::fold(
948 harness_layers
949 .chain(agent_layers)
950 .chain([AgentConfigOverlay::from(session)]),
951 )
952}
953
954async fn seed_runtime_initial_files(
955 harness_store: &dyn RuntimeHarnessStore,
956 agent_store: &dyn RuntimeAgentStore,
957 file_store: &dyn SessionFileSystem,
958 session: &Session,
959) -> Result<()> {
960 let harness_chain = harness_store.get_harness_chain(session.harness_id).await?;
961 if harness_chain.is_empty() {
962 return Err(AgentLoopError::store(format!(
963 "harness not found while seeding files: {}",
964 session.harness_id
965 )));
966 }
967 let agent = match session.agent_id {
968 Some(agent_id) => Some(
969 agent_store
970 .get_agent(agent_id)
971 .await?
972 .ok_or_else(|| AgentLoopError::store(format!("agent not found: {agent_id}")))?,
973 ),
974 None => None,
975 };
976 let overlay = effective_overlay(&harness_chain, agent.as_ref(), session);
977 for file in &overlay.initial_files {
978 file_store.seed_initial_file(session.id, file).await?;
979 }
980 Ok(())
981}
982
983fn message_from_event(data: &EventData) -> Option<Message> {
984 match data {
985 EventData::InputMessage(data) => Some(data.message.clone()),
986 EventData::OutputMessageCompleted(OutputMessageCompletedData { message, .. }) => {
987 Some(message.clone())
988 }
989 EventData::ToolCompleted(data) => Some(tool_completed_to_message(data.clone())),
990 _ => None,
991 }
992}
993
994fn tool_completed_to_message(data: ToolCompletedData) -> Message {
995 let mut images: Vec<ToolResultImage> = Vec::new();
996 let metadata = tool_result_metadata(&data);
997 let result = data.result.map(|parts| {
998 for part in &parts {
999 if let ContentPart::Image(img) = part
1000 && let (Some(base64), Some(media_type)) = (&img.base64, &img.media_type)
1001 {
1002 images.push(ToolResultImage {
1003 base64: base64.clone(),
1004 media_type: media_type.clone(),
1005 });
1006 }
1007 }
1008
1009 let text_parts: Vec<&ContentPart> = parts
1010 .iter()
1011 .filter(|part| matches!(part, ContentPart::Text(_)))
1012 .collect();
1013 if text_parts.len() == 1
1014 && let ContentPart::Text(text) = text_parts[0]
1015 {
1016 return parse_structured_tool_result_text(&text.text);
1017 }
1018 if !text_parts.is_empty() {
1019 serde_json::to_value(&text_parts).unwrap_or_default()
1020 } else {
1021 serde_json::Value::Null
1022 }
1023 });
1024
1025 let mut message = if images.is_empty() {
1026 Message::tool_result(&data.tool_call_id, result, data.error)
1027 } else {
1028 Message::tool_result_with_images(&data.tool_call_id, result, images)
1029 };
1030 message.metadata = metadata;
1031 message
1032}
1033
1034fn tool_result_metadata(
1035 data: &ToolCompletedData,
1036) -> Option<std::collections::HashMap<String, serde_json::Value>> {
1037 let mut metadata = std::collections::HashMap::new();
1038 metadata.insert("tool_name".to_string(), serde_json::json!(data.tool_name));
1039 if let Some(fingerprint) = &data.tool_call_fingerprint {
1040 metadata.insert(
1041 "tool_call_fingerprint".to_string(),
1042 serde_json::json!(fingerprint),
1043 );
1044 }
1045 if let Some(fingerprint) = &data.tool_result_fingerprint {
1046 metadata.insert(
1047 "tool_result_fingerprint".to_string(),
1048 serde_json::json!(fingerprint),
1049 );
1050 }
1051 (!metadata.is_empty()).then_some(metadata)
1052}
1053
1054fn parse_structured_tool_result_text(text: &str) -> serde_json::Value {
1055 let trimmed = text.trim_start();
1056 if !trimmed.starts_with('{') && !trimmed.starts_with('[') {
1057 return serde_json::Value::String(text.to_string());
1058 }
1059
1060 match serde_json::from_str(text) {
1061 Ok(value @ (serde_json::Value::Object(_) | serde_json::Value::Array(_))) => value,
1062 _ => serde_json::Value::String(text.to_string()),
1063 }
1064}
1065
1066#[cfg(test)]
1067mod tool_completed_replay_tests {
1068 use super::*;
1069
1070 #[test]
1071 fn tool_completed_replay_preserves_json_object_shape() {
1072 let data = ToolCompletedData::success(
1073 "call_read".to_string(),
1074 "read_file".to_string(),
1075 vec![ContentPart::text(
1076 serde_json::json!({
1077 "path": "/workspace/src/lib.rs",
1078 "content": "1|fn main() {}"
1079 })
1080 .to_string(),
1081 )],
1082 Some(1),
1083 );
1084
1085 let message = tool_completed_to_message(data);
1086 let result = message
1087 .tool_result_content()
1088 .and_then(|content| content.result.as_ref())
1089 .expect("tool result should be present");
1090
1091 assert_eq!(result["path"], "/workspace/src/lib.rs");
1092 assert_eq!(result["content"], "1|fn main() {}");
1093 }
1094
1095 #[test]
1096 fn tool_completed_replay_keeps_scalar_json_as_text() {
1097 let data = ToolCompletedData::success(
1098 "call_scalar".to_string(),
1099 "custom_tool".to_string(),
1100 vec![ContentPart::text("123")],
1101 Some(1),
1102 );
1103
1104 let message = tool_completed_to_message(data);
1105 let result = message
1106 .tool_result_content()
1107 .and_then(|content| content.result.as_ref())
1108 .expect("tool result should be present");
1109
1110 assert_eq!(result, &serde_json::Value::String("123".to_string()));
1111 }
1112
1113 #[test]
1114 fn tool_completed_replay_preserves_fingerprints_as_metadata() {
1115 let data = ToolCompletedData::success(
1116 "call_read".to_string(),
1117 "read_file".to_string(),
1118 vec![ContentPart::text("{}")],
1119 Some(1),
1120 )
1121 .with_fingerprints("sha256:call".to_string(), "sha256:result".to_string());
1122
1123 let message = tool_completed_to_message(data);
1124 let metadata = message.metadata.expect("metadata should be present");
1125
1126 assert_eq!(metadata["tool_name"], "read_file");
1127 assert_eq!(metadata["tool_call_fingerprint"], "sha256:call");
1128 assert_eq!(metadata["tool_result_fingerprint"], "sha256:result");
1129 }
1130}
1131
1132#[cfg(test)]
1133mod org_id_mapping_tests {
1134 use super::*;
1135 use everruns_core::{DEFAULT_ORG_ID, DEFAULT_ORG_PUBLIC_ID, org_public_id_from_internal};
1136
1137 #[test]
1138 fn default_public_id_maps_to_default_org() {
1139 assert_eq!(
1140 in_process_internal_org_id(DEFAULT_ORG_PUBLIC_ID),
1141 DEFAULT_ORG_ID
1142 );
1143 }
1144
1145 #[test]
1146 fn invalid_public_id_does_not_fall_back_to_default() {
1147 for invalid in [
1148 "",
1149 "not-an-org",
1150 "org_short",
1151 "org_ZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZ",
1152 "ORG_00000000000000000000000000000001",
1153 ] {
1154 let mapped = in_process_internal_org_id(invalid);
1155 assert_ne!(mapped, everruns_core::DEFAULT_ORG_ID);
1156 assert!(
1157 mapped >= 2,
1158 "invalid input {invalid:?} should not map to default"
1159 );
1160 }
1161 }
1162
1163 #[test]
1164 fn zero_public_id_does_not_fall_back_to_default() {
1165 let mapped = in_process_internal_org_id("org_00000000000000000000000000000000");
1168 assert_ne!(mapped, everruns_core::DEFAULT_ORG_ID);
1169 assert!(mapped >= 2, "all-zero id should not map to default");
1170 }
1171
1172 #[test]
1173 fn synthetic_public_id_round_trips_with_internal_helper() {
1174 for internal in [1_i64, 2, 42, 1_000_000, i64::MAX - 1, i64::MAX] {
1175 let public = org_public_id_from_internal(internal);
1176 assert_eq!(
1177 in_process_internal_org_id(&public),
1178 internal,
1179 "round-trip failed for internal={internal}"
1180 );
1181 }
1182 }
1183
1184 #[test]
1185 fn distinct_synthetic_ids_map_to_distinct_internal_ids() {
1186 let a = org_public_id_from_internal(7);
1187 let b = org_public_id_from_internal(8);
1188 assert_ne!(a, b);
1189 assert_ne!(
1190 in_process_internal_org_id(&a),
1191 in_process_internal_org_id(&b)
1192 );
1193 }
1194
1195 #[test]
1196 fn high_entropy_uuid_style_id_hashes_into_reserved_range() {
1197 let high = "org_80000000000000000000000000000000";
1201 let mapped = in_process_internal_org_id(high);
1202 assert!(mapped >= 2, "mapped id {mapped} must be >= 2");
1203 assert_ne!(mapped, DEFAULT_ORG_ID);
1204
1205 assert_eq!(mapped, in_process_internal_org_id(high));
1207 }
1208
1209 #[test]
1210 fn high_entropy_ids_are_isolated_from_each_other() {
1211 let a = in_process_internal_org_id("org_80000000000000000000000000000001");
1212 let b = in_process_internal_org_id("org_80000000000000000000000000000002");
1213 assert_ne!(a, b);
1214 assert_ne!(a, DEFAULT_ORG_ID);
1215 assert_ne!(b, DEFAULT_ORG_ID);
1216 }
1217
1218 #[test]
1219 fn hash_uses_stable_sha256_truncation() {
1220 let mapped = in_process_internal_org_id("org_80000000000000000000000000000000");
1224 let expected = {
1225 let digest = sha2::Sha256::digest(b"org_80000000000000000000000000000000");
1226 let mut buf = [0u8; 8];
1227 buf.copy_from_slice(&digest[..8]);
1228 let raw = u64::from_be_bytes(buf);
1229 ((raw % ((i64::MAX - 1) as u64)) as i64) + 2
1230 };
1231 assert_eq!(mapped, expected);
1232 }
1233
1234 #[test]
1235 fn oversize_input_is_bounded_and_does_not_collide_silently() {
1236 let oversize = "x".repeat(super::HASH_INPUT_CAP_BYTES * 4);
1242 let mapped = in_process_internal_org_id(&oversize);
1243 assert!(mapped >= 2);
1244 assert_ne!(mapped, DEFAULT_ORG_ID);
1245 }
1246}