Skip to main content

everruns_runtime/
runtime.rs

1// In-process runtime builder and runner.
2// Decision: the public runtime is in-memory today, but uses the same core atoms
3// and capability resolution path as the durable worker so behavior stays close.
4
5use crate::backends::{
6    EventBus, RuntimeAgentStore, RuntimeBackends, RuntimeHarnessStore, RuntimeMessageStore,
7    RuntimeProviderStore, RuntimeSessionStore,
8};
9use crate::builders::SingleSessionBuilder;
10use crate::host::{
11    RuntimeHostAdapter, RuntimeHostTurnContext, RuntimeSessionLifecycle, execute_act_activity,
12    execute_input_activity, execute_reason_activity,
13};
14use crate::in_memory::{InMemorySessionFileStore, InMemorySessionFileSystemFactory};
15use async_trait::async_trait;
16use everruns_core::agent::Agent;
17use everruns_core::atoms::{ActInput, AtomContext, InputAtomInput, ReasonInput};
18use everruns_core::capabilities::{Capability, CapabilityRegistry};
19use everruns_core::config_layer::AgentConfigOverlay;
20use everruns_core::error::{AgentLoopError, Result};
21use everruns_core::events::{
22    Event, EventContext, EventData, EventRequest, InputMessageData, OutputMessageCompletedData,
23    ToolCompletedData,
24};
25use everruns_core::harness::Harness;
26use everruns_core::llm_driver_registry::{DriverRegistry, ProviderType};
27use everruns_core::llm_models::LlmProviderType;
28use everruns_core::llmsim_driver::{LlmSimConfig, LlmSimDriver};
29use everruns_core::message::{ContentPart, Message};
30use everruns_core::platform_definition::PlatformDefinition;
31use everruns_core::runtime_context::{AssembledTurnContext, inspect_turn_context};
32use everruns_core::session::{Session, SessionStatus};
33use everruns_core::session_file::{InitialFile, SessionFile};
34use everruns_core::tools::ToolResultImage;
35use everruns_core::traits::{
36    AgentStore, EventEmitter, HarnessStore, LlmProviderStore, ModelWithProvider, SessionMutator,
37    SessionStorageStore, SessionStore, UserConnectionResolver,
38};
39use everruns_core::turn::{TurnAction, TurnContext, TurnOutcome, TurnStateMachine};
40use everruns_core::typed_id::{AgentId, HarnessId, OrgId, SessionId};
41use everruns_core::{
42    InputMessage, MessageRetriever, SessionFileSystem, SessionFileSystemFactoryContext,
43};
44use sha2::{Digest, Sha256};
45use std::sync::Arc;
46
47/// Cap on the input length hashed by [`hash_public_org_id`].
48///
49/// Legitimate org public ids are `org_<32hex>` (36 bytes). Bounding the
50/// hashed prefix keeps worst-case cost predictable when an attacker-controlled
51/// session carries an oversize string.
52const HASH_INPUT_CAP_BYTES: usize = 128;
53
54/// Derive an internal `i64` org id from the public `org_<32hex>` form on a
55/// [`Session`].
56///
57/// Round-trip with [`everruns_core::org_public_id_from_internal`]: when the
58/// public id was produced by that helper (i.e. the upper bits are zero and
59/// the value fits in a positive `i64`), this returns the original internal
60/// id unchanged. Other values are mapped into `[2, i64::MAX]` by hashing the
61/// original public id string so runtime namespaces do not fail open to the
62/// shared default org and avoid arithmetic collision gadgets.
63fn in_process_internal_org_id(public_org_id: &str) -> i64 {
64    if public_org_id == everruns_core::DEFAULT_ORG_PUBLIC_ID {
65        return everruns_core::DEFAULT_ORG_ID;
66    }
67
68    let Ok(parsed) = public_org_id.parse::<OrgId>() else {
69        return hash_public_org_id(public_org_id);
70    };
71    let raw: u128 = parsed.uuid().as_u128();
72    if raw == 0 {
73        return hash_public_org_id(public_org_id);
74    }
75
76    // Synthetic ids from `org_public_id_from_internal(i64)` always fit here,
77    // so the in-process runtime sees the same `org_id` the server used.
78    if raw <= i64::MAX as u128 {
79        return raw as i64;
80    }
81
82    hash_public_org_id(public_org_id)
83}
84
85// Use SHA-256 with a fixed truncation scheme so the mapping is stable across
86// Rust/binary upgrades and predictable for any embedder. Input is bounded to
87// `HASH_INPUT_CAP_BYTES` so attacker-controlled oversize org strings cannot
88// drive unbounded hashing work.
89fn hash_public_org_id(public_org_id: &str) -> i64 {
90    let bytes = public_org_id.as_bytes();
91    let bounded = &bytes[..bytes.len().min(HASH_INPUT_CAP_BYTES)];
92    let digest = Sha256::digest(bounded);
93    let mut buf = [0u8; 8];
94    buf.copy_from_slice(&digest[..8]);
95    let raw = u64::from_be_bytes(buf);
96    ((raw % ((i64::MAX - 1) as u64)) as i64) + 2
97}
98
99#[derive(Debug, Clone)]
100pub struct TurnResult {
101    /// Final text response produced by the turn.
102    pub response: String,
103    /// Number of reason iterations executed.
104    pub iterations: usize,
105    /// Total number of tool calls executed during the turn.
106    pub tool_calls_count: usize,
107    /// Whether the turn completed without an unrecoverable failure.
108    pub success: bool,
109    /// Failure message when `success` is false.
110    pub error: Option<String>,
111    /// Turn identifier used to correlate emitted events.
112    pub turn_id: everruns_core::typed_id::TurnId,
113}
114
115impl TurnResult {
116    fn from_outcome(outcome: TurnOutcome, turn_id: everruns_core::typed_id::TurnId) -> Self {
117        match outcome {
118            TurnOutcome::Success {
119                response,
120                iterations,
121                tool_calls_count,
122            } => Self {
123                response,
124                iterations,
125                tool_calls_count,
126                success: true,
127                error: None,
128                turn_id,
129            },
130            TurnOutcome::Failed { error, iterations } => Self {
131                response: String::new(),
132                iterations,
133                tool_calls_count: 0,
134                success: false,
135                error: Some(error),
136                turn_id,
137            },
138            TurnOutcome::MaxIterationsReached {
139                response,
140                iterations,
141                tool_calls_count,
142            } => Self {
143                response,
144                iterations,
145                tool_calls_count,
146                success: true,
147                error: None,
148                turn_id,
149            },
150        }
151    }
152}
153
154/// Builder for the public in-process runtime.
155///
156/// The builder owns a standalone runtime bundle:
157/// - `PlatformDefinition` for capabilities and drivers
158/// - in-memory stores for sessions, files, storage, memory, and messages
159/// - seeded harness/agent/session entities
160///
161/// `build()` returns an [`InProcessRuntime`] that can execute turns in-process
162/// without the durable engine or the control-plane server.
163pub struct InProcessRuntimeBuilder {
164    platform_definition: PlatformDefinition,
165    llm_sim_config: Option<LlmSimConfig>,
166    default_model: Option<ModelWithProvider>,
167    backends: Option<RuntimeBackends>,
168    session_file_system_factory_context: SessionFileSystemFactoryContext,
169    harnesses: Vec<Harness>,
170    agents: Vec<Agent>,
171    sessions: Vec<Session>,
172    default_session_id: Option<SessionId>,
173    seeded_files: Vec<(SessionId, InitialFile)>,
174    mcp_auth_provider: Option<Arc<dyn everruns_mcp::McpAuthProvider>>,
175}
176
177impl Default for InProcessRuntimeBuilder {
178    fn default() -> Self {
179        Self::new()
180    }
181}
182
183impl InProcessRuntimeBuilder {
184    /// Create a builder with built-in capabilities and no implicit LLM driver.
185    ///
186    /// Embedders must either:
187    /// - call [`Self::llm_sim`] for deterministic local examples/tests, or
188    /// - register their own driver(s) on the platform definition and set a
189    ///   default model via [`Self::default_model`].
190    pub fn new() -> Self {
191        Self {
192            platform_definition: PlatformDefinition::builder()
193                .capability_registry(CapabilityRegistry::with_builtins())
194                .driver_registry(DriverRegistry::new())
195                .session_file_system_factory(Arc::new(InMemorySessionFileSystemFactory))
196                .build(),
197            llm_sim_config: None,
198            default_model: None,
199            backends: None,
200            session_file_system_factory_context: SessionFileSystemFactoryContext::new(),
201            harnesses: Vec::new(),
202            agents: Vec::new(),
203            sessions: Vec::new(),
204            default_session_id: None,
205            seeded_files: Vec::new(),
206            mcp_auth_provider: None,
207        }
208    }
209
210    /// Set the auth provider used to acquire credentials for scoped MCP
211    /// servers (specs/runtime-mcp.md D3). Defaults to no credentials, suitable
212    /// for unauthenticated servers or servers carrying literal auth headers.
213    pub fn mcp_auth_provider(mut self, provider: Arc<dyn everruns_mcp::McpAuthProvider>) -> Self {
214        self.mcp_auth_provider = Some(provider);
215        self
216    }
217
218    /// Replace the platform definition used by the runtime.
219    pub fn platform_definition(mut self, platform_definition: PlatformDefinition) -> Self {
220        self.platform_definition = platform_definition;
221        self
222    }
223
224    /// Register an additional capability on the runtime platform.
225    pub fn capability<C: Capability + 'static>(mut self, capability: C) -> Self {
226        self.platform_definition
227            .capability_registry_mut()
228            .register(capability);
229        self
230    }
231
232    /// Replace the platform driver registry.
233    pub fn driver_registry(mut self, driver_registry: DriverRegistry) -> Self {
234        *self.platform_definition.driver_registry_mut() = driver_registry;
235        self
236    }
237
238    /// Register the built-in `llmsim` driver for deterministic local execution.
239    pub fn llm_sim(mut self, config: LlmSimConfig) -> Self {
240        self.llm_sim_config = Some(config);
241        self
242    }
243
244    /// Set the runtime default model used when sessions/agents do not override it.
245    pub fn default_model(mut self, model: ModelWithProvider) -> Self {
246        self.default_model = Some(model);
247        self
248    }
249
250    /// Supply a custom backend bundle instead of the built-in in-memory stores.
251    pub fn backends(mut self, backends: RuntimeBackends) -> Self {
252        self.backends = Some(backends);
253        self
254    }
255
256    /// Supply host dependencies needed by the platform session filesystem factory.
257    pub fn session_file_system_factory_context(
258        mut self,
259        context: SessionFileSystemFactoryContext,
260    ) -> Self {
261        self.session_file_system_factory_context = context;
262        self
263    }
264
265    /// Seed a harness into the runtime store.
266    pub fn harness(mut self, harness: Harness) -> Self {
267        self.harnesses.push(harness);
268        self
269    }
270
271    /// Seed an agent into the runtime store.
272    pub fn agent(mut self, agent: Agent) -> Self {
273        self.agents.push(agent);
274        self
275    }
276
277    /// Seed a session into the runtime store.
278    pub fn session(mut self, session: Session) -> Self {
279        self.sessions.push(session);
280        self
281    }
282
283    /// Seed one harness, one agent, and one session with a compact sub-builder.
284    ///
285    /// The generated session id is exposed from the built runtime via
286    /// [`InProcessRuntime::default_session_id`].
287    pub fn single_session<F>(mut self, configure: F) -> Self
288    where
289        F: FnOnce(SingleSessionBuilder) -> SingleSessionBuilder,
290    {
291        let (harness, agent, session, session_id) =
292            configure(SingleSessionBuilder::default()).build();
293        self.harnesses.push(harness);
294        self.agents.push(agent);
295        self.sessions.push(session);
296        self.default_session_id = Some(session_id);
297        self
298    }
299
300    /// Seed an additional text file directly into a session workspace.
301    ///
302    /// This is applied after harness/agent/session `initial_files` are merged.
303    pub fn seed_text_file(
304        mut self,
305        session_id: SessionId,
306        path: impl Into<String>,
307        content: impl Into<String>,
308    ) -> Self {
309        self.seeded_files.push((
310            session_id,
311            InitialFile {
312                path: path.into(),
313                content: content.into(),
314                encoding: "text".to_string(),
315                is_readonly: false,
316            },
317        ));
318        self
319    }
320
321    /// Build the in-process runtime.
322    ///
323    /// Returns a configuration error when no default model is available after
324    /// applying explicit configuration and any requested `llmsim` setup.
325    pub async fn build(mut self) -> Result<InProcessRuntime> {
326        let backends = match self.backends.take() {
327            Some(backends) => backends,
328            None => RuntimeBackends::in_memory(),
329        };
330        let file_store = resolve_session_file_system(
331            &self.platform_definition,
332            self.session_file_system_factory_context.clone(),
333        )
334        .await?;
335
336        if let Some(config) = self.llm_sim_config.take() {
337            let driver = LlmSimDriver::new(config);
338            self.platform_definition
339                .driver_registry_mut()
340                .register(ProviderType::LlmSim, move |_api_key, _base_url| {
341                    Box::new(driver.clone())
342                });
343
344            if self.default_model.is_none() {
345                self.default_model = Some(ModelWithProvider {
346                    model: "llmsim-model".to_string(),
347                    provider_type: LlmProviderType::LlmSim,
348                    api_key: Some("fake-key".to_string()),
349                    base_url: None,
350                });
351            }
352        }
353
354        let default_model = self.default_model.ok_or_else(|| {
355            AgentLoopError::config(
356                "in-process runtime requires a default model; call \
357                 InProcessRuntimeBuilder::default_model(...) or \
358                 InProcessRuntimeBuilder::llm_sim(...)",
359            )
360        })?;
361
362        backends
363            .provider_store
364            .set_default_model(default_model)
365            .await?;
366
367        for harness in &self.harnesses {
368            backends.harness_store.add_harness(harness.clone()).await?;
369        }
370        for agent in &self.agents {
371            backends.agent_store.add_agent(agent.clone()).await?;
372        }
373        for session in &self.sessions {
374            backends.session_store.add_session(session.clone()).await?;
375        }
376
377        for session in &self.sessions {
378            seed_runtime_initial_files(
379                backends.harness_store.as_ref(),
380                backends.agent_store.as_ref(),
381                file_store.as_ref(),
382                session,
383            )
384            .await?;
385        }
386
387        for (session_id, file) in &self.seeded_files {
388            file_store.seed_initial_file(*session_id, file).await?;
389        }
390
391        let persisting_emitter =
392            PersistingEventEmitter::new(backends.event_bus.clone(), backends.message_store.clone());
393
394        Ok(InProcessRuntime {
395            platform_definition: Arc::new(self.platform_definition),
396            harness_store: backends.harness_store,
397            agent_store: backends.agent_store,
398            session_store: backends.session_store,
399            default_session_id: self.default_session_id,
400            message_store: backends.message_store,
401            provider_store: backends.provider_store,
402            event_bus: backends.event_bus,
403            persisting_emitter,
404            file_store,
405            storage_store: backends.storage_store,
406            connection_resolver: backends.connection_resolver,
407            mcp_auth_provider: self
408                .mcp_auth_provider
409                .unwrap_or_else(|| Arc::new(everruns_mcp::NoAuthProvider)),
410            mcp_discovery_cache: Arc::new(crate::mcp_cache::McpDiscoveryCache::new()),
411        })
412    }
413}
414
415async fn resolve_session_file_system(
416    platform_definition: &PlatformDefinition,
417    file_system_factory_context: SessionFileSystemFactoryContext,
418) -> Result<Arc<dyn SessionFileSystem>> {
419    let file_system_factory = platform_definition.session_file_system_factory();
420    if file_system_factory.is_disabled() {
421        Ok(Arc::new(InMemorySessionFileStore::new()))
422    } else {
423        Ok(file_system_factory
424            .create_session_file_system(file_system_factory_context)
425            .await?)
426    }
427}
428
429#[derive(Clone)]
430/// Public in-process runtime backed by either in-memory or custom stores.
431///
432/// This runtime is intended for embedders who want to execute Everruns
433/// harnesses inside their own process while controlling capabilities,
434/// harness definitions, and driver registrations directly in Rust.
435pub struct InProcessRuntime {
436    platform_definition: Arc<PlatformDefinition>,
437    harness_store: Arc<dyn RuntimeHarnessStore>,
438    agent_store: Arc<dyn RuntimeAgentStore>,
439    session_store: Arc<dyn RuntimeSessionStore>,
440    default_session_id: Option<SessionId>,
441    message_store: Arc<dyn RuntimeMessageStore>,
442    provider_store: Arc<dyn RuntimeProviderStore>,
443    event_bus: Arc<dyn EventBus>,
444    persisting_emitter: PersistingEventEmitter,
445    file_store: Arc<dyn SessionFileSystem>,
446    storage_store: Arc<dyn SessionStorageStore>,
447    connection_resolver: Option<Arc<dyn UserConnectionResolver>>,
448    mcp_auth_provider: Arc<dyn everruns_mcp::McpAuthProvider>,
449    mcp_discovery_cache: Arc<crate::mcp_cache::McpDiscoveryCache>,
450}
451
452impl InProcessRuntime {
453    /// Build the shared MCP client over the platform egress boundary and the
454    /// configured auth provider.
455    fn mcp_client(&self) -> Arc<everruns_mcp::McpClient> {
456        Arc::new(everruns_mcp::McpClient::new(
457            self.platform_definition.egress_service(),
458            self.mcp_auth_provider.clone(),
459        ))
460    }
461
462    /// Resolve the effective scoped MCP servers for a session.
463    async fn session_mcp_servers(
464        &self,
465        session: &Session,
466        agent: Option<&Agent>,
467    ) -> everruns_core::ScopedMcpServers {
468        let harness_chain = self
469            .harness_store
470            .get_harness_chain(session.harness_id)
471            .await
472            .unwrap_or_default();
473        crate::mcp::merge_session_scoped_servers(&harness_chain, agent, session)
474    }
475    /// Create a builder for the in-process runtime.
476    pub fn builder() -> InProcessRuntimeBuilder {
477        InProcessRuntimeBuilder::new()
478    }
479
480    /// Return the default session id seeded by
481    /// [`InProcessRuntimeBuilder::single_session`], if one was configured.
482    pub fn default_session_id(&self) -> Option<SessionId> {
483        self.default_session_id
484    }
485
486    /// Execute one turn for an existing session.
487    ///
488    /// The input message is stored in the runtime history, an `input.message`
489    /// event is emitted, and the turn then executes the shared core
490    /// `input -> reason -> act` state machine.
491    pub async fn run_turn(
492        &self,
493        session_id: SessionId,
494        input: impl Into<InputMessage>,
495    ) -> Result<TurnResult> {
496        let session = self
497            .session_store
498            .get_session(session_id)
499            .await?
500            .ok_or_else(|| AgentLoopError::store(format!("session not found: {session_id}")))?;
501
502        // Input message is recorded directly (and emitted via the raw bus so
503        // that PersistingEventEmitter does not double-store it). All
504        // subsequent activity-emitted events flow through the persisting
505        // emitter the adapter hands out.
506        let input_message = self
507            .message_store
508            .add_input_message(session_id, input.into())
509            .await?;
510        self.event_bus
511            .emit(EventRequest::new(
512                session_id,
513                EventContext::empty(),
514                InputMessageData::new(input_message.clone()),
515            ))
516            .await?;
517
518        let assembled = self
519            .inspect_context_with_ids(session_id, session.harness_id, session.agent_id)
520            .await?;
521        let synthetic_agent_id = session
522            .agent_id
523            .unwrap_or_else(|| AgentId::from_uuid(session.id.uuid()));
524        let org_id = in_process_internal_org_id(&session.organization_id);
525        let mut state_machine = TurnStateMachine::new(
526            TurnContext::new(session_id, input_message.id, synthetic_agent_id, org_id),
527            assembled.runtime_agent.max_iterations,
528        );
529
530        let mut previous_response_id: Option<String> = None;
531        let mut last_reason_result: Option<everruns_core::ReasonResult> = None;
532
533        loop {
534            match state_machine.next_action() {
535                TurnAction::ExecuteInput => {
536                    let ctx = state_machine.context();
537                    let base_context =
538                        AtomContext::new(ctx.session_id, ctx.turn_id, ctx.input_message_id);
539                    execute_input_activity(
540                        self,
541                        org_id,
542                        InputAtomInput {
543                            context: base_context,
544                        },
545                    )
546                    .await?;
547                    state_machine.on_input_completed();
548                }
549                TurnAction::ExecuteReason => {
550                    let ctx = state_machine.context();
551                    let base_context =
552                        AtomContext::new(ctx.session_id, ctx.turn_id, ctx.input_message_id);
553                    let reason_result = execute_reason_activity(
554                        self,
555                        org_id,
556                        ReasonInput {
557                            context: base_context.next_exec(),
558                            harness_id: session.harness_id,
559                            agent_id: session.agent_id,
560                            org_id,
561                            mcp_tool_definitions: vec![],
562                            previous_response_id: previous_response_id.take(),
563                            iteration: state_machine.current_iteration() as u32 + 1,
564                        },
565                    )
566                    .await?;
567                    previous_response_id = reason_result.response_id.clone();
568                    state_machine.on_reason_completed(
569                        reason_result.text.clone(),
570                        reason_result.has_tool_calls,
571                        reason_result.tool_calls.len(),
572                        reason_result.success,
573                        reason_result.error.clone(),
574                        false,
575                    );
576                    if reason_result.has_tool_calls {
577                        last_reason_result = Some(reason_result);
578                    }
579                }
580                TurnAction::ExecuteAct => {
581                    let reason_result = last_reason_result
582                        .take()
583                        .expect("ExecuteAct requires a prior ReasonResult");
584                    let ctx = state_machine.context();
585                    let base_context =
586                        AtomContext::new(ctx.session_id, ctx.turn_id, ctx.input_message_id);
587                    execute_act_activity(
588                        self,
589                        ActInput {
590                            org_id: Some(org_id),
591                            context: base_context.next_exec(),
592                            harness_id: session.harness_id,
593                            agent_id: session.agent_id,
594                            tool_calls: reason_result.tool_calls,
595                            tool_definitions: reason_result.tool_definitions,
596                            locale: reason_result.locale,
597                            blueprint_id: None,
598                            network_access: reason_result.network_access,
599                            // Request-level `parallel_tool_calls` is not yet
600                            // plumbed into the reason path; the act scheduler
601                            // defaults to its class-aware concurrent schedule.
602                            parallel_tool_calls: None,
603                        },
604                    )
605                    .await?;
606                    state_machine.on_act_completed();
607                }
608                TurnAction::Complete(outcome) => {
609                    let ctx = state_machine.context();
610                    let lifecycle =
611                        RuntimeSessionLifecycle::new(self.clone(), org_id, ctx.session_id);
612                    let turn_succeeded = matches!(
613                        &outcome,
614                        TurnOutcome::Success { .. } | TurnOutcome::MaxIterationsReached { .. }
615                    );
616                    match &outcome {
617                        TurnOutcome::Success { iterations, .. }
618                        | TurnOutcome::MaxIterationsReached { iterations, .. } => {
619                            lifecycle
620                                .turn_completed(
621                                    ctx.turn_id,
622                                    ctx.input_message_id,
623                                    *iterations as u32,
624                                    None,
625                                    None,
626                                )
627                                .await;
628                        }
629                        TurnOutcome::Failed { error, .. } => {
630                            lifecycle
631                                .turn_failed(ctx.turn_id, ctx.input_message_id, error, None)
632                                .await;
633                        }
634                    }
635                    // turn_end lifecycle hooks (advisory). Fired after the
636                    // terminal turn event for both success and failure.
637                    lifecycle
638                        .fire_turn_end_hooks(
639                            session.harness_id,
640                            session.agent_id,
641                            ctx.turn_id,
642                            turn_succeeded,
643                        )
644                        .await;
645                    return Ok(TurnResult::from_outcome(outcome, ctx.turn_id));
646                }
647            }
648        }
649    }
650
651    pub async fn run_text_turn(
652        &self,
653        session_id: SessionId,
654        text: impl Into<String>,
655    ) -> Result<TurnResult> {
656        self.run_turn(session_id, InputMessage::user(text)).await
657    }
658
659    /// Load the current message history for a session.
660    pub async fn messages(&self, session_id: SessionId) -> Result<Vec<Message>> {
661        self.message_store.load(session_id).await
662    }
663
664    /// Read a file from the in-memory session filesystem.
665    pub async fn read_file(
666        &self,
667        session_id: SessionId,
668        path: &str,
669    ) -> Result<Option<SessionFile>> {
670        self.file_store.read_file(session_id, path).await
671    }
672
673    /// Assemble the current runtime context for a session without executing a turn.
674    pub async fn load_context(&self, session_id: SessionId) -> Result<AssembledTurnContext> {
675        let session = self
676            .session_store
677            .get_session(session_id)
678            .await?
679            .ok_or_else(|| AgentLoopError::store(format!("session not found: {session_id}")))?;
680        self.inspect_context_with_ids(session_id, session.harness_id, session.agent_id)
681            .await
682    }
683
684    /// Return all collected events from the runtime event bus.
685    ///
686    /// Event buses that do not retain events return an empty `Vec` (see
687    /// [`EventBus::collected_events`]).
688    pub async fn events(&self) -> Result<Vec<Event>> {
689        Ok(self.event_bus.collected_events().await)
690    }
691
692    /// Execute a system command declared by a registered capability.
693    ///
694    /// Looks up the first capability whose `commands()` includes the named
695    /// command (in capability-resolution order) and delegates to its
696    /// `execute_command`. Returns an error if no capability declares the
697    /// requested name. The coding-CLI example uses this for `/model`
698    /// (provided by `ModelSwitcherCapability`) so the dispatch path stays
699    /// inside the capability instead of the TUI's local `handle_command`
700    /// branches.
701    pub async fn execute_command(
702        &self,
703        session_id: SessionId,
704        request: everruns_core::command::ExecuteCommandRequest,
705    ) -> Result<everruns_core::command::CommandResult> {
706        let ctx = self.load_context(session_id).await?;
707        let registry = self.platform_definition.capability_registry();
708        // Context-aware commands (e.g. /btw) get the same store-backed host
709        // facilities the server provides; the already-assembled context seeds
710        // the host so dispatch and execution assemble it once.
711        let host = everruns_core::command_host::StoreCommandHost::new(
712            session_id,
713            self.harness_store.clone(),
714            self.agent_store.clone(),
715            self.session_store.clone(),
716            self.message_store.clone(),
717            self.provider_store.clone(),
718            registry.clone(),
719            self.platform_definition.driver_registry().clone(),
720        )
721        .with_file_store(self.file_store.clone())
722        .with_assembled_context(ctx.clone());
723        let exec_ctx =
724            everruns_core::command::CommandExecutionContext::new(session_id, Arc::new(host));
725        for config in &ctx.resolved_capability_configs {
726            let Some(capability) = registry.get(config.capability_id()) else {
727                continue;
728            };
729            if capability.commands().iter().any(|c| c.name == request.name) {
730                return capability.execute_command(&request, &exec_ctx).await;
731            }
732        }
733        Err(AgentLoopError::config(format!(
734            "no capability declares command /{}",
735            request.name
736        )))
737    }
738
739    /// List slash commands available for a session.
740    ///
741    /// Resolves the session's harness/agent capability chain and aggregates
742    /// commands declared via [`Capability::commands`], deduplicated by name
743    /// (first occurrence wins, matching the order of resolved capabilities).
744    /// This is the embedded equivalent of the server's
745    /// `GET /v1/sessions/{id}/commands` system-commands list — skill
746    /// commands are not included here because skills are discovered via the
747    /// platform filesystem rather than the capability registry.
748    pub async fn list_commands(
749        &self,
750        session_id: SessionId,
751    ) -> Result<Vec<everruns_core::command::CommandDescriptor>> {
752        let ctx = self.load_context(session_id).await?;
753        let registry = self.platform_definition.capability_registry();
754        let mut seen = std::collections::HashSet::new();
755        let mut commands = Vec::new();
756        for config in &ctx.resolved_capability_configs {
757            let Some(capability) = registry.get(config.capability_id()) else {
758                continue;
759            };
760            for command in capability.commands() {
761                if seen.insert(command.name.clone()) {
762                    commands.push(command);
763                }
764            }
765        }
766        Ok(commands)
767    }
768
769    async fn inspect_context_with_ids(
770        &self,
771        session_id: SessionId,
772        harness_id: everruns_core::HarnessId,
773        agent_id: Option<AgentId>,
774    ) -> Result<AssembledTurnContext> {
775        inspect_turn_context(
776            self.harness_store.as_ref(),
777            self.agent_store.as_ref(),
778            self.session_store.as_ref(),
779            self.message_store.as_ref(),
780            self.provider_store.as_ref(),
781            self.platform_definition.capability_registry(),
782            session_id,
783            harness_id,
784            agent_id,
785            &[],
786            Some(self.file_store.clone()),
787        )
788        .await
789    }
790}
791
792#[async_trait]
793impl RuntimeHostAdapter for InProcessRuntime {
794    async fn get_agent(&self, _org_id: i64, agent_id: AgentId) -> Result<Option<Agent>> {
795        self.agent_store.get_agent(agent_id).await
796    }
797
798    async fn get_harness(&self, _org_id: i64, harness_id: HarnessId) -> Result<Option<Harness>> {
799        let chain = self.harness_store.get_harness_chain(harness_id).await?;
800        Ok(chain.into_iter().last())
801    }
802
803    async fn set_session_status(
804        &self,
805        _org_id: i64,
806        session_id: SessionId,
807        _status: SessionStatus,
808    ) -> Result<Session> {
809        // The in-process runtime does not persist status. Lifecycle callers
810        // still emit their events; downstream consumers in-process don't
811        // observe session.status.
812        self.session_store
813            .get_session(session_id)
814            .await?
815            .ok_or_else(|| AgentLoopError::store(format!("session not found: {session_id}")))
816    }
817
818    async fn load_turn_context(
819        &self,
820        _org_id: i64,
821        session_id: SessionId,
822    ) -> Result<RuntimeHostTurnContext> {
823        let session = self
824            .session_store
825            .get_session(session_id)
826            .await?
827            .ok_or_else(|| AgentLoopError::store(format!("session not found: {session_id}")))?;
828        let agent = match session.agent_id {
829            Some(agent_id) => self.agent_store.get_agent(agent_id).await?,
830            None => None,
831        };
832        let messages = self.message_store.load(session_id).await?;
833        let model = self.provider_store.get_default_model().await?;
834
835        // Discover tools from the session's scoped MCP servers so they appear
836        // to the LLM alongside built-in tools (specs/runtime-mcp.md D4).
837        let scoped_servers = self.session_mcp_servers(&session, agent.as_ref()).await;
838        let mcp_tool_definitions = if scoped_servers.is_empty() {
839            vec![]
840        } else {
841            crate::mcp::discover_tool_definitions(
842                &self.mcp_discovery_cache,
843                self.mcp_client(),
844                session_id.uuid(),
845                &scoped_servers,
846            )
847            .await
848        };
849
850        Ok(RuntimeHostTurnContext {
851            agent,
852            session,
853            messages,
854            model,
855            mcp_tool_definitions,
856        })
857    }
858
859    async fn mcp_executor(
860        &self,
861        _org_id: i64,
862        session_id: SessionId,
863    ) -> Option<Arc<everruns_mcp::McpExecutor>> {
864        let session = self.session_store.get_session(session_id).await.ok()??;
865        let agent = match session.agent_id {
866            Some(agent_id) => self.agent_store.get_agent(agent_id).await.ok().flatten(),
867            None => None,
868        };
869        let scoped_servers = self.session_mcp_servers(&session, agent.as_ref()).await;
870        crate::mcp::build_executor(self.mcp_client(), &scoped_servers)
871    }
872
873    fn capability_registry(&self) -> CapabilityRegistry {
874        self.platform_definition.capability_registry().clone()
875    }
876
877    fn driver_registry(&self) -> DriverRegistry {
878        self.platform_definition.driver_registry().clone()
879    }
880
881    fn harness_store(&self, _org_id: i64) -> Arc<dyn HarnessStore> {
882        self.harness_store.clone()
883    }
884
885    fn agent_store(&self, _org_id: i64) -> Arc<dyn AgentStore> {
886        self.agent_store.clone()
887    }
888
889    fn session_store(&self, _org_id: i64) -> Arc<dyn SessionStore> {
890        self.session_store.clone()
891    }
892
893    fn session_mutator(&self, _org_id: i64) -> Arc<dyn SessionMutator> {
894        self.session_store.clone()
895    }
896
897    fn provider_store(&self, _org_id: i64) -> Arc<dyn LlmProviderStore> {
898        self.provider_store.clone()
899    }
900
901    fn message_store(&self) -> Arc<dyn MessageRetriever> {
902        self.message_store.clone()
903    }
904
905    fn event_emitter(&self) -> Arc<dyn EventEmitter> {
906        Arc::new(self.persisting_emitter.clone())
907    }
908
909    fn file_store(&self) -> Arc<dyn SessionFileSystem> {
910        self.file_store.clone()
911    }
912
913    fn storage_store(&self) -> Option<Arc<dyn SessionStorageStore>> {
914        Some(self.storage_store.clone())
915    }
916
917    fn connection_resolver(&self) -> Option<Arc<dyn UserConnectionResolver>> {
918        self.connection_resolver.clone()
919    }
920
921    fn utility_llm_service(&self) -> Option<Arc<dyn everruns_core::UtilityLlmService>> {
922        Some(self.platform_definition.utility_llm_service())
923    }
924
925    fn egress_service(&self) -> Option<Arc<dyn everruns_core::EgressService>> {
926        Some(self.platform_definition.egress_service())
927    }
928}
929
930#[derive(Clone)]
931struct PersistingEventEmitter {
932    inner: Arc<dyn EventBus>,
933    message_store: Arc<dyn RuntimeMessageStore>,
934}
935
936impl PersistingEventEmitter {
937    fn new(inner: Arc<dyn EventBus>, message_store: Arc<dyn RuntimeMessageStore>) -> Self {
938        Self {
939            inner,
940            message_store,
941        }
942    }
943}
944
945#[async_trait]
946impl EventEmitter for PersistingEventEmitter {
947    async fn emit(&self, request: EventRequest) -> Result<Event> {
948        let event = self.inner.emit(request.clone()).await?;
949        if let Some(message) = message_from_event(&event.data) {
950            self.message_store
951                .store_message(request.session_id, message)
952                .await?;
953        }
954        Ok(event)
955    }
956}
957
958fn effective_overlay(
959    harness_chain: &[Harness],
960    agent: Option<&Agent>,
961    session: &Session,
962) -> AgentConfigOverlay {
963    let harness_layers = harness_chain.iter().map(AgentConfigOverlay::from);
964    let agent_layers = agent.into_iter().map(AgentConfigOverlay::from);
965    AgentConfigOverlay::fold(
966        harness_layers
967            .chain(agent_layers)
968            .chain([AgentConfigOverlay::from(session)]),
969    )
970}
971
972async fn seed_runtime_initial_files(
973    harness_store: &dyn RuntimeHarnessStore,
974    agent_store: &dyn RuntimeAgentStore,
975    file_store: &dyn SessionFileSystem,
976    session: &Session,
977) -> Result<()> {
978    let harness_chain = harness_store.get_harness_chain(session.harness_id).await?;
979    if harness_chain.is_empty() {
980        return Err(AgentLoopError::store(format!(
981            "harness not found while seeding files: {}",
982            session.harness_id
983        )));
984    }
985    let agent = match session.agent_id {
986        Some(agent_id) => Some(
987            agent_store
988                .get_agent(agent_id)
989                .await?
990                .ok_or_else(|| AgentLoopError::store(format!("agent not found: {agent_id}")))?,
991        ),
992        None => None,
993    };
994    let overlay = effective_overlay(&harness_chain, agent.as_ref(), session);
995    for file in &overlay.initial_files {
996        file_store.seed_initial_file(session.id, file).await?;
997    }
998    Ok(())
999}
1000
1001fn message_from_event(data: &EventData) -> Option<Message> {
1002    match data {
1003        EventData::InputMessage(data) => Some(data.message.clone()),
1004        EventData::OutputMessageCompleted(OutputMessageCompletedData { message, .. }) => {
1005            Some(message.clone())
1006        }
1007        EventData::ToolCompleted(data) => Some(tool_completed_to_message(data.clone())),
1008        _ => None,
1009    }
1010}
1011
1012fn tool_completed_to_message(data: ToolCompletedData) -> Message {
1013    let mut images: Vec<ToolResultImage> = Vec::new();
1014    let metadata = tool_result_metadata(&data);
1015    let result = data.result.map(|parts| {
1016        for part in &parts {
1017            if let ContentPart::Image(img) = part
1018                && let (Some(base64), Some(media_type)) = (&img.base64, &img.media_type)
1019            {
1020                images.push(ToolResultImage {
1021                    base64: base64.clone(),
1022                    media_type: media_type.clone(),
1023                });
1024            }
1025        }
1026
1027        let text_parts: Vec<&ContentPart> = parts
1028            .iter()
1029            .filter(|part| matches!(part, ContentPart::Text(_)))
1030            .collect();
1031        if text_parts.len() == 1
1032            && let ContentPart::Text(text) = text_parts[0]
1033        {
1034            return parse_structured_tool_result_text(&text.text);
1035        }
1036        if !text_parts.is_empty() {
1037            serde_json::to_value(&text_parts).unwrap_or_default()
1038        } else {
1039            serde_json::Value::Null
1040        }
1041    });
1042
1043    let mut message = if images.is_empty() {
1044        Message::tool_result(&data.tool_call_id, result, data.error)
1045    } else {
1046        Message::tool_result_with_images(&data.tool_call_id, result, images)
1047    };
1048    message.metadata = metadata;
1049    message
1050}
1051
1052fn tool_result_metadata(
1053    data: &ToolCompletedData,
1054) -> Option<std::collections::HashMap<String, serde_json::Value>> {
1055    let mut metadata = std::collections::HashMap::new();
1056    metadata.insert("tool_name".to_string(), serde_json::json!(data.tool_name));
1057    if let Some(fingerprint) = &data.tool_call_fingerprint {
1058        metadata.insert(
1059            "tool_call_fingerprint".to_string(),
1060            serde_json::json!(fingerprint),
1061        );
1062    }
1063    if let Some(fingerprint) = &data.tool_result_fingerprint {
1064        metadata.insert(
1065            "tool_result_fingerprint".to_string(),
1066            serde_json::json!(fingerprint),
1067        );
1068    }
1069    (!metadata.is_empty()).then_some(metadata)
1070}
1071
1072fn parse_structured_tool_result_text(text: &str) -> serde_json::Value {
1073    let trimmed = text.trim_start();
1074    if !trimmed.starts_with('{') && !trimmed.starts_with('[') {
1075        return serde_json::Value::String(text.to_string());
1076    }
1077
1078    match serde_json::from_str(text) {
1079        Ok(value @ (serde_json::Value::Object(_) | serde_json::Value::Array(_))) => value,
1080        _ => serde_json::Value::String(text.to_string()),
1081    }
1082}
1083
1084#[cfg(test)]
1085mod tool_completed_replay_tests {
1086    use super::*;
1087
1088    #[test]
1089    fn tool_completed_replay_preserves_json_object_shape() {
1090        let data = ToolCompletedData::success(
1091            "call_read".to_string(),
1092            "read_file".to_string(),
1093            vec![ContentPart::text(
1094                serde_json::json!({
1095                    "path": "/workspace/src/lib.rs",
1096                    "content": "1|fn main() {}"
1097                })
1098                .to_string(),
1099            )],
1100            Some(1),
1101        );
1102
1103        let message = tool_completed_to_message(data);
1104        let result = message
1105            .tool_result_content()
1106            .and_then(|content| content.result.as_ref())
1107            .expect("tool result should be present");
1108
1109        assert_eq!(result["path"], "/workspace/src/lib.rs");
1110        assert_eq!(result["content"], "1|fn main() {}");
1111    }
1112
1113    #[test]
1114    fn tool_completed_replay_keeps_scalar_json_as_text() {
1115        let data = ToolCompletedData::success(
1116            "call_scalar".to_string(),
1117            "custom_tool".to_string(),
1118            vec![ContentPart::text("123")],
1119            Some(1),
1120        );
1121
1122        let message = tool_completed_to_message(data);
1123        let result = message
1124            .tool_result_content()
1125            .and_then(|content| content.result.as_ref())
1126            .expect("tool result should be present");
1127
1128        assert_eq!(result, &serde_json::Value::String("123".to_string()));
1129    }
1130
1131    #[test]
1132    fn tool_completed_replay_preserves_fingerprints_as_metadata() {
1133        let data = ToolCompletedData::success(
1134            "call_read".to_string(),
1135            "read_file".to_string(),
1136            vec![ContentPart::text("{}")],
1137            Some(1),
1138        )
1139        .with_fingerprints("sha256:call".to_string(), "sha256:result".to_string());
1140
1141        let message = tool_completed_to_message(data);
1142        let metadata = message.metadata.expect("metadata should be present");
1143
1144        assert_eq!(metadata["tool_name"], "read_file");
1145        assert_eq!(metadata["tool_call_fingerprint"], "sha256:call");
1146        assert_eq!(metadata["tool_result_fingerprint"], "sha256:result");
1147    }
1148}
1149
1150#[cfg(test)]
1151mod org_id_mapping_tests {
1152    use super::*;
1153    use everruns_core::{DEFAULT_ORG_ID, DEFAULT_ORG_PUBLIC_ID, org_public_id_from_internal};
1154
1155    #[test]
1156    fn default_public_id_maps_to_default_org() {
1157        assert_eq!(
1158            in_process_internal_org_id(DEFAULT_ORG_PUBLIC_ID),
1159            DEFAULT_ORG_ID
1160        );
1161    }
1162
1163    #[test]
1164    fn invalid_public_id_does_not_fall_back_to_default() {
1165        for invalid in [
1166            "",
1167            "not-an-org",
1168            "org_short",
1169            "org_ZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZ",
1170            "ORG_00000000000000000000000000000001",
1171        ] {
1172            let mapped = in_process_internal_org_id(invalid);
1173            assert_ne!(mapped, everruns_core::DEFAULT_ORG_ID);
1174            assert!(
1175                mapped >= 2,
1176                "invalid input {invalid:?} should not map to default"
1177            );
1178        }
1179    }
1180
1181    #[test]
1182    fn zero_public_id_does_not_fall_back_to_default() {
1183        // org_public_id_from_internal never produces this; a hand-crafted
1184        // all-zeros id is treated as invalid (raw == 0).
1185        let mapped = in_process_internal_org_id("org_00000000000000000000000000000000");
1186        assert_ne!(mapped, everruns_core::DEFAULT_ORG_ID);
1187        assert!(mapped >= 2, "all-zero id should not map to default");
1188    }
1189
1190    #[test]
1191    fn synthetic_public_id_round_trips_with_internal_helper() {
1192        for internal in [1_i64, 2, 42, 1_000_000, i64::MAX - 1, i64::MAX] {
1193            let public = org_public_id_from_internal(internal);
1194            assert_eq!(
1195                in_process_internal_org_id(&public),
1196                internal,
1197                "round-trip failed for internal={internal}"
1198            );
1199        }
1200    }
1201
1202    #[test]
1203    fn distinct_synthetic_ids_map_to_distinct_internal_ids() {
1204        let a = org_public_id_from_internal(7);
1205        let b = org_public_id_from_internal(8);
1206        assert_ne!(a, b);
1207        assert_ne!(
1208            in_process_internal_org_id(&a),
1209            in_process_internal_org_id(&b)
1210        );
1211    }
1212
1213    #[test]
1214    fn high_entropy_uuid_style_id_hashes_into_reserved_range() {
1215        // First valid UUID-style id whose raw u128 exceeds i64::MAX
1216        // (top bit of the u128 set). It must hash to a positive i64 that
1217        // is neither 0 nor DEFAULT_ORG_ID.
1218        let high = "org_80000000000000000000000000000000";
1219        let mapped = in_process_internal_org_id(high);
1220        assert!(mapped >= 2, "mapped id {mapped} must be >= 2");
1221        assert_ne!(mapped, DEFAULT_ORG_ID);
1222
1223        // Mapping is deterministic.
1224        assert_eq!(mapped, in_process_internal_org_id(high));
1225    }
1226
1227    #[test]
1228    fn high_entropy_ids_are_isolated_from_each_other() {
1229        let a = in_process_internal_org_id("org_80000000000000000000000000000001");
1230        let b = in_process_internal_org_id("org_80000000000000000000000000000002");
1231        assert_ne!(a, b);
1232        assert_ne!(a, DEFAULT_ORG_ID);
1233        assert_ne!(b, DEFAULT_ORG_ID);
1234    }
1235
1236    #[test]
1237    fn hash_uses_stable_sha256_truncation() {
1238        // SHA-256 with fixed big-endian first-8-byte truncation gives a value
1239        // we can pin. If this assertion ever breaks, callers depending on
1240        // build-stable mapping must be re-audited.
1241        let mapped = in_process_internal_org_id("org_80000000000000000000000000000000");
1242        let expected = {
1243            let digest = sha2::Sha256::digest(b"org_80000000000000000000000000000000");
1244            let mut buf = [0u8; 8];
1245            buf.copy_from_slice(&digest[..8]);
1246            let raw = u64::from_be_bytes(buf);
1247            ((raw % ((i64::MAX - 1) as u64)) as i64) + 2
1248        };
1249        assert_eq!(mapped, expected);
1250    }
1251
1252    #[test]
1253    fn oversize_input_is_bounded_and_does_not_collide_silently() {
1254        // Inputs past HASH_INPUT_CAP_BYTES are truncated before hashing, so
1255        // two oversize strings that agree on the first cap bytes map to the
1256        // same internal id. We only assert the result stays in the safe
1257        // [2, i64::MAX] range and is not DEFAULT_ORG_ID — the cap exists to
1258        // bound work, not to widen the input space.
1259        let oversize = "x".repeat(super::HASH_INPUT_CAP_BYTES * 4);
1260        let mapped = in_process_internal_org_id(&oversize);
1261        assert!(mapped >= 2);
1262        assert_ne!(mapped, DEFAULT_ORG_ID);
1263    }
1264}