1use crate::backends::{
6 EventBus, RuntimeAgentStore, RuntimeBackends, RuntimeHarnessStore, RuntimeMessageStore,
7 RuntimeProviderStore, RuntimeSessionStore,
8};
9use crate::builders::SingleSessionBuilder;
10use crate::host::{
11 RuntimeHostAdapter, RuntimeHostTurnContext, RuntimeSessionLifecycle, execute_act_activity,
12 execute_input_activity, execute_reason_activity,
13};
14use crate::in_memory::{InMemorySessionFileStore, InMemorySessionFileSystemFactory};
15use async_trait::async_trait;
16use everruns_core::agent::Agent;
17use everruns_core::atoms::{ActInput, AtomContext, InputAtomInput, ReasonInput};
18use everruns_core::capabilities::{Capability, CapabilityRegistry};
19use everruns_core::config_layer::AgentConfigOverlay;
20use everruns_core::error::{AgentLoopError, Result};
21use everruns_core::events::{
22 Event, EventContext, EventData, EventRequest, InputMessageData, OutputMessageCompletedData,
23 ToolCompletedData,
24};
25use everruns_core::harness::Harness;
26use everruns_core::llm_driver_registry::{DriverRegistry, ProviderType};
27use everruns_core::llm_models::LlmProviderType;
28use everruns_core::llmsim_driver::{LlmSimConfig, LlmSimDriver};
29use everruns_core::message::{ContentPart, Message};
30use everruns_core::platform_definition::PlatformDefinition;
31use everruns_core::runtime_context::{AssembledTurnContext, inspect_turn_context};
32use everruns_core::session::{Session, SessionStatus};
33use everruns_core::session_file::{InitialFile, SessionFile};
34use everruns_core::tools::ToolResultImage;
35use everruns_core::traits::{
36 AgentStore, EventEmitter, HarnessStore, LlmProviderStore, ModelWithProvider, SessionMutator,
37 SessionStorageStore, SessionStore,
38};
39use everruns_core::turn::{TurnAction, TurnContext, TurnOutcome, TurnStateMachine};
40use everruns_core::typed_id::{AgentId, HarnessId, OrgId, SessionId};
41use everruns_core::{
42 InputMessage, MemoryStoreBackend, MessageRetriever, SessionFileSystem,
43 SessionFileSystemFactoryContext,
44};
45use sha2::{Digest, Sha256};
46use std::sync::Arc;
47
48const HASH_INPUT_CAP_BYTES: usize = 128;
54
55fn in_process_internal_org_id(public_org_id: &str) -> i64 {
65 if public_org_id == everruns_core::DEFAULT_ORG_PUBLIC_ID {
66 return everruns_core::DEFAULT_ORG_ID;
67 }
68
69 let Ok(parsed) = public_org_id.parse::<OrgId>() else {
70 return hash_public_org_id(public_org_id);
71 };
72 let raw: u128 = parsed.uuid().as_u128();
73 if raw == 0 {
74 return hash_public_org_id(public_org_id);
75 }
76
77 if raw <= i64::MAX as u128 {
80 return raw as i64;
81 }
82
83 hash_public_org_id(public_org_id)
84}
85
86fn hash_public_org_id(public_org_id: &str) -> i64 {
91 let bytes = public_org_id.as_bytes();
92 let bounded = &bytes[..bytes.len().min(HASH_INPUT_CAP_BYTES)];
93 let digest = Sha256::digest(bounded);
94 let mut buf = [0u8; 8];
95 buf.copy_from_slice(&digest[..8]);
96 let raw = u64::from_be_bytes(buf);
97 ((raw % ((i64::MAX - 1) as u64)) as i64) + 2
98}
99
100#[derive(Debug, Clone)]
101pub struct TurnResult {
102 pub response: String,
104 pub iterations: usize,
106 pub tool_calls_count: usize,
108 pub success: bool,
110 pub error: Option<String>,
112 pub turn_id: everruns_core::typed_id::TurnId,
114}
115
116impl TurnResult {
117 fn from_outcome(outcome: TurnOutcome, turn_id: everruns_core::typed_id::TurnId) -> Self {
118 match outcome {
119 TurnOutcome::Success {
120 response,
121 iterations,
122 tool_calls_count,
123 } => Self {
124 response,
125 iterations,
126 tool_calls_count,
127 success: true,
128 error: None,
129 turn_id,
130 },
131 TurnOutcome::Failed { error, iterations } => Self {
132 response: String::new(),
133 iterations,
134 tool_calls_count: 0,
135 success: false,
136 error: Some(error),
137 turn_id,
138 },
139 TurnOutcome::MaxIterationsReached {
140 response,
141 iterations,
142 tool_calls_count,
143 } => Self {
144 response,
145 iterations,
146 tool_calls_count,
147 success: true,
148 error: None,
149 turn_id,
150 },
151 }
152 }
153}
154
155pub struct InProcessRuntimeBuilder {
165 platform_definition: PlatformDefinition,
166 llm_sim_config: Option<LlmSimConfig>,
167 default_model: Option<ModelWithProvider>,
168 backends: Option<RuntimeBackends>,
169 session_file_system_factory_context: SessionFileSystemFactoryContext,
170 harnesses: Vec<Harness>,
171 agents: Vec<Agent>,
172 sessions: Vec<Session>,
173 default_session_id: Option<SessionId>,
174 seeded_files: Vec<(SessionId, InitialFile)>,
175}
176
177impl Default for InProcessRuntimeBuilder {
178 fn default() -> Self {
179 Self::new()
180 }
181}
182
183impl InProcessRuntimeBuilder {
184 pub fn new() -> Self {
191 Self {
192 platform_definition: PlatformDefinition::builder()
193 .capability_registry(CapabilityRegistry::with_builtins())
194 .driver_registry(DriverRegistry::new())
195 .session_file_system_factory(Arc::new(InMemorySessionFileSystemFactory))
196 .build(),
197 llm_sim_config: None,
198 default_model: None,
199 backends: None,
200 session_file_system_factory_context: SessionFileSystemFactoryContext::new(),
201 harnesses: Vec::new(),
202 agents: Vec::new(),
203 sessions: Vec::new(),
204 default_session_id: None,
205 seeded_files: Vec::new(),
206 }
207 }
208
209 pub fn platform_definition(mut self, platform_definition: PlatformDefinition) -> Self {
211 self.platform_definition = platform_definition;
212 self
213 }
214
215 pub fn capability<C: Capability + 'static>(mut self, capability: C) -> Self {
217 self.platform_definition
218 .capability_registry_mut()
219 .register(capability);
220 self
221 }
222
223 pub fn driver_registry(mut self, driver_registry: DriverRegistry) -> Self {
225 *self.platform_definition.driver_registry_mut() = driver_registry;
226 self
227 }
228
229 pub fn llm_sim(mut self, config: LlmSimConfig) -> Self {
231 self.llm_sim_config = Some(config);
232 self
233 }
234
235 pub fn default_model(mut self, model: ModelWithProvider) -> Self {
237 self.default_model = Some(model);
238 self
239 }
240
241 pub fn backends(mut self, backends: RuntimeBackends) -> Self {
243 self.backends = Some(backends);
244 self
245 }
246
247 pub fn session_file_system_factory_context(
249 mut self,
250 context: SessionFileSystemFactoryContext,
251 ) -> Self {
252 self.session_file_system_factory_context = context;
253 self
254 }
255
256 pub fn harness(mut self, harness: Harness) -> Self {
258 self.harnesses.push(harness);
259 self
260 }
261
262 pub fn agent(mut self, agent: Agent) -> Self {
264 self.agents.push(agent);
265 self
266 }
267
268 pub fn session(mut self, session: Session) -> Self {
270 self.sessions.push(session);
271 self
272 }
273
274 pub fn single_session<F>(mut self, configure: F) -> Self
279 where
280 F: FnOnce(SingleSessionBuilder) -> SingleSessionBuilder,
281 {
282 let (harness, agent, session, session_id) =
283 configure(SingleSessionBuilder::default()).build();
284 self.harnesses.push(harness);
285 self.agents.push(agent);
286 self.sessions.push(session);
287 self.default_session_id = Some(session_id);
288 self
289 }
290
291 pub fn seed_text_file(
295 mut self,
296 session_id: SessionId,
297 path: impl Into<String>,
298 content: impl Into<String>,
299 ) -> Self {
300 self.seeded_files.push((
301 session_id,
302 InitialFile {
303 path: path.into(),
304 content: content.into(),
305 encoding: "text".to_string(),
306 is_readonly: false,
307 },
308 ));
309 self
310 }
311
312 pub async fn build(mut self) -> Result<InProcessRuntime> {
317 let backends = match self.backends.take() {
318 Some(backends) => backends,
319 None => RuntimeBackends::in_memory(),
320 };
321 let file_store = resolve_session_file_system(
322 &self.platform_definition,
323 self.session_file_system_factory_context.clone(),
324 )
325 .await?;
326
327 if let Some(config) = self.llm_sim_config.take() {
328 let driver = LlmSimDriver::new(config);
329 self.platform_definition
330 .driver_registry_mut()
331 .register(ProviderType::LlmSim, move |_api_key, _base_url| {
332 Box::new(driver.clone())
333 });
334
335 if self.default_model.is_none() {
336 self.default_model = Some(ModelWithProvider {
337 model: "llmsim-model".to_string(),
338 provider_type: LlmProviderType::LlmSim,
339 api_key: Some("fake-key".to_string()),
340 base_url: None,
341 });
342 }
343 }
344
345 let default_model = self.default_model.ok_or_else(|| {
346 AgentLoopError::config(
347 "in-process runtime requires a default model; call \
348 InProcessRuntimeBuilder::default_model(...) or \
349 InProcessRuntimeBuilder::llm_sim(...)",
350 )
351 })?;
352
353 backends
354 .provider_store
355 .set_default_model(default_model)
356 .await?;
357
358 for harness in &self.harnesses {
359 backends.harness_store.add_harness(harness.clone()).await?;
360 }
361 for agent in &self.agents {
362 backends.agent_store.add_agent(agent.clone()).await?;
363 }
364 for session in &self.sessions {
365 backends.session_store.add_session(session.clone()).await?;
366 }
367
368 for session in &self.sessions {
369 seed_runtime_initial_files(
370 backends.harness_store.as_ref(),
371 backends.agent_store.as_ref(),
372 file_store.as_ref(),
373 session,
374 )
375 .await?;
376 }
377
378 for (session_id, file) in &self.seeded_files {
379 file_store.seed_initial_file(*session_id, file).await?;
380 }
381
382 let persisting_emitter =
383 PersistingEventEmitter::new(backends.event_bus.clone(), backends.message_store.clone());
384
385 Ok(InProcessRuntime {
386 platform_definition: Arc::new(self.platform_definition),
387 harness_store: backends.harness_store,
388 agent_store: backends.agent_store,
389 session_store: backends.session_store,
390 default_session_id: self.default_session_id,
391 message_store: backends.message_store,
392 provider_store: backends.provider_store,
393 event_bus: backends.event_bus,
394 persisting_emitter,
395 file_store,
396 storage_store: backends.storage_store,
397 memory_store: backends.memory_store,
398 })
399 }
400}
401
402async fn resolve_session_file_system(
403 platform_definition: &PlatformDefinition,
404 file_system_factory_context: SessionFileSystemFactoryContext,
405) -> Result<Arc<dyn SessionFileSystem>> {
406 let file_system_factory = platform_definition.session_file_system_factory();
407 if file_system_factory.is_disabled() {
408 Ok(Arc::new(InMemorySessionFileStore::new()))
409 } else {
410 Ok(file_system_factory
411 .create_session_file_system(file_system_factory_context)
412 .await?)
413 }
414}
415
416#[derive(Clone)]
417pub struct InProcessRuntime {
423 platform_definition: Arc<PlatformDefinition>,
424 harness_store: Arc<dyn RuntimeHarnessStore>,
425 agent_store: Arc<dyn RuntimeAgentStore>,
426 session_store: Arc<dyn RuntimeSessionStore>,
427 default_session_id: Option<SessionId>,
428 message_store: Arc<dyn RuntimeMessageStore>,
429 provider_store: Arc<dyn RuntimeProviderStore>,
430 event_bus: Arc<dyn EventBus>,
431 persisting_emitter: PersistingEventEmitter,
432 file_store: Arc<dyn SessionFileSystem>,
433 storage_store: Arc<dyn SessionStorageStore>,
434 memory_store: Arc<dyn MemoryStoreBackend>,
435}
436
437impl InProcessRuntime {
438 pub fn builder() -> InProcessRuntimeBuilder {
440 InProcessRuntimeBuilder::new()
441 }
442
443 pub fn default_session_id(&self) -> Option<SessionId> {
446 self.default_session_id
447 }
448
449 pub async fn run_turn(
455 &self,
456 session_id: SessionId,
457 input: impl Into<InputMessage>,
458 ) -> Result<TurnResult> {
459 let session = self
460 .session_store
461 .get_session(session_id)
462 .await?
463 .ok_or_else(|| AgentLoopError::store(format!("session not found: {session_id}")))?;
464
465 let input_message = self
470 .message_store
471 .add_input_message(session_id, input.into())
472 .await?;
473 self.event_bus
474 .emit(EventRequest::new(
475 session_id,
476 EventContext::empty(),
477 InputMessageData::new(input_message.clone()),
478 ))
479 .await?;
480
481 let assembled = self
482 .inspect_context_with_ids(session_id, session.harness_id, session.agent_id)
483 .await?;
484 let synthetic_agent_id = session
485 .agent_id
486 .unwrap_or_else(|| AgentId::from_uuid(session.id.uuid()));
487 let org_id = in_process_internal_org_id(&session.organization_id);
488 let mut state_machine = TurnStateMachine::new(
489 TurnContext::new(session_id, input_message.id, synthetic_agent_id, org_id),
490 assembled.runtime_agent.max_iterations,
491 );
492
493 let mut previous_response_id: Option<String> = None;
494 let mut last_reason_result: Option<everruns_core::ReasonResult> = None;
495
496 loop {
497 match state_machine.next_action() {
498 TurnAction::ExecuteInput => {
499 let ctx = state_machine.context();
500 let base_context =
501 AtomContext::new(ctx.session_id, ctx.turn_id, ctx.input_message_id);
502 execute_input_activity(
503 self,
504 org_id,
505 InputAtomInput {
506 context: base_context,
507 },
508 )
509 .await?;
510 state_machine.on_input_completed();
511 }
512 TurnAction::ExecuteReason => {
513 let ctx = state_machine.context();
514 let base_context =
515 AtomContext::new(ctx.session_id, ctx.turn_id, ctx.input_message_id);
516 let reason_result = execute_reason_activity(
517 self,
518 org_id,
519 ReasonInput {
520 context: base_context.next_exec(),
521 harness_id: session.harness_id,
522 agent_id: session.agent_id,
523 org_id,
524 mcp_tool_definitions: vec![],
525 previous_response_id: previous_response_id.take(),
526 iteration: state_machine.current_iteration() as u32 + 1,
527 },
528 )
529 .await?;
530 previous_response_id = reason_result.response_id.clone();
531 state_machine.on_reason_completed(
532 reason_result.text.clone(),
533 reason_result.has_tool_calls,
534 reason_result.tool_calls.len(),
535 reason_result.success,
536 reason_result.error.clone(),
537 false,
538 );
539 if reason_result.has_tool_calls {
540 last_reason_result = Some(reason_result);
541 }
542 }
543 TurnAction::ExecuteAct => {
544 let reason_result = last_reason_result
545 .take()
546 .expect("ExecuteAct requires a prior ReasonResult");
547 let ctx = state_machine.context();
548 let base_context =
549 AtomContext::new(ctx.session_id, ctx.turn_id, ctx.input_message_id);
550 execute_act_activity(
551 self,
552 ActInput {
553 org_id: Some(org_id),
554 context: base_context.next_exec(),
555 harness_id: session.harness_id,
556 agent_id: session.agent_id,
557 tool_calls: reason_result.tool_calls,
558 tool_definitions: reason_result.tool_definitions,
559 locale: reason_result.locale,
560 blueprint_id: None,
561 network_access: reason_result.network_access,
562 },
563 )
564 .await?;
565 state_machine.on_act_completed();
566 }
567 TurnAction::Complete(outcome) => {
568 let ctx = state_machine.context();
569 let lifecycle =
570 RuntimeSessionLifecycle::new(self.clone(), org_id, ctx.session_id);
571 match &outcome {
572 TurnOutcome::Success { iterations, .. }
573 | TurnOutcome::MaxIterationsReached { iterations, .. } => {
574 lifecycle
575 .turn_completed(
576 ctx.turn_id,
577 ctx.input_message_id,
578 *iterations as u32,
579 None,
580 None,
581 )
582 .await;
583 }
584 TurnOutcome::Failed { error, .. } => {
585 lifecycle
586 .turn_failed(ctx.turn_id, ctx.input_message_id, error, None)
587 .await;
588 }
589 }
590 return Ok(TurnResult::from_outcome(outcome, ctx.turn_id));
591 }
592 }
593 }
594 }
595
596 pub async fn run_text_turn(
597 &self,
598 session_id: SessionId,
599 text: impl Into<String>,
600 ) -> Result<TurnResult> {
601 self.run_turn(session_id, InputMessage::user(text)).await
602 }
603
604 pub async fn messages(&self, session_id: SessionId) -> Result<Vec<Message>> {
606 self.message_store.load(session_id).await
607 }
608
609 pub async fn read_file(
611 &self,
612 session_id: SessionId,
613 path: &str,
614 ) -> Result<Option<SessionFile>> {
615 self.file_store.read_file(session_id, path).await
616 }
617
618 pub async fn load_context(&self, session_id: SessionId) -> Result<AssembledTurnContext> {
620 let session = self
621 .session_store
622 .get_session(session_id)
623 .await?
624 .ok_or_else(|| AgentLoopError::store(format!("session not found: {session_id}")))?;
625 self.inspect_context_with_ids(session_id, session.harness_id, session.agent_id)
626 .await
627 }
628
629 pub async fn events(&self) -> Result<Vec<Event>> {
634 Ok(self.event_bus.collected_events().await)
635 }
636
637 pub async fn execute_command(
647 &self,
648 session_id: SessionId,
649 request: everruns_core::command::ExecuteCommandRequest,
650 ) -> Result<everruns_core::command::CommandResult> {
651 let ctx = self.load_context(session_id).await?;
652 let registry = self.platform_definition.capability_registry();
653 let exec_ctx = everruns_core::command::CommandExecutionContext { session_id };
654 for config in &ctx.resolved_capability_configs {
655 let Some(capability) = registry.get(config.capability_id()) else {
656 continue;
657 };
658 if capability.commands().iter().any(|c| c.name == request.name) {
659 return capability.execute_command(&request, &exec_ctx).await;
660 }
661 }
662 Err(AgentLoopError::config(format!(
663 "no capability declares command /{}",
664 request.name
665 )))
666 }
667
668 pub async fn list_commands(
678 &self,
679 session_id: SessionId,
680 ) -> Result<Vec<everruns_core::command::CommandDescriptor>> {
681 let ctx = self.load_context(session_id).await?;
682 let registry = self.platform_definition.capability_registry();
683 let mut seen = std::collections::HashSet::new();
684 let mut commands = Vec::new();
685 for config in &ctx.resolved_capability_configs {
686 let Some(capability) = registry.get(config.capability_id()) else {
687 continue;
688 };
689 for command in capability.commands() {
690 if seen.insert(command.name.clone()) {
691 commands.push(command);
692 }
693 }
694 }
695 Ok(commands)
696 }
697
698 async fn inspect_context_with_ids(
699 &self,
700 session_id: SessionId,
701 harness_id: everruns_core::HarnessId,
702 agent_id: Option<AgentId>,
703 ) -> Result<AssembledTurnContext> {
704 inspect_turn_context(
705 self.harness_store.as_ref(),
706 self.agent_store.as_ref(),
707 self.session_store.as_ref(),
708 self.message_store.as_ref(),
709 self.provider_store.as_ref(),
710 self.platform_definition.capability_registry(),
711 session_id,
712 harness_id,
713 agent_id,
714 &[],
715 Some(self.file_store.clone()),
716 )
717 .await
718 }
719}
720
721#[async_trait]
722impl RuntimeHostAdapter for InProcessRuntime {
723 async fn get_agent(&self, _org_id: i64, agent_id: AgentId) -> Result<Option<Agent>> {
724 self.agent_store.get_agent(agent_id).await
725 }
726
727 async fn get_harness(&self, _org_id: i64, harness_id: HarnessId) -> Result<Option<Harness>> {
728 let chain = self.harness_store.get_harness_chain(harness_id).await?;
729 Ok(chain.into_iter().last())
730 }
731
732 async fn set_session_status(
733 &self,
734 _org_id: i64,
735 session_id: SessionId,
736 _status: SessionStatus,
737 ) -> Result<Session> {
738 self.session_store
742 .get_session(session_id)
743 .await?
744 .ok_or_else(|| AgentLoopError::store(format!("session not found: {session_id}")))
745 }
746
747 async fn load_turn_context(
748 &self,
749 _org_id: i64,
750 session_id: SessionId,
751 ) -> Result<RuntimeHostTurnContext> {
752 let session = self
753 .session_store
754 .get_session(session_id)
755 .await?
756 .ok_or_else(|| AgentLoopError::store(format!("session not found: {session_id}")))?;
757 let agent = match session.agent_id {
758 Some(agent_id) => self.agent_store.get_agent(agent_id).await?,
759 None => None,
760 };
761 let messages = self.message_store.load(session_id).await?;
762 let model = self.provider_store.get_default_model().await?;
763 Ok(RuntimeHostTurnContext {
764 agent,
765 session,
766 messages,
767 model,
768 mcp_tool_definitions: vec![],
769 })
770 }
771
772 fn capability_registry(&self) -> CapabilityRegistry {
773 self.platform_definition.capability_registry().clone()
774 }
775
776 fn driver_registry(&self) -> DriverRegistry {
777 self.platform_definition.driver_registry().clone()
778 }
779
780 fn harness_store(&self, _org_id: i64) -> Arc<dyn HarnessStore> {
781 self.harness_store.clone()
782 }
783
784 fn agent_store(&self, _org_id: i64) -> Arc<dyn AgentStore> {
785 self.agent_store.clone()
786 }
787
788 fn session_store(&self, _org_id: i64) -> Arc<dyn SessionStore> {
789 self.session_store.clone()
790 }
791
792 fn session_mutator(&self, _org_id: i64) -> Arc<dyn SessionMutator> {
793 self.session_store.clone()
794 }
795
796 fn provider_store(&self, _org_id: i64) -> Arc<dyn LlmProviderStore> {
797 self.provider_store.clone()
798 }
799
800 fn message_store(&self) -> Arc<dyn MessageRetriever> {
801 self.message_store.clone()
802 }
803
804 fn event_emitter(&self) -> Arc<dyn EventEmitter> {
805 Arc::new(self.persisting_emitter.clone())
806 }
807
808 fn file_store(&self) -> Arc<dyn SessionFileSystem> {
809 self.file_store.clone()
810 }
811
812 fn storage_store(&self) -> Option<Arc<dyn SessionStorageStore>> {
813 Some(self.storage_store.clone())
814 }
815
816 fn utility_llm_service(&self) -> Option<Arc<dyn everruns_core::UtilityLlmService>> {
817 Some(self.platform_definition.utility_llm_service())
818 }
819
820 fn egress_service(&self) -> Option<Arc<dyn everruns_core::EgressService>> {
821 Some(self.platform_definition.egress_service())
822 }
823
824 fn memory_store(&self, _org_id: i64) -> Option<Arc<dyn MemoryStoreBackend>> {
825 Some(self.memory_store.clone())
826 }
827}
828
829#[derive(Clone)]
830struct PersistingEventEmitter {
831 inner: Arc<dyn EventBus>,
832 message_store: Arc<dyn RuntimeMessageStore>,
833}
834
835impl PersistingEventEmitter {
836 fn new(inner: Arc<dyn EventBus>, message_store: Arc<dyn RuntimeMessageStore>) -> Self {
837 Self {
838 inner,
839 message_store,
840 }
841 }
842}
843
844#[async_trait]
845impl EventEmitter for PersistingEventEmitter {
846 async fn emit(&self, request: EventRequest) -> Result<Event> {
847 let event = self.inner.emit(request.clone()).await?;
848 if let Some(message) = message_from_event(&event.data) {
849 self.message_store
850 .store_message(request.session_id, message)
851 .await?;
852 }
853 Ok(event)
854 }
855}
856
857fn effective_overlay(
858 harness_chain: &[Harness],
859 agent: Option<&Agent>,
860 session: &Session,
861) -> AgentConfigOverlay {
862 let harness_layers = harness_chain.iter().map(AgentConfigOverlay::from);
863 let agent_layers = agent.into_iter().map(AgentConfigOverlay::from);
864 AgentConfigOverlay::fold(
865 harness_layers
866 .chain(agent_layers)
867 .chain([AgentConfigOverlay::from(session)]),
868 )
869}
870
871async fn seed_runtime_initial_files(
872 harness_store: &dyn RuntimeHarnessStore,
873 agent_store: &dyn RuntimeAgentStore,
874 file_store: &dyn SessionFileSystem,
875 session: &Session,
876) -> Result<()> {
877 let harness_chain = harness_store.get_harness_chain(session.harness_id).await?;
878 if harness_chain.is_empty() {
879 return Err(AgentLoopError::store(format!(
880 "harness not found while seeding files: {}",
881 session.harness_id
882 )));
883 }
884 let agent = match session.agent_id {
885 Some(agent_id) => Some(
886 agent_store
887 .get_agent(agent_id)
888 .await?
889 .ok_or_else(|| AgentLoopError::store(format!("agent not found: {agent_id}")))?,
890 ),
891 None => None,
892 };
893 let overlay = effective_overlay(&harness_chain, agent.as_ref(), session);
894 for file in &overlay.initial_files {
895 file_store.seed_initial_file(session.id, file).await?;
896 }
897 Ok(())
898}
899
900fn message_from_event(data: &EventData) -> Option<Message> {
901 match data {
902 EventData::InputMessage(data) => Some(data.message.clone()),
903 EventData::OutputMessageCompleted(OutputMessageCompletedData { message, .. }) => {
904 Some(message.clone())
905 }
906 EventData::ToolCompleted(data) => Some(tool_completed_to_message(data.clone())),
907 _ => None,
908 }
909}
910
911fn tool_completed_to_message(data: ToolCompletedData) -> Message {
912 let mut images: Vec<ToolResultImage> = Vec::new();
913 let metadata = tool_result_metadata(&data);
914 let result = data.result.map(|parts| {
915 for part in &parts {
916 if let ContentPart::Image(img) = part
917 && let (Some(base64), Some(media_type)) = (&img.base64, &img.media_type)
918 {
919 images.push(ToolResultImage {
920 base64: base64.clone(),
921 media_type: media_type.clone(),
922 });
923 }
924 }
925
926 let text_parts: Vec<&ContentPart> = parts
927 .iter()
928 .filter(|part| matches!(part, ContentPart::Text(_)))
929 .collect();
930 if text_parts.len() == 1
931 && let ContentPart::Text(text) = text_parts[0]
932 {
933 return parse_structured_tool_result_text(&text.text);
934 }
935 if !text_parts.is_empty() {
936 serde_json::to_value(&text_parts).unwrap_or_default()
937 } else {
938 serde_json::Value::Null
939 }
940 });
941
942 let mut message = if images.is_empty() {
943 Message::tool_result(&data.tool_call_id, result, data.error)
944 } else {
945 Message::tool_result_with_images(&data.tool_call_id, result, images)
946 };
947 message.metadata = metadata;
948 message
949}
950
951fn tool_result_metadata(
952 data: &ToolCompletedData,
953) -> Option<std::collections::HashMap<String, serde_json::Value>> {
954 let mut metadata = std::collections::HashMap::new();
955 metadata.insert("tool_name".to_string(), serde_json::json!(data.tool_name));
956 if let Some(fingerprint) = &data.tool_call_fingerprint {
957 metadata.insert(
958 "tool_call_fingerprint".to_string(),
959 serde_json::json!(fingerprint),
960 );
961 }
962 if let Some(fingerprint) = &data.tool_result_fingerprint {
963 metadata.insert(
964 "tool_result_fingerprint".to_string(),
965 serde_json::json!(fingerprint),
966 );
967 }
968 (!metadata.is_empty()).then_some(metadata)
969}
970
971fn parse_structured_tool_result_text(text: &str) -> serde_json::Value {
972 let trimmed = text.trim_start();
973 if !trimmed.starts_with('{') && !trimmed.starts_with('[') {
974 return serde_json::Value::String(text.to_string());
975 }
976
977 match serde_json::from_str(text) {
978 Ok(value @ (serde_json::Value::Object(_) | serde_json::Value::Array(_))) => value,
979 _ => serde_json::Value::String(text.to_string()),
980 }
981}
982
983#[cfg(test)]
984mod tool_completed_replay_tests {
985 use super::*;
986
987 #[test]
988 fn tool_completed_replay_preserves_json_object_shape() {
989 let data = ToolCompletedData::success(
990 "call_read".to_string(),
991 "read_file".to_string(),
992 vec![ContentPart::text(
993 serde_json::json!({
994 "path": "/workspace/src/lib.rs",
995 "content": "1|fn main() {}"
996 })
997 .to_string(),
998 )],
999 Some(1),
1000 );
1001
1002 let message = tool_completed_to_message(data);
1003 let result = message
1004 .tool_result_content()
1005 .and_then(|content| content.result.as_ref())
1006 .expect("tool result should be present");
1007
1008 assert_eq!(result["path"], "/workspace/src/lib.rs");
1009 assert_eq!(result["content"], "1|fn main() {}");
1010 }
1011
1012 #[test]
1013 fn tool_completed_replay_keeps_scalar_json_as_text() {
1014 let data = ToolCompletedData::success(
1015 "call_scalar".to_string(),
1016 "custom_tool".to_string(),
1017 vec![ContentPart::text("123")],
1018 Some(1),
1019 );
1020
1021 let message = tool_completed_to_message(data);
1022 let result = message
1023 .tool_result_content()
1024 .and_then(|content| content.result.as_ref())
1025 .expect("tool result should be present");
1026
1027 assert_eq!(result, &serde_json::Value::String("123".to_string()));
1028 }
1029
1030 #[test]
1031 fn tool_completed_replay_preserves_fingerprints_as_metadata() {
1032 let data = ToolCompletedData::success(
1033 "call_read".to_string(),
1034 "read_file".to_string(),
1035 vec![ContentPart::text("{}")],
1036 Some(1),
1037 )
1038 .with_fingerprints("sha256:call".to_string(), "sha256:result".to_string());
1039
1040 let message = tool_completed_to_message(data);
1041 let metadata = message.metadata.expect("metadata should be present");
1042
1043 assert_eq!(metadata["tool_name"], "read_file");
1044 assert_eq!(metadata["tool_call_fingerprint"], "sha256:call");
1045 assert_eq!(metadata["tool_result_fingerprint"], "sha256:result");
1046 }
1047}
1048
1049#[cfg(test)]
1050mod org_id_mapping_tests {
1051 use super::*;
1052 use everruns_core::{DEFAULT_ORG_ID, DEFAULT_ORG_PUBLIC_ID, org_public_id_from_internal};
1053
1054 #[test]
1055 fn default_public_id_maps_to_default_org() {
1056 assert_eq!(
1057 in_process_internal_org_id(DEFAULT_ORG_PUBLIC_ID),
1058 DEFAULT_ORG_ID
1059 );
1060 }
1061
1062 #[test]
1063 fn invalid_public_id_does_not_fall_back_to_default() {
1064 for invalid in [
1065 "",
1066 "not-an-org",
1067 "org_short",
1068 "org_ZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZ",
1069 "ORG_00000000000000000000000000000001",
1070 ] {
1071 let mapped = in_process_internal_org_id(invalid);
1072 assert_ne!(mapped, everruns_core::DEFAULT_ORG_ID);
1073 assert!(
1074 mapped >= 2,
1075 "invalid input {invalid:?} should not map to default"
1076 );
1077 }
1078 }
1079
1080 #[test]
1081 fn zero_public_id_does_not_fall_back_to_default() {
1082 let mapped = in_process_internal_org_id("org_00000000000000000000000000000000");
1085 assert_ne!(mapped, everruns_core::DEFAULT_ORG_ID);
1086 assert!(mapped >= 2, "all-zero id should not map to default");
1087 }
1088
1089 #[test]
1090 fn synthetic_public_id_round_trips_with_internal_helper() {
1091 for internal in [1_i64, 2, 42, 1_000_000, i64::MAX - 1, i64::MAX] {
1092 let public = org_public_id_from_internal(internal);
1093 assert_eq!(
1094 in_process_internal_org_id(&public),
1095 internal,
1096 "round-trip failed for internal={internal}"
1097 );
1098 }
1099 }
1100
1101 #[test]
1102 fn distinct_synthetic_ids_map_to_distinct_internal_ids() {
1103 let a = org_public_id_from_internal(7);
1104 let b = org_public_id_from_internal(8);
1105 assert_ne!(a, b);
1106 assert_ne!(
1107 in_process_internal_org_id(&a),
1108 in_process_internal_org_id(&b)
1109 );
1110 }
1111
1112 #[test]
1113 fn high_entropy_uuid_style_id_hashes_into_reserved_range() {
1114 let high = "org_80000000000000000000000000000000";
1118 let mapped = in_process_internal_org_id(high);
1119 assert!(mapped >= 2, "mapped id {mapped} must be >= 2");
1120 assert_ne!(mapped, DEFAULT_ORG_ID);
1121
1122 assert_eq!(mapped, in_process_internal_org_id(high));
1124 }
1125
1126 #[test]
1127 fn high_entropy_ids_are_isolated_from_each_other() {
1128 let a = in_process_internal_org_id("org_80000000000000000000000000000001");
1129 let b = in_process_internal_org_id("org_80000000000000000000000000000002");
1130 assert_ne!(a, b);
1131 assert_ne!(a, DEFAULT_ORG_ID);
1132 assert_ne!(b, DEFAULT_ORG_ID);
1133 }
1134
1135 #[test]
1136 fn hash_uses_stable_sha256_truncation() {
1137 let mapped = in_process_internal_org_id("org_80000000000000000000000000000000");
1141 let expected = {
1142 let digest = sha2::Sha256::digest(b"org_80000000000000000000000000000000");
1143 let mut buf = [0u8; 8];
1144 buf.copy_from_slice(&digest[..8]);
1145 let raw = u64::from_be_bytes(buf);
1146 ((raw % ((i64::MAX - 1) as u64)) as i64) + 2
1147 };
1148 assert_eq!(mapped, expected);
1149 }
1150
1151 #[test]
1152 fn oversize_input_is_bounded_and_does_not_collide_silently() {
1153 let oversize = "x".repeat(super::HASH_INPUT_CAP_BYTES * 4);
1159 let mapped = in_process_internal_org_id(&oversize);
1160 assert!(mapped >= 2);
1161 assert_ne!(mapped, DEFAULT_ORG_ID);
1162 }
1163}