1use crate::backends::{
6 EventBus, RuntimeAgentStore, RuntimeBackends, RuntimeHarnessStore, RuntimeMessageStore,
7 RuntimeProviderStore, RuntimeSessionStore,
8};
9use crate::builders::SingleSessionBuilder;
10use crate::host::{
11 RuntimeHostAdapter, RuntimeHostTurnContext, RuntimeSessionLifecycle, execute_act_activity,
12 execute_input_activity, execute_reason_activity,
13};
14use crate::in_memory::{InMemorySessionFileStore, InMemorySessionFileSystemFactory};
15use async_trait::async_trait;
16use everruns_core::agent::Agent;
17use everruns_core::atoms::{ActInput, AtomContext, InputAtomInput, ReasonInput};
18use everruns_core::capabilities::{Capability, CapabilityRegistry};
19use everruns_core::config_layer::AgentConfigOverlay;
20use everruns_core::error::{AgentLoopError, Result};
21use everruns_core::events::{
22 Event, EventContext, EventData, EventRequest, InputMessageData, OutputMessageCompletedData,
23 ToolCompletedData,
24};
25use everruns_core::harness::Harness;
26use everruns_core::llm_driver_registry::{DriverRegistry, ProviderType};
27use everruns_core::llm_models::LlmProviderType;
28use everruns_core::llmsim_driver::{LlmSimConfig, LlmSimDriver};
29use everruns_core::message::{ContentPart, Message};
30use everruns_core::platform_definition::PlatformDefinition;
31use everruns_core::runtime_context::{AssembledTurnContext, inspect_turn_context};
32use everruns_core::session::{Session, SessionStatus};
33use everruns_core::session_file::{InitialFile, SessionFile};
34use everruns_core::tools::ToolResultImage;
35use everruns_core::traits::{
36 AgentStore, EventEmitter, HarnessStore, LlmProviderStore, ModelWithProvider, SessionMutator,
37 SessionStorageStore, SessionStore,
38};
39use everruns_core::turn::{TurnAction, TurnContext, TurnOutcome, TurnStateMachine};
40use everruns_core::typed_id::{AgentId, HarnessId, SessionId};
41use everruns_core::{
42 InputMessage, MemoryStoreBackend, MessageRetriever, SessionFileSystem,
43 SessionFileSystemFactoryContext,
44};
45use std::sync::Arc;
46
47const IN_PROCESS_ORG_ID: i64 = everruns_core::DEFAULT_ORG_ID;
53
54#[derive(Debug, Clone)]
55pub struct TurnResult {
56 pub response: String,
58 pub iterations: usize,
60 pub tool_calls_count: usize,
62 pub success: bool,
64 pub error: Option<String>,
66 pub turn_id: everruns_core::typed_id::TurnId,
68}
69
70impl TurnResult {
71 fn from_outcome(outcome: TurnOutcome, turn_id: everruns_core::typed_id::TurnId) -> Self {
72 match outcome {
73 TurnOutcome::Success {
74 response,
75 iterations,
76 tool_calls_count,
77 } => Self {
78 response,
79 iterations,
80 tool_calls_count,
81 success: true,
82 error: None,
83 turn_id,
84 },
85 TurnOutcome::Failed { error, iterations } => Self {
86 response: String::new(),
87 iterations,
88 tool_calls_count: 0,
89 success: false,
90 error: Some(error),
91 turn_id,
92 },
93 TurnOutcome::MaxIterationsReached {
94 response,
95 iterations,
96 tool_calls_count,
97 } => Self {
98 response,
99 iterations,
100 tool_calls_count,
101 success: true,
102 error: None,
103 turn_id,
104 },
105 }
106 }
107}
108
109pub struct InProcessRuntimeBuilder {
119 platform_definition: PlatformDefinition,
120 llm_sim_config: Option<LlmSimConfig>,
121 default_model: Option<ModelWithProvider>,
122 backends: Option<RuntimeBackends>,
123 session_file_system_factory_context: SessionFileSystemFactoryContext,
124 harnesses: Vec<Harness>,
125 agents: Vec<Agent>,
126 sessions: Vec<Session>,
127 default_session_id: Option<SessionId>,
128 seeded_files: Vec<(SessionId, InitialFile)>,
129}
130
131impl Default for InProcessRuntimeBuilder {
132 fn default() -> Self {
133 Self::new()
134 }
135}
136
137impl InProcessRuntimeBuilder {
138 pub fn new() -> Self {
145 Self {
146 platform_definition: PlatformDefinition::builder()
147 .capability_registry(CapabilityRegistry::with_builtins())
148 .driver_registry(DriverRegistry::new())
149 .session_file_system_factory(Arc::new(InMemorySessionFileSystemFactory))
150 .build(),
151 llm_sim_config: None,
152 default_model: None,
153 backends: None,
154 session_file_system_factory_context: SessionFileSystemFactoryContext::new(),
155 harnesses: Vec::new(),
156 agents: Vec::new(),
157 sessions: Vec::new(),
158 default_session_id: None,
159 seeded_files: Vec::new(),
160 }
161 }
162
163 pub fn platform_definition(mut self, platform_definition: PlatformDefinition) -> Self {
165 self.platform_definition = platform_definition;
166 self
167 }
168
169 pub fn capability<C: Capability + 'static>(mut self, capability: C) -> Self {
171 self.platform_definition
172 .capability_registry_mut()
173 .register(capability);
174 self
175 }
176
177 pub fn driver_registry(mut self, driver_registry: DriverRegistry) -> Self {
179 *self.platform_definition.driver_registry_mut() = driver_registry;
180 self
181 }
182
183 pub fn llm_sim(mut self, config: LlmSimConfig) -> Self {
185 self.llm_sim_config = Some(config);
186 self
187 }
188
189 pub fn default_model(mut self, model: ModelWithProvider) -> Self {
191 self.default_model = Some(model);
192 self
193 }
194
195 pub fn backends(mut self, backends: RuntimeBackends) -> Self {
197 self.backends = Some(backends);
198 self
199 }
200
201 pub fn session_file_system_factory_context(
203 mut self,
204 context: SessionFileSystemFactoryContext,
205 ) -> Self {
206 self.session_file_system_factory_context = context;
207 self
208 }
209
210 pub fn harness(mut self, harness: Harness) -> Self {
212 self.harnesses.push(harness);
213 self
214 }
215
216 pub fn agent(mut self, agent: Agent) -> Self {
218 self.agents.push(agent);
219 self
220 }
221
222 pub fn session(mut self, session: Session) -> Self {
224 self.sessions.push(session);
225 self
226 }
227
228 pub fn single_session<F>(mut self, configure: F) -> Self
233 where
234 F: FnOnce(SingleSessionBuilder) -> SingleSessionBuilder,
235 {
236 let (harness, agent, session, session_id) =
237 configure(SingleSessionBuilder::default()).build();
238 self.harnesses.push(harness);
239 self.agents.push(agent);
240 self.sessions.push(session);
241 self.default_session_id = Some(session_id);
242 self
243 }
244
245 pub fn seed_text_file(
249 mut self,
250 session_id: SessionId,
251 path: impl Into<String>,
252 content: impl Into<String>,
253 ) -> Self {
254 self.seeded_files.push((
255 session_id,
256 InitialFile {
257 path: path.into(),
258 content: content.into(),
259 encoding: "text".to_string(),
260 is_readonly: false,
261 },
262 ));
263 self
264 }
265
266 pub async fn build(mut self) -> Result<InProcessRuntime> {
271 let backends = match self.backends.take() {
272 Some(backends) => backends,
273 None => RuntimeBackends::in_memory(),
274 };
275 let file_store = resolve_session_file_system(
276 &self.platform_definition,
277 self.session_file_system_factory_context.clone(),
278 )
279 .await?;
280
281 if let Some(config) = self.llm_sim_config.take() {
282 let driver = LlmSimDriver::new(config);
283 self.platform_definition
284 .driver_registry_mut()
285 .register(ProviderType::LlmSim, move |_api_key, _base_url| {
286 Box::new(driver.clone())
287 });
288
289 if self.default_model.is_none() {
290 self.default_model = Some(ModelWithProvider {
291 model: "llmsim-model".to_string(),
292 provider_type: LlmProviderType::LlmSim,
293 api_key: Some("fake-key".to_string()),
294 base_url: None,
295 });
296 }
297 }
298
299 let default_model = self.default_model.ok_or_else(|| {
300 AgentLoopError::config(
301 "in-process runtime requires a default model; call \
302 InProcessRuntimeBuilder::default_model(...) or \
303 InProcessRuntimeBuilder::llm_sim(...)",
304 )
305 })?;
306
307 backends
308 .provider_store
309 .set_default_model(default_model)
310 .await?;
311
312 for harness in &self.harnesses {
313 backends.harness_store.add_harness(harness.clone()).await?;
314 }
315 for agent in &self.agents {
316 backends.agent_store.add_agent(agent.clone()).await?;
317 }
318 for session in &self.sessions {
319 backends.session_store.add_session(session.clone()).await?;
320 }
321
322 for session in &self.sessions {
323 seed_runtime_initial_files(
324 backends.harness_store.as_ref(),
325 backends.agent_store.as_ref(),
326 file_store.as_ref(),
327 session,
328 )
329 .await?;
330 }
331
332 for (session_id, file) in &self.seeded_files {
333 file_store.seed_initial_file(*session_id, file).await?;
334 }
335
336 let persisting_emitter =
337 PersistingEventEmitter::new(backends.event_bus.clone(), backends.message_store.clone());
338
339 Ok(InProcessRuntime {
340 platform_definition: Arc::new(self.platform_definition),
341 harness_store: backends.harness_store,
342 agent_store: backends.agent_store,
343 session_store: backends.session_store,
344 default_session_id: self.default_session_id,
345 message_store: backends.message_store,
346 provider_store: backends.provider_store,
347 event_bus: backends.event_bus,
348 persisting_emitter,
349 file_store,
350 storage_store: backends.storage_store,
351 memory_store: backends.memory_store,
352 })
353 }
354}
355
356async fn resolve_session_file_system(
357 platform_definition: &PlatformDefinition,
358 file_system_factory_context: SessionFileSystemFactoryContext,
359) -> Result<Arc<dyn SessionFileSystem>> {
360 let file_system_factory = platform_definition.session_file_system_factory();
361 if file_system_factory.is_disabled() {
362 Ok(Arc::new(InMemorySessionFileStore::new()))
363 } else {
364 Ok(file_system_factory
365 .create_session_file_system(file_system_factory_context)
366 .await?)
367 }
368}
369
370#[derive(Clone)]
371pub struct InProcessRuntime {
377 platform_definition: Arc<PlatformDefinition>,
378 harness_store: Arc<dyn RuntimeHarnessStore>,
379 agent_store: Arc<dyn RuntimeAgentStore>,
380 session_store: Arc<dyn RuntimeSessionStore>,
381 default_session_id: Option<SessionId>,
382 message_store: Arc<dyn RuntimeMessageStore>,
383 provider_store: Arc<dyn RuntimeProviderStore>,
384 event_bus: Arc<dyn EventBus>,
385 persisting_emitter: PersistingEventEmitter,
386 file_store: Arc<dyn SessionFileSystem>,
387 storage_store: Arc<dyn SessionStorageStore>,
388 memory_store: Arc<dyn MemoryStoreBackend>,
389}
390
391impl InProcessRuntime {
392 pub fn builder() -> InProcessRuntimeBuilder {
394 InProcessRuntimeBuilder::new()
395 }
396
397 pub fn default_session_id(&self) -> Option<SessionId> {
400 self.default_session_id
401 }
402
403 pub async fn run_turn(
409 &self,
410 session_id: SessionId,
411 input: impl Into<InputMessage>,
412 ) -> Result<TurnResult> {
413 let session = self
414 .session_store
415 .get_session(session_id)
416 .await?
417 .ok_or_else(|| AgentLoopError::store(format!("session not found: {session_id}")))?;
418
419 let input_message = self
424 .message_store
425 .add_input_message(session_id, input.into())
426 .await?;
427 self.event_bus
428 .emit(EventRequest::new(
429 session_id,
430 EventContext::empty(),
431 InputMessageData::new(input_message.clone()),
432 ))
433 .await?;
434
435 let assembled = self
436 .inspect_context_with_ids(session_id, session.harness_id, session.agent_id)
437 .await?;
438 let synthetic_agent_id = session
439 .agent_id
440 .unwrap_or_else(|| AgentId::from_uuid(session.id.uuid()));
441 let org_id: i64 = IN_PROCESS_ORG_ID;
442 let mut state_machine = TurnStateMachine::new(
443 TurnContext::new(session_id, input_message.id, synthetic_agent_id, org_id),
444 assembled.runtime_agent.max_iterations,
445 );
446
447 let mut previous_response_id: Option<String> = None;
448 let mut last_reason_result: Option<everruns_core::ReasonResult> = None;
449
450 loop {
451 match state_machine.next_action() {
452 TurnAction::ExecuteInput => {
453 let ctx = state_machine.context();
454 let base_context =
455 AtomContext::new(ctx.session_id, ctx.turn_id, ctx.input_message_id);
456 execute_input_activity(
457 self,
458 org_id,
459 InputAtomInput {
460 context: base_context,
461 },
462 )
463 .await?;
464 state_machine.on_input_completed();
465 }
466 TurnAction::ExecuteReason => {
467 let ctx = state_machine.context();
468 let base_context =
469 AtomContext::new(ctx.session_id, ctx.turn_id, ctx.input_message_id);
470 let reason_result = execute_reason_activity(
471 self,
472 org_id,
473 ReasonInput {
474 context: base_context.next_exec(),
475 harness_id: session.harness_id,
476 agent_id: session.agent_id,
477 org_id,
478 mcp_tool_definitions: vec![],
479 previous_response_id: previous_response_id.take(),
480 iteration: state_machine.current_iteration() as u32 + 1,
481 },
482 )
483 .await?;
484 previous_response_id = reason_result.response_id.clone();
485 state_machine.on_reason_completed(
486 reason_result.text.clone(),
487 reason_result.has_tool_calls,
488 reason_result.tool_calls.len(),
489 reason_result.success,
490 reason_result.error.clone(),
491 false,
492 );
493 if reason_result.has_tool_calls {
494 last_reason_result = Some(reason_result);
495 }
496 }
497 TurnAction::ExecuteAct => {
498 let reason_result = last_reason_result
499 .take()
500 .expect("ExecuteAct requires a prior ReasonResult");
501 let ctx = state_machine.context();
502 let base_context =
503 AtomContext::new(ctx.session_id, ctx.turn_id, ctx.input_message_id);
504 execute_act_activity(
505 self,
506 ActInput {
507 org_id: Some(org_id),
508 context: base_context.next_exec(),
509 harness_id: session.harness_id,
510 agent_id: session.agent_id,
511 tool_calls: reason_result.tool_calls,
512 tool_definitions: reason_result.tool_definitions,
513 locale: reason_result.locale,
514 blueprint_id: None,
515 network_access: reason_result.network_access,
516 },
517 )
518 .await?;
519 state_machine.on_act_completed();
520 }
521 TurnAction::Complete(outcome) => {
522 let ctx = state_machine.context();
523 let lifecycle =
524 RuntimeSessionLifecycle::new(self.clone(), org_id, ctx.session_id);
525 match &outcome {
526 TurnOutcome::Success { iterations, .. }
527 | TurnOutcome::MaxIterationsReached { iterations, .. } => {
528 lifecycle
529 .turn_completed(
530 ctx.turn_id,
531 ctx.input_message_id,
532 *iterations as u32,
533 None,
534 None,
535 )
536 .await;
537 }
538 TurnOutcome::Failed { error, .. } => {
539 lifecycle
540 .turn_failed(ctx.turn_id, ctx.input_message_id, error, None)
541 .await;
542 }
543 }
544 return Ok(TurnResult::from_outcome(outcome, ctx.turn_id));
545 }
546 }
547 }
548 }
549
550 pub async fn run_text_turn(
551 &self,
552 session_id: SessionId,
553 text: impl Into<String>,
554 ) -> Result<TurnResult> {
555 self.run_turn(session_id, InputMessage::user(text)).await
556 }
557
558 pub async fn messages(&self, session_id: SessionId) -> Result<Vec<Message>> {
560 self.message_store.load(session_id).await
561 }
562
563 pub async fn read_file(
565 &self,
566 session_id: SessionId,
567 path: &str,
568 ) -> Result<Option<SessionFile>> {
569 self.file_store.read_file(session_id, path).await
570 }
571
572 pub async fn load_context(&self, session_id: SessionId) -> Result<AssembledTurnContext> {
574 let session = self
575 .session_store
576 .get_session(session_id)
577 .await?
578 .ok_or_else(|| AgentLoopError::store(format!("session not found: {session_id}")))?;
579 self.inspect_context_with_ids(session_id, session.harness_id, session.agent_id)
580 .await
581 }
582
583 pub async fn events(&self) -> Result<Vec<Event>> {
588 Ok(self.event_bus.collected_events().await)
589 }
590
591 async fn inspect_context_with_ids(
592 &self,
593 session_id: SessionId,
594 harness_id: everruns_core::HarnessId,
595 agent_id: Option<AgentId>,
596 ) -> Result<AssembledTurnContext> {
597 inspect_turn_context(
598 self.harness_store.as_ref(),
599 self.agent_store.as_ref(),
600 self.session_store.as_ref(),
601 self.message_store.as_ref(),
602 self.provider_store.as_ref(),
603 self.platform_definition.capability_registry(),
604 session_id,
605 harness_id,
606 agent_id,
607 &[],
608 Some(self.file_store.clone()),
609 )
610 .await
611 }
612}
613
614#[async_trait]
615impl RuntimeHostAdapter for InProcessRuntime {
616 async fn get_agent(&self, _org_id: i64, agent_id: AgentId) -> Result<Option<Agent>> {
617 self.agent_store.get_agent(agent_id).await
618 }
619
620 async fn get_harness(&self, _org_id: i64, harness_id: HarnessId) -> Result<Option<Harness>> {
621 let chain = self.harness_store.get_harness_chain(harness_id).await?;
622 Ok(chain.into_iter().last())
623 }
624
625 async fn set_session_status(
626 &self,
627 _org_id: i64,
628 session_id: SessionId,
629 _status: SessionStatus,
630 ) -> Result<Session> {
631 self.session_store
635 .get_session(session_id)
636 .await?
637 .ok_or_else(|| AgentLoopError::store(format!("session not found: {session_id}")))
638 }
639
640 async fn load_turn_context(
641 &self,
642 _org_id: i64,
643 session_id: SessionId,
644 ) -> Result<RuntimeHostTurnContext> {
645 let session = self
646 .session_store
647 .get_session(session_id)
648 .await?
649 .ok_or_else(|| AgentLoopError::store(format!("session not found: {session_id}")))?;
650 let agent = match session.agent_id {
651 Some(agent_id) => self.agent_store.get_agent(agent_id).await?,
652 None => None,
653 };
654 let messages = self.message_store.load(session_id).await?;
655 let model = self.provider_store.get_default_model().await?;
656 Ok(RuntimeHostTurnContext {
657 agent,
658 session,
659 messages,
660 model,
661 mcp_tool_definitions: vec![],
662 })
663 }
664
665 fn capability_registry(&self) -> CapabilityRegistry {
666 self.platform_definition.capability_registry().clone()
667 }
668
669 fn driver_registry(&self) -> DriverRegistry {
670 self.platform_definition.driver_registry().clone()
671 }
672
673 fn harness_store(&self, _org_id: i64) -> Arc<dyn HarnessStore> {
674 self.harness_store.clone()
675 }
676
677 fn agent_store(&self, _org_id: i64) -> Arc<dyn AgentStore> {
678 self.agent_store.clone()
679 }
680
681 fn session_store(&self, _org_id: i64) -> Arc<dyn SessionStore> {
682 self.session_store.clone()
683 }
684
685 fn session_mutator(&self, _org_id: i64) -> Arc<dyn SessionMutator> {
686 self.session_store.clone()
687 }
688
689 fn provider_store(&self, _org_id: i64) -> Arc<dyn LlmProviderStore> {
690 self.provider_store.clone()
691 }
692
693 fn message_store(&self) -> Arc<dyn MessageRetriever> {
694 self.message_store.clone()
695 }
696
697 fn event_emitter(&self) -> Arc<dyn EventEmitter> {
698 Arc::new(self.persisting_emitter.clone())
699 }
700
701 fn file_store(&self) -> Arc<dyn SessionFileSystem> {
702 self.file_store.clone()
703 }
704
705 fn storage_store(&self) -> Option<Arc<dyn SessionStorageStore>> {
706 Some(self.storage_store.clone())
707 }
708
709 fn memory_store(&self, _org_id: i64) -> Option<Arc<dyn MemoryStoreBackend>> {
710 Some(self.memory_store.clone())
711 }
712}
713
714#[derive(Clone)]
715struct PersistingEventEmitter {
716 inner: Arc<dyn EventBus>,
717 message_store: Arc<dyn RuntimeMessageStore>,
718}
719
720impl PersistingEventEmitter {
721 fn new(inner: Arc<dyn EventBus>, message_store: Arc<dyn RuntimeMessageStore>) -> Self {
722 Self {
723 inner,
724 message_store,
725 }
726 }
727}
728
729#[async_trait]
730impl EventEmitter for PersistingEventEmitter {
731 async fn emit(&self, request: EventRequest) -> Result<Event> {
732 let event = self.inner.emit(request.clone()).await?;
733 if let Some(message) = message_from_event(&event.data) {
734 self.message_store
735 .store_message(request.session_id, message)
736 .await?;
737 }
738 Ok(event)
739 }
740}
741
742fn effective_overlay(
743 harness_chain: &[Harness],
744 agent: Option<&Agent>,
745 session: &Session,
746) -> AgentConfigOverlay {
747 let harness_layers = harness_chain.iter().map(AgentConfigOverlay::from);
748 let agent_layers = agent.into_iter().map(AgentConfigOverlay::from);
749 AgentConfigOverlay::fold(
750 harness_layers
751 .chain(agent_layers)
752 .chain([AgentConfigOverlay::from(session)]),
753 )
754}
755
756async fn seed_runtime_initial_files(
757 harness_store: &dyn RuntimeHarnessStore,
758 agent_store: &dyn RuntimeAgentStore,
759 file_store: &dyn SessionFileSystem,
760 session: &Session,
761) -> Result<()> {
762 let harness_chain = harness_store.get_harness_chain(session.harness_id).await?;
763 if harness_chain.is_empty() {
764 return Err(AgentLoopError::store(format!(
765 "harness not found while seeding files: {}",
766 session.harness_id
767 )));
768 }
769 let agent = match session.agent_id {
770 Some(agent_id) => Some(
771 agent_store
772 .get_agent(agent_id)
773 .await?
774 .ok_or_else(|| AgentLoopError::store(format!("agent not found: {agent_id}")))?,
775 ),
776 None => None,
777 };
778 let overlay = effective_overlay(&harness_chain, agent.as_ref(), session);
779 for file in &overlay.initial_files {
780 file_store.seed_initial_file(session.id, file).await?;
781 }
782 Ok(())
783}
784
785fn message_from_event(data: &EventData) -> Option<Message> {
786 match data {
787 EventData::InputMessage(data) => Some(data.message.clone()),
788 EventData::OutputMessageCompleted(OutputMessageCompletedData { message, .. }) => {
789 Some(message.clone())
790 }
791 EventData::ToolCompleted(data) => Some(tool_completed_to_message(data.clone())),
792 _ => None,
793 }
794}
795
796fn tool_completed_to_message(data: ToolCompletedData) -> Message {
797 let mut images: Vec<ToolResultImage> = Vec::new();
798 let result = data.result.map(|parts| {
799 for part in &parts {
800 if let ContentPart::Image(img) = part
801 && let (Some(base64), Some(media_type)) = (&img.base64, &img.media_type)
802 {
803 images.push(ToolResultImage {
804 base64: base64.clone(),
805 media_type: media_type.clone(),
806 });
807 }
808 }
809
810 let text_parts: Vec<&ContentPart> = parts
811 .iter()
812 .filter(|part| matches!(part, ContentPart::Text(_)))
813 .collect();
814 if text_parts.len() == 1
815 && let ContentPart::Text(text) = text_parts[0]
816 {
817 return serde_json::Value::String(text.text.clone());
818 }
819 if !text_parts.is_empty() {
820 serde_json::to_value(&text_parts).unwrap_or_default()
821 } else {
822 serde_json::Value::Null
823 }
824 });
825
826 if images.is_empty() {
827 Message::tool_result(&data.tool_call_id, result, data.error)
828 } else {
829 Message::tool_result_with_images(&data.tool_call_id, result, images)
830 }
831}