Skip to main content

lash/
core.rs

1use crate::support::*;
2use lash_core::runtime::{
3    ProcessCommand, ProcessEffectOutcome, RuntimeEffectCommand, RuntimeEffectEnvelope,
4    RuntimeEffectKind, RuntimeEffectLocalExecutor, RuntimeEffectOutcome, RuntimeInvocation,
5    RuntimeScope,
6};
7
8type RuntimeHostInstaller =
9    Arc<dyn Fn(RuntimeHostConfig, &PluginHost) -> Result<RuntimeHostConfig> + Send + Sync>;
10
11#[derive(Clone)]
12pub struct LashCore {
13    pub(crate) env: RuntimeEnvironment,
14    pub(crate) policy: SessionPolicy,
15    pub(crate) protocol_factory: Option<Arc<dyn PluginFactory>>,
16    pub(crate) store_factory: Option<Arc<dyn SessionStoreFactory>>,
17    pub(crate) plugin_factories: Arc<Vec<Arc<dyn PluginFactory>>>,
18    pub(crate) provider: Option<ProviderHandle>,
19    pub(crate) live_replay_store: Arc<dyn LiveReplayStore>,
20    pub(crate) runtime_host_installer: Option<RuntimeHostInstaller>,
21    /// Shared resolution of host-owned work drivers. Shared across `LashCore`
22    /// clones so inline process and queued drivers are constructed at most once.
23    pub(crate) work_driver: Arc<InlineWorkDriverSlot>,
24}
25
26/// How a [`LashCore`] resolves its process work driver, decided at `build()`
27/// and shared across clones.
28pub(crate) enum ProcessWorkDriverSetup {
29    /// No process registry is wired; there is nothing to run.
30    None,
31    /// Lazily construct the default inline process driver on first
32    /// `session().open()`. A store factory is required to build the config (the
33    /// worker rebuilds a session runtime per process); a registry with no store
34    /// factory is rejected at build with
35    /// [`EmbedError::ProcessRegistryRequiresStoreFactory`].
36    LazyDefault {
37        config: Box<DurableProcessWorkerConfig>,
38    },
39    /// The host wired an external driver.
40    External { driver: ProcessWorkDriver },
41}
42
43#[derive(Clone, Default)]
44pub(crate) enum ProcessWorkSource {
45    #[default]
46    None,
47    Inline {
48        registry: Arc<dyn ProcessRegistry>,
49    },
50    External(ProcessWorkDriver),
51}
52
53impl ProcessWorkSource {
54    fn process_registry(&self) -> Option<Arc<dyn ProcessRegistry>> {
55        match self {
56            Self::None => None,
57            Self::Inline { registry } => Some(Arc::clone(registry)),
58            Self::External(driver) => Some(driver.process_registry()),
59        }
60    }
61
62    #[cfg(feature = "rlm")]
63    fn has_registry(&self) -> bool {
64        !matches!(self, Self::None)
65    }
66}
67
68#[derive(Clone, Default)]
69pub(crate) enum QueuedWorkSource {
70    None,
71    #[default]
72    LazyDefault,
73    External(QueuedWorkDriver),
74}
75
76pub(crate) enum QueuedWorkDriverSetup {
77    None,
78    LazyDefault {
79        config: Arc<InlineQueuedWorkRunConfig>,
80    },
81    External {
82        driver: QueuedWorkDriver,
83    },
84}
85
86pub(crate) struct InlineWorkDriverSetup {
87    process: ProcessWorkDriverSetup,
88    queued: QueuedWorkDriverSetup,
89}
90
91#[derive(Clone, Default)]
92pub(crate) struct ResolvedWorkDrivers {
93    pub(crate) process: Option<ProcessWorkDriver>,
94    pub(crate) queued: Option<QueuedWorkDriver>,
95    pub(crate) drive_process_on_open: bool,
96}
97
98/// Shared, lazily-initialized host-work state for a [`LashCore`].
99///
100/// The once-guard ([`tokio::sync::OnceCell`]) constructs inline drivers exactly
101/// once across `LashCore` clones, on the first `session().open()` or admin path
102/// that needs them.
103pub(crate) struct InlineWorkDriverSlot {
104    setup: InlineWorkDriverSetup,
105    drivers: tokio::sync::OnceCell<ResolvedWorkDrivers>,
106    phase_probe_slot: Option<lash_core::runtime::RuntimeTurnPhaseProbeSlot>,
107}
108
109impl InlineWorkDriverSlot {
110    fn new(setup: InlineWorkDriverSetup) -> Self {
111        let phase_probe_slot = match &setup.process {
112            ProcessWorkDriverSetup::LazyDefault { config } => {
113                Some(config.turn_phase_probe_slot.clone())
114            }
115            ProcessWorkDriverSetup::None | ProcessWorkDriverSetup::External { .. } => None,
116        };
117        Self {
118            setup,
119            drivers: tokio::sync::OnceCell::new(),
120            phase_probe_slot,
121        }
122    }
123
124    /// Resolve host work drivers for a session host. Idempotent: the once-guard
125    /// ensures inline drivers are constructed once.
126    pub(crate) async fn drivers(&self) -> ResolvedWorkDrivers {
127        self.drivers
128            .get_or_init(|| async {
129                let queued = match &self.setup.queued {
130                    QueuedWorkDriverSetup::None => None,
131                    QueuedWorkDriverSetup::External { driver } => Some(driver.clone()),
132                    QueuedWorkDriverSetup::LazyDefault { config } => Some(QueuedWorkDriver::new(
133                        Arc::new(InlineQueuedWorkRunHandle::new(Arc::clone(config))),
134                    )),
135                };
136                let (process, drive_process_on_open) = match &self.setup.process {
137                    ProcessWorkDriverSetup::None => (None, false),
138                    ProcessWorkDriverSetup::External { driver } => (Some(driver.clone()), false),
139                    ProcessWorkDriverSetup::LazyDefault { config } => {
140                        let mut config = (**config).clone();
141                        if let Some(driver) = queued.clone() {
142                            config = config.with_queued_work_driver(driver);
143                        }
144                        let registry = Arc::clone(&config.process_registry);
145                        let worker = DurableProcessWorker::new(config);
146                        (Some(ProcessWorkDriver::inline(registry, worker)), true)
147                    }
148                };
149                ResolvedWorkDrivers {
150                    process,
151                    queued,
152                    drive_process_on_open,
153                }
154            })
155            .await
156            .clone()
157    }
158
159    pub(crate) fn phase_probe_slot(&self) -> Option<lash_core::runtime::RuntimeTurnPhaseProbeSlot> {
160        self.phase_probe_slot.clone()
161    }
162
163    fn configured_process_work_driver(&self) -> Option<ProcessWorkDriver> {
164        match &self.setup.process {
165            ProcessWorkDriverSetup::External { driver } => Some(driver.clone()),
166            ProcessWorkDriverSetup::None | ProcessWorkDriverSetup::LazyDefault { .. } => None,
167        }
168    }
169
170    fn configured_queued_work_driver(&self) -> Option<QueuedWorkDriver> {
171        match &self.setup.queued {
172            QueuedWorkDriverSetup::External { driver } => Some(driver.clone()),
173            QueuedWorkDriverSetup::None | QueuedWorkDriverSetup::LazyDefault { .. } => None,
174        }
175    }
176}
177
178pub(crate) struct InlineQueuedWorkRunConfig {
179    env: RuntimeEnvironment,
180    policy: SessionPolicy,
181    protocol_factory: Option<Arc<dyn PluginFactory>>,
182    plugin_factories: Arc<Vec<Arc<dyn PluginFactory>>>,
183    store_factory: Arc<dyn SessionStoreFactory>,
184    live_replay_store: Arc<dyn LiveReplayStore>,
185    runtime_host_installer: Option<RuntimeHostInstaller>,
186}
187
188impl InlineQueuedWorkRunConfig {
189    fn new(
190        env: RuntimeEnvironment,
191        policy: SessionPolicy,
192        protocol_factory: Option<Arc<dyn PluginFactory>>,
193        plugin_factories: Arc<Vec<Arc<dyn PluginFactory>>>,
194        store_factory: Arc<dyn SessionStoreFactory>,
195        live_replay_store: Arc<dyn LiveReplayStore>,
196        runtime_host_installer: Option<RuntimeHostInstaller>,
197    ) -> Self {
198        Self {
199            env,
200            policy,
201            protocol_factory,
202            plugin_factories,
203            store_factory,
204            live_replay_store,
205            runtime_host_installer,
206        }
207    }
208}
209
210struct InlineQueuedWorkRunHandle {
211    config: Arc<InlineQueuedWorkRunConfig>,
212}
213
214impl InlineQueuedWorkRunHandle {
215    fn new(config: Arc<InlineQueuedWorkRunConfig>) -> Self {
216        Self { config }
217    }
218}
219
220#[async_trait]
221impl QueuedWorkRunHandle for InlineQueuedWorkRunHandle {
222    async fn run_queued_work(
223        &self,
224        request: QueuedWorkRunRequest,
225    ) -> std::result::Result<(), lash_core::PluginError> {
226        let Some(session_id) = request.session_id else {
227            return Ok(());
228        };
229        let reason = request.reason;
230        let mut policy = self.config.policy.clone();
231        policy.session_id = Some(session_id.clone());
232        let store = self
233            .config
234            .store_factory
235            .create_store(&SessionStoreCreateRequest {
236                session_id: session_id.clone(),
237                relation: SessionRelation::default(),
238                policy: policy.clone(),
239            })
240            .await
241            .map_err(lash_core::PluginError::Session)?;
242        let state = crate::session::load_state_for_residency(
243            self.config.env.residency,
244            &session_id,
245            &policy,
246            store.as_ref(),
247        )
248        .await
249        .map_err(|err| lash_core::PluginError::Session(err.to_string()))?;
250        let plugin_host = build_plugin_host(
251            self.config.protocol_factory.as_ref(),
252            self.config.plugin_factories.as_ref(),
253            Vec::new(),
254        )
255        .map_err(|err| lash_core::PluginError::Session(err.to_string()))?;
256        let mut env = self.config.env.clone();
257        env.core = match &self.config.runtime_host_installer {
258            Some(install) => install(env.core.clone(), &plugin_host)
259                .map_err(|err| lash_core::PluginError::Session(err.to_string()))?,
260            None => env.core.clone(),
261        };
262        env.plugin_host = Some(Arc::new(plugin_host));
263        let effect_host = Arc::clone(&env.core.control.effect_host);
264        let runtime = LashRuntime::from_environment(&env, policy, state, Some(store))
265            .await
266            .map_err(|err| lash_core::PluginError::Session(err.to_string()))?;
267        let handle = RuntimeHandle::with_live_replay_store(
268            runtime,
269            Arc::clone(&self.config.live_replay_store),
270        );
271        let scope = lash_core::ExecutionScope::queue_drain(session_id, reason);
272        let scoped = effect_host
273            .scoped(scope)
274            .map_err(|err| lash_core::PluginError::Session(err.to_string()))?;
275        crate::turn::stream_next_queued_prepared_turn(
276            &handle,
277            crate::turn::TurnSinks::default(),
278            scoped,
279            CancellationToken::new(),
280            &[],
281        )
282        .await
283        .map_err(|err| lash_core::PluginError::Session(err.to_string()))?;
284        Ok(())
285    }
286}
287
288#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
289pub struct SessionDeleteReport {
290    pub session_id: String,
291    pub process: Option<lash_core::ProcessSessionDeleteReport>,
292}
293
294impl LashCore {
295    pub fn builder() -> LashCoreBuilder {
296        LashCoreBuilder::default()
297    }
298
299    pub fn session(&self, session_id: impl Into<String>) -> SessionBuilder {
300        SessionBuilder {
301            core: self.clone(),
302            session_id: session_id.into(),
303            spec: SessionSpec::inherit(),
304            parent_session_id: None,
305            session_execution_owner_id: None,
306            store: None,
307            provider: None,
308            active_plugins: Vec::new(),
309            plugin_factories: Vec::new(),
310        }
311    }
312
313    pub fn triggers(&self) -> crate::admin::CoreTriggerAdmin {
314        crate::admin::CoreTriggerAdmin { core: self.clone() }
315    }
316
317    pub fn processes(&self) -> crate::admin::Processes {
318        crate::admin::Processes { core: self.clone() }
319    }
320
321    pub fn completions(&self) -> crate::admin::Completions {
322        crate::admin::Completions { core: self.clone() }
323    }
324
325    pub fn effect_host(&self) -> Arc<dyn EffectHost> {
326        Arc::clone(&self.env.core.control.effect_host)
327    }
328
329    pub async fn delete_session(
330        &self,
331        session_id: impl AsRef<str>,
332        scoped_effect_controller: ScopedEffectController<'_>,
333    ) -> Result<SessionDeleteReport> {
334        let session_id = session_id.as_ref().to_string();
335        let Some(store_factory) = self.store_factory.as_ref() else {
336            return Err(EmbedError::MissingSessionStoreFactory);
337        };
338        let process = if let Some(process_registry) = self.env.process_registry.as_ref() {
339            let invocation = RuntimeInvocation::effect(
340                RuntimeScope::new(session_id.clone()),
341                format!("process:delete-session:{session_id}"),
342                RuntimeEffectKind::Process,
343                format!("{session_id}:delete-session"),
344            );
345            let outcome = scoped_effect_controller
346                .controller()
347                .execute_effect(
348                    RuntimeEffectEnvelope::new(
349                        invocation,
350                        RuntimeEffectCommand::process(ProcessCommand::DeleteSession {
351                            session_id: session_id.clone(),
352                        }),
353                    ),
354                    RuntimeEffectLocalExecutor::processes(Arc::clone(process_registry)),
355                )
356                .await
357                .map_err(|err| EmbedError::SessionDeleteProcess {
358                    session_id: session_id.clone(),
359                    message: err.to_string(),
360                })?;
361            match outcome {
362                RuntimeEffectOutcome::Process {
363                    result: ProcessEffectOutcome::DeleteSession { report },
364                } => Some(report),
365                other => {
366                    return Err(EmbedError::SessionDeleteProcess {
367                        session_id,
368                        message: format!(
369                            "process delete returned the wrong outcome: {}",
370                            other.kind().as_str()
371                        ),
372                    });
373                }
374            }
375        } else {
376            None
377        };
378        if let Some(trigger_store) = self.env.trigger_store.as_ref() {
379            trigger_store
380                .delete_session_subscriptions(&session_id)
381                .await
382                .map_err(|err| EmbedError::SessionDeleteProcess {
383                    session_id: session_id.clone(),
384                    message: err.to_string(),
385                })?;
386        }
387        self.env
388            .core
389            .control
390            .effect_host
391            .revoke_await_events_for_session(&session_id)
392            .await
393            .map_err(|err| EmbedError::SessionDeleteProcess {
394                session_id: session_id.clone(),
395                message: err.to_string(),
396            })?;
397        store_factory
398            .delete_session(&session_id)
399            .await
400            .map_err(|message| EmbedError::StoreFactory {
401                session_id: session_id.clone(),
402                message,
403            })?;
404        Ok(SessionDeleteReport {
405            session_id,
406            process,
407        })
408    }
409
410    pub fn process_registry(&self) -> Option<Arc<dyn ProcessRegistry>> {
411        self.env.process_registry.as_ref().cloned()
412    }
413
414    pub fn durable_process_worker_config(&self) -> Result<DurableProcessWorkerConfig> {
415        self.durable_process_worker_config_with_plugins(std::iter::empty::<Arc<dyn PluginFactory>>())
416    }
417
418    pub fn durable_process_worker_config_with_plugins(
419        &self,
420        extra_plugin_factories: impl IntoIterator<Item = Arc<dyn PluginFactory>>,
421    ) -> Result<DurableProcessWorkerConfig> {
422        let Some(process_registry) = self.process_registry() else {
423            return Err(EmbedError::MissingProcessRegistry);
424        };
425        let Some(store_factory) = self.store_factory.as_ref() else {
426            return Err(EmbedError::MissingProcessWorkerStoreFactory);
427        };
428        let plugin_host = build_plugin_host(
429            self.protocol_factory.as_ref(),
430            self.plugin_factories.as_ref(),
431            extra_plugin_factories.into_iter().collect(),
432        )?;
433        let runtime_host =
434            self.runtime_host_for_plugin_host(self.env.core.clone(), &plugin_host)?;
435        let mut config = DurableProcessWorkerConfig::new(
436            Arc::new(plugin_host),
437            runtime_host,
438            Arc::clone(store_factory),
439            process_registry,
440        )
441        .with_session_policy(self.policy.clone())
442        .with_residency(self.env.residency);
443        if let Some(trigger_store) = self.env.trigger_store.as_ref() {
444            config = config.with_trigger_store(Arc::clone(trigger_store));
445        }
446        if let Some(driver) = self.work_driver.configured_process_work_driver() {
447            config = config.with_process_work_driver(driver);
448        }
449        if let Some(driver) = self.work_driver.configured_queued_work_driver() {
450            config = config.with_queued_work_driver(driver);
451        }
452        Ok(config)
453    }
454
455    pub(crate) fn runtime_host_for_plugin_host(
456        &self,
457        runtime_host: RuntimeHostConfig,
458        plugin_host: &PluginHost,
459    ) -> Result<RuntimeHostConfig> {
460        match &self.runtime_host_installer {
461            Some(install) => install(runtime_host, plugin_host),
462            None => Ok(runtime_host),
463        }
464    }
465}
466
467fn default_runtime_stack() -> PluginStack {
468    lash_plugin_tool_output_budget::tool_output_budget_stack()
469}
470
471#[derive(Clone)]
472pub struct StandardCore {
473    core: LashCore,
474}
475
476impl StandardCore {
477    pub fn builder() -> StandardCoreBuilder {
478        StandardCoreBuilder {
479            inner: LashCore::builder()
480                .protocol_plugin(Arc::new(
481                    lash_protocol_standard::StandardProtocolPluginFactory::new(),
482                ))
483                .plugins(default_runtime_stack()),
484        }
485    }
486
487    pub fn session(&self, session_id: impl Into<String>) -> SessionBuilder {
488        self.core.session(session_id)
489    }
490
491    pub fn into_inner(self) -> LashCore {
492        self.core
493    }
494}
495
496impl std::ops::Deref for StandardCore {
497    type Target = LashCore;
498
499    fn deref(&self) -> &Self::Target {
500        &self.core
501    }
502}
503
504pub struct StandardCoreBuilder {
505    inner: LashCoreBuilder,
506}
507
508impl StandardCoreBuilder {
509    pub fn build(self) -> Result<StandardCore> {
510        self.inner.build().map(|core| StandardCore { core })
511    }
512}
513
514impl PromptLayerSink for StandardCoreBuilder {
515    fn prompt_layer_mut(&mut self) -> &mut PromptLayer {
516        self.inner.prompt_layer_mut()
517    }
518}
519
520#[cfg(feature = "rlm")]
521#[derive(Clone)]
522pub struct RlmCore {
523    core: LashCore,
524    surface_config: lash_protocol_rlm::RlmProtocolPluginConfig,
525    process_lifecycle_available: bool,
526    lashlang_artifact_store: Arc<dyn lash_lashlang_runtime::LashlangArtifactStore>,
527}
528
529#[cfg(feature = "rlm")]
530impl RlmCore {
531    pub fn builder() -> RlmCoreBuilder {
532        RlmCoreBuilder {
533            inner: LashCore::builder().plugins(default_runtime_stack()),
534            config: lash_protocol_rlm::RlmProtocolPluginConfig::default(),
535            projection_resolver: Arc::new(lash_protocol_rlm::ProjectionRegistry::default()),
536            deferred_tool_resolver: None,
537            lashlang_artifact_store: None,
538            lashlang_execution_sink: None,
539        }
540    }
541
542    pub fn session(&self, session_id: impl Into<String>) -> RlmSessionBuilder {
543        RlmSessionBuilder {
544            builder: self.core.session(session_id),
545            rlm_final_answer_format: None,
546        }
547    }
548
549    pub fn into_inner(self) -> LashCore {
550        self.core
551    }
552
553    pub fn lashlang_compile_surface(
554        &self,
555        request: crate::rlm::LashlangCompileSurfaceRequest,
556    ) -> Result<crate::rlm::LashlangCompileSurface> {
557        let plugin_host = build_plugin_host(
558            self.core.protocol_factory.as_ref(),
559            self.core.plugin_factories.as_ref(),
560            request.extra_plugin_factories,
561        )?;
562        let plugins = plugin_host.build_session_with_parent(
563            &request.session_id,
564            None,
565            None,
566            lash_core::plugin::SessionAuthorityContext {
567                plugin_options: request.execution_env_spec.plugin_options,
568                ..Default::default()
569            },
570        )?;
571        let tool_catalog = plugins.resolved_tool_catalog(&request.session_id)?;
572        let surface = crate::rlm::rlm_lashlang_surface(
573            &self.surface_config,
574            self.process_lifecycle_available,
575        )
576        .with_plugin_extensions(plugin_host.extensions())
577        .map_err(lash_core::PluginError::Registration)?;
578        let host_environment = surface
579            .host_environment(&tool_catalog)
580            .map_err(lash_core::PluginError::Registration)?;
581        Ok(crate::rlm::LashlangCompileSurface {
582            host_environment,
583            tool_catalog,
584            surface,
585        })
586    }
587
588    pub async fn compile_lashlang_module(
589        &self,
590        request: crate::rlm::LashlangModuleCompileRequest,
591    ) -> std::result::Result<crate::rlm::ModuleCompileOutput, crate::rlm::LashlangModuleCompileError>
592    {
593        let surface = self
594            .lashlang_compile_surface(crate::rlm::LashlangCompileSurfaceRequest {
595                session_id: request.session_id,
596                execution_env_spec: request.execution_env_spec,
597                extra_plugin_factories: request.extra_plugin_factories,
598            })
599            .map_err(|err| {
600                lashlang::ModuleCompileError::Link(lashlang::ModuleCompileDiagnostic {
601                    stage: lashlang::ModuleCompileStage::Link,
602                    message: err.to_string(),
603                    offset: None,
604                    span: None,
605                    line: None,
606                    column: None,
607                    diagnostic: Some(err.to_string()),
608                })
609            })?;
610        lashlang::compile_module(lashlang::ModuleCompileRequest {
611            source: &request.source,
612            environment: &surface.host_environment,
613            artifact_store: Some(self.lashlang_artifact_store.as_ref()),
614        })
615        .await
616    }
617}
618
619#[cfg(feature = "rlm")]
620impl std::ops::Deref for RlmCore {
621    type Target = LashCore;
622
623    fn deref(&self) -> &Self::Target {
624        &self.core
625    }
626}
627
628#[cfg(feature = "rlm")]
629pub struct RlmCoreBuilder {
630    inner: LashCoreBuilder,
631    config: lash_protocol_rlm::RlmProtocolPluginConfig,
632    projection_resolver: Arc<dyn lash_protocol_rlm::ProjectionResolver>,
633    deferred_tool_resolver: Option<lash_lashlang_runtime::SharedDeferredToolResolver>,
634    lashlang_artifact_store: Option<Arc<dyn lash_lashlang_runtime::LashlangArtifactStore>>,
635    lashlang_execution_sink: Option<Arc<dyn lash_trace::TraceSink>>,
636}
637
638#[cfg(feature = "rlm")]
639impl RlmCoreBuilder {
640    pub fn rlm_protocol_config(
641        mut self,
642        config: lash_protocol_rlm::RlmProtocolPluginConfig,
643    ) -> Self {
644        self.config = config;
645        self
646    }
647
648    pub fn projection_resolver(
649        mut self,
650        projection_resolver: Arc<dyn lash_protocol_rlm::ProjectionResolver>,
651    ) -> Self {
652        self.projection_resolver = projection_resolver;
653        self
654    }
655
656    /// Wire a host-provided RLM Deferred Tool Resolver. Most hosts ship none;
657    /// the CLI's MCP-discovery example wires one.
658    pub fn deferred_tool_resolver(
659        mut self,
660        resolver: lash_lashlang_runtime::SharedDeferredToolResolver,
661    ) -> Self {
662        self.deferred_tool_resolver = Some(resolver);
663        self
664    }
665
666    pub fn lashlang_artifact_store(
667        mut self,
668        artifact_store: Arc<dyn lash_lashlang_runtime::LashlangArtifactStore>,
669    ) -> Self {
670        self.lashlang_artifact_store = Some(artifact_store);
671        self
672    }
673
674    pub fn lashlang_execution_sink(
675        mut self,
676        lashlang_execution_sink: Arc<dyn lash_trace::TraceSink>,
677    ) -> Self {
678        self.lashlang_execution_sink = Some(lashlang_execution_sink);
679        self
680    }
681
682    pub fn lashlang_execution_jsonl_path(mut self, path: impl Into<std::path::PathBuf>) -> Self {
683        self.lashlang_execution_sink = Some(Arc::new(lash_trace::JsonlTraceSink::new(path.into())));
684        self
685    }
686
687    pub fn build(mut self) -> Result<RlmCore> {
688        let artifact_store = self
689            .lashlang_artifact_store
690            .clone()
691            .ok_or(EmbedError::MissingLashlangArtifactStore)?;
692        if self.inner.effective_session_store_tier() == Some(DurabilityTier::Durable)
693            && artifact_store.durability_tier()
694                == lash_lashlang_runtime::LashlangDurabilityTier::Inline
695        {
696            return Err(EmbedError::DurableStorePeerRequired {
697                facet: "artifact store",
698            });
699        }
700        let process_lifecycle_available = self.inner.process_work_source.has_registry();
701        let config = crate::rlm::rlm_protocol_config(self.config, process_lifecycle_available);
702        let trace_context = self.inner.resolved_trace_context();
703        let mut protocol_factory = lash_protocol_rlm::RlmProtocolPluginFactory::new(config.clone())
704            .with_projection_resolver(Arc::clone(&self.projection_resolver))
705            .with_lashlang_artifact_store(Arc::clone(&artifact_store))
706            .with_lashlang_execution_trace(
707                self.lashlang_execution_sink.clone(),
708                trace_context.clone(),
709            );
710        if let Some(resolver) = self.deferred_tool_resolver.clone() {
711            protocol_factory = protocol_factory.with_deferred_tool_resolver(resolver);
712        }
713        let protocol_factory = Arc::new(protocol_factory);
714        let engine_artifact_store = Arc::clone(&artifact_store);
715        let engine_config = config.clone();
716        let engine_sink = self.lashlang_execution_sink.clone();
717        self.inner.protocol_factory = Some(protocol_factory);
718        self.inner.runtime_host_installer = Some(Arc::new(move |runtime_host, plugin_host| {
719            let surface =
720                crate::rlm::rlm_lashlang_surface(&engine_config, process_lifecycle_available)
721                    .with_plugin_extensions(plugin_host.extensions())
722                    .map_err(lash_core::PluginError::Registration)?;
723            let engine = lash_lashlang_runtime::LashlangProcessEngine::new(
724                Arc::clone(&engine_artifact_store),
725                surface,
726            )
727            .with_execution_trace(
728                engine_sink.clone(),
729                runtime_host.tracing.trace_context.clone(),
730            );
731            Ok(runtime_host.with_process_engine(Arc::new(engine)))
732        }));
733        self.inner.build().map(|core| RlmCore {
734            core,
735            surface_config: config,
736            process_lifecycle_available,
737            lashlang_artifact_store: artifact_store,
738        })
739    }
740}
741
742#[cfg(feature = "rlm")]
743impl PromptLayerSink for RlmCoreBuilder {
744    fn prompt_layer_mut(&mut self) -> &mut PromptLayer {
745        self.inner.prompt_layer_mut()
746    }
747}
748
749macro_rules! forward_core_builder_methods {
750    ($builder:ident) => {
751        impl $builder {
752            pub fn provider(mut self, provider: ProviderHandle) -> Self {
753                self.inner = self.inner.provider(provider);
754                self
755            }
756
757            pub fn model(mut self, model: lash_core::ModelSpec) -> Self {
758                self.inner = self.inner.model(model);
759                self
760            }
761
762            pub fn max_turns(mut self, max_turns: usize) -> Self {
763                self.inner = self.inner.max_turns(max_turns);
764                self
765            }
766
767            pub fn session_spec(mut self, spec: SessionSpec) -> Self {
768                self.inner = self.inner.session_spec(spec);
769                self
770            }
771
772            pub fn store_factory(mut self, store_factory: Arc<dyn SessionStoreFactory>) -> Self {
773                self.inner = self.inner.store_factory(store_factory);
774                self
775            }
776
777            pub fn child_store_factory(
778                mut self,
779                store_factory: Arc<dyn SessionStoreFactory>,
780            ) -> Self {
781                self.inner = self.inner.child_store_factory(store_factory);
782                self
783            }
784
785            pub fn attachment_store(mut self, attachment_store: Arc<dyn AttachmentStore>) -> Self {
786                self.inner = self.inner.attachment_store(attachment_store);
787                self
788            }
789
790            pub fn process_env_store(
791                mut self,
792                process_env_store: Arc<dyn ProcessExecutionEnvStore>,
793            ) -> Self {
794                self.inner = self.inner.process_env_store(process_env_store);
795                self
796            }
797
798            pub fn effect_host(mut self, effect_host: Arc<dyn EffectHost>) -> Self {
799                self.inner = self.inner.effect_host(effect_host);
800                self
801            }
802
803            pub fn tools(mut self, tools: Arc<dyn ToolProvider>) -> Self {
804                self.inner = self.inner.tools(tools);
805                self
806            }
807
808            pub fn plugin(mut self, plugin: Arc<dyn PluginFactory>) -> Self {
809                self.inner = self.inner.plugin(plugin);
810                self
811            }
812
813            pub fn plugins(mut self, stack: PluginStack) -> Self {
814                self.inner = self.inner.plugins(stack);
815                self
816            }
817
818            pub fn configure_plugins(mut self, configure: impl FnOnce(&mut PluginStack)) -> Self {
819                self.inner = self.inner.configure_plugins(configure);
820                self
821            }
822
823            pub fn trace_sink(mut self, trace_sink: Arc<dyn lash_trace::TraceSink>) -> Self {
824                self.inner = self.inner.trace_sink(trace_sink);
825                self
826            }
827
828            pub fn trace_jsonl_path(mut self, path: impl Into<std::path::PathBuf>) -> Self {
829                self.inner = self.inner.trace_jsonl_path(path);
830                self
831            }
832
833            pub fn trace_level(mut self, trace_level: lash_trace::TraceLevel) -> Self {
834                self.inner = self.inner.trace_level(trace_level);
835                self
836            }
837
838            pub fn trace_context(mut self, trace_context: lash_trace::TraceContext) -> Self {
839                self.inner = self.inner.trace_context(trace_context);
840                self
841            }
842
843            pub fn termination(mut self, termination: TerminationPolicy) -> Self {
844                self.inner = self.inner.termination(termination);
845                self
846            }
847
848            pub fn residency(mut self, residency: Residency) -> Self {
849                self.inner = self.inner.residency(residency);
850                self
851            }
852
853            pub fn live_replay_store(
854                mut self,
855                live_replay_store: Arc<dyn LiveReplayStore>,
856            ) -> Self {
857                self.inner = self.inner.live_replay_store(live_replay_store);
858                self
859            }
860
861            pub fn process_registry(mut self, process_registry: Arc<dyn ProcessRegistry>) -> Self {
862                self.inner = self.inner.process_registry(process_registry);
863                self
864            }
865
866            pub fn trigger_store(mut self, store: Arc<dyn lash_core::TriggerStore>) -> Self {
867                self.inner = self.inner.trigger_store(store);
868                self
869            }
870
871            pub fn process_work_driver(mut self, driver: ProcessWorkDriver) -> Self {
872                self.inner = self.inner.process_work_driver(driver);
873                self
874            }
875
876            pub fn queued_work_driver(mut self, driver: QueuedWorkDriver) -> Self {
877                self.inner = self.inner.queued_work_driver(driver);
878                self
879            }
880
881            pub fn disable_queued_work_driver(mut self) -> Self {
882                self.inner = self.inner.disable_queued_work_driver();
883                self
884            }
885
886            pub fn runtime_host_config(mut self, core: RuntimeHostConfig) -> Self {
887                self.inner.runtime_host_config = Some(core);
888                self
889            }
890        }
891    };
892}
893
894forward_core_builder_methods!(StandardCoreBuilder);
895#[cfg(feature = "rlm")]
896forward_core_builder_methods!(RlmCoreBuilder);
897
898#[derive(Default)]
899pub struct LashCoreBuilder {
900    pub(crate) protocol_factory: Option<Arc<dyn PluginFactory>>,
901    session_spec: SessionSpec,
902    provider: Option<ProviderHandle>,
903    pub(crate) store_factory: Option<Arc<dyn SessionStoreFactory>>,
904    child_store_factory: Option<Arc<dyn SessionStoreFactory>>,
905    // `RuntimeHostConfig` has no `Default`: the generic host-owned durability
906    // dependencies must be named. They are collected here and resolved in
907    // `build()`, which errors if any is unset.
908    effect_host: Option<Arc<dyn EffectHost>>,
909    attachment_store: Option<Arc<dyn AttachmentStore>>,
910    process_env_store: Option<Arc<dyn ProcessExecutionEnvStore>>,
911    trigger_store: Option<Arc<dyn lash_core::TriggerStore>>,
912    // Benign core overrides applied on top of the resolved core.
913    prompt: Option<PromptLayer>,
914    trace_sink: Option<Arc<dyn lash_trace::TraceSink>>,
915    trace_level: Option<lash_trace::TraceLevel>,
916    trace_context: Option<lash_trace::TraceContext>,
917    termination: Option<TerminationPolicy>,
918    // Advanced full-config override; used as the base core when present.
919    runtime_host_config: Option<RuntimeHostConfig>,
920    tool_providers: Vec<Arc<dyn ToolProvider>>,
921    plugin_stack: PluginStack,
922    plugin_host: Option<PluginHost>,
923    residency: Option<Residency>,
924    // Single source of truth for process lifecycle support and process-work
925    // consumption.
926    process_work_source: ProcessWorkSource,
927    queued_work_source: QueuedWorkSource,
928    live_replay_store: Option<Arc<dyn LiveReplayStore>>,
929    runtime_host_installer: Option<RuntimeHostInstaller>,
930}
931
932impl LashCoreBuilder {
933    pub fn protocol_plugin(mut self, plugin: Arc<dyn PluginFactory>) -> Self {
934        self.protocol_factory = Some(plugin);
935        self
936    }
937
938    pub fn provider(mut self, provider: ProviderHandle) -> Self {
939        self.session_spec = self.session_spec.provider_id(provider.kind());
940        self.provider = Some(provider);
941        self
942    }
943
944    pub fn model(mut self, model: lash_core::ModelSpec) -> Self {
945        self.session_spec = self.session_spec.model(model);
946        self
947    }
948
949    pub fn max_turns(mut self, max_turns: usize) -> Self {
950        self.session_spec = self.session_spec.max_turns(max_turns);
951        self
952    }
953
954    pub fn session_spec(mut self, spec: SessionSpec) -> Self {
955        self.session_spec = spec;
956        self
957    }
958
959    /// Configure a factory that can create a persistence store for any root
960    /// session opened from this core.
961    ///
962    /// The factory must honor `SessionStoreCreateRequest::session_id` and
963    /// return a store for that specific session. Do not use this to wrap one
964    /// pre-opened root store; pass root-only stores with
965    /// `LashCore::session(...).store(store)` instead.
966    pub fn store_factory(mut self, store_factory: Arc<dyn SessionStoreFactory>) -> Self {
967        self.store_factory = Some(store_factory);
968        self
969    }
970
971    /// Configure the persistence factory used by managed child sessions, such
972    /// as local subagents.
973    ///
974    /// Child factories must return a distinct store bound to the requested
975    /// child session id. Hosts that pass an explicit root store with
976    /// `SessionBuilder::store` should set this when child sessions need
977    /// persistence.
978    pub fn child_store_factory(mut self, store_factory: Arc<dyn SessionStoreFactory>) -> Self {
979        self.child_store_factory = Some(store_factory);
980        self
981    }
982
983    pub fn attachment_store(mut self, attachment_store: Arc<dyn AttachmentStore>) -> Self {
984        self.attachment_store = Some(attachment_store);
985        self
986    }
987
988    pub fn process_env_store(
989        mut self,
990        process_env_store: Arc<dyn ProcessExecutionEnvStore>,
991    ) -> Self {
992        self.process_env_store = Some(process_env_store);
993        self
994    }
995
996    /// Set the deployment effect host — the durability boundary every operation
997    /// crosses. Pass [`InlineEffectHost`](crate::durability::InlineEffectHost)
998    /// for in-process execution, or a workflow-backed host for durable
999    /// execution.
1000    pub fn effect_host(mut self, effect_host: Arc<dyn EffectHost>) -> Self {
1001        self.effect_host = Some(effect_host);
1002        self
1003    }
1004
1005    pub fn tools(mut self, tools: Arc<dyn ToolProvider>) -> Self {
1006        self.tool_providers.push(tools);
1007        self
1008    }
1009
1010    pub fn plugin(mut self, plugin: Arc<dyn PluginFactory>) -> Self {
1011        self.plugin_stack.push(plugin);
1012        self
1013    }
1014
1015    pub fn plugins(mut self, stack: PluginStack) -> Self {
1016        self.plugin_stack = stack;
1017        self
1018    }
1019
1020    pub fn configure_plugins(mut self, configure: impl FnOnce(&mut PluginStack)) -> Self {
1021        configure(&mut self.plugin_stack);
1022        self
1023    }
1024
1025    pub fn trace_sink(mut self, trace_sink: Arc<dyn lash_trace::TraceSink>) -> Self {
1026        self.trace_sink = Some(trace_sink);
1027        self
1028    }
1029
1030    pub fn trace_jsonl_path(mut self, path: impl Into<std::path::PathBuf>) -> Self {
1031        self.trace_sink = Some(Arc::new(lash_trace::JsonlTraceSink::new(path.into())));
1032        self
1033    }
1034
1035    pub fn trace_level(mut self, trace_level: lash_trace::TraceLevel) -> Self {
1036        self.trace_level = Some(trace_level);
1037        self
1038    }
1039
1040    pub fn trace_context(mut self, trace_context: lash_trace::TraceContext) -> Self {
1041        self.trace_context = Some(trace_context);
1042        self
1043    }
1044
1045    pub fn termination(mut self, termination: TerminationPolicy) -> Self {
1046        self.termination = Some(termination);
1047        self
1048    }
1049
1050    pub fn residency(mut self, residency: Residency) -> Self {
1051        self.residency = Some(residency);
1052        self
1053    }
1054
1055    /// Configure the bounded live replay buffer used by session observation
1056    /// cursors. This is best-effort reconnect recovery only; durable state
1057    /// still comes from the session store and [`SessionReadView`].
1058    pub fn live_replay_store(mut self, live_replay_store: Arc<dyn LiveReplayStore>) -> Self {
1059        self.live_replay_store = Some(live_replay_store);
1060        self
1061    }
1062
1063    /// Resolve the runtime host config, requiring the generic host-owned
1064    /// durability dependencies to have been named.
1065    fn resolve_runtime_host_config(&mut self) -> Result<RuntimeHostConfig> {
1066        if let Some(base) = self.runtime_host_config.take() {
1067            return Ok(self.apply_core_overrides(base));
1068        }
1069        let effect_host = self
1070            .effect_host
1071            .take()
1072            .ok_or(EmbedError::MissingEffectHost)?;
1073        let attachment_store = self
1074            .attachment_store
1075            .take()
1076            .ok_or(EmbedError::MissingAttachmentStore)?;
1077        let process_env_store = self
1078            .process_env_store
1079            .take()
1080            .ok_or(EmbedError::MissingProcessEnvStore)?;
1081        let core = RuntimeHostConfig::new(effect_host, attachment_store, process_env_store);
1082        Ok(self.apply_core_overrides(core))
1083    }
1084
1085    /// Apply benign + still-set dependency overrides on top of a base core.
1086    fn apply_core_overrides(&mut self, mut core: RuntimeHostConfig) -> RuntimeHostConfig {
1087        if let Some(effect_host) = self.effect_host.take() {
1088            core.control.effect_host = effect_host;
1089        }
1090        if let Some(attachment_store) = self.attachment_store.take() {
1091            core.durability.attachment_store = attachment_store;
1092        }
1093        if let Some(process_env_store) = self.process_env_store.take() {
1094            core.durability.process_env_store = process_env_store;
1095        }
1096        if let Some(prompt) = self.prompt.take() {
1097            core.prompt.prompt = prompt;
1098        }
1099        if let Some(trace_sink) = self.trace_sink.take() {
1100            core.tracing.trace_sink = Some(trace_sink);
1101        }
1102        if let Some(trace_level) = self.trace_level.take() {
1103            core.tracing.trace_level = trace_level;
1104        }
1105        if let Some(trace_context) = self.trace_context.take() {
1106            core.tracing.trace_context = trace_context;
1107        }
1108        if let Some(termination) = self.termination.take() {
1109            core.control.termination = termination;
1110        }
1111        core
1112    }
1113
1114    /// Validate store peer-coherence of the wired durability dependencies.
1115    ///
1116    /// Durability is established by what the host wired; the per-invocation
1117    /// durable controller is not visible here (the build-time controller is
1118    /// inline by construction), so this checks the stores against each other
1119    /// only — never the controller (see A5 in the durable-first wiring spec):
1120    ///
1121    /// - a durable session store factory requires a durable attachment and
1122    ///   artifact store (they back the same session state);
1123    /// - a durable process registry requires a session store factory that is
1124    ///   itself durable (the registry's process records are meaningless without
1125    ///   a durable session behind them).
1126    fn effective_session_store_tier(&self) -> Option<DurabilityTier> {
1127        self.child_store_factory
1128            .as_ref()
1129            .or(self.store_factory.as_ref())
1130            .map(|factory| factory.durability_tier())
1131    }
1132
1133    #[cfg(feature = "rlm")]
1134    fn resolved_trace_context(&self) -> lash_trace::TraceContext {
1135        self.trace_context
1136            .clone()
1137            .or_else(|| {
1138                self.runtime_host_config
1139                    .as_ref()
1140                    .map(|core| core.tracing.trace_context.clone())
1141            })
1142            .unwrap_or_default()
1143    }
1144
1145    fn ensure_store_peer_coherence(&self) -> Result<()> {
1146        // Match `build()`'s wiring exactly: the session store factory it installs
1147        // is `child_store_factory.or(store_factory)` (child takes precedence, root
1148        // is the fallback). The coherence check must read the tier of that same
1149        // effective factory, or a host that wires only a durable child factory
1150        // (no root) is wrongly rejected though `build()` would wire it durably.
1151        let session_store_tier = self.effective_session_store_tier();
1152        let attachment_tier = self
1153            .attachment_store
1154            .as_ref()
1155            .map(|store| store.persistence().durability_tier())
1156            .or_else(|| {
1157                self.runtime_host_config.as_ref().map(|core| {
1158                    core.durability
1159                        .attachment_store
1160                        .persistence()
1161                        .durability_tier()
1162                })
1163            });
1164        let process_env_tier = self
1165            .process_env_store
1166            .as_ref()
1167            .map(|store| store.durability_tier())
1168            .or_else(|| {
1169                self.runtime_host_config
1170                    .as_ref()
1171                    .map(|core| core.durability.process_env_store.durability_tier())
1172            });
1173        let effect_host_tier = self
1174            .effect_host
1175            .as_ref()
1176            .map(|host| host.durability_tier())
1177            .or_else(|| {
1178                self.runtime_host_config
1179                    .as_ref()
1180                    .map(|core| core.control.effect_host.durability_tier())
1181            });
1182        let trigger_store_tier = self
1183            .trigger_store
1184            .as_ref()
1185            .map(|store| store.durability_tier());
1186
1187        if session_store_tier == Some(DurabilityTier::Durable) {
1188            if attachment_tier == Some(DurabilityTier::Inline) {
1189                return Err(EmbedError::DurableStorePeerRequired {
1190                    facet: "attachment store",
1191                });
1192            }
1193            if process_env_tier == Some(DurabilityTier::Inline) {
1194                return Err(EmbedError::DurableStorePeerRequired {
1195                    facet: "process execution environment store",
1196                });
1197            }
1198        }
1199
1200        if let Some(process_registry) = self.process_work_source.process_registry().as_ref()
1201            && process_registry.durability_tier() == DurabilityTier::Durable
1202        {
1203            if session_store_tier != Some(DurabilityTier::Durable) {
1204                return Err(EmbedError::DurableProcessRegistryRequiresStoreFactory);
1205            }
1206            if trigger_store_tier != Some(DurabilityTier::Durable) {
1207                return Err(EmbedError::DurableStorePeerRequired {
1208                    facet: "trigger store",
1209                });
1210            }
1211            if process_env_tier != Some(DurabilityTier::Durable) {
1212                return Err(EmbedError::DurableStorePeerRequired {
1213                    facet: "process execution environment store",
1214                });
1215            }
1216        }
1217
1218        if trigger_store_tier == Some(DurabilityTier::Durable) {
1219            if session_store_tier != Some(DurabilityTier::Durable) {
1220                return Err(EmbedError::DurableStorePeerRequired {
1221                    facet: "session store factory",
1222                });
1223            }
1224            if process_env_tier != Some(DurabilityTier::Durable) {
1225                return Err(EmbedError::DurableStorePeerRequired {
1226                    facet: "process execution environment store",
1227                });
1228            }
1229            if let Some(process_registry) = self.process_work_source.process_registry().as_ref()
1230                && process_registry.durability_tier() == DurabilityTier::Inline
1231            {
1232                return Err(EmbedError::DurableStorePeerRequired {
1233                    facet: "process registry",
1234                });
1235            }
1236        }
1237
1238        if effect_host_tier == Some(DurabilityTier::Durable) {
1239            if attachment_tier != Some(DurabilityTier::Durable) {
1240                return Err(EmbedError::DurableStorePeerRequired {
1241                    facet: "attachment store",
1242                });
1243            }
1244            if process_env_tier != Some(DurabilityTier::Durable) {
1245                return Err(EmbedError::DurableStorePeerRequired {
1246                    facet: "process execution environment store",
1247                });
1248            }
1249        }
1250
1251        Ok(())
1252    }
1253
1254    pub fn build(mut self) -> Result<LashCore> {
1255        self.ensure_store_peer_coherence()?;
1256        let protocol_factory = self.protocol_factory.clone();
1257        if protocol_factory.is_none() && self.plugin_host.is_none() {
1258            return Err(EmbedError::MissingProtocolPlugin);
1259        }
1260        let provider_id = self
1261            .session_spec
1262            .provider_id
1263            .clone()
1264            .or_else(|| {
1265                self.provider
1266                    .as_ref()
1267                    .map(|provider| provider.kind().to_string())
1268            })
1269            .unwrap_or_default();
1270        let model = self
1271            .session_spec
1272            .model
1273            .clone()
1274            .ok_or(EmbedError::MissingModelSpec)?;
1275
1276        let base_policy = SessionPolicy {
1277            provider_id,
1278            model,
1279            max_turns: self.session_spec.max_turns.flatten(),
1280            ..SessionPolicy::default()
1281        };
1282        let policy = self.session_spec.resolve_against(&base_policy);
1283
1284        let mut core = self.resolve_runtime_host_config()?;
1285        if let Some(provider) = self.provider.clone() {
1286            core.providers.provider_resolver =
1287                Arc::new(lash_core::SingleProviderResolver::new(provider));
1288        }
1289        let plugin_factories = if let Some(plugin_host) = self.plugin_host {
1290            plugin_host.factories().to_vec()
1291        } else {
1292            let mut factories = Vec::new();
1293            if !self.tool_providers.is_empty() {
1294                let spec = self
1295                    .tool_providers
1296                    .into_iter()
1297                    .fold(PluginSpec::new(), PluginSpec::with_tool_provider);
1298                factories.push(Arc::new(StaticPluginFactory::new("embed_tools", spec))
1299                    as Arc<dyn PluginFactory>);
1300            }
1301            factories.extend(self.plugin_stack.into_factories());
1302            factories
1303        };
1304        let default_plugin_host =
1305            build_plugin_host(protocol_factory.as_ref(), &plugin_factories, Vec::new())?;
1306        if let Some(install) = &self.runtime_host_installer {
1307            core = install(core, &default_plugin_host)?;
1308        }
1309
1310        let process_registry = self.process_work_source.process_registry();
1311
1312        // Resolve process work before the process source is moved into the
1313        // environment. The default inline driver's config is built
1314        // eagerly so a missing store factory fails loudly at build, not at
1315        // first open. It is built from the same single-protocol plugin host the
1316        // live runtime uses, so the worker can rebuild a runtime for a process.
1317        let process_work_driver = Self::resolve_process_work_driver(
1318            &self.process_work_source,
1319            &default_plugin_host,
1320            &core,
1321            // The worker rebuilds sessions with the same factory `build()` wires
1322            // below: `child_store_factory.or(store_factory)`.
1323            self.child_store_factory
1324                .as_ref()
1325                .or(self.store_factory.as_ref()),
1326            &policy,
1327            self.residency.unwrap_or_default(),
1328            self.trigger_store.as_ref(),
1329        )?;
1330
1331        let live_replay_clock = Arc::clone(&core.clock);
1332        let mut env_builder = RuntimeEnvironment::builder()
1333            .with_plugin_host(Arc::new(default_plugin_host))
1334            .with_runtime_host_config(core);
1335        if let Some(process_registry) = process_registry.as_ref() {
1336            env_builder = env_builder.with_process_registry(Arc::clone(process_registry));
1337        }
1338        if let Some(residency) = self.residency {
1339            env_builder = env_builder.with_residency(residency);
1340        }
1341        if let Some(child_store_factory) = self
1342            .child_store_factory
1343            .as_ref()
1344            .or(self.store_factory.as_ref())
1345        {
1346            env_builder = env_builder.with_session_store_factory(Arc::clone(child_store_factory));
1347        }
1348        if let Some(trigger_store) = self.trigger_store.as_ref() {
1349            env_builder = env_builder.with_trigger_store(Arc::clone(trigger_store));
1350        }
1351        let live_replay_store = self.live_replay_store.take().unwrap_or_else(|| {
1352            Arc::new(InMemoryLiveReplayStore::with_clock(
1353                lash_core::InMemoryLiveReplayStoreConfig::default(),
1354                live_replay_clock,
1355            ))
1356        });
1357        let env = env_builder.build();
1358        let queued_work_driver = Self::resolve_queued_work_driver(
1359            &self.queued_work_source,
1360            env.clone(),
1361            policy.clone(),
1362            protocol_factory.clone(),
1363            Arc::new(plugin_factories.clone()),
1364            self.child_store_factory
1365                .as_ref()
1366                .or(self.store_factory.as_ref()),
1367            Arc::clone(&live_replay_store),
1368            self.runtime_host_installer.clone(),
1369        );
1370        let work_driver = InlineWorkDriverSetup {
1371            process: process_work_driver,
1372            queued: queued_work_driver,
1373        };
1374
1375        Ok(LashCore {
1376            env,
1377            policy,
1378            store_factory: self.store_factory,
1379            plugin_factories: Arc::new(plugin_factories),
1380            provider: self.provider,
1381            live_replay_store,
1382            protocol_factory,
1383            runtime_host_installer: self.runtime_host_installer,
1384            work_driver: Arc::new(InlineWorkDriverSlot::new(work_driver)),
1385        })
1386    }
1387
1388    /// Decide how a built [`LashCore`] sources its process work driver.
1389    ///
1390    /// - no registry => nothing to run ([`ProcessWorkDriverSetup::None`]);
1391    /// - external driver wired => use it ([`ProcessWorkDriverSetup::External`]);
1392    /// - inline registry wired => lazily construct the default inline driver on first open. Its
1393    ///   [`DurableProcessWorkerConfig`] is built eagerly when a store factory is
1394    ///   present; without one the inline worker cannot rebuild session runtimes.
1395    fn resolve_process_work_driver(
1396        process_work_source: &ProcessWorkSource,
1397        worker_plugin_host: &PluginHost,
1398        core: &RuntimeHostConfig,
1399        store_factory: Option<&Arc<dyn SessionStoreFactory>>,
1400        policy: &SessionPolicy,
1401        residency: lash_core::Residency,
1402        trigger_store: Option<&Arc<dyn lash_core::TriggerStore>>,
1403    ) -> Result<ProcessWorkDriverSetup> {
1404        let process_registry = match process_work_source {
1405            ProcessWorkSource::None => return Ok(ProcessWorkDriverSetup::None),
1406            ProcessWorkSource::External(driver) => {
1407                return Ok(ProcessWorkDriverSetup::External {
1408                    driver: driver.clone(),
1409                });
1410            }
1411            ProcessWorkSource::Inline { registry } => Arc::clone(registry),
1412        };
1413        // The worker rebuilds a session runtime per process, so it needs a store
1414        // factory; without one the default runner could not execute anything, so
1415        // fail loudly rather than silently leave processes unexecuted.
1416        let Some(store_factory) = store_factory else {
1417            return Err(EmbedError::ProcessRegistryRequiresStoreFactory);
1418        };
1419        // The worker rebuilds with the same plugin host the live runtime uses,
1420        // including the protocol plugin that supplies the protocol session
1421        // capability.
1422        let phase_probe_slot = lash_core::runtime::RuntimeTurnPhaseProbeSlot::default();
1423        let config = Box::new(
1424            DurableProcessWorkerConfig::new(
1425                Arc::new(worker_plugin_host.clone()),
1426                core.clone(),
1427                Arc::clone(store_factory),
1428                process_registry,
1429            )
1430            .with_session_policy(policy.clone())
1431            .with_trigger_store(trigger_store.cloned().unwrap_or_else(|| {
1432                Arc::new(lash_core::InMemoryTriggerStore::with_clock(Arc::clone(
1433                    &core.clock,
1434                )))
1435            }))
1436            .with_residency(residency)
1437            .with_turn_phase_probe_slot(phase_probe_slot),
1438        );
1439        Ok(ProcessWorkDriverSetup::LazyDefault { config })
1440    }
1441
1442    #[allow(clippy::too_many_arguments)]
1443    fn resolve_queued_work_driver(
1444        queued_work_source: &QueuedWorkSource,
1445        env: RuntimeEnvironment,
1446        policy: SessionPolicy,
1447        protocol_factory: Option<Arc<dyn PluginFactory>>,
1448        plugin_factories: Arc<Vec<Arc<dyn PluginFactory>>>,
1449        store_factory: Option<&Arc<dyn SessionStoreFactory>>,
1450        live_replay_store: Arc<dyn LiveReplayStore>,
1451        runtime_host_installer: Option<RuntimeHostInstaller>,
1452    ) -> QueuedWorkDriverSetup {
1453        match queued_work_source {
1454            QueuedWorkSource::None => QueuedWorkDriverSetup::None,
1455            QueuedWorkSource::External(driver) => QueuedWorkDriverSetup::External {
1456                driver: driver.clone(),
1457            },
1458            QueuedWorkSource::LazyDefault => match store_factory {
1459                Some(store_factory) => QueuedWorkDriverSetup::LazyDefault {
1460                    config: Arc::new(InlineQueuedWorkRunConfig::new(
1461                        env,
1462                        policy,
1463                        protocol_factory,
1464                        plugin_factories,
1465                        Arc::clone(store_factory),
1466                        live_replay_store,
1467                        runtime_host_installer,
1468                    )),
1469                },
1470                None => QueuedWorkDriverSetup::None,
1471            },
1472        }
1473    }
1474
1475    pub fn advanced(self) -> AdvancedLashCoreBuilder {
1476        AdvancedLashCoreBuilder { builder: self }
1477    }
1478
1479    pub fn process_registry(mut self, process_registry: Arc<dyn ProcessRegistry>) -> Self {
1480        self.process_work_source = ProcessWorkSource::Inline {
1481            registry: process_registry,
1482        };
1483        self
1484    }
1485
1486    pub fn trigger_store(mut self, store: Arc<dyn lash_core::TriggerStore>) -> Self {
1487        self.trigger_store = Some(store);
1488        self
1489    }
1490
1491    /// Configure an externally owned process work runner.
1492    ///
1493    /// Durable hosts construct a [`ProcessWorkDriver`] from the same process
1494    /// registry and wake handle used by their deployment runner, then pass it
1495    /// here. The driver registry becomes the core's process registry and no
1496    /// inline runner is spawned.
1497    pub fn process_work_driver(mut self, driver: ProcessWorkDriver) -> Self {
1498        self.process_work_source = ProcessWorkSource::External(driver);
1499        self
1500    }
1501
1502    /// Configure an externally owned queued-work driver.
1503    pub fn queued_work_driver(mut self, driver: QueuedWorkDriver) -> Self {
1504        self.queued_work_source = QueuedWorkSource::External(driver);
1505        self
1506    }
1507
1508    pub fn disable_queued_work_driver(mut self) -> Self {
1509        self.queued_work_source = QueuedWorkSource::None;
1510        self
1511    }
1512}
1513
1514pub(crate) fn build_plugin_host(
1515    protocol_factory: Option<&Arc<dyn PluginFactory>>,
1516    common_factories: &[Arc<dyn PluginFactory>],
1517    extra_factories: Vec<Arc<dyn PluginFactory>>,
1518) -> Result<PluginHost> {
1519    let mut factories = Vec::with_capacity(
1520        usize::from(protocol_factory.is_some()) + common_factories.len() + extra_factories.len(),
1521    );
1522    if let Some(protocol_factory) = protocol_factory {
1523        factories.push(Arc::clone(protocol_factory));
1524    }
1525    factories.extend(common_factories.iter().cloned());
1526    factories.extend(extra_factories);
1527    Ok(PluginHost::new(factories))
1528}
1529
1530impl PromptLayerSink for LashCoreBuilder {
1531    fn prompt_layer_mut(&mut self) -> &mut PromptLayer {
1532        self.prompt.get_or_insert_with(PromptLayer::new)
1533    }
1534}
1535
1536pub struct AdvancedLashCoreBuilder {
1537    builder: LashCoreBuilder,
1538}
1539
1540impl AdvancedLashCoreBuilder {
1541    pub fn runtime_host_config(mut self, core: lash_core::RuntimeHostConfig) -> Self {
1542        self.builder.runtime_host_config = Some(core);
1543        self
1544    }
1545
1546    pub fn plugin_host(mut self, plugin_host: PluginHost) -> Self {
1547        self.builder.plugin_host = Some(plugin_host);
1548        self
1549    }
1550
1551    pub fn build(self) -> Result<LashCore> {
1552        self.builder.build()
1553    }
1554}