Skip to main content

everruns_runtime/
runtime.rs

1// In-process runtime builder and runner.
2// Decision: the public runtime is in-memory today, but uses the same core atoms
3// and capability resolution path as the durable worker so behavior stays close.
4
5use crate::backends::{
6    EventBus, RuntimeAgentStore, RuntimeBackends, RuntimeHarnessStore, RuntimeMessageStore,
7    RuntimeProviderStore, RuntimeSessionStore,
8};
9use crate::builders::SingleSessionBuilder;
10use crate::host::{
11    RuntimeHostAdapter, RuntimeHostTurnContext, RuntimeSessionLifecycle, execute_act_activity,
12    execute_input_activity, execute_reason_activity,
13};
14use crate::in_memory::{InMemorySessionFileStore, InMemorySessionFileSystemFactory};
15use async_trait::async_trait;
16use everruns_core::agent::Agent;
17use everruns_core::atoms::{ActInput, AtomContext, InputAtomInput, ReasonInput};
18use everruns_core::capabilities::{Capability, CapabilityRegistry};
19use everruns_core::config_layer::AgentConfigOverlay;
20use everruns_core::error::{AgentLoopError, Result};
21use everruns_core::events::{
22    Event, EventContext, EventData, EventRequest, InputMessageData, OutputMessageCompletedData,
23    ToolCompletedData,
24};
25use everruns_core::harness::Harness;
26use everruns_core::llm_driver_registry::{DriverRegistry, ProviderType};
27use everruns_core::llm_models::LlmProviderType;
28use everruns_core::llmsim_driver::{LlmSimConfig, LlmSimDriver};
29use everruns_core::message::{ContentPart, Message};
30use everruns_core::platform_definition::PlatformDefinition;
31use everruns_core::runtime_context::{AssembledTurnContext, inspect_turn_context};
32use everruns_core::session::{Session, SessionStatus};
33use everruns_core::session_file::{InitialFile, SessionFile};
34use everruns_core::tools::ToolResultImage;
35use everruns_core::traits::{
36    AgentStore, EventEmitter, HarnessStore, LlmProviderStore, ModelWithProvider, SessionMutator,
37    SessionStorageStore, SessionStore,
38};
39use everruns_core::turn::{TurnAction, TurnContext, TurnOutcome, TurnStateMachine};
40use everruns_core::typed_id::{AgentId, HarnessId, OrgId, SessionId};
41use everruns_core::{
42    InputMessage, MemoryStoreBackend, MessageRetriever, SessionFileSystem,
43    SessionFileSystemFactoryContext,
44};
45use sha2::{Digest, Sha256};
46use std::sync::Arc;
47
48/// Cap on the input length hashed by [`hash_public_org_id`].
49///
50/// Legitimate org public ids are `org_<32hex>` (36 bytes). Bounding the
51/// hashed prefix keeps worst-case cost predictable when an attacker-controlled
52/// session carries an oversize string.
53const HASH_INPUT_CAP_BYTES: usize = 128;
54
55/// Derive an internal `i64` org id from the public `org_<32hex>` form on a
56/// [`Session`].
57///
58/// Round-trip with [`everruns_core::org_public_id_from_internal`]: when the
59/// public id was produced by that helper (i.e. the upper bits are zero and
60/// the value fits in a positive `i64`), this returns the original internal
61/// id unchanged. Other values are mapped into `[2, i64::MAX]` by hashing the
62/// original public id string so runtime namespaces do not fail open to the
63/// shared default org and avoid arithmetic collision gadgets.
64fn in_process_internal_org_id(public_org_id: &str) -> i64 {
65    if public_org_id == everruns_core::DEFAULT_ORG_PUBLIC_ID {
66        return everruns_core::DEFAULT_ORG_ID;
67    }
68
69    let Ok(parsed) = public_org_id.parse::<OrgId>() else {
70        return hash_public_org_id(public_org_id);
71    };
72    let raw: u128 = parsed.uuid().as_u128();
73    if raw == 0 {
74        return hash_public_org_id(public_org_id);
75    }
76
77    // Synthetic ids from `org_public_id_from_internal(i64)` always fit here,
78    // so the in-process runtime sees the same `org_id` the server used.
79    if raw <= i64::MAX as u128 {
80        return raw as i64;
81    }
82
83    hash_public_org_id(public_org_id)
84}
85
86// Use SHA-256 with a fixed truncation scheme so the mapping is stable across
87// Rust/binary upgrades and predictable for any embedder. Input is bounded to
88// `HASH_INPUT_CAP_BYTES` so attacker-controlled oversize org strings cannot
89// drive unbounded hashing work.
90fn hash_public_org_id(public_org_id: &str) -> i64 {
91    let bytes = public_org_id.as_bytes();
92    let bounded = &bytes[..bytes.len().min(HASH_INPUT_CAP_BYTES)];
93    let digest = Sha256::digest(bounded);
94    let mut buf = [0u8; 8];
95    buf.copy_from_slice(&digest[..8]);
96    let raw = u64::from_be_bytes(buf);
97    ((raw % ((i64::MAX - 1) as u64)) as i64) + 2
98}
99
100#[derive(Debug, Clone)]
101pub struct TurnResult {
102    /// Final text response produced by the turn.
103    pub response: String,
104    /// Number of reason iterations executed.
105    pub iterations: usize,
106    /// Total number of tool calls executed during the turn.
107    pub tool_calls_count: usize,
108    /// Whether the turn completed without an unrecoverable failure.
109    pub success: bool,
110    /// Failure message when `success` is false.
111    pub error: Option<String>,
112    /// Turn identifier used to correlate emitted events.
113    pub turn_id: everruns_core::typed_id::TurnId,
114}
115
116impl TurnResult {
117    fn from_outcome(outcome: TurnOutcome, turn_id: everruns_core::typed_id::TurnId) -> Self {
118        match outcome {
119            TurnOutcome::Success {
120                response,
121                iterations,
122                tool_calls_count,
123            } => Self {
124                response,
125                iterations,
126                tool_calls_count,
127                success: true,
128                error: None,
129                turn_id,
130            },
131            TurnOutcome::Failed { error, iterations } => Self {
132                response: String::new(),
133                iterations,
134                tool_calls_count: 0,
135                success: false,
136                error: Some(error),
137                turn_id,
138            },
139            TurnOutcome::MaxIterationsReached {
140                response,
141                iterations,
142                tool_calls_count,
143            } => Self {
144                response,
145                iterations,
146                tool_calls_count,
147                success: true,
148                error: None,
149                turn_id,
150            },
151        }
152    }
153}
154
155/// Builder for the public in-process runtime.
156///
157/// The builder owns a standalone runtime bundle:
158/// - `PlatformDefinition` for capabilities and drivers
159/// - in-memory stores for sessions, files, storage, memory, and messages
160/// - seeded harness/agent/session entities
161///
162/// `build()` returns an [`InProcessRuntime`] that can execute turns in-process
163/// without the durable engine or the control-plane server.
164pub struct InProcessRuntimeBuilder {
165    platform_definition: PlatformDefinition,
166    llm_sim_config: Option<LlmSimConfig>,
167    default_model: Option<ModelWithProvider>,
168    backends: Option<RuntimeBackends>,
169    session_file_system_factory_context: SessionFileSystemFactoryContext,
170    harnesses: Vec<Harness>,
171    agents: Vec<Agent>,
172    sessions: Vec<Session>,
173    default_session_id: Option<SessionId>,
174    seeded_files: Vec<(SessionId, InitialFile)>,
175    mcp_auth_provider: Option<Arc<dyn everruns_mcp::McpAuthProvider>>,
176}
177
178impl Default for InProcessRuntimeBuilder {
179    fn default() -> Self {
180        Self::new()
181    }
182}
183
184impl InProcessRuntimeBuilder {
185    /// Create a builder with built-in capabilities and no implicit LLM driver.
186    ///
187    /// Embedders must either:
188    /// - call [`Self::llm_sim`] for deterministic local examples/tests, or
189    /// - register their own driver(s) on the platform definition and set a
190    ///   default model via [`Self::default_model`].
191    pub fn new() -> Self {
192        Self {
193            platform_definition: PlatformDefinition::builder()
194                .capability_registry(CapabilityRegistry::with_builtins())
195                .driver_registry(DriverRegistry::new())
196                .session_file_system_factory(Arc::new(InMemorySessionFileSystemFactory))
197                .build(),
198            llm_sim_config: None,
199            default_model: None,
200            backends: None,
201            session_file_system_factory_context: SessionFileSystemFactoryContext::new(),
202            harnesses: Vec::new(),
203            agents: Vec::new(),
204            sessions: Vec::new(),
205            default_session_id: None,
206            seeded_files: Vec::new(),
207            mcp_auth_provider: None,
208        }
209    }
210
211    /// Set the auth provider used to acquire credentials for scoped MCP
212    /// servers (specs/runtime-mcp.md D3). Defaults to no credentials, suitable
213    /// for unauthenticated servers or servers carrying literal auth headers.
214    pub fn mcp_auth_provider(mut self, provider: Arc<dyn everruns_mcp::McpAuthProvider>) -> Self {
215        self.mcp_auth_provider = Some(provider);
216        self
217    }
218
219    /// Replace the platform definition used by the runtime.
220    pub fn platform_definition(mut self, platform_definition: PlatformDefinition) -> Self {
221        self.platform_definition = platform_definition;
222        self
223    }
224
225    /// Register an additional capability on the runtime platform.
226    pub fn capability<C: Capability + 'static>(mut self, capability: C) -> Self {
227        self.platform_definition
228            .capability_registry_mut()
229            .register(capability);
230        self
231    }
232
233    /// Replace the platform driver registry.
234    pub fn driver_registry(mut self, driver_registry: DriverRegistry) -> Self {
235        *self.platform_definition.driver_registry_mut() = driver_registry;
236        self
237    }
238
239    /// Register the built-in `llmsim` driver for deterministic local execution.
240    pub fn llm_sim(mut self, config: LlmSimConfig) -> Self {
241        self.llm_sim_config = Some(config);
242        self
243    }
244
245    /// Set the runtime default model used when sessions/agents do not override it.
246    pub fn default_model(mut self, model: ModelWithProvider) -> Self {
247        self.default_model = Some(model);
248        self
249    }
250
251    /// Supply a custom backend bundle instead of the built-in in-memory stores.
252    pub fn backends(mut self, backends: RuntimeBackends) -> Self {
253        self.backends = Some(backends);
254        self
255    }
256
257    /// Supply host dependencies needed by the platform session filesystem factory.
258    pub fn session_file_system_factory_context(
259        mut self,
260        context: SessionFileSystemFactoryContext,
261    ) -> Self {
262        self.session_file_system_factory_context = context;
263        self
264    }
265
266    /// Seed a harness into the runtime store.
267    pub fn harness(mut self, harness: Harness) -> Self {
268        self.harnesses.push(harness);
269        self
270    }
271
272    /// Seed an agent into the runtime store.
273    pub fn agent(mut self, agent: Agent) -> Self {
274        self.agents.push(agent);
275        self
276    }
277
278    /// Seed a session into the runtime store.
279    pub fn session(mut self, session: Session) -> Self {
280        self.sessions.push(session);
281        self
282    }
283
284    /// Seed one harness, one agent, and one session with a compact sub-builder.
285    ///
286    /// The generated session id is exposed from the built runtime via
287    /// [`InProcessRuntime::default_session_id`].
288    pub fn single_session<F>(mut self, configure: F) -> Self
289    where
290        F: FnOnce(SingleSessionBuilder) -> SingleSessionBuilder,
291    {
292        let (harness, agent, session, session_id) =
293            configure(SingleSessionBuilder::default()).build();
294        self.harnesses.push(harness);
295        self.agents.push(agent);
296        self.sessions.push(session);
297        self.default_session_id = Some(session_id);
298        self
299    }
300
301    /// Seed an additional text file directly into a session workspace.
302    ///
303    /// This is applied after harness/agent/session `initial_files` are merged.
304    pub fn seed_text_file(
305        mut self,
306        session_id: SessionId,
307        path: impl Into<String>,
308        content: impl Into<String>,
309    ) -> Self {
310        self.seeded_files.push((
311            session_id,
312            InitialFile {
313                path: path.into(),
314                content: content.into(),
315                encoding: "text".to_string(),
316                is_readonly: false,
317            },
318        ));
319        self
320    }
321
322    /// Build the in-process runtime.
323    ///
324    /// Returns a configuration error when no default model is available after
325    /// applying explicit configuration and any requested `llmsim` setup.
326    pub async fn build(mut self) -> Result<InProcessRuntime> {
327        let backends = match self.backends.take() {
328            Some(backends) => backends,
329            None => RuntimeBackends::in_memory(),
330        };
331        let file_store = resolve_session_file_system(
332            &self.platform_definition,
333            self.session_file_system_factory_context.clone(),
334        )
335        .await?;
336
337        if let Some(config) = self.llm_sim_config.take() {
338            let driver = LlmSimDriver::new(config);
339            self.platform_definition
340                .driver_registry_mut()
341                .register(ProviderType::LlmSim, move |_api_key, _base_url| {
342                    Box::new(driver.clone())
343                });
344
345            if self.default_model.is_none() {
346                self.default_model = Some(ModelWithProvider {
347                    model: "llmsim-model".to_string(),
348                    provider_type: LlmProviderType::LlmSim,
349                    api_key: Some("fake-key".to_string()),
350                    base_url: None,
351                });
352            }
353        }
354
355        let default_model = self.default_model.ok_or_else(|| {
356            AgentLoopError::config(
357                "in-process runtime requires a default model; call \
358                 InProcessRuntimeBuilder::default_model(...) or \
359                 InProcessRuntimeBuilder::llm_sim(...)",
360            )
361        })?;
362
363        backends
364            .provider_store
365            .set_default_model(default_model)
366            .await?;
367
368        for harness in &self.harnesses {
369            backends.harness_store.add_harness(harness.clone()).await?;
370        }
371        for agent in &self.agents {
372            backends.agent_store.add_agent(agent.clone()).await?;
373        }
374        for session in &self.sessions {
375            backends.session_store.add_session(session.clone()).await?;
376        }
377
378        for session in &self.sessions {
379            seed_runtime_initial_files(
380                backends.harness_store.as_ref(),
381                backends.agent_store.as_ref(),
382                file_store.as_ref(),
383                session,
384            )
385            .await?;
386        }
387
388        for (session_id, file) in &self.seeded_files {
389            file_store.seed_initial_file(*session_id, file).await?;
390        }
391
392        let persisting_emitter =
393            PersistingEventEmitter::new(backends.event_bus.clone(), backends.message_store.clone());
394
395        Ok(InProcessRuntime {
396            platform_definition: Arc::new(self.platform_definition),
397            harness_store: backends.harness_store,
398            agent_store: backends.agent_store,
399            session_store: backends.session_store,
400            default_session_id: self.default_session_id,
401            message_store: backends.message_store,
402            provider_store: backends.provider_store,
403            event_bus: backends.event_bus,
404            persisting_emitter,
405            file_store,
406            storage_store: backends.storage_store,
407            memory_store: backends.memory_store,
408            mcp_auth_provider: self
409                .mcp_auth_provider
410                .unwrap_or_else(|| Arc::new(everruns_mcp::NoAuthProvider)),
411        })
412    }
413}
414
415async fn resolve_session_file_system(
416    platform_definition: &PlatformDefinition,
417    file_system_factory_context: SessionFileSystemFactoryContext,
418) -> Result<Arc<dyn SessionFileSystem>> {
419    let file_system_factory = platform_definition.session_file_system_factory();
420    if file_system_factory.is_disabled() {
421        Ok(Arc::new(InMemorySessionFileStore::new()))
422    } else {
423        Ok(file_system_factory
424            .create_session_file_system(file_system_factory_context)
425            .await?)
426    }
427}
428
429#[derive(Clone)]
430/// Public in-process runtime backed by either in-memory or custom stores.
431///
432/// This runtime is intended for embedders who want to execute Everruns
433/// harnesses inside their own process while controlling capabilities,
434/// harness definitions, and driver registrations directly in Rust.
435pub struct InProcessRuntime {
436    platform_definition: Arc<PlatformDefinition>,
437    harness_store: Arc<dyn RuntimeHarnessStore>,
438    agent_store: Arc<dyn RuntimeAgentStore>,
439    session_store: Arc<dyn RuntimeSessionStore>,
440    default_session_id: Option<SessionId>,
441    message_store: Arc<dyn RuntimeMessageStore>,
442    provider_store: Arc<dyn RuntimeProviderStore>,
443    event_bus: Arc<dyn EventBus>,
444    persisting_emitter: PersistingEventEmitter,
445    file_store: Arc<dyn SessionFileSystem>,
446    storage_store: Arc<dyn SessionStorageStore>,
447    memory_store: Arc<dyn MemoryStoreBackend>,
448    mcp_auth_provider: Arc<dyn everruns_mcp::McpAuthProvider>,
449}
450
451impl InProcessRuntime {
452    /// Build the shared MCP client over the platform egress boundary and the
453    /// configured auth provider.
454    fn mcp_client(&self) -> Arc<everruns_mcp::McpClient> {
455        Arc::new(everruns_mcp::McpClient::new(
456            self.platform_definition.egress_service(),
457            self.mcp_auth_provider.clone(),
458        ))
459    }
460
461    /// Resolve the effective scoped MCP servers for a session.
462    async fn session_mcp_servers(
463        &self,
464        session: &Session,
465        agent: Option<&Agent>,
466    ) -> everruns_core::ScopedMcpServers {
467        let harness_chain = self
468            .harness_store
469            .get_harness_chain(session.harness_id)
470            .await
471            .unwrap_or_default();
472        crate::mcp::merge_session_scoped_servers(&harness_chain, agent, session)
473    }
474    /// Create a builder for the in-process runtime.
475    pub fn builder() -> InProcessRuntimeBuilder {
476        InProcessRuntimeBuilder::new()
477    }
478
479    /// Return the default session id seeded by
480    /// [`InProcessRuntimeBuilder::single_session`], if one was configured.
481    pub fn default_session_id(&self) -> Option<SessionId> {
482        self.default_session_id
483    }
484
485    /// Execute one turn for an existing session.
486    ///
487    /// The input message is stored in the runtime history, an `input.message`
488    /// event is emitted, and the turn then executes the shared core
489    /// `input -> reason -> act` state machine.
490    pub async fn run_turn(
491        &self,
492        session_id: SessionId,
493        input: impl Into<InputMessage>,
494    ) -> Result<TurnResult> {
495        let session = self
496            .session_store
497            .get_session(session_id)
498            .await?
499            .ok_or_else(|| AgentLoopError::store(format!("session not found: {session_id}")))?;
500
501        // Input message is recorded directly (and emitted via the raw bus so
502        // that PersistingEventEmitter does not double-store it). All
503        // subsequent activity-emitted events flow through the persisting
504        // emitter the adapter hands out.
505        let input_message = self
506            .message_store
507            .add_input_message(session_id, input.into())
508            .await?;
509        self.event_bus
510            .emit(EventRequest::new(
511                session_id,
512                EventContext::empty(),
513                InputMessageData::new(input_message.clone()),
514            ))
515            .await?;
516
517        let assembled = self
518            .inspect_context_with_ids(session_id, session.harness_id, session.agent_id)
519            .await?;
520        let synthetic_agent_id = session
521            .agent_id
522            .unwrap_or_else(|| AgentId::from_uuid(session.id.uuid()));
523        let org_id = in_process_internal_org_id(&session.organization_id);
524        let mut state_machine = TurnStateMachine::new(
525            TurnContext::new(session_id, input_message.id, synthetic_agent_id, org_id),
526            assembled.runtime_agent.max_iterations,
527        );
528
529        let mut previous_response_id: Option<String> = None;
530        let mut last_reason_result: Option<everruns_core::ReasonResult> = None;
531
532        loop {
533            match state_machine.next_action() {
534                TurnAction::ExecuteInput => {
535                    let ctx = state_machine.context();
536                    let base_context =
537                        AtomContext::new(ctx.session_id, ctx.turn_id, ctx.input_message_id);
538                    execute_input_activity(
539                        self,
540                        org_id,
541                        InputAtomInput {
542                            context: base_context,
543                        },
544                    )
545                    .await?;
546                    state_machine.on_input_completed();
547                }
548                TurnAction::ExecuteReason => {
549                    let ctx = state_machine.context();
550                    let base_context =
551                        AtomContext::new(ctx.session_id, ctx.turn_id, ctx.input_message_id);
552                    let reason_result = execute_reason_activity(
553                        self,
554                        org_id,
555                        ReasonInput {
556                            context: base_context.next_exec(),
557                            harness_id: session.harness_id,
558                            agent_id: session.agent_id,
559                            org_id,
560                            mcp_tool_definitions: vec![],
561                            previous_response_id: previous_response_id.take(),
562                            iteration: state_machine.current_iteration() as u32 + 1,
563                        },
564                    )
565                    .await?;
566                    previous_response_id = reason_result.response_id.clone();
567                    state_machine.on_reason_completed(
568                        reason_result.text.clone(),
569                        reason_result.has_tool_calls,
570                        reason_result.tool_calls.len(),
571                        reason_result.success,
572                        reason_result.error.clone(),
573                        false,
574                    );
575                    if reason_result.has_tool_calls {
576                        last_reason_result = Some(reason_result);
577                    }
578                }
579                TurnAction::ExecuteAct => {
580                    let reason_result = last_reason_result
581                        .take()
582                        .expect("ExecuteAct requires a prior ReasonResult");
583                    let ctx = state_machine.context();
584                    let base_context =
585                        AtomContext::new(ctx.session_id, ctx.turn_id, ctx.input_message_id);
586                    execute_act_activity(
587                        self,
588                        ActInput {
589                            org_id: Some(org_id),
590                            context: base_context.next_exec(),
591                            harness_id: session.harness_id,
592                            agent_id: session.agent_id,
593                            tool_calls: reason_result.tool_calls,
594                            tool_definitions: reason_result.tool_definitions,
595                            locale: reason_result.locale,
596                            blueprint_id: None,
597                            network_access: reason_result.network_access,
598                            // Request-level `parallel_tool_calls` is not yet
599                            // plumbed into the reason path; the act scheduler
600                            // defaults to its class-aware concurrent schedule.
601                            parallel_tool_calls: None,
602                        },
603                    )
604                    .await?;
605                    state_machine.on_act_completed();
606                }
607                TurnAction::Complete(outcome) => {
608                    let ctx = state_machine.context();
609                    let lifecycle =
610                        RuntimeSessionLifecycle::new(self.clone(), org_id, ctx.session_id);
611                    let turn_succeeded = matches!(
612                        &outcome,
613                        TurnOutcome::Success { .. } | TurnOutcome::MaxIterationsReached { .. }
614                    );
615                    match &outcome {
616                        TurnOutcome::Success { iterations, .. }
617                        | TurnOutcome::MaxIterationsReached { iterations, .. } => {
618                            lifecycle
619                                .turn_completed(
620                                    ctx.turn_id,
621                                    ctx.input_message_id,
622                                    *iterations as u32,
623                                    None,
624                                    None,
625                                )
626                                .await;
627                        }
628                        TurnOutcome::Failed { error, .. } => {
629                            lifecycle
630                                .turn_failed(ctx.turn_id, ctx.input_message_id, error, None)
631                                .await;
632                        }
633                    }
634                    // turn_end lifecycle hooks (advisory). Fired after the
635                    // terminal turn event for both success and failure.
636                    lifecycle
637                        .fire_turn_end_hooks(
638                            session.harness_id,
639                            session.agent_id,
640                            ctx.turn_id,
641                            turn_succeeded,
642                        )
643                        .await;
644                    return Ok(TurnResult::from_outcome(outcome, ctx.turn_id));
645                }
646            }
647        }
648    }
649
650    pub async fn run_text_turn(
651        &self,
652        session_id: SessionId,
653        text: impl Into<String>,
654    ) -> Result<TurnResult> {
655        self.run_turn(session_id, InputMessage::user(text)).await
656    }
657
658    /// Load the current message history for a session.
659    pub async fn messages(&self, session_id: SessionId) -> Result<Vec<Message>> {
660        self.message_store.load(session_id).await
661    }
662
663    /// Read a file from the in-memory session filesystem.
664    pub async fn read_file(
665        &self,
666        session_id: SessionId,
667        path: &str,
668    ) -> Result<Option<SessionFile>> {
669        self.file_store.read_file(session_id, path).await
670    }
671
672    /// Assemble the current runtime context for a session without executing a turn.
673    pub async fn load_context(&self, session_id: SessionId) -> Result<AssembledTurnContext> {
674        let session = self
675            .session_store
676            .get_session(session_id)
677            .await?
678            .ok_or_else(|| AgentLoopError::store(format!("session not found: {session_id}")))?;
679        self.inspect_context_with_ids(session_id, session.harness_id, session.agent_id)
680            .await
681    }
682
683    /// Return all collected events from the runtime event bus.
684    ///
685    /// Event buses that do not retain events return an empty `Vec` (see
686    /// [`EventBus::collected_events`]).
687    pub async fn events(&self) -> Result<Vec<Event>> {
688        Ok(self.event_bus.collected_events().await)
689    }
690
691    /// Execute a system command declared by a registered capability.
692    ///
693    /// Looks up the first capability whose `commands()` includes the named
694    /// command (in capability-resolution order) and delegates to its
695    /// `execute_command`. Returns an error if no capability declares the
696    /// requested name. The coding-CLI example uses this for `/model`
697    /// (provided by `ModelSwitcherCapability`) so the dispatch path stays
698    /// inside the capability instead of the TUI's local `handle_command`
699    /// branches.
700    pub async fn execute_command(
701        &self,
702        session_id: SessionId,
703        request: everruns_core::command::ExecuteCommandRequest,
704    ) -> Result<everruns_core::command::CommandResult> {
705        let ctx = self.load_context(session_id).await?;
706        let registry = self.platform_definition.capability_registry();
707        let exec_ctx = everruns_core::command::CommandExecutionContext { session_id };
708        for config in &ctx.resolved_capability_configs {
709            let Some(capability) = registry.get(config.capability_id()) else {
710                continue;
711            };
712            if capability.commands().iter().any(|c| c.name == request.name) {
713                return capability.execute_command(&request, &exec_ctx).await;
714            }
715        }
716        Err(AgentLoopError::config(format!(
717            "no capability declares command /{}",
718            request.name
719        )))
720    }
721
722    /// List slash commands available for a session.
723    ///
724    /// Resolves the session's harness/agent capability chain and aggregates
725    /// commands declared via [`Capability::commands`], deduplicated by name
726    /// (first occurrence wins, matching the order of resolved capabilities).
727    /// This is the embedded equivalent of the server's
728    /// `GET /v1/sessions/{id}/commands` system-commands list — skill
729    /// commands are not included here because skills are discovered via the
730    /// platform filesystem rather than the capability registry.
731    pub async fn list_commands(
732        &self,
733        session_id: SessionId,
734    ) -> Result<Vec<everruns_core::command::CommandDescriptor>> {
735        let ctx = self.load_context(session_id).await?;
736        let registry = self.platform_definition.capability_registry();
737        let mut seen = std::collections::HashSet::new();
738        let mut commands = Vec::new();
739        for config in &ctx.resolved_capability_configs {
740            let Some(capability) = registry.get(config.capability_id()) else {
741                continue;
742            };
743            for command in capability.commands() {
744                if seen.insert(command.name.clone()) {
745                    commands.push(command);
746                }
747            }
748        }
749        Ok(commands)
750    }
751
752    async fn inspect_context_with_ids(
753        &self,
754        session_id: SessionId,
755        harness_id: everruns_core::HarnessId,
756        agent_id: Option<AgentId>,
757    ) -> Result<AssembledTurnContext> {
758        inspect_turn_context(
759            self.harness_store.as_ref(),
760            self.agent_store.as_ref(),
761            self.session_store.as_ref(),
762            self.message_store.as_ref(),
763            self.provider_store.as_ref(),
764            self.platform_definition.capability_registry(),
765            session_id,
766            harness_id,
767            agent_id,
768            &[],
769            Some(self.file_store.clone()),
770        )
771        .await
772    }
773}
774
775#[async_trait]
776impl RuntimeHostAdapter for InProcessRuntime {
777    async fn get_agent(&self, _org_id: i64, agent_id: AgentId) -> Result<Option<Agent>> {
778        self.agent_store.get_agent(agent_id).await
779    }
780
781    async fn get_harness(&self, _org_id: i64, harness_id: HarnessId) -> Result<Option<Harness>> {
782        let chain = self.harness_store.get_harness_chain(harness_id).await?;
783        Ok(chain.into_iter().last())
784    }
785
786    async fn set_session_status(
787        &self,
788        _org_id: i64,
789        session_id: SessionId,
790        _status: SessionStatus,
791    ) -> Result<Session> {
792        // The in-process runtime does not persist status. Lifecycle callers
793        // still emit their events; downstream consumers in-process don't
794        // observe session.status.
795        self.session_store
796            .get_session(session_id)
797            .await?
798            .ok_or_else(|| AgentLoopError::store(format!("session not found: {session_id}")))
799    }
800
801    async fn load_turn_context(
802        &self,
803        _org_id: i64,
804        session_id: SessionId,
805    ) -> Result<RuntimeHostTurnContext> {
806        let session = self
807            .session_store
808            .get_session(session_id)
809            .await?
810            .ok_or_else(|| AgentLoopError::store(format!("session not found: {session_id}")))?;
811        let agent = match session.agent_id {
812            Some(agent_id) => self.agent_store.get_agent(agent_id).await?,
813            None => None,
814        };
815        let messages = self.message_store.load(session_id).await?;
816        let model = self.provider_store.get_default_model().await?;
817
818        // Discover tools from the session's scoped MCP servers so they appear
819        // to the LLM alongside built-in tools (specs/runtime-mcp.md D4).
820        let scoped_servers = self.session_mcp_servers(&session, agent.as_ref()).await;
821        let mcp_tool_definitions = if scoped_servers.is_empty() {
822            vec![]
823        } else {
824            crate::mcp::discover_tool_definitions(
825                &self.mcp_client(),
826                session_id.uuid(),
827                &scoped_servers,
828            )
829            .await
830        };
831
832        Ok(RuntimeHostTurnContext {
833            agent,
834            session,
835            messages,
836            model,
837            mcp_tool_definitions,
838        })
839    }
840
841    async fn mcp_executor(
842        &self,
843        _org_id: i64,
844        session_id: SessionId,
845    ) -> Option<Arc<everruns_mcp::McpExecutor>> {
846        let session = self.session_store.get_session(session_id).await.ok()??;
847        let agent = match session.agent_id {
848            Some(agent_id) => self.agent_store.get_agent(agent_id).await.ok().flatten(),
849            None => None,
850        };
851        let scoped_servers = self.session_mcp_servers(&session, agent.as_ref()).await;
852        crate::mcp::build_executor(self.mcp_client(), &scoped_servers)
853    }
854
855    fn capability_registry(&self) -> CapabilityRegistry {
856        self.platform_definition.capability_registry().clone()
857    }
858
859    fn driver_registry(&self) -> DriverRegistry {
860        self.platform_definition.driver_registry().clone()
861    }
862
863    fn harness_store(&self, _org_id: i64) -> Arc<dyn HarnessStore> {
864        self.harness_store.clone()
865    }
866
867    fn agent_store(&self, _org_id: i64) -> Arc<dyn AgentStore> {
868        self.agent_store.clone()
869    }
870
871    fn session_store(&self, _org_id: i64) -> Arc<dyn SessionStore> {
872        self.session_store.clone()
873    }
874
875    fn session_mutator(&self, _org_id: i64) -> Arc<dyn SessionMutator> {
876        self.session_store.clone()
877    }
878
879    fn provider_store(&self, _org_id: i64) -> Arc<dyn LlmProviderStore> {
880        self.provider_store.clone()
881    }
882
883    fn message_store(&self) -> Arc<dyn MessageRetriever> {
884        self.message_store.clone()
885    }
886
887    fn event_emitter(&self) -> Arc<dyn EventEmitter> {
888        Arc::new(self.persisting_emitter.clone())
889    }
890
891    fn file_store(&self) -> Arc<dyn SessionFileSystem> {
892        self.file_store.clone()
893    }
894
895    fn storage_store(&self) -> Option<Arc<dyn SessionStorageStore>> {
896        Some(self.storage_store.clone())
897    }
898
899    fn utility_llm_service(&self) -> Option<Arc<dyn everruns_core::UtilityLlmService>> {
900        Some(self.platform_definition.utility_llm_service())
901    }
902
903    fn egress_service(&self) -> Option<Arc<dyn everruns_core::EgressService>> {
904        Some(self.platform_definition.egress_service())
905    }
906
907    fn memory_store(&self, _org_id: i64) -> Option<Arc<dyn MemoryStoreBackend>> {
908        Some(self.memory_store.clone())
909    }
910}
911
912#[derive(Clone)]
913struct PersistingEventEmitter {
914    inner: Arc<dyn EventBus>,
915    message_store: Arc<dyn RuntimeMessageStore>,
916}
917
918impl PersistingEventEmitter {
919    fn new(inner: Arc<dyn EventBus>, message_store: Arc<dyn RuntimeMessageStore>) -> Self {
920        Self {
921            inner,
922            message_store,
923        }
924    }
925}
926
927#[async_trait]
928impl EventEmitter for PersistingEventEmitter {
929    async fn emit(&self, request: EventRequest) -> Result<Event> {
930        let event = self.inner.emit(request.clone()).await?;
931        if let Some(message) = message_from_event(&event.data) {
932            self.message_store
933                .store_message(request.session_id, message)
934                .await?;
935        }
936        Ok(event)
937    }
938}
939
940fn effective_overlay(
941    harness_chain: &[Harness],
942    agent: Option<&Agent>,
943    session: &Session,
944) -> AgentConfigOverlay {
945    let harness_layers = harness_chain.iter().map(AgentConfigOverlay::from);
946    let agent_layers = agent.into_iter().map(AgentConfigOverlay::from);
947    AgentConfigOverlay::fold(
948        harness_layers
949            .chain(agent_layers)
950            .chain([AgentConfigOverlay::from(session)]),
951    )
952}
953
954async fn seed_runtime_initial_files(
955    harness_store: &dyn RuntimeHarnessStore,
956    agent_store: &dyn RuntimeAgentStore,
957    file_store: &dyn SessionFileSystem,
958    session: &Session,
959) -> Result<()> {
960    let harness_chain = harness_store.get_harness_chain(session.harness_id).await?;
961    if harness_chain.is_empty() {
962        return Err(AgentLoopError::store(format!(
963            "harness not found while seeding files: {}",
964            session.harness_id
965        )));
966    }
967    let agent = match session.agent_id {
968        Some(agent_id) => Some(
969            agent_store
970                .get_agent(agent_id)
971                .await?
972                .ok_or_else(|| AgentLoopError::store(format!("agent not found: {agent_id}")))?,
973        ),
974        None => None,
975    };
976    let overlay = effective_overlay(&harness_chain, agent.as_ref(), session);
977    for file in &overlay.initial_files {
978        file_store.seed_initial_file(session.id, file).await?;
979    }
980    Ok(())
981}
982
983fn message_from_event(data: &EventData) -> Option<Message> {
984    match data {
985        EventData::InputMessage(data) => Some(data.message.clone()),
986        EventData::OutputMessageCompleted(OutputMessageCompletedData { message, .. }) => {
987            Some(message.clone())
988        }
989        EventData::ToolCompleted(data) => Some(tool_completed_to_message(data.clone())),
990        _ => None,
991    }
992}
993
994fn tool_completed_to_message(data: ToolCompletedData) -> Message {
995    let mut images: Vec<ToolResultImage> = Vec::new();
996    let metadata = tool_result_metadata(&data);
997    let result = data.result.map(|parts| {
998        for part in &parts {
999            if let ContentPart::Image(img) = part
1000                && let (Some(base64), Some(media_type)) = (&img.base64, &img.media_type)
1001            {
1002                images.push(ToolResultImage {
1003                    base64: base64.clone(),
1004                    media_type: media_type.clone(),
1005                });
1006            }
1007        }
1008
1009        let text_parts: Vec<&ContentPart> = parts
1010            .iter()
1011            .filter(|part| matches!(part, ContentPart::Text(_)))
1012            .collect();
1013        if text_parts.len() == 1
1014            && let ContentPart::Text(text) = text_parts[0]
1015        {
1016            return parse_structured_tool_result_text(&text.text);
1017        }
1018        if !text_parts.is_empty() {
1019            serde_json::to_value(&text_parts).unwrap_or_default()
1020        } else {
1021            serde_json::Value::Null
1022        }
1023    });
1024
1025    let mut message = if images.is_empty() {
1026        Message::tool_result(&data.tool_call_id, result, data.error)
1027    } else {
1028        Message::tool_result_with_images(&data.tool_call_id, result, images)
1029    };
1030    message.metadata = metadata;
1031    message
1032}
1033
1034fn tool_result_metadata(
1035    data: &ToolCompletedData,
1036) -> Option<std::collections::HashMap<String, serde_json::Value>> {
1037    let mut metadata = std::collections::HashMap::new();
1038    metadata.insert("tool_name".to_string(), serde_json::json!(data.tool_name));
1039    if let Some(fingerprint) = &data.tool_call_fingerprint {
1040        metadata.insert(
1041            "tool_call_fingerprint".to_string(),
1042            serde_json::json!(fingerprint),
1043        );
1044    }
1045    if let Some(fingerprint) = &data.tool_result_fingerprint {
1046        metadata.insert(
1047            "tool_result_fingerprint".to_string(),
1048            serde_json::json!(fingerprint),
1049        );
1050    }
1051    (!metadata.is_empty()).then_some(metadata)
1052}
1053
1054fn parse_structured_tool_result_text(text: &str) -> serde_json::Value {
1055    let trimmed = text.trim_start();
1056    if !trimmed.starts_with('{') && !trimmed.starts_with('[') {
1057        return serde_json::Value::String(text.to_string());
1058    }
1059
1060    match serde_json::from_str(text) {
1061        Ok(value @ (serde_json::Value::Object(_) | serde_json::Value::Array(_))) => value,
1062        _ => serde_json::Value::String(text.to_string()),
1063    }
1064}
1065
1066#[cfg(test)]
1067mod tool_completed_replay_tests {
1068    use super::*;
1069
1070    #[test]
1071    fn tool_completed_replay_preserves_json_object_shape() {
1072        let data = ToolCompletedData::success(
1073            "call_read".to_string(),
1074            "read_file".to_string(),
1075            vec![ContentPart::text(
1076                serde_json::json!({
1077                    "path": "/workspace/src/lib.rs",
1078                    "content": "1|fn main() {}"
1079                })
1080                .to_string(),
1081            )],
1082            Some(1),
1083        );
1084
1085        let message = tool_completed_to_message(data);
1086        let result = message
1087            .tool_result_content()
1088            .and_then(|content| content.result.as_ref())
1089            .expect("tool result should be present");
1090
1091        assert_eq!(result["path"], "/workspace/src/lib.rs");
1092        assert_eq!(result["content"], "1|fn main() {}");
1093    }
1094
1095    #[test]
1096    fn tool_completed_replay_keeps_scalar_json_as_text() {
1097        let data = ToolCompletedData::success(
1098            "call_scalar".to_string(),
1099            "custom_tool".to_string(),
1100            vec![ContentPart::text("123")],
1101            Some(1),
1102        );
1103
1104        let message = tool_completed_to_message(data);
1105        let result = message
1106            .tool_result_content()
1107            .and_then(|content| content.result.as_ref())
1108            .expect("tool result should be present");
1109
1110        assert_eq!(result, &serde_json::Value::String("123".to_string()));
1111    }
1112
1113    #[test]
1114    fn tool_completed_replay_preserves_fingerprints_as_metadata() {
1115        let data = ToolCompletedData::success(
1116            "call_read".to_string(),
1117            "read_file".to_string(),
1118            vec![ContentPart::text("{}")],
1119            Some(1),
1120        )
1121        .with_fingerprints("sha256:call".to_string(), "sha256:result".to_string());
1122
1123        let message = tool_completed_to_message(data);
1124        let metadata = message.metadata.expect("metadata should be present");
1125
1126        assert_eq!(metadata["tool_name"], "read_file");
1127        assert_eq!(metadata["tool_call_fingerprint"], "sha256:call");
1128        assert_eq!(metadata["tool_result_fingerprint"], "sha256:result");
1129    }
1130}
1131
1132#[cfg(test)]
1133mod org_id_mapping_tests {
1134    use super::*;
1135    use everruns_core::{DEFAULT_ORG_ID, DEFAULT_ORG_PUBLIC_ID, org_public_id_from_internal};
1136
1137    #[test]
1138    fn default_public_id_maps_to_default_org() {
1139        assert_eq!(
1140            in_process_internal_org_id(DEFAULT_ORG_PUBLIC_ID),
1141            DEFAULT_ORG_ID
1142        );
1143    }
1144
1145    #[test]
1146    fn invalid_public_id_does_not_fall_back_to_default() {
1147        for invalid in [
1148            "",
1149            "not-an-org",
1150            "org_short",
1151            "org_ZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZ",
1152            "ORG_00000000000000000000000000000001",
1153        ] {
1154            let mapped = in_process_internal_org_id(invalid);
1155            assert_ne!(mapped, everruns_core::DEFAULT_ORG_ID);
1156            assert!(
1157                mapped >= 2,
1158                "invalid input {invalid:?} should not map to default"
1159            );
1160        }
1161    }
1162
1163    #[test]
1164    fn zero_public_id_does_not_fall_back_to_default() {
1165        // org_public_id_from_internal never produces this; a hand-crafted
1166        // all-zeros id is treated as invalid (raw == 0).
1167        let mapped = in_process_internal_org_id("org_00000000000000000000000000000000");
1168        assert_ne!(mapped, everruns_core::DEFAULT_ORG_ID);
1169        assert!(mapped >= 2, "all-zero id should not map to default");
1170    }
1171
1172    #[test]
1173    fn synthetic_public_id_round_trips_with_internal_helper() {
1174        for internal in [1_i64, 2, 42, 1_000_000, i64::MAX - 1, i64::MAX] {
1175            let public = org_public_id_from_internal(internal);
1176            assert_eq!(
1177                in_process_internal_org_id(&public),
1178                internal,
1179                "round-trip failed for internal={internal}"
1180            );
1181        }
1182    }
1183
1184    #[test]
1185    fn distinct_synthetic_ids_map_to_distinct_internal_ids() {
1186        let a = org_public_id_from_internal(7);
1187        let b = org_public_id_from_internal(8);
1188        assert_ne!(a, b);
1189        assert_ne!(
1190            in_process_internal_org_id(&a),
1191            in_process_internal_org_id(&b)
1192        );
1193    }
1194
1195    #[test]
1196    fn high_entropy_uuid_style_id_hashes_into_reserved_range() {
1197        // First valid UUID-style id whose raw u128 exceeds i64::MAX
1198        // (top bit of the u128 set). It must hash to a positive i64 that
1199        // is neither 0 nor DEFAULT_ORG_ID.
1200        let high = "org_80000000000000000000000000000000";
1201        let mapped = in_process_internal_org_id(high);
1202        assert!(mapped >= 2, "mapped id {mapped} must be >= 2");
1203        assert_ne!(mapped, DEFAULT_ORG_ID);
1204
1205        // Mapping is deterministic.
1206        assert_eq!(mapped, in_process_internal_org_id(high));
1207    }
1208
1209    #[test]
1210    fn high_entropy_ids_are_isolated_from_each_other() {
1211        let a = in_process_internal_org_id("org_80000000000000000000000000000001");
1212        let b = in_process_internal_org_id("org_80000000000000000000000000000002");
1213        assert_ne!(a, b);
1214        assert_ne!(a, DEFAULT_ORG_ID);
1215        assert_ne!(b, DEFAULT_ORG_ID);
1216    }
1217
1218    #[test]
1219    fn hash_uses_stable_sha256_truncation() {
1220        // SHA-256 with fixed big-endian first-8-byte truncation gives a value
1221        // we can pin. If this assertion ever breaks, callers depending on
1222        // build-stable mapping must be re-audited.
1223        let mapped = in_process_internal_org_id("org_80000000000000000000000000000000");
1224        let expected = {
1225            let digest = sha2::Sha256::digest(b"org_80000000000000000000000000000000");
1226            let mut buf = [0u8; 8];
1227            buf.copy_from_slice(&digest[..8]);
1228            let raw = u64::from_be_bytes(buf);
1229            ((raw % ((i64::MAX - 1) as u64)) as i64) + 2
1230        };
1231        assert_eq!(mapped, expected);
1232    }
1233
1234    #[test]
1235    fn oversize_input_is_bounded_and_does_not_collide_silently() {
1236        // Inputs past HASH_INPUT_CAP_BYTES are truncated before hashing, so
1237        // two oversize strings that agree on the first cap bytes map to the
1238        // same internal id. We only assert the result stays in the safe
1239        // [2, i64::MAX] range and is not DEFAULT_ORG_ID — the cap exists to
1240        // bound work, not to widen the input space.
1241        let oversize = "x".repeat(super::HASH_INPUT_CAP_BYTES * 4);
1242        let mapped = in_process_internal_org_id(&oversize);
1243        assert!(mapped >= 2);
1244        assert_ne!(mapped, DEFAULT_ORG_ID);
1245    }
1246}