Skip to main content

everruns_runtime/
runtime.rs

1// In-process runtime builder and runner.
2// Decision: the public runtime is in-memory today, but uses the same core atoms
3// and capability resolution path as the durable worker so behavior stays close.
4
5use crate::backends::{
6    EventBus, RuntimeAgentStore, RuntimeBackends, RuntimeHarnessStore, RuntimeMessageStore,
7    RuntimeProviderStore, RuntimeSessionStore,
8};
9use crate::builders::SingleSessionBuilder;
10use crate::host::{
11    RuntimeHostAdapter, RuntimeHostTurnContext, RuntimeSessionLifecycle, execute_act_activity,
12    execute_input_activity, execute_reason_activity,
13};
14use crate::in_memory::{InMemorySessionFileStore, InMemorySessionFileSystemFactory};
15use async_trait::async_trait;
16use everruns_core::agent::Agent;
17use everruns_core::atoms::{ActInput, AtomContext, InputAtomInput, ReasonInput};
18use everruns_core::capabilities::{Capability, CapabilityRegistry};
19use everruns_core::config_layer::AgentConfigOverlay;
20use everruns_core::driver_registry::{DriverId, DriverRegistry};
21use everruns_core::error::{AgentLoopError, Result};
22use everruns_core::events::{
23    Event, EventContext, EventData, EventRequest, InputMessageData, OutputMessageCompletedData,
24    ToolCompletedData,
25};
26use everruns_core::harness::Harness;
27use everruns_core::llmsim_driver::{LlmSimConfig, LlmSimDriver};
28use everruns_core::message::{ContentPart, Message};
29use everruns_core::platform_definition::PlatformDefinition;
30use everruns_core::plugins::{PluginFileSet, compile_plugin};
31use everruns_core::runtime_context::{AssembledTurnContext, inspect_turn_context};
32use everruns_core::session::{Session, SessionStatus};
33use everruns_core::session_file::{InitialFile, SessionFile};
34use everruns_core::tools::ToolResultImage;
35use everruns_core::traits::{
36    AgentStore, EventEmitter, HarnessStore, ProviderStore, ResolvedModel, SessionMutator,
37    SessionStorageStore, SessionStore, UserConnectionResolver,
38};
39use everruns_core::turn::{TurnAction, TurnContext, TurnOutcome, TurnStateMachine};
40use everruns_core::typed_id::{AgentId, HarnessId, OrgId, SessionId};
41use everruns_core::{
42    AgentCapabilityConfig, InputMessage, MessageRetriever, SessionFileSystem,
43    SessionFileSystemFactoryContext, plugin_capability_id,
44};
45use sha2::{Digest, Sha256};
46use std::path::Path;
47use std::sync::Arc;
48
49/// Cap on the input length hashed by [`hash_public_org_id`].
50///
51/// Legitimate org public ids are `org_<32hex>` (36 bytes). Bounding the
52/// hashed prefix keeps worst-case cost predictable when an attacker-controlled
53/// session carries an oversize string.
54const HASH_INPUT_CAP_BYTES: usize = 128;
55
56/// Derive an internal `i64` org id from the public `org_<32hex>` form on a
57/// [`Session`].
58///
59/// Round-trip with [`everruns_core::org_public_id_from_internal`]: when the
60/// public id was produced by that helper (i.e. the upper bits are zero and
61/// the value fits in a positive `i64`), this returns the original internal
62/// id unchanged. Other values are mapped into `[2, i64::MAX]` by hashing the
63/// original public id string so runtime namespaces do not fail open to the
64/// shared default org and avoid arithmetic collision gadgets.
65///
66/// Exposed so embedders (e.g. `everruns-local`) can scope per-org stores to the
67/// same internal id the act path resolves a session's org to.
68pub fn in_process_internal_org_id(public_org_id: &str) -> i64 {
69    if public_org_id == everruns_core::DEFAULT_ORG_PUBLIC_ID {
70        return everruns_core::DEFAULT_ORG_ID;
71    }
72
73    let Ok(parsed) = public_org_id.parse::<OrgId>() else {
74        return hash_public_org_id(public_org_id);
75    };
76    let raw: u128 = parsed.uuid().as_u128();
77    if raw == 0 {
78        return hash_public_org_id(public_org_id);
79    }
80
81    // Synthetic ids from `org_public_id_from_internal(i64)` always fit here,
82    // so the in-process runtime sees the same `org_id` the server used.
83    if raw <= i64::MAX as u128 {
84        return raw as i64;
85    }
86
87    hash_public_org_id(public_org_id)
88}
89
90// Use SHA-256 with a fixed truncation scheme so the mapping is stable across
91// Rust/binary upgrades and predictable for any embedder. Input is bounded to
92// `HASH_INPUT_CAP_BYTES` so attacker-controlled oversize org strings cannot
93// drive unbounded hashing work.
94fn hash_public_org_id(public_org_id: &str) -> i64 {
95    let bytes = public_org_id.as_bytes();
96    let bounded = &bytes[..bytes.len().min(HASH_INPUT_CAP_BYTES)];
97    let digest = Sha256::digest(bounded);
98    let mut buf = [0u8; 8];
99    buf.copy_from_slice(&digest[..8]);
100    let raw = u64::from_be_bytes(buf);
101    ((raw % ((i64::MAX - 1) as u64)) as i64) + 2
102}
103
104#[derive(Debug, Clone)]
105pub struct TurnResult {
106    /// Final text response produced by the turn.
107    pub response: String,
108    /// Number of reason iterations executed.
109    pub iterations: usize,
110    /// Total number of tool calls executed during the turn.
111    pub tool_calls_count: usize,
112    /// Whether the turn completed without an unrecoverable failure.
113    pub success: bool,
114    /// Failure message when `success` is false.
115    pub error: Option<String>,
116    /// Turn identifier used to correlate emitted events.
117    pub turn_id: everruns_core::typed_id::TurnId,
118}
119
120impl TurnResult {
121    fn from_outcome(outcome: TurnOutcome, turn_id: everruns_core::typed_id::TurnId) -> Self {
122        match outcome {
123            TurnOutcome::Success {
124                response,
125                iterations,
126                tool_calls_count,
127            } => Self {
128                response,
129                iterations,
130                tool_calls_count,
131                success: true,
132                error: None,
133                turn_id,
134            },
135            TurnOutcome::Failed { error, iterations } => Self {
136                response: String::new(),
137                iterations,
138                tool_calls_count: 0,
139                success: false,
140                error: Some(error),
141                turn_id,
142            },
143            TurnOutcome::MaxIterationsReached {
144                response,
145                iterations,
146                tool_calls_count,
147            } => Self {
148                response,
149                iterations,
150                tool_calls_count,
151                success: true,
152                error: None,
153                turn_id,
154            },
155            // A sealed turn was deliberately stopped (EVE-534) — distinct from a
156            // success. Report it as non-success carrying the seal reason.
157            TurnOutcome::Sealed {
158                reason,
159                response,
160                iterations,
161                tool_calls_count,
162            } => Self {
163                response,
164                iterations,
165                tool_calls_count,
166                success: false,
167                error: Some(format!("turn sealed: {reason}")),
168                turn_id,
169            },
170        }
171    }
172}
173
174/// Builder for the public in-process runtime.
175///
176/// The builder owns a standalone runtime bundle:
177/// - `PlatformDefinition` for capabilities and drivers
178/// - in-memory stores for sessions, files, storage, memory, and messages
179/// - seeded harness/agent/session entities
180///
181/// `build()` returns an [`InProcessRuntime`] that can execute turns in-process
182/// without the durable engine or the control-plane server.
183pub struct InProcessRuntimeBuilder {
184    platform_definition: PlatformDefinition,
185    llm_sim_config: Option<LlmSimConfig>,
186    default_model: Option<ResolvedModel>,
187    backends: Option<RuntimeBackends>,
188    session_file_system_factory_context: SessionFileSystemFactoryContext,
189    harnesses: Vec<Harness>,
190    agents: Vec<Agent>,
191    sessions: Vec<Session>,
192    default_session_id: Option<SessionId>,
193    seeded_files: Vec<(SessionId, InitialFile)>,
194    mcp_auth_provider: Option<Arc<dyn everruns_mcp::McpAuthProvider>>,
195    /// Hydrated capability configs for plugins loaded via [`Self::with_plugin_dir`].
196    ///
197    /// Keyed by `plugin:{name}`. Agents and harnesses reference these by the
198    /// same `plugin:{name}` capability ref; the hydrated config carries the
199    /// compiled `DeclarativeCapabilityDefinition` so no registry entry is needed.
200    plugin_capability_configs: Vec<AgentCapabilityConfig>,
201    /// Non-fatal warnings collected during plugin compilation.
202    plugin_warnings: Vec<String>,
203}
204
205impl Default for InProcessRuntimeBuilder {
206    fn default() -> Self {
207        Self::new()
208    }
209}
210
211impl InProcessRuntimeBuilder {
212    /// Create a builder with built-in capabilities and no implicit LLM driver.
213    ///
214    /// Embedders must either:
215    /// - call [`Self::llm_sim`] for deterministic local examples/tests, or
216    /// - register their own driver(s) on the platform definition and set a
217    ///   default model via [`Self::default_model`].
218    pub fn new() -> Self {
219        Self {
220            platform_definition: PlatformDefinition::builder()
221                .capability_registry(CapabilityRegistry::with_builtins())
222                .driver_registry(DriverRegistry::new())
223                .session_file_system_factory(Arc::new(InMemorySessionFileSystemFactory))
224                .build(),
225            llm_sim_config: None,
226            default_model: None,
227            backends: None,
228            session_file_system_factory_context: SessionFileSystemFactoryContext::new(),
229            harnesses: Vec::new(),
230            agents: Vec::new(),
231            sessions: Vec::new(),
232            default_session_id: None,
233            seeded_files: Vec::new(),
234            mcp_auth_provider: None,
235            plugin_capability_configs: Vec::new(),
236            plugin_warnings: Vec::new(),
237        }
238    }
239
240    /// Set the auth provider used to acquire credentials for scoped MCP
241    /// servers (specs/runtime-mcp.md D3). Defaults to no credentials, suitable
242    /// for unauthenticated servers or servers carrying literal auth headers.
243    pub fn mcp_auth_provider(mut self, provider: Arc<dyn everruns_mcp::McpAuthProvider>) -> Self {
244        self.mcp_auth_provider = Some(provider);
245        self
246    }
247
248    /// Replace the platform definition used by the runtime.
249    pub fn platform_definition(mut self, platform_definition: PlatformDefinition) -> Self {
250        self.platform_definition = platform_definition;
251        self
252    }
253
254    /// Register an additional capability on the runtime platform.
255    pub fn capability<C: Capability + 'static>(mut self, capability: C) -> Self {
256        self.platform_definition
257            .capability_registry_mut()
258            .register(capability);
259        self
260    }
261
262    /// Replace the platform driver registry.
263    pub fn driver_registry(mut self, driver_registry: DriverRegistry) -> Self {
264        *self.platform_definition.driver_registry_mut() = driver_registry;
265        self
266    }
267
268    /// Register the built-in `llmsim` driver for deterministic local execution.
269    pub fn llm_sim(mut self, config: LlmSimConfig) -> Self {
270        self.llm_sim_config = Some(config);
271        self
272    }
273
274    /// Set the runtime default model used when sessions/agents do not override it.
275    pub fn default_model(mut self, model: ResolvedModel) -> Self {
276        self.default_model = Some(model);
277        self
278    }
279
280    /// Supply a custom backend bundle instead of the built-in in-memory stores.
281    pub fn backends(mut self, backends: RuntimeBackends) -> Self {
282        self.backends = Some(backends);
283        self
284    }
285
286    /// Inject a session-task registry. Convenience over `backends(...)` that
287    /// initializes an in-memory backend bundle on first use, so embedders can
288    /// add the registry without assembling a full `RuntimeBackends`.
289    pub fn with_session_task_registry(
290        mut self,
291        registry: Arc<dyn everruns_core::session_task::SessionTaskRegistry>,
292    ) -> Self {
293        let backends = self
294            .backends
295            .take()
296            .unwrap_or_else(RuntimeBackends::in_memory);
297        self.backends = Some(backends.with_session_task_registry(registry));
298        self
299    }
300
301    /// Inject a per-org schedule store factory (see [`RuntimeBackends`]).
302    pub fn with_schedule_store_factory(
303        mut self,
304        factory: crate::backends::ScheduleStoreFactory,
305    ) -> Self {
306        let backends = self
307            .backends
308            .take()
309            .unwrap_or_else(RuntimeBackends::in_memory);
310        self.backends = Some(backends.with_schedule_store_factory(factory));
311        self
312    }
313
314    /// Inject a per-(org, session) platform store factory (see [`RuntimeBackends`]).
315    pub fn with_platform_store_factory(
316        mut self,
317        factory: crate::backends::PlatformStoreFactory,
318    ) -> Self {
319        let backends = self
320            .backends
321            .take()
322            .unwrap_or_else(RuntimeBackends::in_memory);
323        self.backends = Some(backends.with_platform_store_factory(factory));
324        self
325    }
326
327    /// Supply host dependencies needed by the platform session filesystem factory.
328    pub fn session_file_system_factory_context(
329        mut self,
330        context: SessionFileSystemFactoryContext,
331    ) -> Self {
332        self.session_file_system_factory_context = context;
333        self
334    }
335
336    /// Seed a harness into the runtime store.
337    pub fn harness(mut self, harness: Harness) -> Self {
338        self.harnesses.push(harness);
339        self
340    }
341
342    /// Seed an agent into the runtime store.
343    pub fn agent(mut self, agent: Agent) -> Self {
344        self.agents.push(agent);
345        self
346    }
347
348    /// Seed a session into the runtime store.
349    pub fn session(mut self, session: Session) -> Self {
350        self.sessions.push(session);
351        self
352    }
353
354    /// Seed one harness, one agent, and one session with a compact sub-builder.
355    ///
356    /// The generated session id is exposed from the built runtime via
357    /// [`InProcessRuntime::default_session_id`].
358    pub fn single_session<F>(mut self, configure: F) -> Self
359    where
360        F: FnOnce(SingleSessionBuilder) -> SingleSessionBuilder,
361    {
362        let (harness, agent, session, session_id) =
363            configure(SingleSessionBuilder::default()).build();
364        self.harnesses.push(harness);
365        self.agents.push(agent);
366        self.sessions.push(session);
367        self.default_session_id = Some(session_id);
368        self
369    }
370
371    /// Seed an additional text file directly into a session workspace.
372    ///
373    /// This is applied after harness/agent/session `initial_files` are merged.
374    pub fn seed_text_file(
375        mut self,
376        session_id: SessionId,
377        path: impl Into<String>,
378        content: impl Into<String>,
379    ) -> Self {
380        self.seeded_files.push((
381            session_id,
382            InitialFile {
383                path: path.into(),
384                content: content.into(),
385                encoding: "text".to_string(),
386                is_readonly: false,
387            },
388        ));
389        self
390    }
391
392    /// Load a plugin from a local directory and make it available as a
393    /// `plugin:{name}` capability.
394    ///
395    /// Reads the plugin directory via [`PluginFileSet::from_dir`] and compiles
396    /// it with [`compile_plugin`] at call time. A compilation failure is
397    /// surfaced immediately as a configuration error so the problem is visible
398    /// before the runtime is built. Non-fatal compilation warnings are logged
399    /// via `tracing::warn!` and also collected so they can be inspected on the
400    /// built runtime via [`InProcessRuntime::plugin_warnings`].
401    ///
402    /// After loading, agents and harnesses can reference the plugin by its
403    /// `plugin:{name}` capability ref. The hydrated config carries the compiled
404    /// `DeclarativeCapabilityDefinition`, which the core capability resolution
405    /// path recognises without a registry entry (same path as declarative
406    /// capabilities).
407    ///
408    /// When using [`Self::single_session`], call
409    /// [`SingleSessionBuilder::agent_plugin`] to add the capability ref to the
410    /// seeded agent, or use [`AgentBuilder::capability`] / `with_capability`
411    /// directly.
412    pub fn with_plugin_dir(mut self, path: &Path) -> Result<Self> {
413        let file_set = PluginFileSet::from_dir(path)
414            .map_err(|e| AgentLoopError::config(format!("plugin directory load failed: {e}")))?;
415        let compiled = compile_plugin(&file_set)
416            .map_err(|e| AgentLoopError::config(format!("plugin compilation failed: {e}")))?;
417
418        for warning in &compiled.warnings {
419            tracing::warn!(plugin = %compiled.definition.name, warning = %warning, "plugin compile warning");
420        }
421        self.plugin_warnings.extend(compiled.warnings);
422
423        let cap_id = plugin_capability_id(&compiled.definition.name);
424        let hydrated_config = serde_json::to_value(&compiled.definition)
425            .unwrap_or(serde_json::Value::Object(serde_json::Map::new()));
426        self.plugin_capability_configs
427            .push(AgentCapabilityConfig::with_config(cap_id, hydrated_config));
428
429        Ok(self)
430    }
431
432    /// Return a hydrated `AgentCapabilityConfig` for a previously loaded plugin.
433    ///
434    /// Returns `None` when no plugin with that name was loaded via
435    /// [`Self::with_plugin_dir`]. Primarily used by callers that need the
436    /// hydrated config to seed it onto a harness or agent before building.
437    pub fn plugin_capability(&self, name: &str) -> Option<AgentCapabilityConfig> {
438        let cap_id = plugin_capability_id(name);
439        self.plugin_capability_configs
440            .iter()
441            .find(|c| c.capability_id() == cap_id)
442            .cloned()
443    }
444
445    /// Build the in-process runtime.
446    ///
447    /// Returns a configuration error when no default model is available after
448    /// applying explicit configuration and any requested `llmsim` setup.
449    pub async fn build(mut self) -> Result<InProcessRuntime> {
450        let backends = match self.backends.take() {
451            Some(backends) => backends,
452            None => RuntimeBackends::in_memory(),
453        };
454        let file_store = resolve_session_file_system(
455            &self.platform_definition,
456            self.session_file_system_factory_context.clone(),
457        )
458        .await?;
459
460        if let Some(config) = self.llm_sim_config.take() {
461            let driver = LlmSimDriver::new(config);
462            // Replace intentionally: the platform may already register a
463            // built-in LlmSim driver, and the builder's config takes precedence.
464            self.platform_definition
465                .driver_registry_mut()
466                .register_or_replace(DriverId::LlmSim, move |_config| Box::new(driver.clone()));
467
468            if self.default_model.is_none() {
469                self.default_model = Some(ResolvedModel {
470                    model: "llmsim-model".to_string(),
471                    provider_type: DriverId::LlmSim,
472                    api_key: Some("fake-key".to_string()),
473                    base_url: None,
474                    provider_metadata: None,
475                });
476            }
477        }
478
479        let default_model = self.default_model.ok_or_else(|| {
480            AgentLoopError::config(
481                "in-process runtime requires a default model; call \
482                 InProcessRuntimeBuilder::default_model(...) or \
483                 InProcessRuntimeBuilder::llm_sim(...)",
484            )
485        })?;
486
487        backends
488            .provider_store
489            .set_default_model(default_model)
490            .await?;
491
492        // Hydrate bare plugin: refs in harnesses/agents/sessions with the
493        // compiled definition config so the capability resolution path can
494        // deserialise them without a registry entry (same path as declarative:).
495        for harness in &mut self.harnesses {
496            hydrate_plugin_refs(&mut harness.capabilities, &self.plugin_capability_configs);
497        }
498        for agent in &mut self.agents {
499            hydrate_plugin_refs(&mut agent.capabilities, &self.plugin_capability_configs);
500        }
501        for session in &mut self.sessions {
502            hydrate_plugin_refs(&mut session.capabilities, &self.plugin_capability_configs);
503        }
504
505        for harness in &self.harnesses {
506            backends.harness_store.add_harness(harness.clone()).await?;
507        }
508        for agent in &self.agents {
509            backends.agent_store.add_agent(agent.clone()).await?;
510        }
511        for session in &self.sessions {
512            backends.session_store.add_session(session.clone()).await?;
513        }
514
515        for session in &self.sessions {
516            seed_runtime_initial_files(
517                backends.harness_store.as_ref(),
518                backends.agent_store.as_ref(),
519                file_store.as_ref(),
520                session,
521            )
522            .await?;
523        }
524
525        for (session_id, file) in &self.seeded_files {
526            file_store.seed_initial_file(*session_id, file).await?;
527        }
528
529        let persisting_emitter =
530            PersistingEventEmitter::new(backends.event_bus.clone(), backends.message_store.clone());
531
532        Ok(InProcessRuntime {
533            platform_definition: Arc::new(self.platform_definition),
534            harness_store: backends.harness_store,
535            agent_store: backends.agent_store,
536            session_store: backends.session_store,
537            default_session_id: self.default_session_id,
538            message_store: backends.message_store,
539            provider_store: backends.provider_store,
540            event_bus: backends.event_bus,
541            persisting_emitter,
542            file_store,
543            storage_store: backends.storage_store,
544            connection_resolver: backends.connection_resolver,
545            session_task_registry: backends.session_task_registry,
546            schedule_store_factory: backends.schedule_store_factory,
547            platform_store_factory: backends.platform_store_factory,
548            mcp_auth_provider: self
549                .mcp_auth_provider
550                .unwrap_or_else(|| Arc::new(everruns_mcp::NoAuthProvider)),
551            mcp_discovery_cache: Arc::new(crate::mcp_cache::McpDiscoveryCache::new()),
552            plugin_warnings: self.plugin_warnings,
553        })
554    }
555}
556
557async fn resolve_session_file_system(
558    platform_definition: &PlatformDefinition,
559    file_system_factory_context: SessionFileSystemFactoryContext,
560) -> Result<Arc<dyn SessionFileSystem>> {
561    let file_system_factory = platform_definition.session_file_system_factory();
562    if file_system_factory.is_disabled() {
563        Ok(Arc::new(InMemorySessionFileStore::new()))
564    } else {
565        Ok(file_system_factory
566            .create_session_file_system(file_system_factory_context)
567            .await?)
568    }
569}
570
571#[derive(Clone)]
572/// Public in-process runtime backed by either in-memory or custom stores.
573///
574/// This runtime is intended for embedders who want to execute Everruns
575/// harnesses inside their own process while controlling capabilities,
576/// harness definitions, and driver registrations directly in Rust.
577pub struct InProcessRuntime {
578    platform_definition: Arc<PlatformDefinition>,
579    harness_store: Arc<dyn RuntimeHarnessStore>,
580    agent_store: Arc<dyn RuntimeAgentStore>,
581    session_store: Arc<dyn RuntimeSessionStore>,
582    default_session_id: Option<SessionId>,
583    message_store: Arc<dyn RuntimeMessageStore>,
584    provider_store: Arc<dyn RuntimeProviderStore>,
585    event_bus: Arc<dyn EventBus>,
586    persisting_emitter: PersistingEventEmitter,
587    file_store: Arc<dyn SessionFileSystem>,
588    storage_store: Arc<dyn SessionStorageStore>,
589    connection_resolver: Option<Arc<dyn UserConnectionResolver>>,
590    session_task_registry: Option<Arc<dyn everruns_core::session_task::SessionTaskRegistry>>,
591    schedule_store_factory: Option<crate::backends::ScheduleStoreFactory>,
592    platform_store_factory: Option<crate::backends::PlatformStoreFactory>,
593    mcp_auth_provider: Arc<dyn everruns_mcp::McpAuthProvider>,
594    mcp_discovery_cache: Arc<crate::mcp_cache::McpDiscoveryCache>,
595    /// Non-fatal warnings collected during plugin compilation (see
596    /// [`InProcessRuntimeBuilder::with_plugin_dir`]).
597    plugin_warnings: Vec<String>,
598}
599
600impl InProcessRuntime {
601    /// Build the shared MCP client over the platform egress boundary and the
602    /// configured auth provider.
603    fn mcp_client(&self) -> Arc<everruns_mcp::McpClient> {
604        Arc::new(everruns_mcp::McpClient::new(
605            self.platform_definition.egress_service(),
606            self.mcp_auth_provider.clone(),
607        ))
608    }
609
610    /// Resolve the effective scoped MCP servers for a session.
611    async fn session_mcp_servers(
612        &self,
613        session: &Session,
614        agent: Option<&Agent>,
615    ) -> everruns_core::ScopedMcpServers {
616        let harness_chain = self
617            .harness_store
618            .get_harness_chain(session.harness_id)
619            .await
620            .unwrap_or_default();
621        crate::mcp::merge_session_scoped_servers(&harness_chain, agent, session)
622    }
623    /// Create a builder for the in-process runtime.
624    pub fn builder() -> InProcessRuntimeBuilder {
625        InProcessRuntimeBuilder::new()
626    }
627
628    /// Return the default session id seeded by
629    /// [`InProcessRuntimeBuilder::single_session`], if one was configured.
630    pub fn default_session_id(&self) -> Option<SessionId> {
631        self.default_session_id
632    }
633
634    /// Return non-fatal warnings collected during plugin compilation.
635    ///
636    /// Warnings are also emitted at `tracing::warn!` level when
637    /// [`InProcessRuntimeBuilder::with_plugin_dir`] is called.
638    pub fn plugin_warnings(&self) -> &[String] {
639        &self.plugin_warnings
640    }
641
642    /// Execute one turn for an existing session.
643    ///
644    /// The input message is stored in the runtime history, an `input.message`
645    /// event is emitted, and the turn then executes the shared core
646    /// `input -> reason -> act` state machine.
647    pub async fn run_turn(
648        &self,
649        session_id: SessionId,
650        input: impl Into<InputMessage>,
651    ) -> Result<TurnResult> {
652        let session = self
653            .session_store
654            .get_session(session_id)
655            .await?
656            .ok_or_else(|| AgentLoopError::store(format!("session not found: {session_id}")))?;
657
658        // Input message is recorded directly (and emitted via the raw bus so
659        // that PersistingEventEmitter does not double-store it). All
660        // subsequent activity-emitted events flow through the persisting
661        // emitter the adapter hands out.
662        let input_message = self
663            .message_store
664            .add_input_message(session_id, input.into())
665            .await?;
666        self.event_bus
667            .emit(EventRequest::new(
668                session_id,
669                EventContext::empty(),
670                InputMessageData::new(input_message.clone()),
671            ))
672            .await?;
673
674        let assembled = self
675            .inspect_context_with_ids(session_id, session.harness_id, session.agent_id)
676            .await?;
677        let synthetic_agent_id = session
678            .agent_id
679            .unwrap_or_else(|| AgentId::from_uuid(session.id.uuid()));
680        let org_id = in_process_internal_org_id(&session.organization_id);
681        let mut state_machine = TurnStateMachine::new(
682            TurnContext::new(session_id, input_message.id, synthetic_agent_id, org_id),
683            assembled.runtime_agent.max_iterations,
684        );
685
686        let mut previous_response_id: Option<String> = None;
687        let mut last_reason_result: Option<everruns_core::ReasonResult> = None;
688
689        loop {
690            match state_machine.next_action() {
691                TurnAction::ExecuteInput => {
692                    let ctx = state_machine.context();
693                    let base_context =
694                        AtomContext::new(ctx.session_id, ctx.turn_id, ctx.input_message_id)
695                            .with_workspace_id(session.workspace_id);
696                    execute_input_activity(
697                        self,
698                        org_id,
699                        InputAtomInput {
700                            context: base_context,
701                        },
702                    )
703                    .await?;
704                    state_machine.on_input_completed();
705                }
706                TurnAction::ExecuteReason => {
707                    let ctx = state_machine.context();
708                    let base_context =
709                        AtomContext::new(ctx.session_id, ctx.turn_id, ctx.input_message_id)
710                            .with_workspace_id(session.workspace_id);
711                    let reason_result = execute_reason_activity(
712                        self,
713                        org_id,
714                        ReasonInput {
715                            context: base_context.next_exec(),
716                            harness_id: session.harness_id,
717                            agent_id: session.agent_id,
718                            org_id,
719                            mcp_tool_definitions: vec![],
720                            previous_response_id: previous_response_id.take(),
721                            iteration: state_machine.current_iteration() as u32 + 1,
722                        },
723                    )
724                    .await?;
725                    previous_response_id = reason_result.response_id.clone();
726                    state_machine.on_reason_completed(
727                        reason_result.text.clone(),
728                        reason_result.has_tool_calls,
729                        reason_result.tool_calls.len(),
730                        reason_result.success,
731                        reason_result.error.clone(),
732                        false,
733                    );
734                    if reason_result.has_tool_calls {
735                        last_reason_result = Some(reason_result);
736                    }
737                }
738                TurnAction::ExecuteAct => {
739                    let reason_result = last_reason_result
740                        .take()
741                        .expect("ExecuteAct requires a prior ReasonResult");
742                    let ctx = state_machine.context();
743                    let base_context =
744                        AtomContext::new(ctx.session_id, ctx.turn_id, ctx.input_message_id)
745                            .with_workspace_id(session.workspace_id);
746                    execute_act_activity(
747                        self,
748                        ActInput {
749                            org_id: Some(org_id),
750                            context: base_context.next_exec(),
751                            harness_id: session.harness_id,
752                            agent_id: session.agent_id,
753                            tool_calls: reason_result.tool_calls,
754                            tool_definitions: reason_result.tool_definitions,
755                            locale: reason_result.locale,
756                            blueprint_id: None,
757                            network_access: reason_result.network_access,
758                            // Request-level parallel tool calling preference,
759                            // carried from agent config through reason (EVE-598).
760                            parallel_tool_calls: reason_result.parallel_tool_calls,
761                        },
762                    )
763                    .await?;
764                    state_machine.on_act_completed();
765                }
766                TurnAction::Complete(outcome) => {
767                    let ctx = state_machine.context();
768                    let lifecycle =
769                        RuntimeSessionLifecycle::new(self.clone(), org_id, ctx.session_id);
770                    let turn_succeeded = matches!(
771                        &outcome,
772                        TurnOutcome::Success { .. } | TurnOutcome::MaxIterationsReached { .. }
773                    );
774                    match &outcome {
775                        TurnOutcome::Success { iterations, .. }
776                        | TurnOutcome::MaxIterationsReached { iterations, .. } => {
777                            lifecycle
778                                .turn_completed(
779                                    ctx.turn_id,
780                                    ctx.input_message_id,
781                                    *iterations as u32,
782                                    None,
783                                    None,
784                                )
785                                .await;
786                        }
787                        TurnOutcome::Failed { error, .. } => {
788                            lifecycle
789                                .turn_failed(ctx.turn_id, ctx.input_message_id, error, None)
790                                .await;
791                        }
792                        TurnOutcome::Sealed {
793                            reason, iterations, ..
794                        } => {
795                            lifecycle
796                                .turn_sealed(
797                                    ctx.turn_id,
798                                    ctx.input_message_id,
799                                    reason.as_str(),
800                                    *iterations as u32,
801                                    None,
802                                )
803                                .await;
804                        }
805                    }
806                    // turn_end lifecycle hooks (advisory). Fired after the
807                    // terminal turn event for both success and failure.
808                    lifecycle
809                        .fire_turn_end_hooks(
810                            session.harness_id,
811                            session.agent_id,
812                            ctx.turn_id,
813                            turn_succeeded,
814                        )
815                        .await;
816                    return Ok(TurnResult::from_outcome(outcome, ctx.turn_id));
817                }
818            }
819        }
820    }
821
822    pub async fn run_text_turn(
823        &self,
824        session_id: SessionId,
825        text: impl Into<String>,
826    ) -> Result<TurnResult> {
827        self.run_turn(session_id, InputMessage::user(text)).await
828    }
829
830    /// Load the current message history for a session.
831    pub async fn messages(&self, session_id: SessionId) -> Result<Vec<Message>> {
832        self.message_store.load(session_id).await
833    }
834
835    /// Read a file from the in-memory session filesystem.
836    pub async fn read_file(
837        &self,
838        session_id: SessionId,
839        path: &str,
840    ) -> Result<Option<SessionFile>> {
841        self.file_store.read_file(session_id, path).await
842    }
843
844    /// Assemble the current runtime context for a session without executing a turn.
845    pub async fn load_context(&self, session_id: SessionId) -> Result<AssembledTurnContext> {
846        let session = self
847            .session_store
848            .get_session(session_id)
849            .await?
850            .ok_or_else(|| AgentLoopError::store(format!("session not found: {session_id}")))?;
851        self.inspect_context_with_ids(session_id, session.harness_id, session.agent_id)
852            .await
853    }
854
855    /// Return all collected events from the runtime event bus.
856    ///
857    /// Event buses that do not retain events return an empty `Vec` (see
858    /// [`EventBus::collected_events`]).
859    pub async fn events(&self) -> Result<Vec<Event>> {
860        Ok(self.event_bus.collected_events().await)
861    }
862
863    /// Execute a system command declared by a registered capability.
864    ///
865    /// Looks up the first capability whose `commands()` includes the named
866    /// command (in capability-resolution order) and delegates to its
867    /// `execute_command`. Returns an error if no capability declares the
868    /// requested name. The coding-CLI example uses this for `/model`
869    /// (provided by `ModelSwitcherCapability`) so the dispatch path stays
870    /// inside the capability instead of the TUI's local `handle_command`
871    /// branches.
872    pub async fn execute_command(
873        &self,
874        session_id: SessionId,
875        request: everruns_core::command::ExecuteCommandRequest,
876    ) -> Result<everruns_core::command::CommandResult> {
877        let ctx = self.load_context(session_id).await?;
878        let registry = self.platform_definition.capability_registry();
879        // Context-aware commands (e.g. /btw) get the same store-backed host
880        // facilities the server provides; the already-assembled context seeds
881        // the host so dispatch and execution assemble it once.
882        let host = everruns_core::command_host::StoreCommandHost::new(
883            session_id,
884            self.harness_store.clone(),
885            self.agent_store.clone(),
886            self.session_store.clone(),
887            self.message_store.clone(),
888            self.provider_store.clone(),
889            registry.clone(),
890            self.platform_definition.driver_registry().clone(),
891        )
892        .with_file_store(self.file_store.clone())
893        .with_assembled_context(ctx.clone());
894        let exec_ctx =
895            everruns_core::command::CommandExecutionContext::new(session_id, Arc::new(host));
896        for config in &ctx.resolved_capability_configs {
897            let Some(capability) = registry.get(config.capability_id()) else {
898                continue;
899            };
900            if capability.commands().iter().any(|c| c.name == request.name) {
901                return capability.execute_command(&request, &exec_ctx).await;
902            }
903        }
904        Err(AgentLoopError::config(format!(
905            "no capability declares command /{}",
906            request.name
907        )))
908    }
909
910    /// List slash commands available for a session.
911    ///
912    /// Resolves the session's harness/agent capability chain and aggregates
913    /// commands declared via [`Capability::commands`], deduplicated by name
914    /// (first occurrence wins, matching the order of resolved capabilities).
915    /// This is the embedded equivalent of the server's
916    /// `GET /v1/sessions/{id}/commands` system-commands list — skill
917    /// commands are not included here because skills are discovered via the
918    /// platform filesystem rather than the capability registry.
919    pub async fn list_commands(
920        &self,
921        session_id: SessionId,
922    ) -> Result<Vec<everruns_core::command::CommandDescriptor>> {
923        let ctx = self.load_context(session_id).await?;
924        let registry = self.platform_definition.capability_registry();
925        let mut seen = std::collections::HashSet::new();
926        let mut commands = Vec::new();
927        for config in &ctx.resolved_capability_configs {
928            let Some(capability) = registry.get(config.capability_id()) else {
929                continue;
930            };
931            for command in capability.commands() {
932                if seen.insert(command.name.clone()) {
933                    commands.push(command);
934                }
935            }
936        }
937        Ok(commands)
938    }
939
940    async fn inspect_context_with_ids(
941        &self,
942        session_id: SessionId,
943        harness_id: everruns_core::HarnessId,
944        agent_id: Option<AgentId>,
945    ) -> Result<AssembledTurnContext> {
946        inspect_turn_context(
947            self.harness_store.as_ref(),
948            self.agent_store.as_ref(),
949            self.session_store.as_ref(),
950            self.message_store.as_ref(),
951            self.provider_store.as_ref(),
952            self.platform_definition.capability_registry(),
953            session_id,
954            harness_id,
955            agent_id,
956            &[],
957            Some(self.file_store.clone()),
958        )
959        .await
960    }
961}
962
963#[async_trait]
964impl RuntimeHostAdapter for InProcessRuntime {
965    async fn get_agent(&self, _org_id: i64, agent_id: AgentId) -> Result<Option<Agent>> {
966        self.agent_store.get_agent(agent_id).await
967    }
968
969    async fn get_harness(&self, _org_id: i64, harness_id: HarnessId) -> Result<Option<Harness>> {
970        let chain = self.harness_store.get_harness_chain(harness_id).await?;
971        Ok(chain.into_iter().last())
972    }
973
974    async fn set_session_status(
975        &self,
976        _org_id: i64,
977        session_id: SessionId,
978        _status: SessionStatus,
979    ) -> Result<Session> {
980        // The in-process runtime does not persist status. Lifecycle callers
981        // still emit their events; downstream consumers in-process don't
982        // observe session.status.
983        self.session_store
984            .get_session(session_id)
985            .await?
986            .ok_or_else(|| AgentLoopError::store(format!("session not found: {session_id}")))
987    }
988
989    async fn load_turn_context(
990        &self,
991        _org_id: i64,
992        session_id: SessionId,
993    ) -> Result<RuntimeHostTurnContext> {
994        let mut session = self
995            .session_store
996            .get_session(session_id)
997            .await?
998            .ok_or_else(|| AgentLoopError::store(format!("session not found: {session_id}")))?;
999        // Fold runtime ARD attachments into the session config layer before
1000        // scoped MCP servers / capabilities are resolved (resource_discovery).
1001        everruns_core::ard_attachment::apply_session_attachments(
1002            self.storage_store.as_ref(),
1003            &mut session,
1004        )
1005        .await;
1006        let agent = match session.agent_id {
1007            Some(agent_id) => self.agent_store.get_agent(agent_id).await?,
1008            None => None,
1009        };
1010        let messages = self.message_store.load(session_id).await?;
1011        let model = self.provider_store.get_default_model().await?;
1012
1013        // Discover tools from the session's scoped MCP servers so they appear
1014        // to the LLM alongside built-in tools (specs/runtime-mcp.md D4).
1015        let scoped_servers = self.session_mcp_servers(&session, agent.as_ref()).await;
1016        let mcp_tool_definitions = if scoped_servers.is_empty() {
1017            vec![]
1018        } else {
1019            crate::mcp::discover_tool_definitions(
1020                &self.mcp_discovery_cache,
1021                self.mcp_client(),
1022                session_id.uuid(),
1023                &scoped_servers,
1024            )
1025            .await
1026        };
1027
1028        Ok(RuntimeHostTurnContext {
1029            agent,
1030            session,
1031            messages,
1032            model,
1033            mcp_tool_definitions,
1034        })
1035    }
1036
1037    async fn mcp_executor(
1038        &self,
1039        _org_id: i64,
1040        session_id: SessionId,
1041    ) -> Option<Arc<everruns_mcp::McpExecutor>> {
1042        let session = self.session_store.get_session(session_id).await.ok()??;
1043        let agent = match session.agent_id {
1044            Some(agent_id) => self.agent_store.get_agent(agent_id).await.ok().flatten(),
1045            None => None,
1046        };
1047        let scoped_servers = self.session_mcp_servers(&session, agent.as_ref()).await;
1048        crate::mcp::build_executor(self.mcp_client(), &scoped_servers)
1049    }
1050
1051    fn capability_registry(&self) -> CapabilityRegistry {
1052        self.platform_definition.capability_registry().clone()
1053    }
1054
1055    fn driver_registry(&self) -> DriverRegistry {
1056        self.platform_definition.driver_registry().clone()
1057    }
1058
1059    fn harness_store(&self, _org_id: i64) -> Arc<dyn HarnessStore> {
1060        self.harness_store.clone()
1061    }
1062
1063    fn agent_store(&self, _org_id: i64) -> Arc<dyn AgentStore> {
1064        self.agent_store.clone()
1065    }
1066
1067    fn session_store(&self, _org_id: i64) -> Arc<dyn SessionStore> {
1068        self.session_store.clone()
1069    }
1070
1071    fn session_mutator(&self, _org_id: i64) -> Arc<dyn SessionMutator> {
1072        self.session_store.clone()
1073    }
1074
1075    fn provider_store(&self, _org_id: i64) -> Arc<dyn ProviderStore> {
1076        self.provider_store.clone()
1077    }
1078
1079    fn message_store(&self) -> Arc<dyn MessageRetriever> {
1080        self.message_store.clone()
1081    }
1082
1083    fn event_emitter(&self) -> Arc<dyn EventEmitter> {
1084        Arc::new(self.persisting_emitter.clone())
1085    }
1086
1087    fn file_store(&self) -> Arc<dyn SessionFileSystem> {
1088        self.file_store.clone()
1089    }
1090
1091    fn storage_store(&self) -> Option<Arc<dyn SessionStorageStore>> {
1092        Some(self.storage_store.clone())
1093    }
1094
1095    fn connection_resolver(&self) -> Option<Arc<dyn UserConnectionResolver>> {
1096        self.connection_resolver.clone()
1097    }
1098
1099    fn session_task_registry(
1100        &self,
1101    ) -> Option<Arc<dyn everruns_core::session_task::SessionTaskRegistry>> {
1102        self.session_task_registry.clone()
1103    }
1104
1105    fn schedule_store(
1106        &self,
1107        org_id: i64,
1108    ) -> Option<Arc<dyn everruns_core::traits::SessionScheduleStore>> {
1109        self.schedule_store_factory
1110            .as_ref()
1111            .map(|factory| factory(org_id))
1112    }
1113
1114    fn platform_store(
1115        &self,
1116        org_id: i64,
1117        session_id: SessionId,
1118    ) -> Option<Arc<dyn everruns_core::platform_store::PlatformStore>> {
1119        self.platform_store_factory
1120            .as_ref()
1121            .map(|factory| factory(org_id, session_id))
1122    }
1123
1124    fn utility_llm_service(&self) -> Option<Arc<dyn everruns_core::UtilityLlmService>> {
1125        Some(self.platform_definition.utility_llm_service())
1126    }
1127
1128    fn egress_service(&self) -> Option<Arc<dyn everruns_core::EgressService>> {
1129        Some(self.platform_definition.egress_service())
1130    }
1131}
1132
1133#[derive(Clone)]
1134struct PersistingEventEmitter {
1135    inner: Arc<dyn EventBus>,
1136    message_store: Arc<dyn RuntimeMessageStore>,
1137}
1138
1139impl PersistingEventEmitter {
1140    fn new(inner: Arc<dyn EventBus>, message_store: Arc<dyn RuntimeMessageStore>) -> Self {
1141        Self {
1142            inner,
1143            message_store,
1144        }
1145    }
1146}
1147
1148#[async_trait]
1149impl EventEmitter for PersistingEventEmitter {
1150    async fn emit(&self, request: EventRequest) -> Result<Event> {
1151        let event = self.inner.emit(request.clone()).await?;
1152        if let Some(message) = message_from_event(&event.data) {
1153            self.message_store
1154                .store_message(request.session_id, message)
1155                .await?;
1156        }
1157        Ok(event)
1158    }
1159}
1160
1161fn effective_overlay(
1162    harness_chain: &[Harness],
1163    agent: Option<&Agent>,
1164    session: &Session,
1165) -> AgentConfigOverlay {
1166    let harness_layers = harness_chain.iter().map(AgentConfigOverlay::from);
1167    let agent_layers = agent.into_iter().map(AgentConfigOverlay::from);
1168    AgentConfigOverlay::fold(
1169        harness_layers
1170            .chain(agent_layers)
1171            .chain([AgentConfigOverlay::from(session)]),
1172    )
1173}
1174
1175/// Replace bare `plugin:{name}` refs (empty config) with the hydrated version
1176/// (config = serialised `DeclarativeCapabilityDefinition`) so that the core
1177/// capability resolution path can deserialise them without a registry entry.
1178///
1179/// Only replaces entries whose config is empty / `null`; entries that already
1180/// carry a non-empty config are left unchanged so explicit overrides are honoured.
1181fn hydrate_plugin_refs(
1182    capabilities: &mut [AgentCapabilityConfig],
1183    plugin_configs: &[AgentCapabilityConfig],
1184) {
1185    for cap in capabilities.iter_mut() {
1186        let cap_id = cap.capability_id();
1187        if !everruns_core::is_plugin_capability(cap_id) {
1188            continue;
1189        }
1190        // Only replace if the config is missing / empty so explicit overrides are honoured.
1191        let is_bare = cap.config.is_null()
1192            || cap
1193                .config
1194                .as_object()
1195                .map(|o| o.is_empty())
1196                .unwrap_or(false);
1197        if !is_bare {
1198            continue;
1199        }
1200        if let Some(hydrated) = plugin_configs.iter().find(|c| c.capability_id() == cap_id) {
1201            cap.config = hydrated.config.clone();
1202        }
1203    }
1204}
1205
1206async fn seed_runtime_initial_files(
1207    harness_store: &dyn RuntimeHarnessStore,
1208    agent_store: &dyn RuntimeAgentStore,
1209    file_store: &dyn SessionFileSystem,
1210    session: &Session,
1211) -> Result<()> {
1212    let harness_chain = harness_store.get_harness_chain(session.harness_id).await?;
1213    if harness_chain.is_empty() {
1214        return Err(AgentLoopError::store(format!(
1215            "harness not found while seeding files: {}",
1216            session.harness_id
1217        )));
1218    }
1219    let agent = match session.agent_id {
1220        Some(agent_id) => Some(
1221            agent_store
1222                .get_agent(agent_id)
1223                .await?
1224                .ok_or_else(|| AgentLoopError::store(format!("agent not found: {agent_id}")))?,
1225        ),
1226        None => None,
1227    };
1228    let overlay = effective_overlay(&harness_chain, agent.as_ref(), session);
1229    // Seed into the session's workspace (a shared workspace differs from the
1230    // session id; the default 1:1 case is equal).
1231    let seed_key = SessionId::from_uuid(session.workspace_id.uuid());
1232    for file in &overlay.initial_files {
1233        file_store.seed_initial_file(seed_key, file).await?;
1234    }
1235    Ok(())
1236}
1237
1238fn message_from_event(data: &EventData) -> Option<Message> {
1239    match data {
1240        EventData::InputMessage(data) => Some(data.message.clone()),
1241        EventData::OutputMessageCompleted(OutputMessageCompletedData { message, .. }) => {
1242            Some(message.clone())
1243        }
1244        EventData::ToolCompleted(data) => Some(tool_completed_to_message(data.clone())),
1245        _ => None,
1246    }
1247}
1248
1249fn tool_completed_to_message(data: ToolCompletedData) -> Message {
1250    let mut images: Vec<ToolResultImage> = Vec::new();
1251    let metadata = tool_result_metadata(&data);
1252    let result = data.result.map(|parts| {
1253        for part in &parts {
1254            if let ContentPart::Image(img) = part
1255                && let (Some(base64), Some(media_type)) = (&img.base64, &img.media_type)
1256            {
1257                images.push(ToolResultImage {
1258                    base64: base64.clone(),
1259                    media_type: media_type.clone(),
1260                });
1261            }
1262        }
1263
1264        let text_parts: Vec<&ContentPart> = parts
1265            .iter()
1266            .filter(|part| matches!(part, ContentPart::Text(_)))
1267            .collect();
1268        if text_parts.len() == 1
1269            && let ContentPart::Text(text) = text_parts[0]
1270        {
1271            return parse_structured_tool_result_text(&text.text);
1272        }
1273        if !text_parts.is_empty() {
1274            serde_json::to_value(&text_parts).unwrap_or_default()
1275        } else {
1276            serde_json::Value::Null
1277        }
1278    });
1279
1280    let mut message = if images.is_empty() {
1281        Message::tool_result(&data.tool_call_id, result, data.error)
1282    } else {
1283        Message::tool_result_with_images(&data.tool_call_id, result, images)
1284    };
1285    message.metadata = metadata;
1286    message
1287}
1288
1289fn tool_result_metadata(
1290    data: &ToolCompletedData,
1291) -> Option<std::collections::HashMap<String, serde_json::Value>> {
1292    let mut metadata = std::collections::HashMap::new();
1293    metadata.insert("tool_name".to_string(), serde_json::json!(data.tool_name));
1294    if let Some(fingerprint) = &data.tool_call_fingerprint {
1295        metadata.insert(
1296            "tool_call_fingerprint".to_string(),
1297            serde_json::json!(fingerprint),
1298        );
1299    }
1300    if let Some(fingerprint) = &data.tool_result_fingerprint {
1301        metadata.insert(
1302            "tool_result_fingerprint".to_string(),
1303            serde_json::json!(fingerprint),
1304        );
1305    }
1306    (!metadata.is_empty()).then_some(metadata)
1307}
1308
1309fn parse_structured_tool_result_text(text: &str) -> serde_json::Value {
1310    let trimmed = text.trim_start();
1311    if !trimmed.starts_with('{') && !trimmed.starts_with('[') {
1312        return serde_json::Value::String(text.to_string());
1313    }
1314
1315    match serde_json::from_str(text) {
1316        Ok(value @ (serde_json::Value::Object(_) | serde_json::Value::Array(_))) => value,
1317        _ => serde_json::Value::String(text.to_string()),
1318    }
1319}
1320
1321#[cfg(test)]
1322mod tool_completed_replay_tests {
1323    use super::*;
1324
1325    #[test]
1326    fn tool_completed_replay_preserves_json_object_shape() {
1327        let data = ToolCompletedData::success(
1328            "call_read".to_string(),
1329            "read_file".to_string(),
1330            vec![ContentPart::text(
1331                serde_json::json!({
1332                    "path": "/workspace/src/lib.rs",
1333                    "content": "1|fn main() {}"
1334                })
1335                .to_string(),
1336            )],
1337            Some(1),
1338        );
1339
1340        let message = tool_completed_to_message(data);
1341        let result = message
1342            .tool_result_content()
1343            .and_then(|content| content.result.as_ref())
1344            .expect("tool result should be present");
1345
1346        assert_eq!(result["path"], "/workspace/src/lib.rs");
1347        assert_eq!(result["content"], "1|fn main() {}");
1348    }
1349
1350    #[test]
1351    fn tool_completed_replay_keeps_scalar_json_as_text() {
1352        let data = ToolCompletedData::success(
1353            "call_scalar".to_string(),
1354            "custom_tool".to_string(),
1355            vec![ContentPart::text("123")],
1356            Some(1),
1357        );
1358
1359        let message = tool_completed_to_message(data);
1360        let result = message
1361            .tool_result_content()
1362            .and_then(|content| content.result.as_ref())
1363            .expect("tool result should be present");
1364
1365        assert_eq!(result, &serde_json::Value::String("123".to_string()));
1366    }
1367
1368    #[test]
1369    fn tool_completed_replay_preserves_fingerprints_as_metadata() {
1370        let data = ToolCompletedData::success(
1371            "call_read".to_string(),
1372            "read_file".to_string(),
1373            vec![ContentPart::text("{}")],
1374            Some(1),
1375        )
1376        .with_fingerprints("sha256:call".to_string(), "sha256:result".to_string());
1377
1378        let message = tool_completed_to_message(data);
1379        let metadata = message.metadata.expect("metadata should be present");
1380
1381        assert_eq!(metadata["tool_name"], "read_file");
1382        assert_eq!(metadata["tool_call_fingerprint"], "sha256:call");
1383        assert_eq!(metadata["tool_result_fingerprint"], "sha256:result");
1384    }
1385}
1386
1387#[cfg(test)]
1388mod org_id_mapping_tests {
1389    use super::*;
1390    use everruns_core::{DEFAULT_ORG_ID, DEFAULT_ORG_PUBLIC_ID, org_public_id_from_internal};
1391
1392    #[test]
1393    fn default_public_id_maps_to_default_org() {
1394        assert_eq!(
1395            in_process_internal_org_id(DEFAULT_ORG_PUBLIC_ID),
1396            DEFAULT_ORG_ID
1397        );
1398    }
1399
1400    #[test]
1401    fn invalid_public_id_does_not_fall_back_to_default() {
1402        for invalid in [
1403            "",
1404            "not-an-org",
1405            "org_short",
1406            "org_ZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZ",
1407            "ORG_00000000000000000000000000000001",
1408        ] {
1409            let mapped = in_process_internal_org_id(invalid);
1410            assert_ne!(mapped, everruns_core::DEFAULT_ORG_ID);
1411            assert!(
1412                mapped >= 2,
1413                "invalid input {invalid:?} should not map to default"
1414            );
1415        }
1416    }
1417
1418    #[test]
1419    fn zero_public_id_does_not_fall_back_to_default() {
1420        // org_public_id_from_internal never produces this; a hand-crafted
1421        // all-zeros id is treated as invalid (raw == 0).
1422        let mapped = in_process_internal_org_id("org_00000000000000000000000000000000");
1423        assert_ne!(mapped, everruns_core::DEFAULT_ORG_ID);
1424        assert!(mapped >= 2, "all-zero id should not map to default");
1425    }
1426
1427    #[test]
1428    fn synthetic_public_id_round_trips_with_internal_helper() {
1429        for internal in [1_i64, 2, 42, 1_000_000, i64::MAX - 1, i64::MAX] {
1430            let public = org_public_id_from_internal(internal);
1431            assert_eq!(
1432                in_process_internal_org_id(&public),
1433                internal,
1434                "round-trip failed for internal={internal}"
1435            );
1436        }
1437    }
1438
1439    #[test]
1440    fn distinct_synthetic_ids_map_to_distinct_internal_ids() {
1441        let a = org_public_id_from_internal(7);
1442        let b = org_public_id_from_internal(8);
1443        assert_ne!(a, b);
1444        assert_ne!(
1445            in_process_internal_org_id(&a),
1446            in_process_internal_org_id(&b)
1447        );
1448    }
1449
1450    #[test]
1451    fn high_entropy_uuid_style_id_hashes_into_reserved_range() {
1452        // First valid UUID-style id whose raw u128 exceeds i64::MAX
1453        // (top bit of the u128 set). It must hash to a positive i64 that
1454        // is neither 0 nor DEFAULT_ORG_ID.
1455        let high = "org_80000000000000000000000000000000";
1456        let mapped = in_process_internal_org_id(high);
1457        assert!(mapped >= 2, "mapped id {mapped} must be >= 2");
1458        assert_ne!(mapped, DEFAULT_ORG_ID);
1459
1460        // Mapping is deterministic.
1461        assert_eq!(mapped, in_process_internal_org_id(high));
1462    }
1463
1464    #[test]
1465    fn high_entropy_ids_are_isolated_from_each_other() {
1466        let a = in_process_internal_org_id("org_80000000000000000000000000000001");
1467        let b = in_process_internal_org_id("org_80000000000000000000000000000002");
1468        assert_ne!(a, b);
1469        assert_ne!(a, DEFAULT_ORG_ID);
1470        assert_ne!(b, DEFAULT_ORG_ID);
1471    }
1472
1473    #[test]
1474    fn hash_uses_stable_sha256_truncation() {
1475        // SHA-256 with fixed big-endian first-8-byte truncation gives a value
1476        // we can pin. If this assertion ever breaks, callers depending on
1477        // build-stable mapping must be re-audited.
1478        let mapped = in_process_internal_org_id("org_80000000000000000000000000000000");
1479        let expected = {
1480            let digest = sha2::Sha256::digest(b"org_80000000000000000000000000000000");
1481            let mut buf = [0u8; 8];
1482            buf.copy_from_slice(&digest[..8]);
1483            let raw = u64::from_be_bytes(buf);
1484            ((raw % ((i64::MAX - 1) as u64)) as i64) + 2
1485        };
1486        assert_eq!(mapped, expected);
1487    }
1488
1489    #[test]
1490    fn oversize_input_is_bounded_and_does_not_collide_silently() {
1491        // Inputs past HASH_INPUT_CAP_BYTES are truncated before hashing, so
1492        // two oversize strings that agree on the first cap bytes map to the
1493        // same internal id. We only assert the result stays in the safe
1494        // [2, i64::MAX] range and is not DEFAULT_ORG_ID — the cap exists to
1495        // bound work, not to widen the input space.
1496        let oversize = "x".repeat(super::HASH_INPUT_CAP_BYTES * 4);
1497        let mapped = in_process_internal_org_id(&oversize);
1498        assert!(mapped >= 2);
1499        assert_ne!(mapped, DEFAULT_ORG_ID);
1500    }
1501}