Skip to main content

everruns_runtime/
runtime.rs

1// In-process runtime builder and runner.
2// Decision: the public runtime is in-memory today, but uses the same core atoms
3// and capability resolution path as the durable worker so behavior stays close.
4
5use crate::backends::{
6    EventBus, RuntimeAgentStore, RuntimeBackends, RuntimeHarnessStore, RuntimeMessageStore,
7    RuntimeProviderStore, RuntimeSessionStore,
8};
9use crate::builders::SingleSessionBuilder;
10use crate::host::{
11    RuntimeHostAdapter, RuntimeHostTurnContext, RuntimeSessionLifecycle, execute_act_activity,
12    execute_input_activity, execute_reason_activity,
13};
14use crate::in_memory::{InMemorySessionFileStore, InMemorySessionFileSystemFactory};
15use async_trait::async_trait;
16use everruns_core::agent::Agent;
17use everruns_core::atoms::{ActInput, AtomContext, InputAtomInput, ReasonInput};
18use everruns_core::capabilities::{Capability, CapabilityRegistry};
19use everruns_core::config_layer::AgentConfigOverlay;
20use everruns_core::error::{AgentLoopError, Result};
21use everruns_core::events::{
22    Event, EventContext, EventData, EventRequest, InputMessageData, OutputMessageCompletedData,
23    ToolCompletedData,
24};
25use everruns_core::harness::Harness;
26use everruns_core::llm_driver_registry::{DriverId, DriverRegistry};
27use everruns_core::llmsim_driver::{LlmSimConfig, LlmSimDriver};
28use everruns_core::message::{ContentPart, Message};
29use everruns_core::platform_definition::PlatformDefinition;
30use everruns_core::plugins::{PluginFileSet, compile_plugin};
31use everruns_core::runtime_context::{AssembledTurnContext, inspect_turn_context};
32use everruns_core::session::{Session, SessionStatus};
33use everruns_core::session_file::{InitialFile, SessionFile};
34use everruns_core::tools::ToolResultImage;
35use everruns_core::traits::{
36    AgentStore, EventEmitter, HarnessStore, ProviderStore, ResolvedModel, SessionMutator,
37    SessionStorageStore, SessionStore, UserConnectionResolver,
38};
39use everruns_core::turn::{TurnAction, TurnContext, TurnOutcome, TurnStateMachine};
40use everruns_core::typed_id::{AgentId, HarnessId, OrgId, SessionId};
41use everruns_core::{
42    AgentCapabilityConfig, InputMessage, MessageRetriever, SessionFileSystem,
43    SessionFileSystemFactoryContext, plugin_capability_id,
44};
45use sha2::{Digest, Sha256};
46use std::path::Path;
47use std::sync::Arc;
48
49/// Cap on the input length hashed by [`hash_public_org_id`].
50///
51/// Legitimate org public ids are `org_<32hex>` (36 bytes). Bounding the
52/// hashed prefix keeps worst-case cost predictable when an attacker-controlled
53/// session carries an oversize string.
54const HASH_INPUT_CAP_BYTES: usize = 128;
55
56/// Derive an internal `i64` org id from the public `org_<32hex>` form on a
57/// [`Session`].
58///
59/// Round-trip with [`everruns_core::org_public_id_from_internal`]: when the
60/// public id was produced by that helper (i.e. the upper bits are zero and
61/// the value fits in a positive `i64`), this returns the original internal
62/// id unchanged. Other values are mapped into `[2, i64::MAX]` by hashing the
63/// original public id string so runtime namespaces do not fail open to the
64/// shared default org and avoid arithmetic collision gadgets.
65fn in_process_internal_org_id(public_org_id: &str) -> i64 {
66    if public_org_id == everruns_core::DEFAULT_ORG_PUBLIC_ID {
67        return everruns_core::DEFAULT_ORG_ID;
68    }
69
70    let Ok(parsed) = public_org_id.parse::<OrgId>() else {
71        return hash_public_org_id(public_org_id);
72    };
73    let raw: u128 = parsed.uuid().as_u128();
74    if raw == 0 {
75        return hash_public_org_id(public_org_id);
76    }
77
78    // Synthetic ids from `org_public_id_from_internal(i64)` always fit here,
79    // so the in-process runtime sees the same `org_id` the server used.
80    if raw <= i64::MAX as u128 {
81        return raw as i64;
82    }
83
84    hash_public_org_id(public_org_id)
85}
86
87// Use SHA-256 with a fixed truncation scheme so the mapping is stable across
88// Rust/binary upgrades and predictable for any embedder. Input is bounded to
89// `HASH_INPUT_CAP_BYTES` so attacker-controlled oversize org strings cannot
90// drive unbounded hashing work.
91fn hash_public_org_id(public_org_id: &str) -> i64 {
92    let bytes = public_org_id.as_bytes();
93    let bounded = &bytes[..bytes.len().min(HASH_INPUT_CAP_BYTES)];
94    let digest = Sha256::digest(bounded);
95    let mut buf = [0u8; 8];
96    buf.copy_from_slice(&digest[..8]);
97    let raw = u64::from_be_bytes(buf);
98    ((raw % ((i64::MAX - 1) as u64)) as i64) + 2
99}
100
101#[derive(Debug, Clone)]
102pub struct TurnResult {
103    /// Final text response produced by the turn.
104    pub response: String,
105    /// Number of reason iterations executed.
106    pub iterations: usize,
107    /// Total number of tool calls executed during the turn.
108    pub tool_calls_count: usize,
109    /// Whether the turn completed without an unrecoverable failure.
110    pub success: bool,
111    /// Failure message when `success` is false.
112    pub error: Option<String>,
113    /// Turn identifier used to correlate emitted events.
114    pub turn_id: everruns_core::typed_id::TurnId,
115}
116
117impl TurnResult {
118    fn from_outcome(outcome: TurnOutcome, turn_id: everruns_core::typed_id::TurnId) -> Self {
119        match outcome {
120            TurnOutcome::Success {
121                response,
122                iterations,
123                tool_calls_count,
124            } => Self {
125                response,
126                iterations,
127                tool_calls_count,
128                success: true,
129                error: None,
130                turn_id,
131            },
132            TurnOutcome::Failed { error, iterations } => Self {
133                response: String::new(),
134                iterations,
135                tool_calls_count: 0,
136                success: false,
137                error: Some(error),
138                turn_id,
139            },
140            TurnOutcome::MaxIterationsReached {
141                response,
142                iterations,
143                tool_calls_count,
144            } => Self {
145                response,
146                iterations,
147                tool_calls_count,
148                success: true,
149                error: None,
150                turn_id,
151            },
152        }
153    }
154}
155
156/// Builder for the public in-process runtime.
157///
158/// The builder owns a standalone runtime bundle:
159/// - `PlatformDefinition` for capabilities and drivers
160/// - in-memory stores for sessions, files, storage, memory, and messages
161/// - seeded harness/agent/session entities
162///
163/// `build()` returns an [`InProcessRuntime`] that can execute turns in-process
164/// without the durable engine or the control-plane server.
165pub struct InProcessRuntimeBuilder {
166    platform_definition: PlatformDefinition,
167    llm_sim_config: Option<LlmSimConfig>,
168    default_model: Option<ResolvedModel>,
169    backends: Option<RuntimeBackends>,
170    session_file_system_factory_context: SessionFileSystemFactoryContext,
171    harnesses: Vec<Harness>,
172    agents: Vec<Agent>,
173    sessions: Vec<Session>,
174    default_session_id: Option<SessionId>,
175    seeded_files: Vec<(SessionId, InitialFile)>,
176    mcp_auth_provider: Option<Arc<dyn everruns_mcp::McpAuthProvider>>,
177    /// Hydrated capability configs for plugins loaded via [`Self::with_plugin_dir`].
178    ///
179    /// Keyed by `plugin:{name}`. Agents and harnesses reference these by the
180    /// same `plugin:{name}` capability ref; the hydrated config carries the
181    /// compiled `DeclarativeCapabilityDefinition` so no registry entry is needed.
182    plugin_capability_configs: Vec<AgentCapabilityConfig>,
183    /// Non-fatal warnings collected during plugin compilation.
184    plugin_warnings: Vec<String>,
185}
186
187impl Default for InProcessRuntimeBuilder {
188    fn default() -> Self {
189        Self::new()
190    }
191}
192
193impl InProcessRuntimeBuilder {
194    /// Create a builder with built-in capabilities and no implicit LLM driver.
195    ///
196    /// Embedders must either:
197    /// - call [`Self::llm_sim`] for deterministic local examples/tests, or
198    /// - register their own driver(s) on the platform definition and set a
199    ///   default model via [`Self::default_model`].
200    pub fn new() -> Self {
201        Self {
202            platform_definition: PlatformDefinition::builder()
203                .capability_registry(CapabilityRegistry::with_builtins())
204                .driver_registry(DriverRegistry::new())
205                .session_file_system_factory(Arc::new(InMemorySessionFileSystemFactory))
206                .build(),
207            llm_sim_config: None,
208            default_model: None,
209            backends: None,
210            session_file_system_factory_context: SessionFileSystemFactoryContext::new(),
211            harnesses: Vec::new(),
212            agents: Vec::new(),
213            sessions: Vec::new(),
214            default_session_id: None,
215            seeded_files: Vec::new(),
216            mcp_auth_provider: None,
217            plugin_capability_configs: Vec::new(),
218            plugin_warnings: Vec::new(),
219        }
220    }
221
222    /// Set the auth provider used to acquire credentials for scoped MCP
223    /// servers (specs/runtime-mcp.md D3). Defaults to no credentials, suitable
224    /// for unauthenticated servers or servers carrying literal auth headers.
225    pub fn mcp_auth_provider(mut self, provider: Arc<dyn everruns_mcp::McpAuthProvider>) -> Self {
226        self.mcp_auth_provider = Some(provider);
227        self
228    }
229
230    /// Replace the platform definition used by the runtime.
231    pub fn platform_definition(mut self, platform_definition: PlatformDefinition) -> Self {
232        self.platform_definition = platform_definition;
233        self
234    }
235
236    /// Register an additional capability on the runtime platform.
237    pub fn capability<C: Capability + 'static>(mut self, capability: C) -> Self {
238        self.platform_definition
239            .capability_registry_mut()
240            .register(capability);
241        self
242    }
243
244    /// Replace the platform driver registry.
245    pub fn driver_registry(mut self, driver_registry: DriverRegistry) -> Self {
246        *self.platform_definition.driver_registry_mut() = driver_registry;
247        self
248    }
249
250    /// Register the built-in `llmsim` driver for deterministic local execution.
251    pub fn llm_sim(mut self, config: LlmSimConfig) -> Self {
252        self.llm_sim_config = Some(config);
253        self
254    }
255
256    /// Set the runtime default model used when sessions/agents do not override it.
257    pub fn default_model(mut self, model: ResolvedModel) -> Self {
258        self.default_model = Some(model);
259        self
260    }
261
262    /// Supply a custom backend bundle instead of the built-in in-memory stores.
263    pub fn backends(mut self, backends: RuntimeBackends) -> Self {
264        self.backends = Some(backends);
265        self
266    }
267
268    /// Supply host dependencies needed by the platform session filesystem factory.
269    pub fn session_file_system_factory_context(
270        mut self,
271        context: SessionFileSystemFactoryContext,
272    ) -> Self {
273        self.session_file_system_factory_context = context;
274        self
275    }
276
277    /// Seed a harness into the runtime store.
278    pub fn harness(mut self, harness: Harness) -> Self {
279        self.harnesses.push(harness);
280        self
281    }
282
283    /// Seed an agent into the runtime store.
284    pub fn agent(mut self, agent: Agent) -> Self {
285        self.agents.push(agent);
286        self
287    }
288
289    /// Seed a session into the runtime store.
290    pub fn session(mut self, session: Session) -> Self {
291        self.sessions.push(session);
292        self
293    }
294
295    /// Seed one harness, one agent, and one session with a compact sub-builder.
296    ///
297    /// The generated session id is exposed from the built runtime via
298    /// [`InProcessRuntime::default_session_id`].
299    pub fn single_session<F>(mut self, configure: F) -> Self
300    where
301        F: FnOnce(SingleSessionBuilder) -> SingleSessionBuilder,
302    {
303        let (harness, agent, session, session_id) =
304            configure(SingleSessionBuilder::default()).build();
305        self.harnesses.push(harness);
306        self.agents.push(agent);
307        self.sessions.push(session);
308        self.default_session_id = Some(session_id);
309        self
310    }
311
312    /// Seed an additional text file directly into a session workspace.
313    ///
314    /// This is applied after harness/agent/session `initial_files` are merged.
315    pub fn seed_text_file(
316        mut self,
317        session_id: SessionId,
318        path: impl Into<String>,
319        content: impl Into<String>,
320    ) -> Self {
321        self.seeded_files.push((
322            session_id,
323            InitialFile {
324                path: path.into(),
325                content: content.into(),
326                encoding: "text".to_string(),
327                is_readonly: false,
328            },
329        ));
330        self
331    }
332
333    /// Load a plugin from a local directory and make it available as a
334    /// `plugin:{name}` capability.
335    ///
336    /// Reads the plugin directory via [`PluginFileSet::from_dir`] and compiles
337    /// it with [`compile_plugin`] at call time. A compilation failure is
338    /// surfaced immediately as a configuration error so the problem is visible
339    /// before the runtime is built. Non-fatal compilation warnings are logged
340    /// via `tracing::warn!` and also collected so they can be inspected on the
341    /// built runtime via [`InProcessRuntime::plugin_warnings`].
342    ///
343    /// After loading, agents and harnesses can reference the plugin by its
344    /// `plugin:{name}` capability ref. The hydrated config carries the compiled
345    /// `DeclarativeCapabilityDefinition`, which the core capability resolution
346    /// path recognises without a registry entry (same path as declarative
347    /// capabilities).
348    ///
349    /// When using [`Self::single_session`], call
350    /// [`SingleSessionBuilder::agent_plugin`] to add the capability ref to the
351    /// seeded agent, or use [`AgentBuilder::capability`] / `with_capability`
352    /// directly.
353    pub fn with_plugin_dir(mut self, path: &Path) -> Result<Self> {
354        let file_set = PluginFileSet::from_dir(path)
355            .map_err(|e| AgentLoopError::config(format!("plugin directory load failed: {e}")))?;
356        let compiled = compile_plugin(&file_set)
357            .map_err(|e| AgentLoopError::config(format!("plugin compilation failed: {e}")))?;
358
359        for warning in &compiled.warnings {
360            tracing::warn!(plugin = %compiled.definition.name, warning = %warning, "plugin compile warning");
361        }
362        self.plugin_warnings.extend(compiled.warnings);
363
364        let cap_id = plugin_capability_id(&compiled.definition.name);
365        let hydrated_config = serde_json::to_value(&compiled.definition)
366            .unwrap_or(serde_json::Value::Object(serde_json::Map::new()));
367        self.plugin_capability_configs
368            .push(AgentCapabilityConfig::with_config(cap_id, hydrated_config));
369
370        Ok(self)
371    }
372
373    /// Return a hydrated `AgentCapabilityConfig` for a previously loaded plugin.
374    ///
375    /// Returns `None` when no plugin with that name was loaded via
376    /// [`Self::with_plugin_dir`]. Primarily used by callers that need the
377    /// hydrated config to seed it onto a harness or agent before building.
378    pub fn plugin_capability(&self, name: &str) -> Option<AgentCapabilityConfig> {
379        let cap_id = plugin_capability_id(name);
380        self.plugin_capability_configs
381            .iter()
382            .find(|c| c.capability_id() == cap_id)
383            .cloned()
384    }
385
386    /// Build the in-process runtime.
387    ///
388    /// Returns a configuration error when no default model is available after
389    /// applying explicit configuration and any requested `llmsim` setup.
390    pub async fn build(mut self) -> Result<InProcessRuntime> {
391        let backends = match self.backends.take() {
392            Some(backends) => backends,
393            None => RuntimeBackends::in_memory(),
394        };
395        let file_store = resolve_session_file_system(
396            &self.platform_definition,
397            self.session_file_system_factory_context.clone(),
398        )
399        .await?;
400
401        if let Some(config) = self.llm_sim_config.take() {
402            let driver = LlmSimDriver::new(config);
403            // Replace intentionally: the platform may already register a
404            // built-in LlmSim driver, and the builder's config takes precedence.
405            self.platform_definition
406                .driver_registry_mut()
407                .register_or_replace(DriverId::LlmSim, move |_config| Box::new(driver.clone()));
408
409            if self.default_model.is_none() {
410                self.default_model = Some(ResolvedModel {
411                    model: "llmsim-model".to_string(),
412                    provider_type: DriverId::LlmSim,
413                    api_key: Some("fake-key".to_string()),
414                    base_url: None,
415                    provider_metadata: None,
416                });
417            }
418        }
419
420        let default_model = self.default_model.ok_or_else(|| {
421            AgentLoopError::config(
422                "in-process runtime requires a default model; call \
423                 InProcessRuntimeBuilder::default_model(...) or \
424                 InProcessRuntimeBuilder::llm_sim(...)",
425            )
426        })?;
427
428        backends
429            .provider_store
430            .set_default_model(default_model)
431            .await?;
432
433        // Hydrate bare plugin: refs in harnesses/agents/sessions with the
434        // compiled definition config so the capability resolution path can
435        // deserialise them without a registry entry (same path as declarative:).
436        for harness in &mut self.harnesses {
437            hydrate_plugin_refs(&mut harness.capabilities, &self.plugin_capability_configs);
438        }
439        for agent in &mut self.agents {
440            hydrate_plugin_refs(&mut agent.capabilities, &self.plugin_capability_configs);
441        }
442        for session in &mut self.sessions {
443            hydrate_plugin_refs(&mut session.capabilities, &self.plugin_capability_configs);
444        }
445
446        for harness in &self.harnesses {
447            backends.harness_store.add_harness(harness.clone()).await?;
448        }
449        for agent in &self.agents {
450            backends.agent_store.add_agent(agent.clone()).await?;
451        }
452        for session in &self.sessions {
453            backends.session_store.add_session(session.clone()).await?;
454        }
455
456        for session in &self.sessions {
457            seed_runtime_initial_files(
458                backends.harness_store.as_ref(),
459                backends.agent_store.as_ref(),
460                file_store.as_ref(),
461                session,
462            )
463            .await?;
464        }
465
466        for (session_id, file) in &self.seeded_files {
467            file_store.seed_initial_file(*session_id, file).await?;
468        }
469
470        let persisting_emitter =
471            PersistingEventEmitter::new(backends.event_bus.clone(), backends.message_store.clone());
472
473        Ok(InProcessRuntime {
474            platform_definition: Arc::new(self.platform_definition),
475            harness_store: backends.harness_store,
476            agent_store: backends.agent_store,
477            session_store: backends.session_store,
478            default_session_id: self.default_session_id,
479            message_store: backends.message_store,
480            provider_store: backends.provider_store,
481            event_bus: backends.event_bus,
482            persisting_emitter,
483            file_store,
484            storage_store: backends.storage_store,
485            connection_resolver: backends.connection_resolver,
486            mcp_auth_provider: self
487                .mcp_auth_provider
488                .unwrap_or_else(|| Arc::new(everruns_mcp::NoAuthProvider)),
489            mcp_discovery_cache: Arc::new(crate::mcp_cache::McpDiscoveryCache::new()),
490            plugin_warnings: self.plugin_warnings,
491        })
492    }
493}
494
495async fn resolve_session_file_system(
496    platform_definition: &PlatformDefinition,
497    file_system_factory_context: SessionFileSystemFactoryContext,
498) -> Result<Arc<dyn SessionFileSystem>> {
499    let file_system_factory = platform_definition.session_file_system_factory();
500    if file_system_factory.is_disabled() {
501        Ok(Arc::new(InMemorySessionFileStore::new()))
502    } else {
503        Ok(file_system_factory
504            .create_session_file_system(file_system_factory_context)
505            .await?)
506    }
507}
508
509#[derive(Clone)]
510/// Public in-process runtime backed by either in-memory or custom stores.
511///
512/// This runtime is intended for embedders who want to execute Everruns
513/// harnesses inside their own process while controlling capabilities,
514/// harness definitions, and driver registrations directly in Rust.
515pub struct InProcessRuntime {
516    platform_definition: Arc<PlatformDefinition>,
517    harness_store: Arc<dyn RuntimeHarnessStore>,
518    agent_store: Arc<dyn RuntimeAgentStore>,
519    session_store: Arc<dyn RuntimeSessionStore>,
520    default_session_id: Option<SessionId>,
521    message_store: Arc<dyn RuntimeMessageStore>,
522    provider_store: Arc<dyn RuntimeProviderStore>,
523    event_bus: Arc<dyn EventBus>,
524    persisting_emitter: PersistingEventEmitter,
525    file_store: Arc<dyn SessionFileSystem>,
526    storage_store: Arc<dyn SessionStorageStore>,
527    connection_resolver: Option<Arc<dyn UserConnectionResolver>>,
528    mcp_auth_provider: Arc<dyn everruns_mcp::McpAuthProvider>,
529    mcp_discovery_cache: Arc<crate::mcp_cache::McpDiscoveryCache>,
530    /// Non-fatal warnings collected during plugin compilation (see
531    /// [`InProcessRuntimeBuilder::with_plugin_dir`]).
532    plugin_warnings: Vec<String>,
533}
534
535impl InProcessRuntime {
536    /// Build the shared MCP client over the platform egress boundary and the
537    /// configured auth provider.
538    fn mcp_client(&self) -> Arc<everruns_mcp::McpClient> {
539        Arc::new(everruns_mcp::McpClient::new(
540            self.platform_definition.egress_service(),
541            self.mcp_auth_provider.clone(),
542        ))
543    }
544
545    /// Resolve the effective scoped MCP servers for a session.
546    async fn session_mcp_servers(
547        &self,
548        session: &Session,
549        agent: Option<&Agent>,
550    ) -> everruns_core::ScopedMcpServers {
551        let harness_chain = self
552            .harness_store
553            .get_harness_chain(session.harness_id)
554            .await
555            .unwrap_or_default();
556        crate::mcp::merge_session_scoped_servers(&harness_chain, agent, session)
557    }
558    /// Create a builder for the in-process runtime.
559    pub fn builder() -> InProcessRuntimeBuilder {
560        InProcessRuntimeBuilder::new()
561    }
562
563    /// Return the default session id seeded by
564    /// [`InProcessRuntimeBuilder::single_session`], if one was configured.
565    pub fn default_session_id(&self) -> Option<SessionId> {
566        self.default_session_id
567    }
568
569    /// Return non-fatal warnings collected during plugin compilation.
570    ///
571    /// Warnings are also emitted at `tracing::warn!` level when
572    /// [`InProcessRuntimeBuilder::with_plugin_dir`] is called.
573    pub fn plugin_warnings(&self) -> &[String] {
574        &self.plugin_warnings
575    }
576
577    /// Execute one turn for an existing session.
578    ///
579    /// The input message is stored in the runtime history, an `input.message`
580    /// event is emitted, and the turn then executes the shared core
581    /// `input -> reason -> act` state machine.
582    pub async fn run_turn(
583        &self,
584        session_id: SessionId,
585        input: impl Into<InputMessage>,
586    ) -> Result<TurnResult> {
587        let session = self
588            .session_store
589            .get_session(session_id)
590            .await?
591            .ok_or_else(|| AgentLoopError::store(format!("session not found: {session_id}")))?;
592
593        // Input message is recorded directly (and emitted via the raw bus so
594        // that PersistingEventEmitter does not double-store it). All
595        // subsequent activity-emitted events flow through the persisting
596        // emitter the adapter hands out.
597        let input_message = self
598            .message_store
599            .add_input_message(session_id, input.into())
600            .await?;
601        self.event_bus
602            .emit(EventRequest::new(
603                session_id,
604                EventContext::empty(),
605                InputMessageData::new(input_message.clone()),
606            ))
607            .await?;
608
609        let assembled = self
610            .inspect_context_with_ids(session_id, session.harness_id, session.agent_id)
611            .await?;
612        let synthetic_agent_id = session
613            .agent_id
614            .unwrap_or_else(|| AgentId::from_uuid(session.id.uuid()));
615        let org_id = in_process_internal_org_id(&session.organization_id);
616        let mut state_machine = TurnStateMachine::new(
617            TurnContext::new(session_id, input_message.id, synthetic_agent_id, org_id),
618            assembled.runtime_agent.max_iterations,
619        );
620
621        let mut previous_response_id: Option<String> = None;
622        let mut last_reason_result: Option<everruns_core::ReasonResult> = None;
623
624        loop {
625            match state_machine.next_action() {
626                TurnAction::ExecuteInput => {
627                    let ctx = state_machine.context();
628                    let base_context =
629                        AtomContext::new(ctx.session_id, ctx.turn_id, ctx.input_message_id)
630                            .with_workspace_id(session.workspace_id);
631                    execute_input_activity(
632                        self,
633                        org_id,
634                        InputAtomInput {
635                            context: base_context,
636                        },
637                    )
638                    .await?;
639                    state_machine.on_input_completed();
640                }
641                TurnAction::ExecuteReason => {
642                    let ctx = state_machine.context();
643                    let base_context =
644                        AtomContext::new(ctx.session_id, ctx.turn_id, ctx.input_message_id)
645                            .with_workspace_id(session.workspace_id);
646                    let reason_result = execute_reason_activity(
647                        self,
648                        org_id,
649                        ReasonInput {
650                            context: base_context.next_exec(),
651                            harness_id: session.harness_id,
652                            agent_id: session.agent_id,
653                            org_id,
654                            mcp_tool_definitions: vec![],
655                            previous_response_id: previous_response_id.take(),
656                            iteration: state_machine.current_iteration() as u32 + 1,
657                        },
658                    )
659                    .await?;
660                    previous_response_id = reason_result.response_id.clone();
661                    state_machine.on_reason_completed(
662                        reason_result.text.clone(),
663                        reason_result.has_tool_calls,
664                        reason_result.tool_calls.len(),
665                        reason_result.success,
666                        reason_result.error.clone(),
667                        false,
668                    );
669                    if reason_result.has_tool_calls {
670                        last_reason_result = Some(reason_result);
671                    }
672                }
673                TurnAction::ExecuteAct => {
674                    let reason_result = last_reason_result
675                        .take()
676                        .expect("ExecuteAct requires a prior ReasonResult");
677                    let ctx = state_machine.context();
678                    let base_context =
679                        AtomContext::new(ctx.session_id, ctx.turn_id, ctx.input_message_id)
680                            .with_workspace_id(session.workspace_id);
681                    execute_act_activity(
682                        self,
683                        ActInput {
684                            org_id: Some(org_id),
685                            context: base_context.next_exec(),
686                            harness_id: session.harness_id,
687                            agent_id: session.agent_id,
688                            tool_calls: reason_result.tool_calls,
689                            tool_definitions: reason_result.tool_definitions,
690                            locale: reason_result.locale,
691                            blueprint_id: None,
692                            network_access: reason_result.network_access,
693                            // Request-level `parallel_tool_calls` is not yet
694                            // plumbed into the reason path; the act scheduler
695                            // defaults to its class-aware concurrent schedule.
696                            parallel_tool_calls: None,
697                        },
698                    )
699                    .await?;
700                    state_machine.on_act_completed();
701                }
702                TurnAction::Complete(outcome) => {
703                    let ctx = state_machine.context();
704                    let lifecycle =
705                        RuntimeSessionLifecycle::new(self.clone(), org_id, ctx.session_id);
706                    let turn_succeeded = matches!(
707                        &outcome,
708                        TurnOutcome::Success { .. } | TurnOutcome::MaxIterationsReached { .. }
709                    );
710                    match &outcome {
711                        TurnOutcome::Success { iterations, .. }
712                        | TurnOutcome::MaxIterationsReached { iterations, .. } => {
713                            lifecycle
714                                .turn_completed(
715                                    ctx.turn_id,
716                                    ctx.input_message_id,
717                                    *iterations as u32,
718                                    None,
719                                    None,
720                                )
721                                .await;
722                        }
723                        TurnOutcome::Failed { error, .. } => {
724                            lifecycle
725                                .turn_failed(ctx.turn_id, ctx.input_message_id, error, None)
726                                .await;
727                        }
728                    }
729                    // turn_end lifecycle hooks (advisory). Fired after the
730                    // terminal turn event for both success and failure.
731                    lifecycle
732                        .fire_turn_end_hooks(
733                            session.harness_id,
734                            session.agent_id,
735                            ctx.turn_id,
736                            turn_succeeded,
737                        )
738                        .await;
739                    return Ok(TurnResult::from_outcome(outcome, ctx.turn_id));
740                }
741            }
742        }
743    }
744
745    pub async fn run_text_turn(
746        &self,
747        session_id: SessionId,
748        text: impl Into<String>,
749    ) -> Result<TurnResult> {
750        self.run_turn(session_id, InputMessage::user(text)).await
751    }
752
753    /// Load the current message history for a session.
754    pub async fn messages(&self, session_id: SessionId) -> Result<Vec<Message>> {
755        self.message_store.load(session_id).await
756    }
757
758    /// Read a file from the in-memory session filesystem.
759    pub async fn read_file(
760        &self,
761        session_id: SessionId,
762        path: &str,
763    ) -> Result<Option<SessionFile>> {
764        self.file_store.read_file(session_id, path).await
765    }
766
767    /// Assemble the current runtime context for a session without executing a turn.
768    pub async fn load_context(&self, session_id: SessionId) -> Result<AssembledTurnContext> {
769        let session = self
770            .session_store
771            .get_session(session_id)
772            .await?
773            .ok_or_else(|| AgentLoopError::store(format!("session not found: {session_id}")))?;
774        self.inspect_context_with_ids(session_id, session.harness_id, session.agent_id)
775            .await
776    }
777
778    /// Return all collected events from the runtime event bus.
779    ///
780    /// Event buses that do not retain events return an empty `Vec` (see
781    /// [`EventBus::collected_events`]).
782    pub async fn events(&self) -> Result<Vec<Event>> {
783        Ok(self.event_bus.collected_events().await)
784    }
785
786    /// Execute a system command declared by a registered capability.
787    ///
788    /// Looks up the first capability whose `commands()` includes the named
789    /// command (in capability-resolution order) and delegates to its
790    /// `execute_command`. Returns an error if no capability declares the
791    /// requested name. The coding-CLI example uses this for `/model`
792    /// (provided by `ModelSwitcherCapability`) so the dispatch path stays
793    /// inside the capability instead of the TUI's local `handle_command`
794    /// branches.
795    pub async fn execute_command(
796        &self,
797        session_id: SessionId,
798        request: everruns_core::command::ExecuteCommandRequest,
799    ) -> Result<everruns_core::command::CommandResult> {
800        let ctx = self.load_context(session_id).await?;
801        let registry = self.platform_definition.capability_registry();
802        // Context-aware commands (e.g. /btw) get the same store-backed host
803        // facilities the server provides; the already-assembled context seeds
804        // the host so dispatch and execution assemble it once.
805        let host = everruns_core::command_host::StoreCommandHost::new(
806            session_id,
807            self.harness_store.clone(),
808            self.agent_store.clone(),
809            self.session_store.clone(),
810            self.message_store.clone(),
811            self.provider_store.clone(),
812            registry.clone(),
813            self.platform_definition.driver_registry().clone(),
814        )
815        .with_file_store(self.file_store.clone())
816        .with_assembled_context(ctx.clone());
817        let exec_ctx =
818            everruns_core::command::CommandExecutionContext::new(session_id, Arc::new(host));
819        for config in &ctx.resolved_capability_configs {
820            let Some(capability) = registry.get(config.capability_id()) else {
821                continue;
822            };
823            if capability.commands().iter().any(|c| c.name == request.name) {
824                return capability.execute_command(&request, &exec_ctx).await;
825            }
826        }
827        Err(AgentLoopError::config(format!(
828            "no capability declares command /{}",
829            request.name
830        )))
831    }
832
833    /// List slash commands available for a session.
834    ///
835    /// Resolves the session's harness/agent capability chain and aggregates
836    /// commands declared via [`Capability::commands`], deduplicated by name
837    /// (first occurrence wins, matching the order of resolved capabilities).
838    /// This is the embedded equivalent of the server's
839    /// `GET /v1/sessions/{id}/commands` system-commands list — skill
840    /// commands are not included here because skills are discovered via the
841    /// platform filesystem rather than the capability registry.
842    pub async fn list_commands(
843        &self,
844        session_id: SessionId,
845    ) -> Result<Vec<everruns_core::command::CommandDescriptor>> {
846        let ctx = self.load_context(session_id).await?;
847        let registry = self.platform_definition.capability_registry();
848        let mut seen = std::collections::HashSet::new();
849        let mut commands = Vec::new();
850        for config in &ctx.resolved_capability_configs {
851            let Some(capability) = registry.get(config.capability_id()) else {
852                continue;
853            };
854            for command in capability.commands() {
855                if seen.insert(command.name.clone()) {
856                    commands.push(command);
857                }
858            }
859        }
860        Ok(commands)
861    }
862
863    async fn inspect_context_with_ids(
864        &self,
865        session_id: SessionId,
866        harness_id: everruns_core::HarnessId,
867        agent_id: Option<AgentId>,
868    ) -> Result<AssembledTurnContext> {
869        inspect_turn_context(
870            self.harness_store.as_ref(),
871            self.agent_store.as_ref(),
872            self.session_store.as_ref(),
873            self.message_store.as_ref(),
874            self.provider_store.as_ref(),
875            self.platform_definition.capability_registry(),
876            session_id,
877            harness_id,
878            agent_id,
879            &[],
880            Some(self.file_store.clone()),
881        )
882        .await
883    }
884}
885
886#[async_trait]
887impl RuntimeHostAdapter for InProcessRuntime {
888    async fn get_agent(&self, _org_id: i64, agent_id: AgentId) -> Result<Option<Agent>> {
889        self.agent_store.get_agent(agent_id).await
890    }
891
892    async fn get_harness(&self, _org_id: i64, harness_id: HarnessId) -> Result<Option<Harness>> {
893        let chain = self.harness_store.get_harness_chain(harness_id).await?;
894        Ok(chain.into_iter().last())
895    }
896
897    async fn set_session_status(
898        &self,
899        _org_id: i64,
900        session_id: SessionId,
901        _status: SessionStatus,
902    ) -> Result<Session> {
903        // The in-process runtime does not persist status. Lifecycle callers
904        // still emit their events; downstream consumers in-process don't
905        // observe session.status.
906        self.session_store
907            .get_session(session_id)
908            .await?
909            .ok_or_else(|| AgentLoopError::store(format!("session not found: {session_id}")))
910    }
911
912    async fn load_turn_context(
913        &self,
914        _org_id: i64,
915        session_id: SessionId,
916    ) -> Result<RuntimeHostTurnContext> {
917        let session = self
918            .session_store
919            .get_session(session_id)
920            .await?
921            .ok_or_else(|| AgentLoopError::store(format!("session not found: {session_id}")))?;
922        let agent = match session.agent_id {
923            Some(agent_id) => self.agent_store.get_agent(agent_id).await?,
924            None => None,
925        };
926        let messages = self.message_store.load(session_id).await?;
927        let model = self.provider_store.get_default_model().await?;
928
929        // Discover tools from the session's scoped MCP servers so they appear
930        // to the LLM alongside built-in tools (specs/runtime-mcp.md D4).
931        let scoped_servers = self.session_mcp_servers(&session, agent.as_ref()).await;
932        let mcp_tool_definitions = if scoped_servers.is_empty() {
933            vec![]
934        } else {
935            crate::mcp::discover_tool_definitions(
936                &self.mcp_discovery_cache,
937                self.mcp_client(),
938                session_id.uuid(),
939                &scoped_servers,
940            )
941            .await
942        };
943
944        Ok(RuntimeHostTurnContext {
945            agent,
946            session,
947            messages,
948            model,
949            mcp_tool_definitions,
950        })
951    }
952
953    async fn mcp_executor(
954        &self,
955        _org_id: i64,
956        session_id: SessionId,
957    ) -> Option<Arc<everruns_mcp::McpExecutor>> {
958        let session = self.session_store.get_session(session_id).await.ok()??;
959        let agent = match session.agent_id {
960            Some(agent_id) => self.agent_store.get_agent(agent_id).await.ok().flatten(),
961            None => None,
962        };
963        let scoped_servers = self.session_mcp_servers(&session, agent.as_ref()).await;
964        crate::mcp::build_executor(self.mcp_client(), &scoped_servers)
965    }
966
967    fn capability_registry(&self) -> CapabilityRegistry {
968        self.platform_definition.capability_registry().clone()
969    }
970
971    fn driver_registry(&self) -> DriverRegistry {
972        self.platform_definition.driver_registry().clone()
973    }
974
975    fn harness_store(&self, _org_id: i64) -> Arc<dyn HarnessStore> {
976        self.harness_store.clone()
977    }
978
979    fn agent_store(&self, _org_id: i64) -> Arc<dyn AgentStore> {
980        self.agent_store.clone()
981    }
982
983    fn session_store(&self, _org_id: i64) -> Arc<dyn SessionStore> {
984        self.session_store.clone()
985    }
986
987    fn session_mutator(&self, _org_id: i64) -> Arc<dyn SessionMutator> {
988        self.session_store.clone()
989    }
990
991    fn provider_store(&self, _org_id: i64) -> Arc<dyn ProviderStore> {
992        self.provider_store.clone()
993    }
994
995    fn message_store(&self) -> Arc<dyn MessageRetriever> {
996        self.message_store.clone()
997    }
998
999    fn event_emitter(&self) -> Arc<dyn EventEmitter> {
1000        Arc::new(self.persisting_emitter.clone())
1001    }
1002
1003    fn file_store(&self) -> Arc<dyn SessionFileSystem> {
1004        self.file_store.clone()
1005    }
1006
1007    fn storage_store(&self) -> Option<Arc<dyn SessionStorageStore>> {
1008        Some(self.storage_store.clone())
1009    }
1010
1011    fn connection_resolver(&self) -> Option<Arc<dyn UserConnectionResolver>> {
1012        self.connection_resolver.clone()
1013    }
1014
1015    fn utility_llm_service(&self) -> Option<Arc<dyn everruns_core::UtilityLlmService>> {
1016        Some(self.platform_definition.utility_llm_service())
1017    }
1018
1019    fn egress_service(&self) -> Option<Arc<dyn everruns_core::EgressService>> {
1020        Some(self.platform_definition.egress_service())
1021    }
1022}
1023
1024#[derive(Clone)]
1025struct PersistingEventEmitter {
1026    inner: Arc<dyn EventBus>,
1027    message_store: Arc<dyn RuntimeMessageStore>,
1028}
1029
1030impl PersistingEventEmitter {
1031    fn new(inner: Arc<dyn EventBus>, message_store: Arc<dyn RuntimeMessageStore>) -> Self {
1032        Self {
1033            inner,
1034            message_store,
1035        }
1036    }
1037}
1038
1039#[async_trait]
1040impl EventEmitter for PersistingEventEmitter {
1041    async fn emit(&self, request: EventRequest) -> Result<Event> {
1042        let event = self.inner.emit(request.clone()).await?;
1043        if let Some(message) = message_from_event(&event.data) {
1044            self.message_store
1045                .store_message(request.session_id, message)
1046                .await?;
1047        }
1048        Ok(event)
1049    }
1050}
1051
1052fn effective_overlay(
1053    harness_chain: &[Harness],
1054    agent: Option<&Agent>,
1055    session: &Session,
1056) -> AgentConfigOverlay {
1057    let harness_layers = harness_chain.iter().map(AgentConfigOverlay::from);
1058    let agent_layers = agent.into_iter().map(AgentConfigOverlay::from);
1059    AgentConfigOverlay::fold(
1060        harness_layers
1061            .chain(agent_layers)
1062            .chain([AgentConfigOverlay::from(session)]),
1063    )
1064}
1065
1066/// Replace bare `plugin:{name}` refs (empty config) with the hydrated version
1067/// (config = serialised `DeclarativeCapabilityDefinition`) so that the core
1068/// capability resolution path can deserialise them without a registry entry.
1069///
1070/// Only replaces entries whose config is empty / `null`; entries that already
1071/// carry a non-empty config are left unchanged so explicit overrides are honoured.
1072fn hydrate_plugin_refs(
1073    capabilities: &mut [AgentCapabilityConfig],
1074    plugin_configs: &[AgentCapabilityConfig],
1075) {
1076    for cap in capabilities.iter_mut() {
1077        let cap_id = cap.capability_id();
1078        if !everruns_core::is_plugin_capability(cap_id) {
1079            continue;
1080        }
1081        // Only replace if the config is missing / empty so explicit overrides are honoured.
1082        let is_bare = cap.config.is_null()
1083            || cap
1084                .config
1085                .as_object()
1086                .map(|o| o.is_empty())
1087                .unwrap_or(false);
1088        if !is_bare {
1089            continue;
1090        }
1091        if let Some(hydrated) = plugin_configs.iter().find(|c| c.capability_id() == cap_id) {
1092            cap.config = hydrated.config.clone();
1093        }
1094    }
1095}
1096
1097async fn seed_runtime_initial_files(
1098    harness_store: &dyn RuntimeHarnessStore,
1099    agent_store: &dyn RuntimeAgentStore,
1100    file_store: &dyn SessionFileSystem,
1101    session: &Session,
1102) -> Result<()> {
1103    let harness_chain = harness_store.get_harness_chain(session.harness_id).await?;
1104    if harness_chain.is_empty() {
1105        return Err(AgentLoopError::store(format!(
1106            "harness not found while seeding files: {}",
1107            session.harness_id
1108        )));
1109    }
1110    let agent = match session.agent_id {
1111        Some(agent_id) => Some(
1112            agent_store
1113                .get_agent(agent_id)
1114                .await?
1115                .ok_or_else(|| AgentLoopError::store(format!("agent not found: {agent_id}")))?,
1116        ),
1117        None => None,
1118    };
1119    let overlay = effective_overlay(&harness_chain, agent.as_ref(), session);
1120    // Seed into the session's workspace (a shared workspace differs from the
1121    // session id; the default 1:1 case is equal).
1122    let seed_key = SessionId::from_uuid(session.workspace_id.uuid());
1123    for file in &overlay.initial_files {
1124        file_store.seed_initial_file(seed_key, file).await?;
1125    }
1126    Ok(())
1127}
1128
1129fn message_from_event(data: &EventData) -> Option<Message> {
1130    match data {
1131        EventData::InputMessage(data) => Some(data.message.clone()),
1132        EventData::OutputMessageCompleted(OutputMessageCompletedData { message, .. }) => {
1133            Some(message.clone())
1134        }
1135        EventData::ToolCompleted(data) => Some(tool_completed_to_message(data.clone())),
1136        _ => None,
1137    }
1138}
1139
1140fn tool_completed_to_message(data: ToolCompletedData) -> Message {
1141    let mut images: Vec<ToolResultImage> = Vec::new();
1142    let metadata = tool_result_metadata(&data);
1143    let result = data.result.map(|parts| {
1144        for part in &parts {
1145            if let ContentPart::Image(img) = part
1146                && let (Some(base64), Some(media_type)) = (&img.base64, &img.media_type)
1147            {
1148                images.push(ToolResultImage {
1149                    base64: base64.clone(),
1150                    media_type: media_type.clone(),
1151                });
1152            }
1153        }
1154
1155        let text_parts: Vec<&ContentPart> = parts
1156            .iter()
1157            .filter(|part| matches!(part, ContentPart::Text(_)))
1158            .collect();
1159        if text_parts.len() == 1
1160            && let ContentPart::Text(text) = text_parts[0]
1161        {
1162            return parse_structured_tool_result_text(&text.text);
1163        }
1164        if !text_parts.is_empty() {
1165            serde_json::to_value(&text_parts).unwrap_or_default()
1166        } else {
1167            serde_json::Value::Null
1168        }
1169    });
1170
1171    let mut message = if images.is_empty() {
1172        Message::tool_result(&data.tool_call_id, result, data.error)
1173    } else {
1174        Message::tool_result_with_images(&data.tool_call_id, result, images)
1175    };
1176    message.metadata = metadata;
1177    message
1178}
1179
1180fn tool_result_metadata(
1181    data: &ToolCompletedData,
1182) -> Option<std::collections::HashMap<String, serde_json::Value>> {
1183    let mut metadata = std::collections::HashMap::new();
1184    metadata.insert("tool_name".to_string(), serde_json::json!(data.tool_name));
1185    if let Some(fingerprint) = &data.tool_call_fingerprint {
1186        metadata.insert(
1187            "tool_call_fingerprint".to_string(),
1188            serde_json::json!(fingerprint),
1189        );
1190    }
1191    if let Some(fingerprint) = &data.tool_result_fingerprint {
1192        metadata.insert(
1193            "tool_result_fingerprint".to_string(),
1194            serde_json::json!(fingerprint),
1195        );
1196    }
1197    (!metadata.is_empty()).then_some(metadata)
1198}
1199
1200fn parse_structured_tool_result_text(text: &str) -> serde_json::Value {
1201    let trimmed = text.trim_start();
1202    if !trimmed.starts_with('{') && !trimmed.starts_with('[') {
1203        return serde_json::Value::String(text.to_string());
1204    }
1205
1206    match serde_json::from_str(text) {
1207        Ok(value @ (serde_json::Value::Object(_) | serde_json::Value::Array(_))) => value,
1208        _ => serde_json::Value::String(text.to_string()),
1209    }
1210}
1211
1212#[cfg(test)]
1213mod tool_completed_replay_tests {
1214    use super::*;
1215
1216    #[test]
1217    fn tool_completed_replay_preserves_json_object_shape() {
1218        let data = ToolCompletedData::success(
1219            "call_read".to_string(),
1220            "read_file".to_string(),
1221            vec![ContentPart::text(
1222                serde_json::json!({
1223                    "path": "/workspace/src/lib.rs",
1224                    "content": "1|fn main() {}"
1225                })
1226                .to_string(),
1227            )],
1228            Some(1),
1229        );
1230
1231        let message = tool_completed_to_message(data);
1232        let result = message
1233            .tool_result_content()
1234            .and_then(|content| content.result.as_ref())
1235            .expect("tool result should be present");
1236
1237        assert_eq!(result["path"], "/workspace/src/lib.rs");
1238        assert_eq!(result["content"], "1|fn main() {}");
1239    }
1240
1241    #[test]
1242    fn tool_completed_replay_keeps_scalar_json_as_text() {
1243        let data = ToolCompletedData::success(
1244            "call_scalar".to_string(),
1245            "custom_tool".to_string(),
1246            vec![ContentPart::text("123")],
1247            Some(1),
1248        );
1249
1250        let message = tool_completed_to_message(data);
1251        let result = message
1252            .tool_result_content()
1253            .and_then(|content| content.result.as_ref())
1254            .expect("tool result should be present");
1255
1256        assert_eq!(result, &serde_json::Value::String("123".to_string()));
1257    }
1258
1259    #[test]
1260    fn tool_completed_replay_preserves_fingerprints_as_metadata() {
1261        let data = ToolCompletedData::success(
1262            "call_read".to_string(),
1263            "read_file".to_string(),
1264            vec![ContentPart::text("{}")],
1265            Some(1),
1266        )
1267        .with_fingerprints("sha256:call".to_string(), "sha256:result".to_string());
1268
1269        let message = tool_completed_to_message(data);
1270        let metadata = message.metadata.expect("metadata should be present");
1271
1272        assert_eq!(metadata["tool_name"], "read_file");
1273        assert_eq!(metadata["tool_call_fingerprint"], "sha256:call");
1274        assert_eq!(metadata["tool_result_fingerprint"], "sha256:result");
1275    }
1276}
1277
1278#[cfg(test)]
1279mod org_id_mapping_tests {
1280    use super::*;
1281    use everruns_core::{DEFAULT_ORG_ID, DEFAULT_ORG_PUBLIC_ID, org_public_id_from_internal};
1282
1283    #[test]
1284    fn default_public_id_maps_to_default_org() {
1285        assert_eq!(
1286            in_process_internal_org_id(DEFAULT_ORG_PUBLIC_ID),
1287            DEFAULT_ORG_ID
1288        );
1289    }
1290
1291    #[test]
1292    fn invalid_public_id_does_not_fall_back_to_default() {
1293        for invalid in [
1294            "",
1295            "not-an-org",
1296            "org_short",
1297            "org_ZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZ",
1298            "ORG_00000000000000000000000000000001",
1299        ] {
1300            let mapped = in_process_internal_org_id(invalid);
1301            assert_ne!(mapped, everruns_core::DEFAULT_ORG_ID);
1302            assert!(
1303                mapped >= 2,
1304                "invalid input {invalid:?} should not map to default"
1305            );
1306        }
1307    }
1308
1309    #[test]
1310    fn zero_public_id_does_not_fall_back_to_default() {
1311        // org_public_id_from_internal never produces this; a hand-crafted
1312        // all-zeros id is treated as invalid (raw == 0).
1313        let mapped = in_process_internal_org_id("org_00000000000000000000000000000000");
1314        assert_ne!(mapped, everruns_core::DEFAULT_ORG_ID);
1315        assert!(mapped >= 2, "all-zero id should not map to default");
1316    }
1317
1318    #[test]
1319    fn synthetic_public_id_round_trips_with_internal_helper() {
1320        for internal in [1_i64, 2, 42, 1_000_000, i64::MAX - 1, i64::MAX] {
1321            let public = org_public_id_from_internal(internal);
1322            assert_eq!(
1323                in_process_internal_org_id(&public),
1324                internal,
1325                "round-trip failed for internal={internal}"
1326            );
1327        }
1328    }
1329
1330    #[test]
1331    fn distinct_synthetic_ids_map_to_distinct_internal_ids() {
1332        let a = org_public_id_from_internal(7);
1333        let b = org_public_id_from_internal(8);
1334        assert_ne!(a, b);
1335        assert_ne!(
1336            in_process_internal_org_id(&a),
1337            in_process_internal_org_id(&b)
1338        );
1339    }
1340
1341    #[test]
1342    fn high_entropy_uuid_style_id_hashes_into_reserved_range() {
1343        // First valid UUID-style id whose raw u128 exceeds i64::MAX
1344        // (top bit of the u128 set). It must hash to a positive i64 that
1345        // is neither 0 nor DEFAULT_ORG_ID.
1346        let high = "org_80000000000000000000000000000000";
1347        let mapped = in_process_internal_org_id(high);
1348        assert!(mapped >= 2, "mapped id {mapped} must be >= 2");
1349        assert_ne!(mapped, DEFAULT_ORG_ID);
1350
1351        // Mapping is deterministic.
1352        assert_eq!(mapped, in_process_internal_org_id(high));
1353    }
1354
1355    #[test]
1356    fn high_entropy_ids_are_isolated_from_each_other() {
1357        let a = in_process_internal_org_id("org_80000000000000000000000000000001");
1358        let b = in_process_internal_org_id("org_80000000000000000000000000000002");
1359        assert_ne!(a, b);
1360        assert_ne!(a, DEFAULT_ORG_ID);
1361        assert_ne!(b, DEFAULT_ORG_ID);
1362    }
1363
1364    #[test]
1365    fn hash_uses_stable_sha256_truncation() {
1366        // SHA-256 with fixed big-endian first-8-byte truncation gives a value
1367        // we can pin. If this assertion ever breaks, callers depending on
1368        // build-stable mapping must be re-audited.
1369        let mapped = in_process_internal_org_id("org_80000000000000000000000000000000");
1370        let expected = {
1371            let digest = sha2::Sha256::digest(b"org_80000000000000000000000000000000");
1372            let mut buf = [0u8; 8];
1373            buf.copy_from_slice(&digest[..8]);
1374            let raw = u64::from_be_bytes(buf);
1375            ((raw % ((i64::MAX - 1) as u64)) as i64) + 2
1376        };
1377        assert_eq!(mapped, expected);
1378    }
1379
1380    #[test]
1381    fn oversize_input_is_bounded_and_does_not_collide_silently() {
1382        // Inputs past HASH_INPUT_CAP_BYTES are truncated before hashing, so
1383        // two oversize strings that agree on the first cap bytes map to the
1384        // same internal id. We only assert the result stays in the safe
1385        // [2, i64::MAX] range and is not DEFAULT_ORG_ID — the cap exists to
1386        // bound work, not to widen the input space.
1387        let oversize = "x".repeat(super::HASH_INPUT_CAP_BYTES * 4);
1388        let mapped = in_process_internal_org_id(&oversize);
1389        assert!(mapped >= 2);
1390        assert_ne!(mapped, DEFAULT_ORG_ID);
1391    }
1392}