Skip to main content

everruns_runtime/
runtime.rs

1// In-process runtime builder and runner.
2// Decision: the public runtime is in-memory today, but uses the same core atoms
3// and capability resolution path as the durable worker so behavior stays close.
4
5use crate::backends::{
6    EventBus, RuntimeAgentStore, RuntimeBackends, RuntimeHarnessStore, RuntimeMessageStore,
7    RuntimeProviderStore, RuntimeSessionStore,
8};
9use crate::builders::SingleSessionBuilder;
10use crate::host::{
11    RuntimeHostAdapter, RuntimeHostTurnContext, RuntimeSessionLifecycle, execute_act_activity,
12    execute_input_activity, execute_reason_activity,
13};
14use crate::in_memory::{InMemorySessionFileStore, InMemorySessionFileSystemFactory};
15use async_trait::async_trait;
16use everruns_core::agent::Agent;
17use everruns_core::atoms::{ActInput, AtomContext, InputAtomInput, ReasonInput};
18use everruns_core::capabilities::{Capability, CapabilityRegistry};
19use everruns_core::config_layer::AgentConfigOverlay;
20use everruns_core::error::{AgentLoopError, Result};
21use everruns_core::events::{
22    Event, EventContext, EventData, EventRequest, InputMessageData, OutputMessageCompletedData,
23    ToolCompletedData,
24};
25use everruns_core::harness::Harness;
26use everruns_core::llm_driver_registry::{DriverRegistry, ProviderType};
27use everruns_core::llm_models::LlmProviderType;
28use everruns_core::llmsim_driver::{LlmSimConfig, LlmSimDriver};
29use everruns_core::message::{ContentPart, Message};
30use everruns_core::platform_definition::PlatformDefinition;
31use everruns_core::runtime_context::{AssembledTurnContext, inspect_turn_context};
32use everruns_core::session::{Session, SessionStatus};
33use everruns_core::session_file::{InitialFile, SessionFile};
34use everruns_core::tools::ToolResultImage;
35use everruns_core::traits::{
36    AgentStore, EventEmitter, HarnessStore, LlmProviderStore, ModelWithProvider, SessionMutator,
37    SessionStorageStore, SessionStore,
38};
39use everruns_core::turn::{TurnAction, TurnContext, TurnOutcome, TurnStateMachine};
40use everruns_core::typed_id::{AgentId, HarnessId, SessionId};
41use everruns_core::{
42    InputMessage, MemoryStoreBackend, MessageRetriever, SessionFileSystem,
43    SessionFileSystemFactoryContext,
44};
45use std::sync::Arc;
46
47/// Internal org id used by the in-process runtime when calling host
48/// activity functions. The in-process runtime does not multi-tenant; this
49/// matches `everruns_core::DEFAULT_ORG_ID` so the public-id mapping in
50/// `org_public_id_from_internal` resolves to `DEFAULT_ORG_PUBLIC_ID` and
51/// the org id `ActAtom` sees matches the prior (pre-host-activity) wiring.
52const IN_PROCESS_ORG_ID: i64 = everruns_core::DEFAULT_ORG_ID;
53
54#[derive(Debug, Clone)]
55pub struct TurnResult {
56    /// Final text response produced by the turn.
57    pub response: String,
58    /// Number of reason iterations executed.
59    pub iterations: usize,
60    /// Total number of tool calls executed during the turn.
61    pub tool_calls_count: usize,
62    /// Whether the turn completed without an unrecoverable failure.
63    pub success: bool,
64    /// Failure message when `success` is false.
65    pub error: Option<String>,
66    /// Turn identifier used to correlate emitted events.
67    pub turn_id: everruns_core::typed_id::TurnId,
68}
69
70impl TurnResult {
71    fn from_outcome(outcome: TurnOutcome, turn_id: everruns_core::typed_id::TurnId) -> Self {
72        match outcome {
73            TurnOutcome::Success {
74                response,
75                iterations,
76                tool_calls_count,
77            } => Self {
78                response,
79                iterations,
80                tool_calls_count,
81                success: true,
82                error: None,
83                turn_id,
84            },
85            TurnOutcome::Failed { error, iterations } => Self {
86                response: String::new(),
87                iterations,
88                tool_calls_count: 0,
89                success: false,
90                error: Some(error),
91                turn_id,
92            },
93            TurnOutcome::MaxIterationsReached {
94                response,
95                iterations,
96                tool_calls_count,
97            } => Self {
98                response,
99                iterations,
100                tool_calls_count,
101                success: true,
102                error: None,
103                turn_id,
104            },
105        }
106    }
107}
108
109/// Builder for the public in-process runtime.
110///
111/// The builder owns a standalone runtime bundle:
112/// - `PlatformDefinition` for capabilities and drivers
113/// - in-memory stores for sessions, files, storage, memory, and messages
114/// - seeded harness/agent/session entities
115///
116/// `build()` returns an [`InProcessRuntime`] that can execute turns in-process
117/// without the durable engine or the control-plane server.
118pub struct InProcessRuntimeBuilder {
119    platform_definition: PlatformDefinition,
120    llm_sim_config: Option<LlmSimConfig>,
121    default_model: Option<ModelWithProvider>,
122    backends: Option<RuntimeBackends>,
123    session_file_system_factory_context: SessionFileSystemFactoryContext,
124    harnesses: Vec<Harness>,
125    agents: Vec<Agent>,
126    sessions: Vec<Session>,
127    default_session_id: Option<SessionId>,
128    seeded_files: Vec<(SessionId, InitialFile)>,
129}
130
131impl Default for InProcessRuntimeBuilder {
132    fn default() -> Self {
133        Self::new()
134    }
135}
136
137impl InProcessRuntimeBuilder {
138    /// Create a builder with built-in capabilities and no implicit LLM driver.
139    ///
140    /// Embedders must either:
141    /// - call [`Self::llm_sim`] for deterministic local examples/tests, or
142    /// - register their own driver(s) on the platform definition and set a
143    ///   default model via [`Self::default_model`].
144    pub fn new() -> Self {
145        Self {
146            platform_definition: PlatformDefinition::builder()
147                .capability_registry(CapabilityRegistry::with_builtins())
148                .driver_registry(DriverRegistry::new())
149                .session_file_system_factory(Arc::new(InMemorySessionFileSystemFactory))
150                .build(),
151            llm_sim_config: None,
152            default_model: None,
153            backends: None,
154            session_file_system_factory_context: SessionFileSystemFactoryContext::new(),
155            harnesses: Vec::new(),
156            agents: Vec::new(),
157            sessions: Vec::new(),
158            default_session_id: None,
159            seeded_files: Vec::new(),
160        }
161    }
162
163    /// Replace the platform definition used by the runtime.
164    pub fn platform_definition(mut self, platform_definition: PlatformDefinition) -> Self {
165        self.platform_definition = platform_definition;
166        self
167    }
168
169    /// Register an additional capability on the runtime platform.
170    pub fn capability<C: Capability + 'static>(mut self, capability: C) -> Self {
171        self.platform_definition
172            .capability_registry_mut()
173            .register(capability);
174        self
175    }
176
177    /// Replace the platform driver registry.
178    pub fn driver_registry(mut self, driver_registry: DriverRegistry) -> Self {
179        *self.platform_definition.driver_registry_mut() = driver_registry;
180        self
181    }
182
183    /// Register the built-in `llmsim` driver for deterministic local execution.
184    pub fn llm_sim(mut self, config: LlmSimConfig) -> Self {
185        self.llm_sim_config = Some(config);
186        self
187    }
188
189    /// Set the runtime default model used when sessions/agents do not override it.
190    pub fn default_model(mut self, model: ModelWithProvider) -> Self {
191        self.default_model = Some(model);
192        self
193    }
194
195    /// Supply a custom backend bundle instead of the built-in in-memory stores.
196    pub fn backends(mut self, backends: RuntimeBackends) -> Self {
197        self.backends = Some(backends);
198        self
199    }
200
201    /// Supply host dependencies needed by the platform session filesystem factory.
202    pub fn session_file_system_factory_context(
203        mut self,
204        context: SessionFileSystemFactoryContext,
205    ) -> Self {
206        self.session_file_system_factory_context = context;
207        self
208    }
209
210    /// Seed a harness into the runtime store.
211    pub fn harness(mut self, harness: Harness) -> Self {
212        self.harnesses.push(harness);
213        self
214    }
215
216    /// Seed an agent into the runtime store.
217    pub fn agent(mut self, agent: Agent) -> Self {
218        self.agents.push(agent);
219        self
220    }
221
222    /// Seed a session into the runtime store.
223    pub fn session(mut self, session: Session) -> Self {
224        self.sessions.push(session);
225        self
226    }
227
228    /// Seed one harness, one agent, and one session with a compact sub-builder.
229    ///
230    /// The generated session id is exposed from the built runtime via
231    /// [`InProcessRuntime::default_session_id`].
232    pub fn single_session<F>(mut self, configure: F) -> Self
233    where
234        F: FnOnce(SingleSessionBuilder) -> SingleSessionBuilder,
235    {
236        let (harness, agent, session, session_id) =
237            configure(SingleSessionBuilder::default()).build();
238        self.harnesses.push(harness);
239        self.agents.push(agent);
240        self.sessions.push(session);
241        self.default_session_id = Some(session_id);
242        self
243    }
244
245    /// Seed an additional text file directly into a session workspace.
246    ///
247    /// This is applied after harness/agent/session `initial_files` are merged.
248    pub fn seed_text_file(
249        mut self,
250        session_id: SessionId,
251        path: impl Into<String>,
252        content: impl Into<String>,
253    ) -> Self {
254        self.seeded_files.push((
255            session_id,
256            InitialFile {
257                path: path.into(),
258                content: content.into(),
259                encoding: "text".to_string(),
260                is_readonly: false,
261            },
262        ));
263        self
264    }
265
266    /// Build the in-process runtime.
267    ///
268    /// Returns a configuration error when no default model is available after
269    /// applying explicit configuration and any requested `llmsim` setup.
270    pub async fn build(mut self) -> Result<InProcessRuntime> {
271        let backends = match self.backends.take() {
272            Some(backends) => backends,
273            None => RuntimeBackends::in_memory(),
274        };
275        let file_store = resolve_session_file_system(
276            &self.platform_definition,
277            self.session_file_system_factory_context.clone(),
278        )
279        .await?;
280
281        if let Some(config) = self.llm_sim_config.take() {
282            let driver = LlmSimDriver::new(config);
283            self.platform_definition
284                .driver_registry_mut()
285                .register(ProviderType::LlmSim, move |_api_key, _base_url| {
286                    Box::new(driver.clone())
287                });
288
289            if self.default_model.is_none() {
290                self.default_model = Some(ModelWithProvider {
291                    model: "llmsim-model".to_string(),
292                    provider_type: LlmProviderType::LlmSim,
293                    api_key: Some("fake-key".to_string()),
294                    base_url: None,
295                });
296            }
297        }
298
299        let default_model = self.default_model.ok_or_else(|| {
300            AgentLoopError::config(
301                "in-process runtime requires a default model; call \
302                 InProcessRuntimeBuilder::default_model(...) or \
303                 InProcessRuntimeBuilder::llm_sim(...)",
304            )
305        })?;
306
307        backends
308            .provider_store
309            .set_default_model(default_model)
310            .await?;
311
312        for harness in &self.harnesses {
313            backends.harness_store.add_harness(harness.clone()).await?;
314        }
315        for agent in &self.agents {
316            backends.agent_store.add_agent(agent.clone()).await?;
317        }
318        for session in &self.sessions {
319            backends.session_store.add_session(session.clone()).await?;
320        }
321
322        for session in &self.sessions {
323            seed_runtime_initial_files(
324                backends.harness_store.as_ref(),
325                backends.agent_store.as_ref(),
326                file_store.as_ref(),
327                session,
328            )
329            .await?;
330        }
331
332        for (session_id, file) in &self.seeded_files {
333            file_store.seed_initial_file(*session_id, file).await?;
334        }
335
336        let persisting_emitter =
337            PersistingEventEmitter::new(backends.event_bus.clone(), backends.message_store.clone());
338
339        Ok(InProcessRuntime {
340            platform_definition: Arc::new(self.platform_definition),
341            harness_store: backends.harness_store,
342            agent_store: backends.agent_store,
343            session_store: backends.session_store,
344            default_session_id: self.default_session_id,
345            message_store: backends.message_store,
346            provider_store: backends.provider_store,
347            event_bus: backends.event_bus,
348            persisting_emitter,
349            file_store,
350            storage_store: backends.storage_store,
351            memory_store: backends.memory_store,
352        })
353    }
354}
355
356async fn resolve_session_file_system(
357    platform_definition: &PlatformDefinition,
358    file_system_factory_context: SessionFileSystemFactoryContext,
359) -> Result<Arc<dyn SessionFileSystem>> {
360    let file_system_factory = platform_definition.session_file_system_factory();
361    if file_system_factory.is_disabled() {
362        Ok(Arc::new(InMemorySessionFileStore::new()))
363    } else {
364        Ok(file_system_factory
365            .create_session_file_system(file_system_factory_context)
366            .await?)
367    }
368}
369
370#[derive(Clone)]
371/// Public in-process runtime backed by either in-memory or custom stores.
372///
373/// This runtime is intended for embedders who want to execute Everruns
374/// harnesses inside their own process while controlling capabilities,
375/// harness definitions, and driver registrations directly in Rust.
376pub struct InProcessRuntime {
377    platform_definition: Arc<PlatformDefinition>,
378    harness_store: Arc<dyn RuntimeHarnessStore>,
379    agent_store: Arc<dyn RuntimeAgentStore>,
380    session_store: Arc<dyn RuntimeSessionStore>,
381    default_session_id: Option<SessionId>,
382    message_store: Arc<dyn RuntimeMessageStore>,
383    provider_store: Arc<dyn RuntimeProviderStore>,
384    event_bus: Arc<dyn EventBus>,
385    persisting_emitter: PersistingEventEmitter,
386    file_store: Arc<dyn SessionFileSystem>,
387    storage_store: Arc<dyn SessionStorageStore>,
388    memory_store: Arc<dyn MemoryStoreBackend>,
389}
390
391impl InProcessRuntime {
392    /// Create a builder for the in-process runtime.
393    pub fn builder() -> InProcessRuntimeBuilder {
394        InProcessRuntimeBuilder::new()
395    }
396
397    /// Return the default session id seeded by
398    /// [`InProcessRuntimeBuilder::single_session`], if one was configured.
399    pub fn default_session_id(&self) -> Option<SessionId> {
400        self.default_session_id
401    }
402
403    /// Execute one turn for an existing session.
404    ///
405    /// The input message is stored in the runtime history, an `input.message`
406    /// event is emitted, and the turn then executes the shared core
407    /// `input -> reason -> act` state machine.
408    pub async fn run_turn(
409        &self,
410        session_id: SessionId,
411        input: impl Into<InputMessage>,
412    ) -> Result<TurnResult> {
413        let session = self
414            .session_store
415            .get_session(session_id)
416            .await?
417            .ok_or_else(|| AgentLoopError::store(format!("session not found: {session_id}")))?;
418
419        // Input message is recorded directly (and emitted via the raw bus so
420        // that PersistingEventEmitter does not double-store it). All
421        // subsequent activity-emitted events flow through the persisting
422        // emitter the adapter hands out.
423        let input_message = self
424            .message_store
425            .add_input_message(session_id, input.into())
426            .await?;
427        self.event_bus
428            .emit(EventRequest::new(
429                session_id,
430                EventContext::empty(),
431                InputMessageData::new(input_message.clone()),
432            ))
433            .await?;
434
435        let assembled = self
436            .inspect_context_with_ids(session_id, session.harness_id, session.agent_id)
437            .await?;
438        let synthetic_agent_id = session
439            .agent_id
440            .unwrap_or_else(|| AgentId::from_uuid(session.id.uuid()));
441        let org_id: i64 = IN_PROCESS_ORG_ID;
442        let mut state_machine = TurnStateMachine::new(
443            TurnContext::new(session_id, input_message.id, synthetic_agent_id, org_id),
444            assembled.runtime_agent.max_iterations,
445        );
446
447        let mut previous_response_id: Option<String> = None;
448        let mut last_reason_result: Option<everruns_core::ReasonResult> = None;
449
450        loop {
451            match state_machine.next_action() {
452                TurnAction::ExecuteInput => {
453                    let ctx = state_machine.context();
454                    let base_context =
455                        AtomContext::new(ctx.session_id, ctx.turn_id, ctx.input_message_id);
456                    execute_input_activity(
457                        self,
458                        org_id,
459                        InputAtomInput {
460                            context: base_context,
461                        },
462                    )
463                    .await?;
464                    state_machine.on_input_completed();
465                }
466                TurnAction::ExecuteReason => {
467                    let ctx = state_machine.context();
468                    let base_context =
469                        AtomContext::new(ctx.session_id, ctx.turn_id, ctx.input_message_id);
470                    let reason_result = execute_reason_activity(
471                        self,
472                        org_id,
473                        ReasonInput {
474                            context: base_context.next_exec(),
475                            harness_id: session.harness_id,
476                            agent_id: session.agent_id,
477                            org_id,
478                            mcp_tool_definitions: vec![],
479                            previous_response_id: previous_response_id.take(),
480                            iteration: state_machine.current_iteration() as u32 + 1,
481                        },
482                    )
483                    .await?;
484                    previous_response_id = reason_result.response_id.clone();
485                    state_machine.on_reason_completed(
486                        reason_result.text.clone(),
487                        reason_result.has_tool_calls,
488                        reason_result.tool_calls.len(),
489                        reason_result.success,
490                        reason_result.error.clone(),
491                        false,
492                    );
493                    if reason_result.has_tool_calls {
494                        last_reason_result = Some(reason_result);
495                    }
496                }
497                TurnAction::ExecuteAct => {
498                    let reason_result = last_reason_result
499                        .take()
500                        .expect("ExecuteAct requires a prior ReasonResult");
501                    let ctx = state_machine.context();
502                    let base_context =
503                        AtomContext::new(ctx.session_id, ctx.turn_id, ctx.input_message_id);
504                    execute_act_activity(
505                        self,
506                        ActInput {
507                            org_id: Some(org_id),
508                            context: base_context.next_exec(),
509                            harness_id: session.harness_id,
510                            agent_id: session.agent_id,
511                            tool_calls: reason_result.tool_calls,
512                            tool_definitions: reason_result.tool_definitions,
513                            locale: reason_result.locale,
514                            blueprint_id: None,
515                            network_access: reason_result.network_access,
516                        },
517                    )
518                    .await?;
519                    state_machine.on_act_completed();
520                }
521                TurnAction::Complete(outcome) => {
522                    let ctx = state_machine.context();
523                    let lifecycle =
524                        RuntimeSessionLifecycle::new(self.clone(), org_id, ctx.session_id);
525                    match &outcome {
526                        TurnOutcome::Success { iterations, .. }
527                        | TurnOutcome::MaxIterationsReached { iterations, .. } => {
528                            lifecycle
529                                .turn_completed(
530                                    ctx.turn_id,
531                                    ctx.input_message_id,
532                                    *iterations as u32,
533                                    None,
534                                    None,
535                                )
536                                .await;
537                        }
538                        TurnOutcome::Failed { error, .. } => {
539                            lifecycle
540                                .turn_failed(ctx.turn_id, ctx.input_message_id, error, None)
541                                .await;
542                        }
543                    }
544                    return Ok(TurnResult::from_outcome(outcome, ctx.turn_id));
545                }
546            }
547        }
548    }
549
550    pub async fn run_text_turn(
551        &self,
552        session_id: SessionId,
553        text: impl Into<String>,
554    ) -> Result<TurnResult> {
555        self.run_turn(session_id, InputMessage::user(text)).await
556    }
557
558    /// Load the current message history for a session.
559    pub async fn messages(&self, session_id: SessionId) -> Result<Vec<Message>> {
560        self.message_store.load(session_id).await
561    }
562
563    /// Read a file from the in-memory session filesystem.
564    pub async fn read_file(
565        &self,
566        session_id: SessionId,
567        path: &str,
568    ) -> Result<Option<SessionFile>> {
569        self.file_store.read_file(session_id, path).await
570    }
571
572    /// Assemble the current runtime context for a session without executing a turn.
573    pub async fn load_context(&self, session_id: SessionId) -> Result<AssembledTurnContext> {
574        let session = self
575            .session_store
576            .get_session(session_id)
577            .await?
578            .ok_or_else(|| AgentLoopError::store(format!("session not found: {session_id}")))?;
579        self.inspect_context_with_ids(session_id, session.harness_id, session.agent_id)
580            .await
581    }
582
583    /// Return all collected events from the runtime event bus.
584    ///
585    /// Event buses that do not retain events return an empty `Vec` (see
586    /// [`EventBus::collected_events`]).
587    pub async fn events(&self) -> Result<Vec<Event>> {
588        Ok(self.event_bus.collected_events().await)
589    }
590
591    async fn inspect_context_with_ids(
592        &self,
593        session_id: SessionId,
594        harness_id: everruns_core::HarnessId,
595        agent_id: Option<AgentId>,
596    ) -> Result<AssembledTurnContext> {
597        inspect_turn_context(
598            self.harness_store.as_ref(),
599            self.agent_store.as_ref(),
600            self.session_store.as_ref(),
601            self.message_store.as_ref(),
602            self.provider_store.as_ref(),
603            self.platform_definition.capability_registry(),
604            session_id,
605            harness_id,
606            agent_id,
607            &[],
608            Some(self.file_store.clone()),
609        )
610        .await
611    }
612}
613
614#[async_trait]
615impl RuntimeHostAdapter for InProcessRuntime {
616    async fn get_agent(&self, _org_id: i64, agent_id: AgentId) -> Result<Option<Agent>> {
617        self.agent_store.get_agent(agent_id).await
618    }
619
620    async fn get_harness(&self, _org_id: i64, harness_id: HarnessId) -> Result<Option<Harness>> {
621        let chain = self.harness_store.get_harness_chain(harness_id).await?;
622        Ok(chain.into_iter().last())
623    }
624
625    async fn set_session_status(
626        &self,
627        _org_id: i64,
628        session_id: SessionId,
629        _status: SessionStatus,
630    ) -> Result<Session> {
631        // The in-process runtime does not persist status. Lifecycle callers
632        // still emit their events; downstream consumers in-process don't
633        // observe session.status.
634        self.session_store
635            .get_session(session_id)
636            .await?
637            .ok_or_else(|| AgentLoopError::store(format!("session not found: {session_id}")))
638    }
639
640    async fn load_turn_context(
641        &self,
642        _org_id: i64,
643        session_id: SessionId,
644    ) -> Result<RuntimeHostTurnContext> {
645        let session = self
646            .session_store
647            .get_session(session_id)
648            .await?
649            .ok_or_else(|| AgentLoopError::store(format!("session not found: {session_id}")))?;
650        let agent = match session.agent_id {
651            Some(agent_id) => self.agent_store.get_agent(agent_id).await?,
652            None => None,
653        };
654        let messages = self.message_store.load(session_id).await?;
655        let model = self.provider_store.get_default_model().await?;
656        Ok(RuntimeHostTurnContext {
657            agent,
658            session,
659            messages,
660            model,
661            mcp_tool_definitions: vec![],
662        })
663    }
664
665    fn capability_registry(&self) -> CapabilityRegistry {
666        self.platform_definition.capability_registry().clone()
667    }
668
669    fn driver_registry(&self) -> DriverRegistry {
670        self.platform_definition.driver_registry().clone()
671    }
672
673    fn harness_store(&self, _org_id: i64) -> Arc<dyn HarnessStore> {
674        self.harness_store.clone()
675    }
676
677    fn agent_store(&self, _org_id: i64) -> Arc<dyn AgentStore> {
678        self.agent_store.clone()
679    }
680
681    fn session_store(&self, _org_id: i64) -> Arc<dyn SessionStore> {
682        self.session_store.clone()
683    }
684
685    fn session_mutator(&self, _org_id: i64) -> Arc<dyn SessionMutator> {
686        self.session_store.clone()
687    }
688
689    fn provider_store(&self, _org_id: i64) -> Arc<dyn LlmProviderStore> {
690        self.provider_store.clone()
691    }
692
693    fn message_store(&self) -> Arc<dyn MessageRetriever> {
694        self.message_store.clone()
695    }
696
697    fn event_emitter(&self) -> Arc<dyn EventEmitter> {
698        Arc::new(self.persisting_emitter.clone())
699    }
700
701    fn file_store(&self) -> Arc<dyn SessionFileSystem> {
702        self.file_store.clone()
703    }
704
705    fn storage_store(&self) -> Option<Arc<dyn SessionStorageStore>> {
706        Some(self.storage_store.clone())
707    }
708
709    fn memory_store(&self, _org_id: i64) -> Option<Arc<dyn MemoryStoreBackend>> {
710        Some(self.memory_store.clone())
711    }
712}
713
714#[derive(Clone)]
715struct PersistingEventEmitter {
716    inner: Arc<dyn EventBus>,
717    message_store: Arc<dyn RuntimeMessageStore>,
718}
719
720impl PersistingEventEmitter {
721    fn new(inner: Arc<dyn EventBus>, message_store: Arc<dyn RuntimeMessageStore>) -> Self {
722        Self {
723            inner,
724            message_store,
725        }
726    }
727}
728
729#[async_trait]
730impl EventEmitter for PersistingEventEmitter {
731    async fn emit(&self, request: EventRequest) -> Result<Event> {
732        let event = self.inner.emit(request.clone()).await?;
733        if let Some(message) = message_from_event(&event.data) {
734            self.message_store
735                .store_message(request.session_id, message)
736                .await?;
737        }
738        Ok(event)
739    }
740}
741
742fn effective_overlay(
743    harness_chain: &[Harness],
744    agent: Option<&Agent>,
745    session: &Session,
746) -> AgentConfigOverlay {
747    let harness_layers = harness_chain.iter().map(AgentConfigOverlay::from);
748    let agent_layers = agent.into_iter().map(AgentConfigOverlay::from);
749    AgentConfigOverlay::fold(
750        harness_layers
751            .chain(agent_layers)
752            .chain([AgentConfigOverlay::from(session)]),
753    )
754}
755
756async fn seed_runtime_initial_files(
757    harness_store: &dyn RuntimeHarnessStore,
758    agent_store: &dyn RuntimeAgentStore,
759    file_store: &dyn SessionFileSystem,
760    session: &Session,
761) -> Result<()> {
762    let harness_chain = harness_store.get_harness_chain(session.harness_id).await?;
763    if harness_chain.is_empty() {
764        return Err(AgentLoopError::store(format!(
765            "harness not found while seeding files: {}",
766            session.harness_id
767        )));
768    }
769    let agent = match session.agent_id {
770        Some(agent_id) => Some(
771            agent_store
772                .get_agent(agent_id)
773                .await?
774                .ok_or_else(|| AgentLoopError::store(format!("agent not found: {agent_id}")))?,
775        ),
776        None => None,
777    };
778    let overlay = effective_overlay(&harness_chain, agent.as_ref(), session);
779    for file in &overlay.initial_files {
780        file_store.seed_initial_file(session.id, file).await?;
781    }
782    Ok(())
783}
784
785fn message_from_event(data: &EventData) -> Option<Message> {
786    match data {
787        EventData::InputMessage(data) => Some(data.message.clone()),
788        EventData::OutputMessageCompleted(OutputMessageCompletedData { message, .. }) => {
789            Some(message.clone())
790        }
791        EventData::ToolCompleted(data) => Some(tool_completed_to_message(data.clone())),
792        _ => None,
793    }
794}
795
796fn tool_completed_to_message(data: ToolCompletedData) -> Message {
797    let mut images: Vec<ToolResultImage> = Vec::new();
798    let result = data.result.map(|parts| {
799        for part in &parts {
800            if let ContentPart::Image(img) = part
801                && let (Some(base64), Some(media_type)) = (&img.base64, &img.media_type)
802            {
803                images.push(ToolResultImage {
804                    base64: base64.clone(),
805                    media_type: media_type.clone(),
806                });
807            }
808        }
809
810        let text_parts: Vec<&ContentPart> = parts
811            .iter()
812            .filter(|part| matches!(part, ContentPart::Text(_)))
813            .collect();
814        if text_parts.len() == 1
815            && let ContentPart::Text(text) = text_parts[0]
816        {
817            return serde_json::Value::String(text.text.clone());
818        }
819        if !text_parts.is_empty() {
820            serde_json::to_value(&text_parts).unwrap_or_default()
821        } else {
822            serde_json::Value::Null
823        }
824    });
825
826    if images.is_empty() {
827        Message::tool_result(&data.tool_call_id, result, data.error)
828    } else {
829        Message::tool_result_with_images(&data.tool_call_id, result, images)
830    }
831}