Skip to main content

lash/
core.rs

1use crate::support::*;
2use lash_core::runtime::{
3    ProcessCommand, ProcessEffectOutcome, RuntimeEffectCommand, RuntimeEffectEnvelope,
4    RuntimeEffectKind, RuntimeEffectLocalExecutor, RuntimeEffectOutcome, RuntimeInvocation,
5    RuntimeScope,
6};
7
8type RuntimeHostInstaller =
9    Arc<dyn Fn(RuntimeHostConfig, &PluginHost) -> Result<RuntimeHostConfig> + Send + Sync>;
10
11#[derive(Clone)]
12pub struct LashCore {
13    pub(crate) env: RuntimeEnvironment,
14    pub(crate) policy: SessionPolicy,
15    pub(crate) protocol_factory: Option<Arc<dyn PluginFactory>>,
16    pub(crate) store_factory: Option<Arc<dyn SessionStoreFactory>>,
17    pub(crate) plugin_factories: Arc<Vec<Arc<dyn PluginFactory>>>,
18    pub(crate) provider: Option<ProviderHandle>,
19    pub(crate) live_replay_store: Arc<dyn LiveReplayStore>,
20    pub(crate) runtime_host_installer: Option<RuntimeHostInstaller>,
21    /// Shared resolution of host-owned work drivers. Shared across `LashCore`
22    /// clones so inline process and queued drivers are constructed at most once.
23    pub(crate) work_driver: Arc<InlineWorkDriverSlot>,
24}
25
26/// How a [`LashCore`] resolves its process work driver, decided at `build()`
27/// and shared across clones.
28pub(crate) enum ProcessWorkDriverSetup {
29    /// No process registry is wired; there is nothing to run.
30    None,
31    /// Lazily construct the default inline process driver on first
32    /// `session().open()`. A store factory is required to build the config (the
33    /// worker rebuilds a session runtime per process); a registry with no store
34    /// factory is rejected at build with
35    /// [`EmbedError::ProcessRegistryRequiresStoreFactory`].
36    LazyDefault {
37        config: Box<DurableProcessWorkerConfig>,
38    },
39    /// The host wired an external driver.
40    External { driver: ProcessWorkDriver },
41}
42
43#[derive(Clone, Default)]
44pub(crate) enum ProcessWorkSource {
45    #[default]
46    None,
47    Inline {
48        registry: Arc<dyn ProcessRegistry>,
49    },
50    External(ProcessWorkDriver),
51}
52
53impl ProcessWorkSource {
54    fn process_registry(&self) -> Option<Arc<dyn ProcessRegistry>> {
55        match self {
56            Self::None => None,
57            Self::Inline { registry } => Some(Arc::clone(registry)),
58            Self::External(driver) => Some(driver.process_registry()),
59        }
60    }
61
62    #[cfg(feature = "rlm")]
63    fn has_registry(&self) -> bool {
64        !matches!(self, Self::None)
65    }
66}
67
68#[derive(Clone)]
69pub(crate) enum QueuedWorkSource {
70    None,
71    LazyDefault,
72    External(QueuedWorkDriver),
73}
74
75impl Default for QueuedWorkSource {
76    fn default() -> Self {
77        Self::LazyDefault
78    }
79}
80
81pub(crate) enum QueuedWorkDriverSetup {
82    None,
83    LazyDefault {
84        config: Arc<InlineQueuedWorkRunConfig>,
85    },
86    External {
87        driver: QueuedWorkDriver,
88    },
89}
90
91pub(crate) struct InlineWorkDriverSetup {
92    process: ProcessWorkDriverSetup,
93    queued: QueuedWorkDriverSetup,
94}
95
96#[derive(Clone, Default)]
97pub(crate) struct ResolvedWorkDrivers {
98    pub(crate) process: Option<ProcessWorkDriver>,
99    pub(crate) queued: Option<QueuedWorkDriver>,
100    pub(crate) drive_process_on_open: bool,
101}
102
103/// Shared, lazily-initialized host-work state for a [`LashCore`].
104///
105/// The once-guard ([`tokio::sync::OnceCell`]) constructs inline drivers exactly
106/// once across `LashCore` clones, on the first `session().open()` or admin path
107/// that needs them.
108pub(crate) struct InlineWorkDriverSlot {
109    setup: InlineWorkDriverSetup,
110    drivers: tokio::sync::OnceCell<ResolvedWorkDrivers>,
111    phase_probe_slot: Option<lash_core::runtime::RuntimeTurnPhaseProbeSlot>,
112}
113
114impl InlineWorkDriverSlot {
115    fn new(setup: InlineWorkDriverSetup) -> Self {
116        let phase_probe_slot = match &setup.process {
117            ProcessWorkDriverSetup::LazyDefault { config } => {
118                Some(config.turn_phase_probe_slot.clone())
119            }
120            ProcessWorkDriverSetup::None | ProcessWorkDriverSetup::External { .. } => None,
121        };
122        Self {
123            setup,
124            drivers: tokio::sync::OnceCell::new(),
125            phase_probe_slot,
126        }
127    }
128
129    /// Resolve host work drivers for a session host. Idempotent: the once-guard
130    /// ensures inline drivers are constructed once.
131    pub(crate) async fn drivers(&self) -> ResolvedWorkDrivers {
132        self.drivers
133            .get_or_init(|| async {
134                let queued = match &self.setup.queued {
135                    QueuedWorkDriverSetup::None => None,
136                    QueuedWorkDriverSetup::External { driver } => Some(driver.clone()),
137                    QueuedWorkDriverSetup::LazyDefault { config } => Some(QueuedWorkDriver::new(
138                        Arc::new(InlineQueuedWorkRunHandle::new(Arc::clone(config))),
139                    )),
140                };
141                let (process, drive_process_on_open) = match &self.setup.process {
142                    ProcessWorkDriverSetup::None => (None, false),
143                    ProcessWorkDriverSetup::External { driver } => (Some(driver.clone()), false),
144                    ProcessWorkDriverSetup::LazyDefault { config } => {
145                        let mut config = (**config).clone();
146                        if let Some(driver) = queued.clone() {
147                            config = config.with_queued_work_driver(driver);
148                        }
149                        let registry = Arc::clone(&config.process_registry);
150                        let worker = DurableProcessWorker::new(config);
151                        (Some(ProcessWorkDriver::inline(registry, worker)), true)
152                    }
153                };
154                ResolvedWorkDrivers {
155                    process,
156                    queued,
157                    drive_process_on_open,
158                }
159            })
160            .await
161            .clone()
162    }
163
164    pub(crate) fn phase_probe_slot(&self) -> Option<lash_core::runtime::RuntimeTurnPhaseProbeSlot> {
165        self.phase_probe_slot.clone()
166    }
167
168    fn configured_process_work_driver(&self) -> Option<ProcessWorkDriver> {
169        match &self.setup.process {
170            ProcessWorkDriverSetup::External { driver } => Some(driver.clone()),
171            ProcessWorkDriverSetup::None | ProcessWorkDriverSetup::LazyDefault { .. } => None,
172        }
173    }
174
175    fn configured_queued_work_driver(&self) -> Option<QueuedWorkDriver> {
176        match &self.setup.queued {
177            QueuedWorkDriverSetup::External { driver } => Some(driver.clone()),
178            QueuedWorkDriverSetup::None | QueuedWorkDriverSetup::LazyDefault { .. } => None,
179        }
180    }
181}
182
183pub(crate) struct InlineQueuedWorkRunConfig {
184    env: RuntimeEnvironment,
185    policy: SessionPolicy,
186    protocol_factory: Option<Arc<dyn PluginFactory>>,
187    plugin_factories: Arc<Vec<Arc<dyn PluginFactory>>>,
188    store_factory: Arc<dyn SessionStoreFactory>,
189    live_replay_store: Arc<dyn LiveReplayStore>,
190    runtime_host_installer: Option<RuntimeHostInstaller>,
191}
192
193impl InlineQueuedWorkRunConfig {
194    fn new(
195        env: RuntimeEnvironment,
196        policy: SessionPolicy,
197        protocol_factory: Option<Arc<dyn PluginFactory>>,
198        plugin_factories: Arc<Vec<Arc<dyn PluginFactory>>>,
199        store_factory: Arc<dyn SessionStoreFactory>,
200        live_replay_store: Arc<dyn LiveReplayStore>,
201        runtime_host_installer: Option<RuntimeHostInstaller>,
202    ) -> Self {
203        Self {
204            env,
205            policy,
206            protocol_factory,
207            plugin_factories,
208            store_factory,
209            live_replay_store,
210            runtime_host_installer,
211        }
212    }
213}
214
215struct InlineQueuedWorkRunHandle {
216    config: Arc<InlineQueuedWorkRunConfig>,
217}
218
219impl InlineQueuedWorkRunHandle {
220    fn new(config: Arc<InlineQueuedWorkRunConfig>) -> Self {
221        Self { config }
222    }
223}
224
225#[async_trait]
226impl QueuedWorkRunHandle for InlineQueuedWorkRunHandle {
227    async fn run_queued_work(
228        &self,
229        request: QueuedWorkRunRequest,
230    ) -> std::result::Result<(), lash_core::PluginError> {
231        let Some(session_id) = request.session_id else {
232            return Ok(());
233        };
234        let reason = request.reason;
235        let mut policy = self.config.policy.clone();
236        policy.session_id = Some(session_id.clone());
237        let store = self
238            .config
239            .store_factory
240            .create_store(&SessionStoreCreateRequest {
241                session_id: session_id.clone(),
242                relation: SessionRelation::default(),
243                policy: policy.clone(),
244            })
245            .await
246            .map_err(lash_core::PluginError::Session)?;
247        let state = crate::session::load_state_for_residency(
248            self.config.env.residency,
249            &session_id,
250            &policy,
251            store.as_ref(),
252        )
253        .await
254        .map_err(|err| lash_core::PluginError::Session(err.to_string()))?;
255        let plugin_host = build_plugin_host(
256            self.config.protocol_factory.as_ref(),
257            self.config.plugin_factories.as_ref(),
258            Vec::new(),
259        )
260        .map_err(|err| lash_core::PluginError::Session(err.to_string()))?;
261        let mut env = self.config.env.clone();
262        env.core = match &self.config.runtime_host_installer {
263            Some(install) => install(env.core.clone(), &plugin_host)
264                .map_err(|err| lash_core::PluginError::Session(err.to_string()))?,
265            None => env.core.clone(),
266        };
267        env.plugin_host = Some(Arc::new(plugin_host));
268        let effect_host = Arc::clone(&env.core.control.effect_host);
269        let runtime = LashRuntime::from_environment(&env, policy, state, Some(store))
270            .await
271            .map_err(|err| lash_core::PluginError::Session(err.to_string()))?;
272        let handle = RuntimeHandle::with_live_replay_store(
273            runtime,
274            Arc::clone(&self.config.live_replay_store),
275        );
276        let scope = lash_core::ExecutionScope::queue_drain(session_id, reason);
277        let scoped = effect_host
278            .scoped(scope)
279            .map_err(|err| lash_core::PluginError::Session(err.to_string()))?;
280        crate::turn::stream_next_queued_prepared_turn(
281            &handle,
282            crate::turn::TurnSinks::default(),
283            scoped,
284            CancellationToken::new(),
285            &[],
286        )
287        .await
288        .map_err(|err| lash_core::PluginError::Session(err.to_string()))?;
289        Ok(())
290    }
291}
292
293#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
294pub struct SessionDeleteReport {
295    pub session_id: String,
296    pub process: Option<lash_core::ProcessSessionDeleteReport>,
297}
298
299impl LashCore {
300    pub fn builder() -> LashCoreBuilder {
301        LashCoreBuilder::default()
302    }
303
304    pub fn session(&self, session_id: impl Into<String>) -> SessionBuilder {
305        SessionBuilder {
306            core: self.clone(),
307            session_id: session_id.into(),
308            spec: SessionSpec::inherit(),
309            parent_session_id: None,
310            store: None,
311            provider: None,
312            active_plugins: Vec::new(),
313            plugin_factories: Vec::new(),
314        }
315    }
316
317    pub fn triggers(&self) -> crate::admin::CoreTriggerAdmin {
318        crate::admin::CoreTriggerAdmin { core: self.clone() }
319    }
320
321    pub fn processes(&self) -> crate::admin::Processes {
322        crate::admin::Processes { core: self.clone() }
323    }
324
325    pub fn completions(&self) -> crate::admin::Completions {
326        crate::admin::Completions { core: self.clone() }
327    }
328
329    pub fn effect_host(&self) -> Arc<dyn EffectHost> {
330        Arc::clone(&self.env.core.control.effect_host)
331    }
332
333    pub async fn delete_session(
334        &self,
335        session_id: impl AsRef<str>,
336        scoped_effect_controller: ScopedEffectController<'_>,
337    ) -> Result<SessionDeleteReport> {
338        let session_id = session_id.as_ref().to_string();
339        let Some(store_factory) = self.store_factory.as_ref() else {
340            return Err(EmbedError::MissingSessionStoreFactory);
341        };
342        let process = if let Some(process_registry) = self.env.process_registry.as_ref() {
343            let invocation = RuntimeInvocation::effect(
344                RuntimeScope::new(session_id.clone()),
345                format!("process:delete-session:{session_id}"),
346                RuntimeEffectKind::Process,
347                format!("{session_id}:delete-session"),
348            );
349            let outcome = scoped_effect_controller
350                .controller()
351                .execute_effect(
352                    RuntimeEffectEnvelope::new(
353                        invocation,
354                        RuntimeEffectCommand::process(ProcessCommand::DeleteSession {
355                            session_id: session_id.clone(),
356                        }),
357                    ),
358                    RuntimeEffectLocalExecutor::processes(Arc::clone(process_registry)),
359                )
360                .await
361                .map_err(|err| EmbedError::SessionDeleteProcess {
362                    session_id: session_id.clone(),
363                    message: err.to_string(),
364                })?;
365            match outcome {
366                RuntimeEffectOutcome::Process {
367                    result: ProcessEffectOutcome::DeleteSession { report },
368                } => Some(report),
369                other => {
370                    return Err(EmbedError::SessionDeleteProcess {
371                        session_id,
372                        message: format!(
373                            "process delete returned the wrong outcome: {}",
374                            other.kind().as_str()
375                        ),
376                    });
377                }
378            }
379        } else {
380            None
381        };
382        if let Some(trigger_store) = self.env.trigger_store.as_ref() {
383            trigger_store
384                .delete_session_subscriptions(&session_id)
385                .await
386                .map_err(|err| EmbedError::SessionDeleteProcess {
387                    session_id: session_id.clone(),
388                    message: err.to_string(),
389                })?;
390        }
391        self.env
392            .core
393            .control
394            .effect_host
395            .revoke_await_events_for_session(&session_id)
396            .await
397            .map_err(|err| EmbedError::SessionDeleteProcess {
398                session_id: session_id.clone(),
399                message: err.to_string(),
400            })?;
401        store_factory
402            .delete_session(&session_id)
403            .await
404            .map_err(|message| EmbedError::StoreFactory {
405                session_id: session_id.clone(),
406                message,
407            })?;
408        Ok(SessionDeleteReport {
409            session_id,
410            process,
411        })
412    }
413
414    pub fn process_registry(&self) -> Option<Arc<dyn ProcessRegistry>> {
415        self.env.process_registry.as_ref().cloned()
416    }
417
418    pub fn durable_process_worker_config(&self) -> Result<DurableProcessWorkerConfig> {
419        self.durable_process_worker_config_with_plugins(std::iter::empty::<Arc<dyn PluginFactory>>())
420    }
421
422    pub fn durable_process_worker_config_with_plugins(
423        &self,
424        extra_plugin_factories: impl IntoIterator<Item = Arc<dyn PluginFactory>>,
425    ) -> Result<DurableProcessWorkerConfig> {
426        let Some(process_registry) = self.process_registry() else {
427            return Err(EmbedError::MissingProcessRegistry);
428        };
429        let Some(store_factory) = self.store_factory.as_ref() else {
430            return Err(EmbedError::MissingProcessWorkerStoreFactory);
431        };
432        let plugin_host = build_plugin_host(
433            self.protocol_factory.as_ref(),
434            self.plugin_factories.as_ref(),
435            extra_plugin_factories.into_iter().collect(),
436        )?;
437        let runtime_host =
438            self.runtime_host_for_plugin_host(self.env.core.clone(), &plugin_host)?;
439        let mut config = DurableProcessWorkerConfig::new(
440            Arc::new(plugin_host),
441            runtime_host,
442            Arc::clone(store_factory),
443            process_registry,
444        )
445        .with_session_policy(self.policy.clone())
446        .with_residency(self.env.residency);
447        if let Some(trigger_store) = self.env.trigger_store.as_ref() {
448            config = config.with_trigger_store(Arc::clone(trigger_store));
449        }
450        if let Some(driver) = self.work_driver.configured_process_work_driver() {
451            config = config.with_process_work_driver(driver);
452        }
453        if let Some(driver) = self.work_driver.configured_queued_work_driver() {
454            config = config.with_queued_work_driver(driver);
455        }
456        Ok(config)
457    }
458
459    pub(crate) fn runtime_host_for_plugin_host(
460        &self,
461        runtime_host: RuntimeHostConfig,
462        plugin_host: &PluginHost,
463    ) -> Result<RuntimeHostConfig> {
464        match &self.runtime_host_installer {
465            Some(install) => install(runtime_host, plugin_host),
466            None => Ok(runtime_host),
467        }
468    }
469}
470
471fn default_runtime_stack() -> PluginStack {
472    lash_plugin_tool_output_budget::tool_output_budget_stack()
473}
474
475#[derive(Clone)]
476pub struct StandardCore {
477    core: LashCore,
478}
479
480impl StandardCore {
481    pub fn builder() -> StandardCoreBuilder {
482        StandardCoreBuilder {
483            inner: LashCore::builder()
484                .protocol_plugin(Arc::new(
485                    lash_protocol_standard::StandardProtocolPluginFactory::new(),
486                ))
487                .plugins(default_runtime_stack()),
488        }
489    }
490
491    pub fn session(&self, session_id: impl Into<String>) -> SessionBuilder {
492        self.core.session(session_id)
493    }
494
495    pub fn into_inner(self) -> LashCore {
496        self.core
497    }
498}
499
500impl std::ops::Deref for StandardCore {
501    type Target = LashCore;
502
503    fn deref(&self) -> &Self::Target {
504        &self.core
505    }
506}
507
508pub struct StandardCoreBuilder {
509    inner: LashCoreBuilder,
510}
511
512impl StandardCoreBuilder {
513    pub fn build(self) -> Result<StandardCore> {
514        self.inner.build().map(|core| StandardCore { core })
515    }
516}
517
518impl PromptLayerSink for StandardCoreBuilder {
519    fn prompt_layer_mut(&mut self) -> &mut PromptLayer {
520        self.inner.prompt_layer_mut()
521    }
522}
523
524#[cfg(feature = "rlm")]
525#[derive(Clone)]
526pub struct RlmCore {
527    core: LashCore,
528    surface_config: lash_protocol_rlm::RlmProtocolPluginConfig,
529    process_lifecycle_available: bool,
530    lashlang_artifact_store: Arc<dyn lash_lashlang_runtime::LashlangArtifactStore>,
531}
532
533#[cfg(feature = "rlm")]
534impl RlmCore {
535    pub fn builder() -> RlmCoreBuilder {
536        RlmCoreBuilder {
537            inner: LashCore::builder().plugins(default_runtime_stack()),
538            config: lash_protocol_rlm::RlmProtocolPluginConfig::default(),
539            projection_resolver: Arc::new(lash_protocol_rlm::ProjectionRegistry::default()),
540            lashlang_artifact_store: None,
541            lashlang_execution_sink: None,
542        }
543    }
544
545    pub fn session(&self, session_id: impl Into<String>) -> RlmSessionBuilder {
546        RlmSessionBuilder {
547            builder: self.core.session(session_id),
548            rlm_final_answer_format: None,
549        }
550    }
551
552    pub fn into_inner(self) -> LashCore {
553        self.core
554    }
555
556    pub fn lashlang_compile_surface(
557        &self,
558        request: crate::rlm::LashlangCompileSurfaceRequest,
559    ) -> Result<crate::rlm::LashlangCompileSurface> {
560        let plugin_host = build_plugin_host(
561            self.core.protocol_factory.as_ref(),
562            self.core.plugin_factories.as_ref(),
563            request.extra_plugin_factories,
564        )?;
565        let plugins = plugin_host
566            .build_session_with_parent(
567                &request.session_id,
568                None,
569                None,
570                lash_core::plugin::SessionAuthorityContext {
571                    plugin_options: request.execution_env_spec.plugin_options,
572                    ..Default::default()
573                },
574            )
575            .map_err(lash_core::PluginError::from)?;
576        let tool_catalog = plugins.resolved_tool_catalog(&request.session_id)?;
577        let surface = crate::rlm::rlm_lashlang_surface(
578            &self.surface_config,
579            self.process_lifecycle_available,
580        )
581        .with_plugin_extensions(plugin_host.extensions())
582        .map_err(lash_core::PluginError::Registration)?;
583        let host_environment = surface
584            .host_environment(&tool_catalog)
585            .map_err(lash_core::PluginError::Registration)?;
586        Ok(crate::rlm::LashlangCompileSurface {
587            host_environment,
588            tool_catalog,
589            surface,
590        })
591    }
592
593    pub async fn compile_lashlang_module(
594        &self,
595        request: crate::rlm::LashlangModuleCompileRequest,
596    ) -> std::result::Result<crate::rlm::ModuleCompileOutput, crate::rlm::LashlangModuleCompileError>
597    {
598        let surface = self
599            .lashlang_compile_surface(crate::rlm::LashlangCompileSurfaceRequest {
600                session_id: request.session_id,
601                execution_env_spec: request.execution_env_spec,
602                extra_plugin_factories: request.extra_plugin_factories,
603            })
604            .map_err(|err| {
605                lashlang::ModuleCompileError::Link(lashlang::ModuleCompileDiagnostic {
606                    stage: lashlang::ModuleCompileStage::Link,
607                    message: err.to_string(),
608                    offset: None,
609                    span: None,
610                    line: None,
611                    column: None,
612                    diagnostic: Some(err.to_string()),
613                })
614            })?;
615        lashlang::compile_module(lashlang::ModuleCompileRequest {
616            source: &request.source,
617            environment: &surface.host_environment,
618            artifact_store: Some(self.lashlang_artifact_store.as_ref()),
619        })
620        .await
621    }
622}
623
624#[cfg(feature = "rlm")]
625impl std::ops::Deref for RlmCore {
626    type Target = LashCore;
627
628    fn deref(&self) -> &Self::Target {
629        &self.core
630    }
631}
632
633#[cfg(feature = "rlm")]
634pub struct RlmCoreBuilder {
635    inner: LashCoreBuilder,
636    config: lash_protocol_rlm::RlmProtocolPluginConfig,
637    projection_resolver: Arc<dyn lash_protocol_rlm::ProjectionResolver>,
638    lashlang_artifact_store: Option<Arc<dyn lash_lashlang_runtime::LashlangArtifactStore>>,
639    lashlang_execution_sink: Option<Arc<dyn lash_trace::TraceSink>>,
640}
641
642#[cfg(feature = "rlm")]
643impl RlmCoreBuilder {
644    pub fn rlm_protocol_config(
645        mut self,
646        config: lash_protocol_rlm::RlmProtocolPluginConfig,
647    ) -> Self {
648        self.config = config;
649        self
650    }
651
652    pub fn projection_resolver(
653        mut self,
654        projection_resolver: Arc<dyn lash_protocol_rlm::ProjectionResolver>,
655    ) -> Self {
656        self.projection_resolver = projection_resolver;
657        self
658    }
659
660    pub fn lashlang_artifact_store(
661        mut self,
662        artifact_store: Arc<dyn lash_lashlang_runtime::LashlangArtifactStore>,
663    ) -> Self {
664        self.lashlang_artifact_store = Some(artifact_store);
665        self
666    }
667
668    pub fn lashlang_execution_sink(
669        mut self,
670        lashlang_execution_sink: Arc<dyn lash_trace::TraceSink>,
671    ) -> Self {
672        self.lashlang_execution_sink = Some(lashlang_execution_sink);
673        self
674    }
675
676    pub fn lashlang_execution_jsonl_path(mut self, path: impl Into<std::path::PathBuf>) -> Self {
677        self.lashlang_execution_sink = Some(Arc::new(lash_trace::JsonlTraceSink::new(path.into())));
678        self
679    }
680
681    pub fn build(mut self) -> Result<RlmCore> {
682        let artifact_store = self
683            .lashlang_artifact_store
684            .clone()
685            .ok_or(EmbedError::MissingLashlangArtifactStore)?;
686        if self.inner.effective_session_store_tier() == Some(DurabilityTier::Durable)
687            && artifact_store.durability_tier()
688                == lash_lashlang_runtime::LashlangDurabilityTier::Inline
689        {
690            return Err(EmbedError::DurableStorePeerRequired {
691                facet: "artifact store",
692            });
693        }
694        let process_lifecycle_available = self.inner.process_work_source.has_registry();
695        let config = crate::rlm::rlm_protocol_config(self.config, process_lifecycle_available);
696        let trace_context = self.inner.resolved_trace_context();
697        let protocol_factory = Arc::new(
698            lash_protocol_rlm::RlmProtocolPluginFactory::new(config.clone())
699                .with_projection_resolver(Arc::clone(&self.projection_resolver))
700                .with_lashlang_artifact_store(Arc::clone(&artifact_store))
701                .with_lashlang_execution_trace(
702                    self.lashlang_execution_sink.clone(),
703                    trace_context.clone(),
704                ),
705        );
706        let engine_artifact_store = Arc::clone(&artifact_store);
707        let engine_config = config.clone();
708        let engine_sink = self.lashlang_execution_sink.clone();
709        self.inner.protocol_factory = Some(protocol_factory);
710        self.inner.runtime_host_installer = Some(Arc::new(move |runtime_host, plugin_host| {
711            let surface =
712                crate::rlm::rlm_lashlang_surface(&engine_config, process_lifecycle_available)
713                    .with_plugin_extensions(plugin_host.extensions())
714                    .map_err(lash_core::PluginError::Registration)?;
715            let engine = lash_lashlang_runtime::LashlangProcessEngine::new(
716                Arc::clone(&engine_artifact_store),
717                surface,
718            )
719            .with_execution_trace(
720                engine_sink.clone(),
721                runtime_host.tracing.trace_context.clone(),
722            );
723            Ok(runtime_host.with_process_engine(Arc::new(engine)))
724        }));
725        self.inner.build().map(|core| RlmCore {
726            core,
727            surface_config: config,
728            process_lifecycle_available,
729            lashlang_artifact_store: artifact_store,
730        })
731    }
732}
733
734#[cfg(feature = "rlm")]
735impl PromptLayerSink for RlmCoreBuilder {
736    fn prompt_layer_mut(&mut self) -> &mut PromptLayer {
737        self.inner.prompt_layer_mut()
738    }
739}
740
741macro_rules! forward_core_builder_methods {
742    ($builder:ident) => {
743        impl $builder {
744            pub fn provider(mut self, provider: ProviderHandle) -> Self {
745                self.inner = self.inner.provider(provider);
746                self
747            }
748
749            pub fn model(mut self, model: lash_core::ModelSpec) -> Self {
750                self.inner = self.inner.model(model);
751                self
752            }
753
754            pub fn max_turns(mut self, max_turns: usize) -> Self {
755                self.inner = self.inner.max_turns(max_turns);
756                self
757            }
758
759            pub fn session_spec(mut self, spec: SessionSpec) -> Self {
760                self.inner = self.inner.session_spec(spec);
761                self
762            }
763
764            pub fn store_factory(mut self, store_factory: Arc<dyn SessionStoreFactory>) -> Self {
765                self.inner = self.inner.store_factory(store_factory);
766                self
767            }
768
769            pub fn child_store_factory(
770                mut self,
771                store_factory: Arc<dyn SessionStoreFactory>,
772            ) -> Self {
773                self.inner = self.inner.child_store_factory(store_factory);
774                self
775            }
776
777            pub fn attachment_store(mut self, attachment_store: Arc<dyn AttachmentStore>) -> Self {
778                self.inner = self.inner.attachment_store(attachment_store);
779                self
780            }
781
782            pub fn process_env_store(
783                mut self,
784                process_env_store: Arc<dyn ProcessExecutionEnvStore>,
785            ) -> Self {
786                self.inner = self.inner.process_env_store(process_env_store);
787                self
788            }
789
790            pub fn effect_host(mut self, effect_host: Arc<dyn EffectHost>) -> Self {
791                self.inner = self.inner.effect_host(effect_host);
792                self
793            }
794
795            pub fn tools(mut self, tools: Arc<dyn ToolProvider>) -> Self {
796                self.inner = self.inner.tools(tools);
797                self
798            }
799
800            pub fn plugin(mut self, plugin: Arc<dyn PluginFactory>) -> Self {
801                self.inner = self.inner.plugin(plugin);
802                self
803            }
804
805            pub fn plugins(mut self, stack: PluginStack) -> Self {
806                self.inner = self.inner.plugins(stack);
807                self
808            }
809
810            pub fn configure_plugins(mut self, configure: impl FnOnce(&mut PluginStack)) -> Self {
811                self.inner = self.inner.configure_plugins(configure);
812                self
813            }
814
815            pub fn trace_sink(mut self, trace_sink: Arc<dyn lash_trace::TraceSink>) -> Self {
816                self.inner = self.inner.trace_sink(trace_sink);
817                self
818            }
819
820            pub fn trace_jsonl_path(mut self, path: impl Into<std::path::PathBuf>) -> Self {
821                self.inner = self.inner.trace_jsonl_path(path);
822                self
823            }
824
825            pub fn trace_level(mut self, trace_level: lash_trace::TraceLevel) -> Self {
826                self.inner = self.inner.trace_level(trace_level);
827                self
828            }
829
830            pub fn trace_context(mut self, trace_context: lash_trace::TraceContext) -> Self {
831                self.inner = self.inner.trace_context(trace_context);
832                self
833            }
834
835            pub fn termination(mut self, termination: TerminationPolicy) -> Self {
836                self.inner = self.inner.termination(termination);
837                self
838            }
839
840            pub fn residency(mut self, residency: Residency) -> Self {
841                self.inner = self.inner.residency(residency);
842                self
843            }
844
845            pub fn live_replay_store(
846                mut self,
847                live_replay_store: Arc<dyn LiveReplayStore>,
848            ) -> Self {
849                self.inner = self.inner.live_replay_store(live_replay_store);
850                self
851            }
852
853            pub fn process_registry(mut self, process_registry: Arc<dyn ProcessRegistry>) -> Self {
854                self.inner = self.inner.process_registry(process_registry);
855                self
856            }
857
858            pub fn trigger_store(mut self, store: Arc<dyn lash_core::TriggerStore>) -> Self {
859                self.inner = self.inner.trigger_store(store);
860                self
861            }
862
863            pub fn process_work_driver(mut self, driver: ProcessWorkDriver) -> Self {
864                self.inner = self.inner.process_work_driver(driver);
865                self
866            }
867
868            pub fn queued_work_driver(mut self, driver: QueuedWorkDriver) -> Self {
869                self.inner = self.inner.queued_work_driver(driver);
870                self
871            }
872
873            pub fn disable_queued_work_driver(mut self) -> Self {
874                self.inner = self.inner.disable_queued_work_driver();
875                self
876            }
877
878            pub fn runtime_host_config(mut self, core: RuntimeHostConfig) -> Self {
879                self.inner.runtime_host_config = Some(core);
880                self
881            }
882        }
883    };
884}
885
886forward_core_builder_methods!(StandardCoreBuilder);
887#[cfg(feature = "rlm")]
888forward_core_builder_methods!(RlmCoreBuilder);
889
890#[derive(Default)]
891pub struct LashCoreBuilder {
892    pub(crate) protocol_factory: Option<Arc<dyn PluginFactory>>,
893    session_spec: SessionSpec,
894    provider: Option<ProviderHandle>,
895    pub(crate) store_factory: Option<Arc<dyn SessionStoreFactory>>,
896    child_store_factory: Option<Arc<dyn SessionStoreFactory>>,
897    // `RuntimeHostConfig` has no `Default`: the generic host-owned durability
898    // dependencies must be named. They are collected here and resolved in
899    // `build()`, which errors if any is unset.
900    effect_host: Option<Arc<dyn EffectHost>>,
901    attachment_store: Option<Arc<dyn AttachmentStore>>,
902    process_env_store: Option<Arc<dyn ProcessExecutionEnvStore>>,
903    trigger_store: Option<Arc<dyn lash_core::TriggerStore>>,
904    // Benign core overrides applied on top of the resolved core.
905    prompt: Option<PromptLayer>,
906    trace_sink: Option<Arc<dyn lash_trace::TraceSink>>,
907    trace_level: Option<lash_trace::TraceLevel>,
908    trace_context: Option<lash_trace::TraceContext>,
909    termination: Option<TerminationPolicy>,
910    // Advanced full-config override; used as the base core when present.
911    runtime_host_config: Option<RuntimeHostConfig>,
912    tool_providers: Vec<Arc<dyn ToolProvider>>,
913    plugin_stack: PluginStack,
914    plugin_host: Option<PluginHost>,
915    residency: Option<Residency>,
916    // Single source of truth for process lifecycle support and process-work
917    // consumption.
918    process_work_source: ProcessWorkSource,
919    queued_work_source: QueuedWorkSource,
920    live_replay_store: Option<Arc<dyn LiveReplayStore>>,
921    runtime_host_installer: Option<RuntimeHostInstaller>,
922}
923
924impl LashCoreBuilder {
925    pub fn protocol_plugin(mut self, plugin: Arc<dyn PluginFactory>) -> Self {
926        self.protocol_factory = Some(plugin);
927        self
928    }
929
930    pub fn provider(mut self, provider: ProviderHandle) -> Self {
931        self.session_spec = self.session_spec.provider_id(provider.kind());
932        self.provider = Some(provider);
933        self
934    }
935
936    pub fn model(mut self, model: lash_core::ModelSpec) -> Self {
937        self.session_spec = self.session_spec.model(model);
938        self
939    }
940
941    pub fn max_turns(mut self, max_turns: usize) -> Self {
942        self.session_spec = self.session_spec.max_turns(max_turns);
943        self
944    }
945
946    pub fn session_spec(mut self, spec: SessionSpec) -> Self {
947        self.session_spec = spec;
948        self
949    }
950
951    /// Configure a factory that can create a persistence store for any root
952    /// session opened from this core.
953    ///
954    /// The factory must honor `SessionStoreCreateRequest::session_id` and
955    /// return a store for that specific session. Do not use this to wrap one
956    /// pre-opened root store; pass root-only stores with
957    /// `LashCore::session(...).store(store)` instead.
958    pub fn store_factory(mut self, store_factory: Arc<dyn SessionStoreFactory>) -> Self {
959        self.store_factory = Some(store_factory);
960        self
961    }
962
963    /// Configure the persistence factory used by managed child sessions, such
964    /// as local subagents.
965    ///
966    /// Child factories must return a distinct store bound to the requested
967    /// child session id. Hosts that pass an explicit root store with
968    /// `SessionBuilder::store` should set this when child sessions need
969    /// persistence.
970    pub fn child_store_factory(mut self, store_factory: Arc<dyn SessionStoreFactory>) -> Self {
971        self.child_store_factory = Some(store_factory);
972        self
973    }
974
975    pub fn attachment_store(mut self, attachment_store: Arc<dyn AttachmentStore>) -> Self {
976        self.attachment_store = Some(attachment_store);
977        self
978    }
979
980    pub fn process_env_store(
981        mut self,
982        process_env_store: Arc<dyn ProcessExecutionEnvStore>,
983    ) -> Self {
984        self.process_env_store = Some(process_env_store);
985        self
986    }
987
988    /// Set the deployment effect host — the durability boundary every operation
989    /// crosses. Pass [`InlineEffectHost`](crate::durability::InlineEffectHost)
990    /// for in-process execution, or a workflow-backed host for durable
991    /// execution.
992    pub fn effect_host(mut self, effect_host: Arc<dyn EffectHost>) -> Self {
993        self.effect_host = Some(effect_host);
994        self
995    }
996
997    pub fn tools(mut self, tools: Arc<dyn ToolProvider>) -> Self {
998        self.tool_providers.push(tools);
999        self
1000    }
1001
1002    pub fn plugin(mut self, plugin: Arc<dyn PluginFactory>) -> Self {
1003        self.plugin_stack.push(plugin);
1004        self
1005    }
1006
1007    pub fn plugins(mut self, stack: PluginStack) -> Self {
1008        self.plugin_stack = stack;
1009        self
1010    }
1011
1012    pub fn configure_plugins(mut self, configure: impl FnOnce(&mut PluginStack)) -> Self {
1013        configure(&mut self.plugin_stack);
1014        self
1015    }
1016
1017    pub fn trace_sink(mut self, trace_sink: Arc<dyn lash_trace::TraceSink>) -> Self {
1018        self.trace_sink = Some(trace_sink);
1019        self
1020    }
1021
1022    pub fn trace_jsonl_path(mut self, path: impl Into<std::path::PathBuf>) -> Self {
1023        self.trace_sink = Some(Arc::new(lash_trace::JsonlTraceSink::new(path.into())));
1024        self
1025    }
1026
1027    pub fn trace_level(mut self, trace_level: lash_trace::TraceLevel) -> Self {
1028        self.trace_level = Some(trace_level);
1029        self
1030    }
1031
1032    pub fn trace_context(mut self, trace_context: lash_trace::TraceContext) -> Self {
1033        self.trace_context = Some(trace_context);
1034        self
1035    }
1036
1037    pub fn termination(mut self, termination: TerminationPolicy) -> Self {
1038        self.termination = Some(termination);
1039        self
1040    }
1041
1042    pub fn residency(mut self, residency: Residency) -> Self {
1043        self.residency = Some(residency);
1044        self
1045    }
1046
1047    /// Configure the bounded live replay buffer used by session observation
1048    /// cursors. This is best-effort reconnect recovery only; durable state
1049    /// still comes from the session store and [`SessionReadView`].
1050    pub fn live_replay_store(mut self, live_replay_store: Arc<dyn LiveReplayStore>) -> Self {
1051        self.live_replay_store = Some(live_replay_store);
1052        self
1053    }
1054
1055    /// Resolve the runtime host config, requiring the generic host-owned
1056    /// durability dependencies to have been named.
1057    fn resolve_runtime_host_config(&mut self) -> Result<RuntimeHostConfig> {
1058        if let Some(base) = self.runtime_host_config.take() {
1059            return Ok(self.apply_core_overrides(base));
1060        }
1061        let effect_host = self
1062            .effect_host
1063            .take()
1064            .ok_or(EmbedError::MissingEffectHost)?;
1065        let attachment_store = self
1066            .attachment_store
1067            .take()
1068            .ok_or(EmbedError::MissingAttachmentStore)?;
1069        let process_env_store = self
1070            .process_env_store
1071            .take()
1072            .ok_or(EmbedError::MissingProcessEnvStore)?;
1073        let core = RuntimeHostConfig::new(effect_host, attachment_store, process_env_store);
1074        Ok(self.apply_core_overrides(core))
1075    }
1076
1077    /// Apply benign + still-set dependency overrides on top of a base core.
1078    fn apply_core_overrides(&mut self, mut core: RuntimeHostConfig) -> RuntimeHostConfig {
1079        if let Some(effect_host) = self.effect_host.take() {
1080            core.control.effect_host = effect_host;
1081        }
1082        if let Some(attachment_store) = self.attachment_store.take() {
1083            core.durability.attachment_store = attachment_store;
1084        }
1085        if let Some(process_env_store) = self.process_env_store.take() {
1086            core.durability.process_env_store = process_env_store;
1087        }
1088        if let Some(prompt) = self.prompt.take() {
1089            core.prompt.prompt = prompt;
1090        }
1091        if let Some(trace_sink) = self.trace_sink.take() {
1092            core.tracing.trace_sink = Some(trace_sink);
1093        }
1094        if let Some(trace_level) = self.trace_level.take() {
1095            core.tracing.trace_level = trace_level;
1096        }
1097        if let Some(trace_context) = self.trace_context.take() {
1098            core.tracing.trace_context = trace_context;
1099        }
1100        if let Some(termination) = self.termination.take() {
1101            core.control.termination = termination;
1102        }
1103        core
1104    }
1105
1106    /// Validate store peer-coherence of the wired durability dependencies.
1107    ///
1108    /// Durability is established by what the host wired; the per-invocation
1109    /// durable controller is not visible here (the build-time controller is
1110    /// inline by construction), so this checks the stores against each other
1111    /// only — never the controller (see A5 in the durable-first wiring spec):
1112    ///
1113    /// - a durable session store factory requires a durable attachment and
1114    ///   artifact store (they back the same session state);
1115    /// - a durable process registry requires a session store factory that is
1116    ///   itself durable (the registry's process records are meaningless without
1117    ///   a durable session behind them).
1118    fn effective_session_store_tier(&self) -> Option<DurabilityTier> {
1119        self.child_store_factory
1120            .as_ref()
1121            .or(self.store_factory.as_ref())
1122            .map(|factory| factory.durability_tier())
1123    }
1124
1125    #[cfg(feature = "rlm")]
1126    fn resolved_trace_context(&self) -> lash_trace::TraceContext {
1127        self.trace_context
1128            .clone()
1129            .or_else(|| {
1130                self.runtime_host_config
1131                    .as_ref()
1132                    .map(|core| core.tracing.trace_context.clone())
1133            })
1134            .unwrap_or_default()
1135    }
1136
1137    fn ensure_store_peer_coherence(&self) -> Result<()> {
1138        // Match `build()`'s wiring exactly: the session store factory it installs
1139        // is `child_store_factory.or(store_factory)` (child takes precedence, root
1140        // is the fallback). The coherence check must read the tier of that same
1141        // effective factory, or a host that wires only a durable child factory
1142        // (no root) is wrongly rejected though `build()` would wire it durably.
1143        let session_store_tier = self.effective_session_store_tier();
1144        let attachment_tier = self
1145            .attachment_store
1146            .as_ref()
1147            .map(|store| store.persistence().durability_tier())
1148            .or_else(|| {
1149                self.runtime_host_config.as_ref().map(|core| {
1150                    core.durability
1151                        .attachment_store
1152                        .persistence()
1153                        .durability_tier()
1154                })
1155            });
1156        let process_env_tier = self
1157            .process_env_store
1158            .as_ref()
1159            .map(|store| store.durability_tier())
1160            .or_else(|| {
1161                self.runtime_host_config
1162                    .as_ref()
1163                    .map(|core| core.durability.process_env_store.durability_tier())
1164            });
1165        let effect_host_tier = self
1166            .effect_host
1167            .as_ref()
1168            .map(|host| host.durability_tier())
1169            .or_else(|| {
1170                self.runtime_host_config
1171                    .as_ref()
1172                    .map(|core| core.control.effect_host.durability_tier())
1173            });
1174        let trigger_store_tier = self
1175            .trigger_store
1176            .as_ref()
1177            .map(|store| store.durability_tier());
1178
1179        if session_store_tier == Some(DurabilityTier::Durable) {
1180            if attachment_tier == Some(DurabilityTier::Inline) {
1181                return Err(EmbedError::DurableStorePeerRequired {
1182                    facet: "attachment store",
1183                });
1184            }
1185            if process_env_tier == Some(DurabilityTier::Inline) {
1186                return Err(EmbedError::DurableStorePeerRequired {
1187                    facet: "process execution environment store",
1188                });
1189            }
1190        }
1191
1192        if let Some(process_registry) = self.process_work_source.process_registry().as_ref()
1193            && process_registry.durability_tier() == DurabilityTier::Durable
1194        {
1195            if session_store_tier != Some(DurabilityTier::Durable) {
1196                return Err(EmbedError::DurableProcessRegistryRequiresStoreFactory);
1197            }
1198            if trigger_store_tier != Some(DurabilityTier::Durable) {
1199                return Err(EmbedError::DurableStorePeerRequired {
1200                    facet: "trigger store",
1201                });
1202            }
1203            if process_env_tier != Some(DurabilityTier::Durable) {
1204                return Err(EmbedError::DurableStorePeerRequired {
1205                    facet: "process execution environment store",
1206                });
1207            }
1208        }
1209
1210        if trigger_store_tier == Some(DurabilityTier::Durable) {
1211            if session_store_tier != Some(DurabilityTier::Durable) {
1212                return Err(EmbedError::DurableStorePeerRequired {
1213                    facet: "session store factory",
1214                });
1215            }
1216            if process_env_tier != Some(DurabilityTier::Durable) {
1217                return Err(EmbedError::DurableStorePeerRequired {
1218                    facet: "process execution environment store",
1219                });
1220            }
1221            if let Some(process_registry) = self.process_work_source.process_registry().as_ref()
1222                && process_registry.durability_tier() == DurabilityTier::Inline
1223            {
1224                return Err(EmbedError::DurableStorePeerRequired {
1225                    facet: "process registry",
1226                });
1227            }
1228        }
1229
1230        if effect_host_tier == Some(DurabilityTier::Durable) {
1231            if attachment_tier != Some(DurabilityTier::Durable) {
1232                return Err(EmbedError::DurableStorePeerRequired {
1233                    facet: "attachment store",
1234                });
1235            }
1236            if process_env_tier != Some(DurabilityTier::Durable) {
1237                return Err(EmbedError::DurableStorePeerRequired {
1238                    facet: "process execution environment store",
1239                });
1240            }
1241        }
1242
1243        Ok(())
1244    }
1245
1246    pub fn build(mut self) -> Result<LashCore> {
1247        self.ensure_store_peer_coherence()?;
1248        let protocol_factory = self.protocol_factory.clone();
1249        if protocol_factory.is_none() && self.plugin_host.is_none() {
1250            return Err(EmbedError::MissingProtocolPlugin);
1251        }
1252        let provider_id = self
1253            .session_spec
1254            .provider_id
1255            .clone()
1256            .or_else(|| {
1257                self.provider
1258                    .as_ref()
1259                    .map(|provider| provider.kind().to_string())
1260            })
1261            .unwrap_or_default();
1262        let model = self
1263            .session_spec
1264            .model
1265            .clone()
1266            .ok_or(EmbedError::MissingModelSpec)?;
1267
1268        let base_policy = SessionPolicy {
1269            provider_id,
1270            model,
1271            max_turns: self.session_spec.max_turns.flatten(),
1272            ..SessionPolicy::default()
1273        };
1274        let policy = self.session_spec.resolve_against(&base_policy);
1275
1276        let mut core = self.resolve_runtime_host_config()?;
1277        if let Some(provider) = self.provider.clone() {
1278            core.providers.provider_resolver =
1279                Arc::new(lash_core::SingleProviderResolver::new(provider));
1280        }
1281        let plugin_factories = if let Some(plugin_host) = self.plugin_host {
1282            plugin_host.factories().to_vec()
1283        } else {
1284            let mut factories = Vec::new();
1285            if !self.tool_providers.is_empty() {
1286                let spec = self
1287                    .tool_providers
1288                    .into_iter()
1289                    .fold(PluginSpec::new(), PluginSpec::with_tool_provider);
1290                factories.push(Arc::new(StaticPluginFactory::new("embed_tools", spec))
1291                    as Arc<dyn PluginFactory>);
1292            }
1293            factories.extend(self.plugin_stack.into_factories());
1294            factories
1295        };
1296        let default_plugin_host =
1297            build_plugin_host(protocol_factory.as_ref(), &plugin_factories, Vec::new())?;
1298        if let Some(install) = &self.runtime_host_installer {
1299            core = install(core, &default_plugin_host)?;
1300        }
1301
1302        let process_registry = self.process_work_source.process_registry();
1303
1304        // Resolve process work before the process source is moved into the
1305        // environment. The default inline driver's config is built
1306        // eagerly so a missing store factory fails loudly at build, not at
1307        // first open. It is built from the same single-protocol plugin host the
1308        // live runtime uses, so the worker can rebuild a runtime for a process.
1309        let process_work_driver = Self::resolve_process_work_driver(
1310            &self.process_work_source,
1311            &default_plugin_host,
1312            &core,
1313            // The worker rebuilds sessions with the same factory `build()` wires
1314            // below: `child_store_factory.or(store_factory)`.
1315            self.child_store_factory
1316                .as_ref()
1317                .or(self.store_factory.as_ref()),
1318            &policy,
1319            self.residency.unwrap_or_default(),
1320            self.trigger_store.as_ref(),
1321        )?;
1322
1323        let live_replay_clock = Arc::clone(&core.clock);
1324        let mut env_builder = RuntimeEnvironment::builder()
1325            .with_plugin_host(Arc::new(default_plugin_host))
1326            .with_runtime_host_config(core);
1327        if let Some(process_registry) = process_registry.as_ref() {
1328            env_builder = env_builder.with_process_registry(Arc::clone(process_registry));
1329        }
1330        if let Some(residency) = self.residency {
1331            env_builder = env_builder.with_residency(residency);
1332        }
1333        if let Some(child_store_factory) = self
1334            .child_store_factory
1335            .as_ref()
1336            .or(self.store_factory.as_ref())
1337        {
1338            env_builder = env_builder.with_session_store_factory(Arc::clone(child_store_factory));
1339        }
1340        if let Some(trigger_store) = self.trigger_store.as_ref() {
1341            env_builder = env_builder.with_trigger_store(Arc::clone(trigger_store));
1342        }
1343        let live_replay_store = self.live_replay_store.take().unwrap_or_else(|| {
1344            Arc::new(InMemoryLiveReplayStore::with_clock(
1345                lash_core::InMemoryLiveReplayStoreConfig::default(),
1346                live_replay_clock,
1347            ))
1348        });
1349        let env = env_builder.build();
1350        let queued_work_driver = Self::resolve_queued_work_driver(
1351            &self.queued_work_source,
1352            env.clone(),
1353            policy.clone(),
1354            protocol_factory.clone(),
1355            Arc::new(plugin_factories.clone()),
1356            self.child_store_factory
1357                .as_ref()
1358                .or(self.store_factory.as_ref()),
1359            Arc::clone(&live_replay_store),
1360            self.runtime_host_installer.clone(),
1361        );
1362        let work_driver = InlineWorkDriverSetup {
1363            process: process_work_driver,
1364            queued: queued_work_driver,
1365        };
1366
1367        Ok(LashCore {
1368            env,
1369            policy,
1370            store_factory: self.store_factory,
1371            plugin_factories: Arc::new(plugin_factories),
1372            provider: self.provider,
1373            live_replay_store,
1374            protocol_factory,
1375            runtime_host_installer: self.runtime_host_installer,
1376            work_driver: Arc::new(InlineWorkDriverSlot::new(work_driver)),
1377        })
1378    }
1379
1380    /// Decide how a built [`LashCore`] sources its process work driver.
1381    ///
1382    /// - no registry => nothing to run ([`ProcessWorkDriverSetup::None`]);
1383    /// - external driver wired => use it ([`ProcessWorkDriverSetup::External`]);
1384    /// - inline registry wired => lazily construct the default inline driver on first open. Its
1385    ///   [`DurableProcessWorkerConfig`] is built eagerly when a store factory is
1386    ///   present; without one the inline worker cannot rebuild session runtimes.
1387    fn resolve_process_work_driver(
1388        process_work_source: &ProcessWorkSource,
1389        worker_plugin_host: &PluginHost,
1390        core: &RuntimeHostConfig,
1391        store_factory: Option<&Arc<dyn SessionStoreFactory>>,
1392        policy: &SessionPolicy,
1393        residency: lash_core::Residency,
1394        trigger_store: Option<&Arc<dyn lash_core::TriggerStore>>,
1395    ) -> Result<ProcessWorkDriverSetup> {
1396        let process_registry = match process_work_source {
1397            ProcessWorkSource::None => return Ok(ProcessWorkDriverSetup::None),
1398            ProcessWorkSource::External(driver) => {
1399                return Ok(ProcessWorkDriverSetup::External {
1400                    driver: driver.clone(),
1401                });
1402            }
1403            ProcessWorkSource::Inline { registry } => Arc::clone(registry),
1404        };
1405        // The worker rebuilds a session runtime per process, so it needs a store
1406        // factory; without one the default runner could not execute anything, so
1407        // fail loudly rather than silently leave processes unexecuted.
1408        let Some(store_factory) = store_factory else {
1409            return Err(EmbedError::ProcessRegistryRequiresStoreFactory);
1410        };
1411        // The worker rebuilds with the same plugin host the live runtime uses,
1412        // including the protocol plugin that supplies the protocol session
1413        // capability.
1414        let phase_probe_slot = lash_core::runtime::RuntimeTurnPhaseProbeSlot::default();
1415        let config = Box::new(
1416            DurableProcessWorkerConfig::new(
1417                Arc::new(worker_plugin_host.clone()),
1418                core.clone(),
1419                Arc::clone(store_factory),
1420                process_registry,
1421            )
1422            .with_session_policy(policy.clone())
1423            .with_trigger_store(trigger_store.cloned().unwrap_or_else(|| {
1424                Arc::new(lash_core::InMemoryTriggerStore::with_clock(Arc::clone(
1425                    &core.clock,
1426                )))
1427            }))
1428            .with_residency(residency)
1429            .with_turn_phase_probe_slot(phase_probe_slot),
1430        );
1431        Ok(ProcessWorkDriverSetup::LazyDefault { config })
1432    }
1433
1434    fn resolve_queued_work_driver(
1435        queued_work_source: &QueuedWorkSource,
1436        env: RuntimeEnvironment,
1437        policy: SessionPolicy,
1438        protocol_factory: Option<Arc<dyn PluginFactory>>,
1439        plugin_factories: Arc<Vec<Arc<dyn PluginFactory>>>,
1440        store_factory: Option<&Arc<dyn SessionStoreFactory>>,
1441        live_replay_store: Arc<dyn LiveReplayStore>,
1442        runtime_host_installer: Option<RuntimeHostInstaller>,
1443    ) -> QueuedWorkDriverSetup {
1444        match queued_work_source {
1445            QueuedWorkSource::None => QueuedWorkDriverSetup::None,
1446            QueuedWorkSource::External(driver) => QueuedWorkDriverSetup::External {
1447                driver: driver.clone(),
1448            },
1449            QueuedWorkSource::LazyDefault => match store_factory {
1450                Some(store_factory) => QueuedWorkDriverSetup::LazyDefault {
1451                    config: Arc::new(InlineQueuedWorkRunConfig::new(
1452                        env,
1453                        policy,
1454                        protocol_factory,
1455                        plugin_factories,
1456                        Arc::clone(store_factory),
1457                        live_replay_store,
1458                        runtime_host_installer,
1459                    )),
1460                },
1461                None => QueuedWorkDriverSetup::None,
1462            },
1463        }
1464    }
1465
1466    pub fn advanced(self) -> AdvancedLashCoreBuilder {
1467        AdvancedLashCoreBuilder { builder: self }
1468    }
1469
1470    pub fn process_registry(mut self, process_registry: Arc<dyn ProcessRegistry>) -> Self {
1471        self.process_work_source = ProcessWorkSource::Inline {
1472            registry: process_registry,
1473        };
1474        self
1475    }
1476
1477    pub fn trigger_store(mut self, store: Arc<dyn lash_core::TriggerStore>) -> Self {
1478        self.trigger_store = Some(store);
1479        self
1480    }
1481
1482    /// Configure an externally owned process work runner.
1483    ///
1484    /// Durable hosts construct a [`ProcessWorkDriver`] from the same process
1485    /// registry and wake handle used by their deployment runner, then pass it
1486    /// here. The driver registry becomes the core's process registry and no
1487    /// inline runner is spawned.
1488    pub fn process_work_driver(mut self, driver: ProcessWorkDriver) -> Self {
1489        self.process_work_source = ProcessWorkSource::External(driver);
1490        self
1491    }
1492
1493    /// Configure an externally owned queued-work driver.
1494    pub fn queued_work_driver(mut self, driver: QueuedWorkDriver) -> Self {
1495        self.queued_work_source = QueuedWorkSource::External(driver);
1496        self
1497    }
1498
1499    pub fn disable_queued_work_driver(mut self) -> Self {
1500        self.queued_work_source = QueuedWorkSource::None;
1501        self
1502    }
1503}
1504
1505pub(crate) fn build_plugin_host(
1506    protocol_factory: Option<&Arc<dyn PluginFactory>>,
1507    common_factories: &[Arc<dyn PluginFactory>],
1508    extra_factories: Vec<Arc<dyn PluginFactory>>,
1509) -> Result<PluginHost> {
1510    let mut factories = Vec::with_capacity(
1511        usize::from(protocol_factory.is_some()) + common_factories.len() + extra_factories.len(),
1512    );
1513    if let Some(protocol_factory) = protocol_factory {
1514        factories.push(Arc::clone(protocol_factory));
1515    }
1516    factories.extend(common_factories.iter().cloned());
1517    factories.extend(extra_factories);
1518    Ok(PluginHost::new(factories))
1519}
1520
1521impl PromptLayerSink for LashCoreBuilder {
1522    fn prompt_layer_mut(&mut self) -> &mut PromptLayer {
1523        self.prompt.get_or_insert_with(PromptLayer::new)
1524    }
1525}
1526
1527pub struct AdvancedLashCoreBuilder {
1528    builder: LashCoreBuilder,
1529}
1530
1531impl AdvancedLashCoreBuilder {
1532    pub fn runtime_host_config(mut self, core: lash_core::RuntimeHostConfig) -> Self {
1533        self.builder.runtime_host_config = Some(core);
1534        self
1535    }
1536
1537    pub fn plugin_host(mut self, plugin_host: PluginHost) -> Self {
1538        self.builder.plugin_host = Some(plugin_host);
1539        self
1540    }
1541
1542    pub fn build(self) -> Result<LashCore> {
1543        self.builder.build()
1544    }
1545}