use crate::backends::{
EventBus, RuntimeAgentStore, RuntimeBackends, RuntimeHarnessStore, RuntimeMessageStore,
RuntimeProviderStore, RuntimeSessionStore,
};
use crate::builders::SingleSessionBuilder;
use crate::host::{
RuntimeHostAdapter, RuntimeHostTurnContext, RuntimeSessionLifecycle, execute_act_activity,
execute_input_activity, execute_reason_activity,
};
use crate::in_memory::{InMemorySessionFileStore, InMemorySessionFileSystemFactory};
use async_trait::async_trait;
use everruns_core::agent::Agent;
use everruns_core::atoms::{ActInput, AtomContext, InputAtomInput, ReasonInput};
use everruns_core::capabilities::{Capability, CapabilityRegistry};
use everruns_core::config_layer::AgentConfigOverlay;
use everruns_core::error::{AgentLoopError, Result};
use everruns_core::events::{
Event, EventContext, EventData, EventRequest, InputMessageData, OutputMessageCompletedData,
ToolCompletedData,
};
use everruns_core::harness::Harness;
use everruns_core::llm_driver_registry::{DriverRegistry, ProviderType};
use everruns_core::llm_models::LlmProviderType;
use everruns_core::llmsim_driver::{LlmSimConfig, LlmSimDriver};
use everruns_core::message::{ContentPart, Message};
use everruns_core::platform_definition::PlatformDefinition;
use everruns_core::runtime_context::{AssembledTurnContext, inspect_turn_context};
use everruns_core::session::{Session, SessionStatus};
use everruns_core::session_file::{InitialFile, SessionFile};
use everruns_core::tools::ToolResultImage;
use everruns_core::traits::{
AgentStore, EventEmitter, HarnessStore, LlmProviderStore, ModelWithProvider, SessionMutator,
SessionStorageStore, SessionStore,
};
use everruns_core::turn::{TurnAction, TurnContext, TurnOutcome, TurnStateMachine};
use everruns_core::typed_id::{AgentId, HarnessId, SessionId};
use everruns_core::{
InputMessage, MemoryStoreBackend, MessageRetriever, SessionFileSystem,
SessionFileSystemFactoryContext,
};
use std::sync::Arc;
const IN_PROCESS_ORG_ID: i64 = everruns_core::DEFAULT_ORG_ID;
#[derive(Debug, Clone)]
pub struct TurnResult {
pub response: String,
pub iterations: usize,
pub tool_calls_count: usize,
pub success: bool,
pub error: Option<String>,
pub turn_id: everruns_core::typed_id::TurnId,
}
impl TurnResult {
fn from_outcome(outcome: TurnOutcome, turn_id: everruns_core::typed_id::TurnId) -> Self {
match outcome {
TurnOutcome::Success {
response,
iterations,
tool_calls_count,
} => Self {
response,
iterations,
tool_calls_count,
success: true,
error: None,
turn_id,
},
TurnOutcome::Failed { error, iterations } => Self {
response: String::new(),
iterations,
tool_calls_count: 0,
success: false,
error: Some(error),
turn_id,
},
TurnOutcome::MaxIterationsReached {
response,
iterations,
tool_calls_count,
} => Self {
response,
iterations,
tool_calls_count,
success: true,
error: None,
turn_id,
},
}
}
}
pub struct InProcessRuntimeBuilder {
platform_definition: PlatformDefinition,
llm_sim_config: Option<LlmSimConfig>,
default_model: Option<ModelWithProvider>,
backends: Option<RuntimeBackends>,
session_file_system_factory_context: SessionFileSystemFactoryContext,
harnesses: Vec<Harness>,
agents: Vec<Agent>,
sessions: Vec<Session>,
default_session_id: Option<SessionId>,
seeded_files: Vec<(SessionId, InitialFile)>,
}
impl Default for InProcessRuntimeBuilder {
fn default() -> Self {
Self::new()
}
}
impl InProcessRuntimeBuilder {
pub fn new() -> Self {
Self {
platform_definition: PlatformDefinition::builder()
.capability_registry(CapabilityRegistry::with_builtins())
.driver_registry(DriverRegistry::new())
.session_file_system_factory(Arc::new(InMemorySessionFileSystemFactory))
.build(),
llm_sim_config: None,
default_model: None,
backends: None,
session_file_system_factory_context: SessionFileSystemFactoryContext::new(),
harnesses: Vec::new(),
agents: Vec::new(),
sessions: Vec::new(),
default_session_id: None,
seeded_files: Vec::new(),
}
}
pub fn platform_definition(mut self, platform_definition: PlatformDefinition) -> Self {
self.platform_definition = platform_definition;
self
}
pub fn capability<C: Capability + 'static>(mut self, capability: C) -> Self {
self.platform_definition
.capability_registry_mut()
.register(capability);
self
}
pub fn driver_registry(mut self, driver_registry: DriverRegistry) -> Self {
*self.platform_definition.driver_registry_mut() = driver_registry;
self
}
pub fn llm_sim(mut self, config: LlmSimConfig) -> Self {
self.llm_sim_config = Some(config);
self
}
pub fn default_model(mut self, model: ModelWithProvider) -> Self {
self.default_model = Some(model);
self
}
pub fn backends(mut self, backends: RuntimeBackends) -> Self {
self.backends = Some(backends);
self
}
pub fn session_file_system_factory_context(
mut self,
context: SessionFileSystemFactoryContext,
) -> Self {
self.session_file_system_factory_context = context;
self
}
pub fn harness(mut self, harness: Harness) -> Self {
self.harnesses.push(harness);
self
}
pub fn agent(mut self, agent: Agent) -> Self {
self.agents.push(agent);
self
}
pub fn session(mut self, session: Session) -> Self {
self.sessions.push(session);
self
}
pub fn single_session<F>(mut self, configure: F) -> Self
where
F: FnOnce(SingleSessionBuilder) -> SingleSessionBuilder,
{
let (harness, agent, session, session_id) =
configure(SingleSessionBuilder::default()).build();
self.harnesses.push(harness);
self.agents.push(agent);
self.sessions.push(session);
self.default_session_id = Some(session_id);
self
}
pub fn seed_text_file(
mut self,
session_id: SessionId,
path: impl Into<String>,
content: impl Into<String>,
) -> Self {
self.seeded_files.push((
session_id,
InitialFile {
path: path.into(),
content: content.into(),
encoding: "text".to_string(),
is_readonly: false,
},
));
self
}
pub async fn build(mut self) -> Result<InProcessRuntime> {
let backends = match self.backends.take() {
Some(backends) => backends,
None => RuntimeBackends::in_memory(),
};
let file_store = resolve_session_file_system(
&self.platform_definition,
self.session_file_system_factory_context.clone(),
)
.await?;
if let Some(config) = self.llm_sim_config.take() {
let driver = LlmSimDriver::new(config);
self.platform_definition
.driver_registry_mut()
.register(ProviderType::LlmSim, move |_api_key, _base_url| {
Box::new(driver.clone())
});
if self.default_model.is_none() {
self.default_model = Some(ModelWithProvider {
model: "llmsim-model".to_string(),
provider_type: LlmProviderType::LlmSim,
api_key: Some("fake-key".to_string()),
base_url: None,
});
}
}
let default_model = self.default_model.ok_or_else(|| {
AgentLoopError::config(
"in-process runtime requires a default model; call \
InProcessRuntimeBuilder::default_model(...) or \
InProcessRuntimeBuilder::llm_sim(...)",
)
})?;
backends
.provider_store
.set_default_model(default_model)
.await?;
for harness in &self.harnesses {
backends.harness_store.add_harness(harness.clone()).await?;
}
for agent in &self.agents {
backends.agent_store.add_agent(agent.clone()).await?;
}
for session in &self.sessions {
backends.session_store.add_session(session.clone()).await?;
}
for session in &self.sessions {
seed_runtime_initial_files(
backends.harness_store.as_ref(),
backends.agent_store.as_ref(),
file_store.as_ref(),
session,
)
.await?;
}
for (session_id, file) in &self.seeded_files {
file_store.seed_initial_file(*session_id, file).await?;
}
let persisting_emitter =
PersistingEventEmitter::new(backends.event_bus.clone(), backends.message_store.clone());
Ok(InProcessRuntime {
platform_definition: Arc::new(self.platform_definition),
harness_store: backends.harness_store,
agent_store: backends.agent_store,
session_store: backends.session_store,
default_session_id: self.default_session_id,
message_store: backends.message_store,
provider_store: backends.provider_store,
event_bus: backends.event_bus,
persisting_emitter,
file_store,
storage_store: backends.storage_store,
memory_store: backends.memory_store,
})
}
}
async fn resolve_session_file_system(
platform_definition: &PlatformDefinition,
file_system_factory_context: SessionFileSystemFactoryContext,
) -> Result<Arc<dyn SessionFileSystem>> {
let file_system_factory = platform_definition.session_file_system_factory();
if file_system_factory.is_disabled() {
Ok(Arc::new(InMemorySessionFileStore::new()))
} else {
Ok(file_system_factory
.create_session_file_system(file_system_factory_context)
.await?)
}
}
#[derive(Clone)]
pub struct InProcessRuntime {
platform_definition: Arc<PlatformDefinition>,
harness_store: Arc<dyn RuntimeHarnessStore>,
agent_store: Arc<dyn RuntimeAgentStore>,
session_store: Arc<dyn RuntimeSessionStore>,
default_session_id: Option<SessionId>,
message_store: Arc<dyn RuntimeMessageStore>,
provider_store: Arc<dyn RuntimeProviderStore>,
event_bus: Arc<dyn EventBus>,
persisting_emitter: PersistingEventEmitter,
file_store: Arc<dyn SessionFileSystem>,
storage_store: Arc<dyn SessionStorageStore>,
memory_store: Arc<dyn MemoryStoreBackend>,
}
impl InProcessRuntime {
pub fn builder() -> InProcessRuntimeBuilder {
InProcessRuntimeBuilder::new()
}
pub fn default_session_id(&self) -> Option<SessionId> {
self.default_session_id
}
pub async fn run_turn(
&self,
session_id: SessionId,
input: impl Into<InputMessage>,
) -> Result<TurnResult> {
let session = self
.session_store
.get_session(session_id)
.await?
.ok_or_else(|| AgentLoopError::store(format!("session not found: {session_id}")))?;
let input_message = self
.message_store
.add_input_message(session_id, input.into())
.await?;
self.event_bus
.emit(EventRequest::new(
session_id,
EventContext::empty(),
InputMessageData::new(input_message.clone()),
))
.await?;
let assembled = self
.inspect_context_with_ids(session_id, session.harness_id, session.agent_id)
.await?;
let synthetic_agent_id = session
.agent_id
.unwrap_or_else(|| AgentId::from_uuid(session.id.uuid()));
let org_id: i64 = IN_PROCESS_ORG_ID;
let mut state_machine = TurnStateMachine::new(
TurnContext::new(session_id, input_message.id, synthetic_agent_id, org_id),
assembled.runtime_agent.max_iterations,
);
let mut previous_response_id: Option<String> = None;
let mut last_reason_result: Option<everruns_core::ReasonResult> = None;
loop {
match state_machine.next_action() {
TurnAction::ExecuteInput => {
let ctx = state_machine.context();
let base_context =
AtomContext::new(ctx.session_id, ctx.turn_id, ctx.input_message_id);
execute_input_activity(
self,
org_id,
InputAtomInput {
context: base_context,
},
)
.await?;
state_machine.on_input_completed();
}
TurnAction::ExecuteReason => {
let ctx = state_machine.context();
let base_context =
AtomContext::new(ctx.session_id, ctx.turn_id, ctx.input_message_id);
let reason_result = execute_reason_activity(
self,
org_id,
ReasonInput {
context: base_context.next_exec(),
harness_id: session.harness_id,
agent_id: session.agent_id,
org_id,
mcp_tool_definitions: vec![],
previous_response_id: previous_response_id.take(),
iteration: state_machine.current_iteration() as u32 + 1,
},
)
.await?;
previous_response_id = reason_result.response_id.clone();
state_machine.on_reason_completed(
reason_result.text.clone(),
reason_result.has_tool_calls,
reason_result.tool_calls.len(),
reason_result.success,
reason_result.error.clone(),
false,
);
if reason_result.has_tool_calls {
last_reason_result = Some(reason_result);
}
}
TurnAction::ExecuteAct => {
let reason_result = last_reason_result
.take()
.expect("ExecuteAct requires a prior ReasonResult");
let ctx = state_machine.context();
let base_context =
AtomContext::new(ctx.session_id, ctx.turn_id, ctx.input_message_id);
execute_act_activity(
self,
ActInput {
org_id: Some(org_id),
context: base_context.next_exec(),
harness_id: session.harness_id,
agent_id: session.agent_id,
tool_calls: reason_result.tool_calls,
tool_definitions: reason_result.tool_definitions,
locale: reason_result.locale,
blueprint_id: None,
network_access: reason_result.network_access,
},
)
.await?;
state_machine.on_act_completed();
}
TurnAction::Complete(outcome) => {
let ctx = state_machine.context();
let lifecycle =
RuntimeSessionLifecycle::new(self.clone(), org_id, ctx.session_id);
match &outcome {
TurnOutcome::Success { iterations, .. }
| TurnOutcome::MaxIterationsReached { iterations, .. } => {
lifecycle
.turn_completed(
ctx.turn_id,
ctx.input_message_id,
*iterations as u32,
None,
None,
)
.await;
}
TurnOutcome::Failed { error, .. } => {
lifecycle
.turn_failed(ctx.turn_id, ctx.input_message_id, error, None)
.await;
}
}
return Ok(TurnResult::from_outcome(outcome, ctx.turn_id));
}
}
}
}
pub async fn run_text_turn(
&self,
session_id: SessionId,
text: impl Into<String>,
) -> Result<TurnResult> {
self.run_turn(session_id, InputMessage::user(text)).await
}
pub async fn messages(&self, session_id: SessionId) -> Result<Vec<Message>> {
self.message_store.load(session_id).await
}
pub async fn read_file(
&self,
session_id: SessionId,
path: &str,
) -> Result<Option<SessionFile>> {
self.file_store.read_file(session_id, path).await
}
pub async fn load_context(&self, session_id: SessionId) -> Result<AssembledTurnContext> {
let session = self
.session_store
.get_session(session_id)
.await?
.ok_or_else(|| AgentLoopError::store(format!("session not found: {session_id}")))?;
self.inspect_context_with_ids(session_id, session.harness_id, session.agent_id)
.await
}
pub async fn events(&self) -> Result<Vec<Event>> {
Ok(self.event_bus.collected_events().await)
}
async fn inspect_context_with_ids(
&self,
session_id: SessionId,
harness_id: everruns_core::HarnessId,
agent_id: Option<AgentId>,
) -> Result<AssembledTurnContext> {
inspect_turn_context(
self.harness_store.as_ref(),
self.agent_store.as_ref(),
self.session_store.as_ref(),
self.message_store.as_ref(),
self.provider_store.as_ref(),
self.platform_definition.capability_registry(),
session_id,
harness_id,
agent_id,
&[],
Some(self.file_store.clone()),
)
.await
}
}
#[async_trait]
impl RuntimeHostAdapter for InProcessRuntime {
async fn get_agent(&self, _org_id: i64, agent_id: AgentId) -> Result<Option<Agent>> {
self.agent_store.get_agent(agent_id).await
}
async fn get_harness(&self, _org_id: i64, harness_id: HarnessId) -> Result<Option<Harness>> {
let chain = self.harness_store.get_harness_chain(harness_id).await?;
Ok(chain.into_iter().last())
}
async fn set_session_status(
&self,
_org_id: i64,
session_id: SessionId,
_status: SessionStatus,
) -> Result<Session> {
self.session_store
.get_session(session_id)
.await?
.ok_or_else(|| AgentLoopError::store(format!("session not found: {session_id}")))
}
async fn load_turn_context(
&self,
_org_id: i64,
session_id: SessionId,
) -> Result<RuntimeHostTurnContext> {
let session = self
.session_store
.get_session(session_id)
.await?
.ok_or_else(|| AgentLoopError::store(format!("session not found: {session_id}")))?;
let agent = match session.agent_id {
Some(agent_id) => self.agent_store.get_agent(agent_id).await?,
None => None,
};
let messages = self.message_store.load(session_id).await?;
let model = self.provider_store.get_default_model().await?;
Ok(RuntimeHostTurnContext {
agent,
session,
messages,
model,
mcp_tool_definitions: vec![],
})
}
fn capability_registry(&self) -> CapabilityRegistry {
self.platform_definition.capability_registry().clone()
}
fn driver_registry(&self) -> DriverRegistry {
self.platform_definition.driver_registry().clone()
}
fn harness_store(&self, _org_id: i64) -> Arc<dyn HarnessStore> {
self.harness_store.clone()
}
fn agent_store(&self, _org_id: i64) -> Arc<dyn AgentStore> {
self.agent_store.clone()
}
fn session_store(&self, _org_id: i64) -> Arc<dyn SessionStore> {
self.session_store.clone()
}
fn session_mutator(&self, _org_id: i64) -> Arc<dyn SessionMutator> {
self.session_store.clone()
}
fn provider_store(&self, _org_id: i64) -> Arc<dyn LlmProviderStore> {
self.provider_store.clone()
}
fn message_store(&self) -> Arc<dyn MessageRetriever> {
self.message_store.clone()
}
fn event_emitter(&self) -> Arc<dyn EventEmitter> {
Arc::new(self.persisting_emitter.clone())
}
fn file_store(&self) -> Arc<dyn SessionFileSystem> {
self.file_store.clone()
}
fn storage_store(&self) -> Option<Arc<dyn SessionStorageStore>> {
Some(self.storage_store.clone())
}
fn memory_store(&self, _org_id: i64) -> Option<Arc<dyn MemoryStoreBackend>> {
Some(self.memory_store.clone())
}
}
#[derive(Clone)]
struct PersistingEventEmitter {
inner: Arc<dyn EventBus>,
message_store: Arc<dyn RuntimeMessageStore>,
}
impl PersistingEventEmitter {
fn new(inner: Arc<dyn EventBus>, message_store: Arc<dyn RuntimeMessageStore>) -> Self {
Self {
inner,
message_store,
}
}
}
#[async_trait]
impl EventEmitter for PersistingEventEmitter {
async fn emit(&self, request: EventRequest) -> Result<Event> {
let event = self.inner.emit(request.clone()).await?;
if let Some(message) = message_from_event(&event.data) {
self.message_store
.store_message(request.session_id, message)
.await?;
}
Ok(event)
}
}
fn effective_overlay(
harness_chain: &[Harness],
agent: Option<&Agent>,
session: &Session,
) -> AgentConfigOverlay {
let harness_layers = harness_chain.iter().map(AgentConfigOverlay::from);
let agent_layers = agent.into_iter().map(AgentConfigOverlay::from);
AgentConfigOverlay::fold(
harness_layers
.chain(agent_layers)
.chain([AgentConfigOverlay::from(session)]),
)
}
async fn seed_runtime_initial_files(
harness_store: &dyn RuntimeHarnessStore,
agent_store: &dyn RuntimeAgentStore,
file_store: &dyn SessionFileSystem,
session: &Session,
) -> Result<()> {
let harness_chain = harness_store.get_harness_chain(session.harness_id).await?;
if harness_chain.is_empty() {
return Err(AgentLoopError::store(format!(
"harness not found while seeding files: {}",
session.harness_id
)));
}
let agent = match session.agent_id {
Some(agent_id) => Some(
agent_store
.get_agent(agent_id)
.await?
.ok_or_else(|| AgentLoopError::store(format!("agent not found: {agent_id}")))?,
),
None => None,
};
let overlay = effective_overlay(&harness_chain, agent.as_ref(), session);
for file in &overlay.initial_files {
file_store.seed_initial_file(session.id, file).await?;
}
Ok(())
}
fn message_from_event(data: &EventData) -> Option<Message> {
match data {
EventData::InputMessage(data) => Some(data.message.clone()),
EventData::OutputMessageCompleted(OutputMessageCompletedData { message, .. }) => {
Some(message.clone())
}
EventData::ToolCompleted(data) => Some(tool_completed_to_message(data.clone())),
_ => None,
}
}
fn tool_completed_to_message(data: ToolCompletedData) -> Message {
let mut images: Vec<ToolResultImage> = Vec::new();
let result = data.result.map(|parts| {
for part in &parts {
if let ContentPart::Image(img) = part
&& let (Some(base64), Some(media_type)) = (&img.base64, &img.media_type)
{
images.push(ToolResultImage {
base64: base64.clone(),
media_type: media_type.clone(),
});
}
}
let text_parts: Vec<&ContentPart> = parts
.iter()
.filter(|part| matches!(part, ContentPart::Text(_)))
.collect();
if text_parts.len() == 1
&& let ContentPart::Text(text) = text_parts[0]
{
return serde_json::Value::String(text.text.clone());
}
if !text_parts.is_empty() {
serde_json::to_value(&text_parts).unwrap_or_default()
} else {
serde_json::Value::Null
}
});
if images.is_empty() {
Message::tool_result(&data.tool_call_id, result, data.error)
} else {
Message::tool_result_with_images(&data.tool_call_id, result, images)
}
}