Skip to main content

everruns_runtime/
runtime.rs

1// In-process runtime builder and runner.
2// Decision: the public runtime is in-memory today, but uses the same core atoms
3// and capability resolution path as the durable worker so behavior stays close.
4
5use crate::backends::{
6    EventBus, RuntimeAgentStore, RuntimeBackends, RuntimeHarnessStore, RuntimeMessageStore,
7    RuntimeProviderStore, RuntimeSessionStore,
8};
9use crate::builders::SingleSessionBuilder;
10use crate::host::{
11    RuntimeHostAdapter, RuntimeHostTurnContext, RuntimeSessionLifecycle, execute_act_activity,
12    execute_input_activity, execute_reason_activity,
13};
14use crate::in_memory::{InMemorySessionFileStore, InMemorySessionFileSystemFactory};
15use async_trait::async_trait;
16use everruns_core::agent::Agent;
17use everruns_core::atoms::{ActInput, AtomContext, InputAtomInput, ReasonInput};
18use everruns_core::capabilities::{Capability, CapabilityRegistry};
19use everruns_core::config_layer::AgentConfigOverlay;
20use everruns_core::error::{AgentLoopError, Result};
21use everruns_core::events::{
22    Event, EventContext, EventData, EventRequest, InputMessageData, OutputMessageCompletedData,
23    ToolCompletedData,
24};
25use everruns_core::harness::Harness;
26use everruns_core::llm_driver_registry::{DriverRegistry, ProviderType};
27use everruns_core::llm_models::LlmProviderType;
28use everruns_core::llmsim_driver::{LlmSimConfig, LlmSimDriver};
29use everruns_core::message::{ContentPart, Message};
30use everruns_core::platform_definition::PlatformDefinition;
31use everruns_core::runtime_context::{AssembledTurnContext, inspect_turn_context};
32use everruns_core::session::{Session, SessionStatus};
33use everruns_core::session_file::{InitialFile, SessionFile};
34use everruns_core::tools::ToolResultImage;
35use everruns_core::traits::{
36    AgentStore, EventEmitter, HarnessStore, LlmProviderStore, ModelWithProvider, SessionMutator,
37    SessionStorageStore, SessionStore,
38};
39use everruns_core::turn::{TurnAction, TurnContext, TurnOutcome, TurnStateMachine};
40use everruns_core::typed_id::{AgentId, HarnessId, OrgId, SessionId};
41use everruns_core::{
42    InputMessage, MemoryStoreBackend, MessageRetriever, SessionFileSystem,
43    SessionFileSystemFactoryContext,
44};
45use sha2::{Digest, Sha256};
46use std::sync::Arc;
47
48/// Cap on the input length hashed by [`hash_public_org_id`].
49///
50/// Legitimate org public ids are `org_<32hex>` (36 bytes). Bounding the
51/// hashed prefix keeps worst-case cost predictable when an attacker-controlled
52/// session carries an oversize string.
53const HASH_INPUT_CAP_BYTES: usize = 128;
54
55/// Derive an internal `i64` org id from the public `org_<32hex>` form on a
56/// [`Session`].
57///
58/// Round-trip with [`everruns_core::org_public_id_from_internal`]: when the
59/// public id was produced by that helper (i.e. the upper bits are zero and
60/// the value fits in a positive `i64`), this returns the original internal
61/// id unchanged. Other values are mapped into `[2, i64::MAX]` by hashing the
62/// original public id string so runtime namespaces do not fail open to the
63/// shared default org and avoid arithmetic collision gadgets.
64fn in_process_internal_org_id(public_org_id: &str) -> i64 {
65    if public_org_id == everruns_core::DEFAULT_ORG_PUBLIC_ID {
66        return everruns_core::DEFAULT_ORG_ID;
67    }
68
69    let Ok(parsed) = public_org_id.parse::<OrgId>() else {
70        return hash_public_org_id(public_org_id);
71    };
72    let raw: u128 = parsed.uuid().as_u128();
73    if raw == 0 {
74        return hash_public_org_id(public_org_id);
75    }
76
77    // Synthetic ids from `org_public_id_from_internal(i64)` always fit here,
78    // so the in-process runtime sees the same `org_id` the server used.
79    if raw <= i64::MAX as u128 {
80        return raw as i64;
81    }
82
83    hash_public_org_id(public_org_id)
84}
85
86// Use SHA-256 with a fixed truncation scheme so the mapping is stable across
87// Rust/binary upgrades and predictable for any embedder. Input is bounded to
88// `HASH_INPUT_CAP_BYTES` so attacker-controlled oversize org strings cannot
89// drive unbounded hashing work.
90fn hash_public_org_id(public_org_id: &str) -> i64 {
91    let bytes = public_org_id.as_bytes();
92    let bounded = &bytes[..bytes.len().min(HASH_INPUT_CAP_BYTES)];
93    let digest = Sha256::digest(bounded);
94    let mut buf = [0u8; 8];
95    buf.copy_from_slice(&digest[..8]);
96    let raw = u64::from_be_bytes(buf);
97    ((raw % ((i64::MAX - 1) as u64)) as i64) + 2
98}
99
100#[derive(Debug, Clone)]
101pub struct TurnResult {
102    /// Final text response produced by the turn.
103    pub response: String,
104    /// Number of reason iterations executed.
105    pub iterations: usize,
106    /// Total number of tool calls executed during the turn.
107    pub tool_calls_count: usize,
108    /// Whether the turn completed without an unrecoverable failure.
109    pub success: bool,
110    /// Failure message when `success` is false.
111    pub error: Option<String>,
112    /// Turn identifier used to correlate emitted events.
113    pub turn_id: everruns_core::typed_id::TurnId,
114}
115
116impl TurnResult {
117    fn from_outcome(outcome: TurnOutcome, turn_id: everruns_core::typed_id::TurnId) -> Self {
118        match outcome {
119            TurnOutcome::Success {
120                response,
121                iterations,
122                tool_calls_count,
123            } => Self {
124                response,
125                iterations,
126                tool_calls_count,
127                success: true,
128                error: None,
129                turn_id,
130            },
131            TurnOutcome::Failed { error, iterations } => Self {
132                response: String::new(),
133                iterations,
134                tool_calls_count: 0,
135                success: false,
136                error: Some(error),
137                turn_id,
138            },
139            TurnOutcome::MaxIterationsReached {
140                response,
141                iterations,
142                tool_calls_count,
143            } => Self {
144                response,
145                iterations,
146                tool_calls_count,
147                success: true,
148                error: None,
149                turn_id,
150            },
151        }
152    }
153}
154
155/// Builder for the public in-process runtime.
156///
157/// The builder owns a standalone runtime bundle:
158/// - `PlatformDefinition` for capabilities and drivers
159/// - in-memory stores for sessions, files, storage, memory, and messages
160/// - seeded harness/agent/session entities
161///
162/// `build()` returns an [`InProcessRuntime`] that can execute turns in-process
163/// without the durable engine or the control-plane server.
164pub struct InProcessRuntimeBuilder {
165    platform_definition: PlatformDefinition,
166    llm_sim_config: Option<LlmSimConfig>,
167    default_model: Option<ModelWithProvider>,
168    backends: Option<RuntimeBackends>,
169    session_file_system_factory_context: SessionFileSystemFactoryContext,
170    harnesses: Vec<Harness>,
171    agents: Vec<Agent>,
172    sessions: Vec<Session>,
173    default_session_id: Option<SessionId>,
174    seeded_files: Vec<(SessionId, InitialFile)>,
175}
176
177impl Default for InProcessRuntimeBuilder {
178    fn default() -> Self {
179        Self::new()
180    }
181}
182
183impl InProcessRuntimeBuilder {
184    /// Create a builder with built-in capabilities and no implicit LLM driver.
185    ///
186    /// Embedders must either:
187    /// - call [`Self::llm_sim`] for deterministic local examples/tests, or
188    /// - register their own driver(s) on the platform definition and set a
189    ///   default model via [`Self::default_model`].
190    pub fn new() -> Self {
191        Self {
192            platform_definition: PlatformDefinition::builder()
193                .capability_registry(CapabilityRegistry::with_builtins())
194                .driver_registry(DriverRegistry::new())
195                .session_file_system_factory(Arc::new(InMemorySessionFileSystemFactory))
196                .build(),
197            llm_sim_config: None,
198            default_model: None,
199            backends: None,
200            session_file_system_factory_context: SessionFileSystemFactoryContext::new(),
201            harnesses: Vec::new(),
202            agents: Vec::new(),
203            sessions: Vec::new(),
204            default_session_id: None,
205            seeded_files: Vec::new(),
206        }
207    }
208
209    /// Replace the platform definition used by the runtime.
210    pub fn platform_definition(mut self, platform_definition: PlatformDefinition) -> Self {
211        self.platform_definition = platform_definition;
212        self
213    }
214
215    /// Register an additional capability on the runtime platform.
216    pub fn capability<C: Capability + 'static>(mut self, capability: C) -> Self {
217        self.platform_definition
218            .capability_registry_mut()
219            .register(capability);
220        self
221    }
222
223    /// Replace the platform driver registry.
224    pub fn driver_registry(mut self, driver_registry: DriverRegistry) -> Self {
225        *self.platform_definition.driver_registry_mut() = driver_registry;
226        self
227    }
228
229    /// Register the built-in `llmsim` driver for deterministic local execution.
230    pub fn llm_sim(mut self, config: LlmSimConfig) -> Self {
231        self.llm_sim_config = Some(config);
232        self
233    }
234
235    /// Set the runtime default model used when sessions/agents do not override it.
236    pub fn default_model(mut self, model: ModelWithProvider) -> Self {
237        self.default_model = Some(model);
238        self
239    }
240
241    /// Supply a custom backend bundle instead of the built-in in-memory stores.
242    pub fn backends(mut self, backends: RuntimeBackends) -> Self {
243        self.backends = Some(backends);
244        self
245    }
246
247    /// Supply host dependencies needed by the platform session filesystem factory.
248    pub fn session_file_system_factory_context(
249        mut self,
250        context: SessionFileSystemFactoryContext,
251    ) -> Self {
252        self.session_file_system_factory_context = context;
253        self
254    }
255
256    /// Seed a harness into the runtime store.
257    pub fn harness(mut self, harness: Harness) -> Self {
258        self.harnesses.push(harness);
259        self
260    }
261
262    /// Seed an agent into the runtime store.
263    pub fn agent(mut self, agent: Agent) -> Self {
264        self.agents.push(agent);
265        self
266    }
267
268    /// Seed a session into the runtime store.
269    pub fn session(mut self, session: Session) -> Self {
270        self.sessions.push(session);
271        self
272    }
273
274    /// Seed one harness, one agent, and one session with a compact sub-builder.
275    ///
276    /// The generated session id is exposed from the built runtime via
277    /// [`InProcessRuntime::default_session_id`].
278    pub fn single_session<F>(mut self, configure: F) -> Self
279    where
280        F: FnOnce(SingleSessionBuilder) -> SingleSessionBuilder,
281    {
282        let (harness, agent, session, session_id) =
283            configure(SingleSessionBuilder::default()).build();
284        self.harnesses.push(harness);
285        self.agents.push(agent);
286        self.sessions.push(session);
287        self.default_session_id = Some(session_id);
288        self
289    }
290
291    /// Seed an additional text file directly into a session workspace.
292    ///
293    /// This is applied after harness/agent/session `initial_files` are merged.
294    pub fn seed_text_file(
295        mut self,
296        session_id: SessionId,
297        path: impl Into<String>,
298        content: impl Into<String>,
299    ) -> Self {
300        self.seeded_files.push((
301            session_id,
302            InitialFile {
303                path: path.into(),
304                content: content.into(),
305                encoding: "text".to_string(),
306                is_readonly: false,
307            },
308        ));
309        self
310    }
311
312    /// Build the in-process runtime.
313    ///
314    /// Returns a configuration error when no default model is available after
315    /// applying explicit configuration and any requested `llmsim` setup.
316    pub async fn build(mut self) -> Result<InProcessRuntime> {
317        let backends = match self.backends.take() {
318            Some(backends) => backends,
319            None => RuntimeBackends::in_memory(),
320        };
321        let file_store = resolve_session_file_system(
322            &self.platform_definition,
323            self.session_file_system_factory_context.clone(),
324        )
325        .await?;
326
327        if let Some(config) = self.llm_sim_config.take() {
328            let driver = LlmSimDriver::new(config);
329            self.platform_definition
330                .driver_registry_mut()
331                .register(ProviderType::LlmSim, move |_api_key, _base_url| {
332                    Box::new(driver.clone())
333                });
334
335            if self.default_model.is_none() {
336                self.default_model = Some(ModelWithProvider {
337                    model: "llmsim-model".to_string(),
338                    provider_type: LlmProviderType::LlmSim,
339                    api_key: Some("fake-key".to_string()),
340                    base_url: None,
341                });
342            }
343        }
344
345        let default_model = self.default_model.ok_or_else(|| {
346            AgentLoopError::config(
347                "in-process runtime requires a default model; call \
348                 InProcessRuntimeBuilder::default_model(...) or \
349                 InProcessRuntimeBuilder::llm_sim(...)",
350            )
351        })?;
352
353        backends
354            .provider_store
355            .set_default_model(default_model)
356            .await?;
357
358        for harness in &self.harnesses {
359            backends.harness_store.add_harness(harness.clone()).await?;
360        }
361        for agent in &self.agents {
362            backends.agent_store.add_agent(agent.clone()).await?;
363        }
364        for session in &self.sessions {
365            backends.session_store.add_session(session.clone()).await?;
366        }
367
368        for session in &self.sessions {
369            seed_runtime_initial_files(
370                backends.harness_store.as_ref(),
371                backends.agent_store.as_ref(),
372                file_store.as_ref(),
373                session,
374            )
375            .await?;
376        }
377
378        for (session_id, file) in &self.seeded_files {
379            file_store.seed_initial_file(*session_id, file).await?;
380        }
381
382        let persisting_emitter =
383            PersistingEventEmitter::new(backends.event_bus.clone(), backends.message_store.clone());
384
385        Ok(InProcessRuntime {
386            platform_definition: Arc::new(self.platform_definition),
387            harness_store: backends.harness_store,
388            agent_store: backends.agent_store,
389            session_store: backends.session_store,
390            default_session_id: self.default_session_id,
391            message_store: backends.message_store,
392            provider_store: backends.provider_store,
393            event_bus: backends.event_bus,
394            persisting_emitter,
395            file_store,
396            storage_store: backends.storage_store,
397            memory_store: backends.memory_store,
398        })
399    }
400}
401
402async fn resolve_session_file_system(
403    platform_definition: &PlatformDefinition,
404    file_system_factory_context: SessionFileSystemFactoryContext,
405) -> Result<Arc<dyn SessionFileSystem>> {
406    let file_system_factory = platform_definition.session_file_system_factory();
407    if file_system_factory.is_disabled() {
408        Ok(Arc::new(InMemorySessionFileStore::new()))
409    } else {
410        Ok(file_system_factory
411            .create_session_file_system(file_system_factory_context)
412            .await?)
413    }
414}
415
416#[derive(Clone)]
417/// Public in-process runtime backed by either in-memory or custom stores.
418///
419/// This runtime is intended for embedders who want to execute Everruns
420/// harnesses inside their own process while controlling capabilities,
421/// harness definitions, and driver registrations directly in Rust.
422pub struct InProcessRuntime {
423    platform_definition: Arc<PlatformDefinition>,
424    harness_store: Arc<dyn RuntimeHarnessStore>,
425    agent_store: Arc<dyn RuntimeAgentStore>,
426    session_store: Arc<dyn RuntimeSessionStore>,
427    default_session_id: Option<SessionId>,
428    message_store: Arc<dyn RuntimeMessageStore>,
429    provider_store: Arc<dyn RuntimeProviderStore>,
430    event_bus: Arc<dyn EventBus>,
431    persisting_emitter: PersistingEventEmitter,
432    file_store: Arc<dyn SessionFileSystem>,
433    storage_store: Arc<dyn SessionStorageStore>,
434    memory_store: Arc<dyn MemoryStoreBackend>,
435}
436
437impl InProcessRuntime {
438    /// Create a builder for the in-process runtime.
439    pub fn builder() -> InProcessRuntimeBuilder {
440        InProcessRuntimeBuilder::new()
441    }
442
443    /// Return the default session id seeded by
444    /// [`InProcessRuntimeBuilder::single_session`], if one was configured.
445    pub fn default_session_id(&self) -> Option<SessionId> {
446        self.default_session_id
447    }
448
449    /// Execute one turn for an existing session.
450    ///
451    /// The input message is stored in the runtime history, an `input.message`
452    /// event is emitted, and the turn then executes the shared core
453    /// `input -> reason -> act` state machine.
454    pub async fn run_turn(
455        &self,
456        session_id: SessionId,
457        input: impl Into<InputMessage>,
458    ) -> Result<TurnResult> {
459        let session = self
460            .session_store
461            .get_session(session_id)
462            .await?
463            .ok_or_else(|| AgentLoopError::store(format!("session not found: {session_id}")))?;
464
465        // Input message is recorded directly (and emitted via the raw bus so
466        // that PersistingEventEmitter does not double-store it). All
467        // subsequent activity-emitted events flow through the persisting
468        // emitter the adapter hands out.
469        let input_message = self
470            .message_store
471            .add_input_message(session_id, input.into())
472            .await?;
473        self.event_bus
474            .emit(EventRequest::new(
475                session_id,
476                EventContext::empty(),
477                InputMessageData::new(input_message.clone()),
478            ))
479            .await?;
480
481        let assembled = self
482            .inspect_context_with_ids(session_id, session.harness_id, session.agent_id)
483            .await?;
484        let synthetic_agent_id = session
485            .agent_id
486            .unwrap_or_else(|| AgentId::from_uuid(session.id.uuid()));
487        let org_id = in_process_internal_org_id(&session.organization_id);
488        let mut state_machine = TurnStateMachine::new(
489            TurnContext::new(session_id, input_message.id, synthetic_agent_id, org_id),
490            assembled.runtime_agent.max_iterations,
491        );
492
493        let mut previous_response_id: Option<String> = None;
494        let mut last_reason_result: Option<everruns_core::ReasonResult> = None;
495
496        loop {
497            match state_machine.next_action() {
498                TurnAction::ExecuteInput => {
499                    let ctx = state_machine.context();
500                    let base_context =
501                        AtomContext::new(ctx.session_id, ctx.turn_id, ctx.input_message_id);
502                    execute_input_activity(
503                        self,
504                        org_id,
505                        InputAtomInput {
506                            context: base_context,
507                        },
508                    )
509                    .await?;
510                    state_machine.on_input_completed();
511                }
512                TurnAction::ExecuteReason => {
513                    let ctx = state_machine.context();
514                    let base_context =
515                        AtomContext::new(ctx.session_id, ctx.turn_id, ctx.input_message_id);
516                    let reason_result = execute_reason_activity(
517                        self,
518                        org_id,
519                        ReasonInput {
520                            context: base_context.next_exec(),
521                            harness_id: session.harness_id,
522                            agent_id: session.agent_id,
523                            org_id,
524                            mcp_tool_definitions: vec![],
525                            previous_response_id: previous_response_id.take(),
526                            iteration: state_machine.current_iteration() as u32 + 1,
527                        },
528                    )
529                    .await?;
530                    previous_response_id = reason_result.response_id.clone();
531                    state_machine.on_reason_completed(
532                        reason_result.text.clone(),
533                        reason_result.has_tool_calls,
534                        reason_result.tool_calls.len(),
535                        reason_result.success,
536                        reason_result.error.clone(),
537                        false,
538                    );
539                    if reason_result.has_tool_calls {
540                        last_reason_result = Some(reason_result);
541                    }
542                }
543                TurnAction::ExecuteAct => {
544                    let reason_result = last_reason_result
545                        .take()
546                        .expect("ExecuteAct requires a prior ReasonResult");
547                    let ctx = state_machine.context();
548                    let base_context =
549                        AtomContext::new(ctx.session_id, ctx.turn_id, ctx.input_message_id);
550                    execute_act_activity(
551                        self,
552                        ActInput {
553                            org_id: Some(org_id),
554                            context: base_context.next_exec(),
555                            harness_id: session.harness_id,
556                            agent_id: session.agent_id,
557                            tool_calls: reason_result.tool_calls,
558                            tool_definitions: reason_result.tool_definitions,
559                            locale: reason_result.locale,
560                            blueprint_id: None,
561                            network_access: reason_result.network_access,
562                        },
563                    )
564                    .await?;
565                    state_machine.on_act_completed();
566                }
567                TurnAction::Complete(outcome) => {
568                    let ctx = state_machine.context();
569                    let lifecycle =
570                        RuntimeSessionLifecycle::new(self.clone(), org_id, ctx.session_id);
571                    match &outcome {
572                        TurnOutcome::Success { iterations, .. }
573                        | TurnOutcome::MaxIterationsReached { iterations, .. } => {
574                            lifecycle
575                                .turn_completed(
576                                    ctx.turn_id,
577                                    ctx.input_message_id,
578                                    *iterations as u32,
579                                    None,
580                                    None,
581                                )
582                                .await;
583                        }
584                        TurnOutcome::Failed { error, .. } => {
585                            lifecycle
586                                .turn_failed(ctx.turn_id, ctx.input_message_id, error, None)
587                                .await;
588                        }
589                    }
590                    return Ok(TurnResult::from_outcome(outcome, ctx.turn_id));
591                }
592            }
593        }
594    }
595
596    pub async fn run_text_turn(
597        &self,
598        session_id: SessionId,
599        text: impl Into<String>,
600    ) -> Result<TurnResult> {
601        self.run_turn(session_id, InputMessage::user(text)).await
602    }
603
604    /// Load the current message history for a session.
605    pub async fn messages(&self, session_id: SessionId) -> Result<Vec<Message>> {
606        self.message_store.load(session_id).await
607    }
608
609    /// Read a file from the in-memory session filesystem.
610    pub async fn read_file(
611        &self,
612        session_id: SessionId,
613        path: &str,
614    ) -> Result<Option<SessionFile>> {
615        self.file_store.read_file(session_id, path).await
616    }
617
618    /// Assemble the current runtime context for a session without executing a turn.
619    pub async fn load_context(&self, session_id: SessionId) -> Result<AssembledTurnContext> {
620        let session = self
621            .session_store
622            .get_session(session_id)
623            .await?
624            .ok_or_else(|| AgentLoopError::store(format!("session not found: {session_id}")))?;
625        self.inspect_context_with_ids(session_id, session.harness_id, session.agent_id)
626            .await
627    }
628
629    /// Return all collected events from the runtime event bus.
630    ///
631    /// Event buses that do not retain events return an empty `Vec` (see
632    /// [`EventBus::collected_events`]).
633    pub async fn events(&self) -> Result<Vec<Event>> {
634        Ok(self.event_bus.collected_events().await)
635    }
636
637    /// Execute a system command declared by a registered capability.
638    ///
639    /// Looks up the first capability whose `commands()` includes the named
640    /// command (in capability-resolution order) and delegates to its
641    /// `execute_command`. Returns an error if no capability declares the
642    /// requested name. The coding-CLI example uses this for `/model`
643    /// (provided by `ModelSwitcherCapability`) so the dispatch path stays
644    /// inside the capability instead of the TUI's local `handle_command`
645    /// branches.
646    pub async fn execute_command(
647        &self,
648        session_id: SessionId,
649        request: everruns_core::command::ExecuteCommandRequest,
650    ) -> Result<everruns_core::command::CommandResult> {
651        let ctx = self.load_context(session_id).await?;
652        let registry = self.platform_definition.capability_registry();
653        let exec_ctx = everruns_core::command::CommandExecutionContext { session_id };
654        for config in &ctx.resolved_capability_configs {
655            let Some(capability) = registry.get(config.capability_id()) else {
656                continue;
657            };
658            if capability.commands().iter().any(|c| c.name == request.name) {
659                return capability.execute_command(&request, &exec_ctx).await;
660            }
661        }
662        Err(AgentLoopError::config(format!(
663            "no capability declares command /{}",
664            request.name
665        )))
666    }
667
668    /// List slash commands available for a session.
669    ///
670    /// Resolves the session's harness/agent capability chain and aggregates
671    /// commands declared via [`Capability::commands`], deduplicated by name
672    /// (first occurrence wins, matching the order of resolved capabilities).
673    /// This is the embedded equivalent of the server's
674    /// `GET /v1/sessions/{id}/commands` system-commands list — skill
675    /// commands are not included here because skills are discovered via the
676    /// platform filesystem rather than the capability registry.
677    pub async fn list_commands(
678        &self,
679        session_id: SessionId,
680    ) -> Result<Vec<everruns_core::command::CommandDescriptor>> {
681        let ctx = self.load_context(session_id).await?;
682        let registry = self.platform_definition.capability_registry();
683        let mut seen = std::collections::HashSet::new();
684        let mut commands = Vec::new();
685        for config in &ctx.resolved_capability_configs {
686            let Some(capability) = registry.get(config.capability_id()) else {
687                continue;
688            };
689            for command in capability.commands() {
690                if seen.insert(command.name.clone()) {
691                    commands.push(command);
692                }
693            }
694        }
695        Ok(commands)
696    }
697
698    async fn inspect_context_with_ids(
699        &self,
700        session_id: SessionId,
701        harness_id: everruns_core::HarnessId,
702        agent_id: Option<AgentId>,
703    ) -> Result<AssembledTurnContext> {
704        inspect_turn_context(
705            self.harness_store.as_ref(),
706            self.agent_store.as_ref(),
707            self.session_store.as_ref(),
708            self.message_store.as_ref(),
709            self.provider_store.as_ref(),
710            self.platform_definition.capability_registry(),
711            session_id,
712            harness_id,
713            agent_id,
714            &[],
715            Some(self.file_store.clone()),
716        )
717        .await
718    }
719}
720
721#[async_trait]
722impl RuntimeHostAdapter for InProcessRuntime {
723    async fn get_agent(&self, _org_id: i64, agent_id: AgentId) -> Result<Option<Agent>> {
724        self.agent_store.get_agent(agent_id).await
725    }
726
727    async fn get_harness(&self, _org_id: i64, harness_id: HarnessId) -> Result<Option<Harness>> {
728        let chain = self.harness_store.get_harness_chain(harness_id).await?;
729        Ok(chain.into_iter().last())
730    }
731
732    async fn set_session_status(
733        &self,
734        _org_id: i64,
735        session_id: SessionId,
736        _status: SessionStatus,
737    ) -> Result<Session> {
738        // The in-process runtime does not persist status. Lifecycle callers
739        // still emit their events; downstream consumers in-process don't
740        // observe session.status.
741        self.session_store
742            .get_session(session_id)
743            .await?
744            .ok_or_else(|| AgentLoopError::store(format!("session not found: {session_id}")))
745    }
746
747    async fn load_turn_context(
748        &self,
749        _org_id: i64,
750        session_id: SessionId,
751    ) -> Result<RuntimeHostTurnContext> {
752        let session = self
753            .session_store
754            .get_session(session_id)
755            .await?
756            .ok_or_else(|| AgentLoopError::store(format!("session not found: {session_id}")))?;
757        let agent = match session.agent_id {
758            Some(agent_id) => self.agent_store.get_agent(agent_id).await?,
759            None => None,
760        };
761        let messages = self.message_store.load(session_id).await?;
762        let model = self.provider_store.get_default_model().await?;
763        Ok(RuntimeHostTurnContext {
764            agent,
765            session,
766            messages,
767            model,
768            mcp_tool_definitions: vec![],
769        })
770    }
771
772    fn capability_registry(&self) -> CapabilityRegistry {
773        self.platform_definition.capability_registry().clone()
774    }
775
776    fn driver_registry(&self) -> DriverRegistry {
777        self.platform_definition.driver_registry().clone()
778    }
779
780    fn harness_store(&self, _org_id: i64) -> Arc<dyn HarnessStore> {
781        self.harness_store.clone()
782    }
783
784    fn agent_store(&self, _org_id: i64) -> Arc<dyn AgentStore> {
785        self.agent_store.clone()
786    }
787
788    fn session_store(&self, _org_id: i64) -> Arc<dyn SessionStore> {
789        self.session_store.clone()
790    }
791
792    fn session_mutator(&self, _org_id: i64) -> Arc<dyn SessionMutator> {
793        self.session_store.clone()
794    }
795
796    fn provider_store(&self, _org_id: i64) -> Arc<dyn LlmProviderStore> {
797        self.provider_store.clone()
798    }
799
800    fn message_store(&self) -> Arc<dyn MessageRetriever> {
801        self.message_store.clone()
802    }
803
804    fn event_emitter(&self) -> Arc<dyn EventEmitter> {
805        Arc::new(self.persisting_emitter.clone())
806    }
807
808    fn file_store(&self) -> Arc<dyn SessionFileSystem> {
809        self.file_store.clone()
810    }
811
812    fn storage_store(&self) -> Option<Arc<dyn SessionStorageStore>> {
813        Some(self.storage_store.clone())
814    }
815
816    fn utility_llm_service(&self) -> Option<Arc<dyn everruns_core::UtilityLlmService>> {
817        Some(self.platform_definition.utility_llm_service())
818    }
819
820    fn egress_service(&self) -> Option<Arc<dyn everruns_core::EgressService>> {
821        Some(self.platform_definition.egress_service())
822    }
823
824    fn memory_store(&self, _org_id: i64) -> Option<Arc<dyn MemoryStoreBackend>> {
825        Some(self.memory_store.clone())
826    }
827}
828
829#[derive(Clone)]
830struct PersistingEventEmitter {
831    inner: Arc<dyn EventBus>,
832    message_store: Arc<dyn RuntimeMessageStore>,
833}
834
835impl PersistingEventEmitter {
836    fn new(inner: Arc<dyn EventBus>, message_store: Arc<dyn RuntimeMessageStore>) -> Self {
837        Self {
838            inner,
839            message_store,
840        }
841    }
842}
843
844#[async_trait]
845impl EventEmitter for PersistingEventEmitter {
846    async fn emit(&self, request: EventRequest) -> Result<Event> {
847        let event = self.inner.emit(request.clone()).await?;
848        if let Some(message) = message_from_event(&event.data) {
849            self.message_store
850                .store_message(request.session_id, message)
851                .await?;
852        }
853        Ok(event)
854    }
855}
856
857fn effective_overlay(
858    harness_chain: &[Harness],
859    agent: Option<&Agent>,
860    session: &Session,
861) -> AgentConfigOverlay {
862    let harness_layers = harness_chain.iter().map(AgentConfigOverlay::from);
863    let agent_layers = agent.into_iter().map(AgentConfigOverlay::from);
864    AgentConfigOverlay::fold(
865        harness_layers
866            .chain(agent_layers)
867            .chain([AgentConfigOverlay::from(session)]),
868    )
869}
870
871async fn seed_runtime_initial_files(
872    harness_store: &dyn RuntimeHarnessStore,
873    agent_store: &dyn RuntimeAgentStore,
874    file_store: &dyn SessionFileSystem,
875    session: &Session,
876) -> Result<()> {
877    let harness_chain = harness_store.get_harness_chain(session.harness_id).await?;
878    if harness_chain.is_empty() {
879        return Err(AgentLoopError::store(format!(
880            "harness not found while seeding files: {}",
881            session.harness_id
882        )));
883    }
884    let agent = match session.agent_id {
885        Some(agent_id) => Some(
886            agent_store
887                .get_agent(agent_id)
888                .await?
889                .ok_or_else(|| AgentLoopError::store(format!("agent not found: {agent_id}")))?,
890        ),
891        None => None,
892    };
893    let overlay = effective_overlay(&harness_chain, agent.as_ref(), session);
894    for file in &overlay.initial_files {
895        file_store.seed_initial_file(session.id, file).await?;
896    }
897    Ok(())
898}
899
900fn message_from_event(data: &EventData) -> Option<Message> {
901    match data {
902        EventData::InputMessage(data) => Some(data.message.clone()),
903        EventData::OutputMessageCompleted(OutputMessageCompletedData { message, .. }) => {
904            Some(message.clone())
905        }
906        EventData::ToolCompleted(data) => Some(tool_completed_to_message(data.clone())),
907        _ => None,
908    }
909}
910
911fn tool_completed_to_message(data: ToolCompletedData) -> Message {
912    let mut images: Vec<ToolResultImage> = Vec::new();
913    let metadata = tool_result_metadata(&data);
914    let result = data.result.map(|parts| {
915        for part in &parts {
916            if let ContentPart::Image(img) = part
917                && let (Some(base64), Some(media_type)) = (&img.base64, &img.media_type)
918            {
919                images.push(ToolResultImage {
920                    base64: base64.clone(),
921                    media_type: media_type.clone(),
922                });
923            }
924        }
925
926        let text_parts: Vec<&ContentPart> = parts
927            .iter()
928            .filter(|part| matches!(part, ContentPart::Text(_)))
929            .collect();
930        if text_parts.len() == 1
931            && let ContentPart::Text(text) = text_parts[0]
932        {
933            return parse_structured_tool_result_text(&text.text);
934        }
935        if !text_parts.is_empty() {
936            serde_json::to_value(&text_parts).unwrap_or_default()
937        } else {
938            serde_json::Value::Null
939        }
940    });
941
942    let mut message = if images.is_empty() {
943        Message::tool_result(&data.tool_call_id, result, data.error)
944    } else {
945        Message::tool_result_with_images(&data.tool_call_id, result, images)
946    };
947    message.metadata = metadata;
948    message
949}
950
951fn tool_result_metadata(
952    data: &ToolCompletedData,
953) -> Option<std::collections::HashMap<String, serde_json::Value>> {
954    let mut metadata = std::collections::HashMap::new();
955    metadata.insert("tool_name".to_string(), serde_json::json!(data.tool_name));
956    if let Some(fingerprint) = &data.tool_call_fingerprint {
957        metadata.insert(
958            "tool_call_fingerprint".to_string(),
959            serde_json::json!(fingerprint),
960        );
961    }
962    if let Some(fingerprint) = &data.tool_result_fingerprint {
963        metadata.insert(
964            "tool_result_fingerprint".to_string(),
965            serde_json::json!(fingerprint),
966        );
967    }
968    (!metadata.is_empty()).then_some(metadata)
969}
970
971fn parse_structured_tool_result_text(text: &str) -> serde_json::Value {
972    let trimmed = text.trim_start();
973    if !trimmed.starts_with('{') && !trimmed.starts_with('[') {
974        return serde_json::Value::String(text.to_string());
975    }
976
977    match serde_json::from_str(text) {
978        Ok(value @ (serde_json::Value::Object(_) | serde_json::Value::Array(_))) => value,
979        _ => serde_json::Value::String(text.to_string()),
980    }
981}
982
983#[cfg(test)]
984mod tool_completed_replay_tests {
985    use super::*;
986
987    #[test]
988    fn tool_completed_replay_preserves_json_object_shape() {
989        let data = ToolCompletedData::success(
990            "call_read".to_string(),
991            "read_file".to_string(),
992            vec![ContentPart::text(
993                serde_json::json!({
994                    "path": "/workspace/src/lib.rs",
995                    "content": "1|fn main() {}"
996                })
997                .to_string(),
998            )],
999            Some(1),
1000        );
1001
1002        let message = tool_completed_to_message(data);
1003        let result = message
1004            .tool_result_content()
1005            .and_then(|content| content.result.as_ref())
1006            .expect("tool result should be present");
1007
1008        assert_eq!(result["path"], "/workspace/src/lib.rs");
1009        assert_eq!(result["content"], "1|fn main() {}");
1010    }
1011
1012    #[test]
1013    fn tool_completed_replay_keeps_scalar_json_as_text() {
1014        let data = ToolCompletedData::success(
1015            "call_scalar".to_string(),
1016            "custom_tool".to_string(),
1017            vec![ContentPart::text("123")],
1018            Some(1),
1019        );
1020
1021        let message = tool_completed_to_message(data);
1022        let result = message
1023            .tool_result_content()
1024            .and_then(|content| content.result.as_ref())
1025            .expect("tool result should be present");
1026
1027        assert_eq!(result, &serde_json::Value::String("123".to_string()));
1028    }
1029
1030    #[test]
1031    fn tool_completed_replay_preserves_fingerprints_as_metadata() {
1032        let data = ToolCompletedData::success(
1033            "call_read".to_string(),
1034            "read_file".to_string(),
1035            vec![ContentPart::text("{}")],
1036            Some(1),
1037        )
1038        .with_fingerprints("sha256:call".to_string(), "sha256:result".to_string());
1039
1040        let message = tool_completed_to_message(data);
1041        let metadata = message.metadata.expect("metadata should be present");
1042
1043        assert_eq!(metadata["tool_name"], "read_file");
1044        assert_eq!(metadata["tool_call_fingerprint"], "sha256:call");
1045        assert_eq!(metadata["tool_result_fingerprint"], "sha256:result");
1046    }
1047}
1048
1049#[cfg(test)]
1050mod org_id_mapping_tests {
1051    use super::*;
1052    use everruns_core::{DEFAULT_ORG_ID, DEFAULT_ORG_PUBLIC_ID, org_public_id_from_internal};
1053
1054    #[test]
1055    fn default_public_id_maps_to_default_org() {
1056        assert_eq!(
1057            in_process_internal_org_id(DEFAULT_ORG_PUBLIC_ID),
1058            DEFAULT_ORG_ID
1059        );
1060    }
1061
1062    #[test]
1063    fn invalid_public_id_does_not_fall_back_to_default() {
1064        for invalid in [
1065            "",
1066            "not-an-org",
1067            "org_short",
1068            "org_ZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZ",
1069            "ORG_00000000000000000000000000000001",
1070        ] {
1071            let mapped = in_process_internal_org_id(invalid);
1072            assert_ne!(mapped, everruns_core::DEFAULT_ORG_ID);
1073            assert!(
1074                mapped >= 2,
1075                "invalid input {invalid:?} should not map to default"
1076            );
1077        }
1078    }
1079
1080    #[test]
1081    fn zero_public_id_does_not_fall_back_to_default() {
1082        // org_public_id_from_internal never produces this; a hand-crafted
1083        // all-zeros id is treated as invalid (raw == 0).
1084        let mapped = in_process_internal_org_id("org_00000000000000000000000000000000");
1085        assert_ne!(mapped, everruns_core::DEFAULT_ORG_ID);
1086        assert!(mapped >= 2, "all-zero id should not map to default");
1087    }
1088
1089    #[test]
1090    fn synthetic_public_id_round_trips_with_internal_helper() {
1091        for internal in [1_i64, 2, 42, 1_000_000, i64::MAX - 1, i64::MAX] {
1092            let public = org_public_id_from_internal(internal);
1093            assert_eq!(
1094                in_process_internal_org_id(&public),
1095                internal,
1096                "round-trip failed for internal={internal}"
1097            );
1098        }
1099    }
1100
1101    #[test]
1102    fn distinct_synthetic_ids_map_to_distinct_internal_ids() {
1103        let a = org_public_id_from_internal(7);
1104        let b = org_public_id_from_internal(8);
1105        assert_ne!(a, b);
1106        assert_ne!(
1107            in_process_internal_org_id(&a),
1108            in_process_internal_org_id(&b)
1109        );
1110    }
1111
1112    #[test]
1113    fn high_entropy_uuid_style_id_hashes_into_reserved_range() {
1114        // First valid UUID-style id whose raw u128 exceeds i64::MAX
1115        // (top bit of the u128 set). It must hash to a positive i64 that
1116        // is neither 0 nor DEFAULT_ORG_ID.
1117        let high = "org_80000000000000000000000000000000";
1118        let mapped = in_process_internal_org_id(high);
1119        assert!(mapped >= 2, "mapped id {mapped} must be >= 2");
1120        assert_ne!(mapped, DEFAULT_ORG_ID);
1121
1122        // Mapping is deterministic.
1123        assert_eq!(mapped, in_process_internal_org_id(high));
1124    }
1125
1126    #[test]
1127    fn high_entropy_ids_are_isolated_from_each_other() {
1128        let a = in_process_internal_org_id("org_80000000000000000000000000000001");
1129        let b = in_process_internal_org_id("org_80000000000000000000000000000002");
1130        assert_ne!(a, b);
1131        assert_ne!(a, DEFAULT_ORG_ID);
1132        assert_ne!(b, DEFAULT_ORG_ID);
1133    }
1134
1135    #[test]
1136    fn hash_uses_stable_sha256_truncation() {
1137        // SHA-256 with fixed big-endian first-8-byte truncation gives a value
1138        // we can pin. If this assertion ever breaks, callers depending on
1139        // build-stable mapping must be re-audited.
1140        let mapped = in_process_internal_org_id("org_80000000000000000000000000000000");
1141        let expected = {
1142            let digest = sha2::Sha256::digest(b"org_80000000000000000000000000000000");
1143            let mut buf = [0u8; 8];
1144            buf.copy_from_slice(&digest[..8]);
1145            let raw = u64::from_be_bytes(buf);
1146            ((raw % ((i64::MAX - 1) as u64)) as i64) + 2
1147        };
1148        assert_eq!(mapped, expected);
1149    }
1150
1151    #[test]
1152    fn oversize_input_is_bounded_and_does_not_collide_silently() {
1153        // Inputs past HASH_INPUT_CAP_BYTES are truncated before hashing, so
1154        // two oversize strings that agree on the first cap bytes map to the
1155        // same internal id. We only assert the result stays in the safe
1156        // [2, i64::MAX] range and is not DEFAULT_ORG_ID — the cap exists to
1157        // bound work, not to widen the input space.
1158        let oversize = "x".repeat(super::HASH_INPUT_CAP_BYTES * 4);
1159        let mapped = in_process_internal_org_id(&oversize);
1160        assert!(mapped >= 2);
1161        assert_ne!(mapped, DEFAULT_ORG_ID);
1162    }
1163}