Skip to main content

lash/
core.rs

1use crate::support::*;
2use lash_core::runtime::{
3    ProcessCommand, ProcessEffectOutcome, RuntimeEffectCommand, RuntimeEffectEnvelope,
4    RuntimeEffectKind, RuntimeEffectLocalExecutor, RuntimeEffectOutcome, RuntimeInvocation,
5    RuntimeScope,
6};
7
8#[derive(Clone)]
9pub struct LashCore {
10    pub(crate) env: RuntimeEnvironment,
11    pub(crate) policy: SessionPolicy,
12    pub(crate) modes: Arc<BTreeMap<ModeId, ModePreset>>,
13    pub(crate) default_mode: ModeId,
14    pub(crate) store_factory: Option<Arc<dyn SessionStoreFactory>>,
15    pub(crate) plugin_factories: Arc<Vec<Arc<dyn PluginFactory>>>,
16    pub(crate) provider: Option<ProviderHandle>,
17    pub(crate) process_observer: Option<ProcessWorkObserver>,
18    /// Shared resolution of the process work runner. The poke it yields is
19    /// threaded onto every session's host so the process control seam can wake
20    /// the runner after a successful start. Shared across `LashCore` clones so
21    /// the default inline runner is spawned at most once (Decision 3).
22    pub(crate) process_work_runner: Arc<ProcessWorkRunnerSlot>,
23}
24
25/// How a [`LashCore`] resolves its process work runner, decided at `build()`
26/// and shared across clones.
27pub(crate) enum ProcessWorkRunnerSetup {
28    /// No process registry is wired; there is nothing to run and no poke.
29    None,
30    /// Lazily spawn the default inline [`ProcessWorkRunner`] on first
31    /// `session().open()` (Decision 3: the runtime is guaranteed by then, and
32    /// `build()` is sync — some tests call it outside a tokio runtime). A store
33    /// factory is required to build the config (the worker rebuilds a session
34    /// runtime per process); a registry with no store factory is rejected at
35    /// build with [`EmbedError::ProcessRegistryRequiresStoreFactory`].
36    LazyDefault {
37        config: Box<DurableProcessWorkerConfig>,
38    },
39    /// The host wired an external runner (e.g. the Restate ingress-client
40    /// runner) and handed its driver to the core.
41    External { driver: ProcessWorkDriver },
42}
43
44#[derive(Clone, Default)]
45pub(crate) enum ProcessWorkSource {
46    #[default]
47    None,
48    Inline {
49        registry: Arc<dyn ProcessRegistry>,
50    },
51    External(ProcessWorkDriver),
52}
53
54impl ProcessWorkSource {
55    fn process_registry(&self) -> Option<Arc<dyn ProcessRegistry>> {
56        match self {
57            Self::None => None,
58            Self::Inline { registry } => Some(Arc::clone(registry)),
59            Self::External(driver) => Some(driver.process_registry()),
60        }
61    }
62
63    fn has_registry(&self) -> bool {
64        !matches!(self, Self::None)
65    }
66}
67
68/// Shared, lazily-initialized process-work-runner state for a [`LashCore`].
69///
70/// The once-guard ([`tokio::sync::OnceCell`]) makes the default inline runner
71/// spawn exactly once across `LashCore` clones, on the first `session().open()`
72/// that needs it. The resolved [`ProcessWorkPoke`] (if any) is then reused for
73/// every session host.
74pub(crate) struct ProcessWorkRunnerSlot {
75    setup: ProcessWorkRunnerSetup,
76    poke: tokio::sync::OnceCell<Option<ProcessWorkPoke>>,
77}
78
79impl ProcessWorkRunnerSlot {
80    fn new(setup: ProcessWorkRunnerSetup) -> Self {
81        Self {
82            setup,
83            poke: tokio::sync::OnceCell::new(),
84        }
85    }
86
87    /// Resolve the poke for a session host, spawning the default inline runner
88    /// on first use. Idempotent: the once-guard ensures a single spawn.
89    pub(crate) async fn poke(&self) -> Option<ProcessWorkPoke> {
90        self.poke
91            .get_or_init(|| async {
92                match &self.setup {
93                    ProcessWorkRunnerSetup::None => None,
94                    ProcessWorkRunnerSetup::External { driver } => Some(driver.poke_handle()),
95                    ProcessWorkRunnerSetup::LazyDefault { config } => {
96                        let worker = DurableProcessWorker::new((**config).clone());
97                        Some(ProcessWorkRunner::inline(worker).spawn())
98                    }
99                }
100            })
101            .await
102            .clone()
103    }
104}
105
106#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
107pub struct SessionDeleteReport {
108    pub session_id: String,
109    pub process: Option<lash_core::ProcessSessionDeleteReport>,
110}
111
112impl LashCore {
113    pub fn builder() -> LashCoreBuilder {
114        LashCoreBuilder::default()
115    }
116
117    /// Preset for `standard` mode.
118    ///
119    /// Storage and effect durability are still host-owned choices. Provide the
120    /// effect host, Lashlang artifact store, and attachment store facets with
121    /// the builder setters before calling [`LashCoreBuilder::build`].
122    pub fn standard() -> LashCoreBuilder {
123        Self::builder()
124            .install_mode(ModePreset::standard())
125            .default_mode(ModeId::standard())
126            .plugins(default_runtime_stack())
127    }
128
129    /// Preset for `rlm` mode.
130    ///
131    /// Storage and effect durability are still host-owned choices. Provide the
132    /// effect host, Lashlang artifact store, and attachment store facets with
133    /// the builder setters before calling [`LashCoreBuilder::build`].
134    pub fn rlm() -> LashCoreBuilder {
135        Self::builder()
136            .install_mode(ModePreset::rlm())
137            .default_mode(ModeId::rlm())
138            .plugins(default_runtime_stack())
139    }
140
141    pub fn session(&self, session_id: impl Into<String>) -> SessionBuilder {
142        SessionBuilder {
143            core: self.clone(),
144            session_id: session_id.into(),
145            spec: SessionSpec::inherit(),
146            mode: None,
147            parent_session_id: None,
148            store: None,
149            provider: None,
150            active_plugins: Vec::new(),
151            plugin_factories: Vec::new(),
152            rlm_final_answer_format: None,
153        }
154    }
155
156    pub fn host_events(&self) -> crate::control::HostEventsControl {
157        crate::control::HostEventsControl { core: self.clone() }
158    }
159
160    pub fn effect_host(&self) -> Arc<dyn EffectHost> {
161        Arc::clone(&self.env.core.control.effect_host)
162    }
163
164    pub async fn delete_session(
165        &self,
166        session_id: impl AsRef<str>,
167        scoped_effect_controller: ScopedEffectController<'_>,
168    ) -> Result<SessionDeleteReport> {
169        let session_id = session_id.as_ref().to_string();
170        let Some(store_factory) = self.store_factory.as_ref() else {
171            return Err(EmbedError::MissingSessionStoreFactory);
172        };
173        let process = if let Some(process_registry) = self.env.process_registry.as_ref() {
174            let invocation = RuntimeInvocation::effect(
175                RuntimeScope::new(session_id.clone()),
176                format!("process:delete-session:{session_id}"),
177                RuntimeEffectKind::Process,
178                format!("{session_id}:delete-session"),
179            );
180            let outcome = scoped_effect_controller
181                .controller()
182                .execute_effect(
183                    RuntimeEffectEnvelope::new(
184                        invocation,
185                        RuntimeEffectCommand::Process {
186                            command: ProcessCommand::DeleteSession {
187                                session_id: session_id.clone(),
188                            },
189                        },
190                    ),
191                    RuntimeEffectLocalExecutor::process_control(Arc::clone(process_registry)),
192                )
193                .await
194                .map_err(|err| EmbedError::SessionDeleteProcess {
195                    session_id: session_id.clone(),
196                    message: err.to_string(),
197                })?;
198            match outcome {
199                RuntimeEffectOutcome::Process {
200                    result: ProcessEffectOutcome::DeleteSession { report },
201                } => Some(report),
202                other => {
203                    return Err(EmbedError::SessionDeleteProcess {
204                        session_id,
205                        message: format!(
206                            "process delete returned the wrong outcome: {}",
207                            other.kind().as_str()
208                        ),
209                    });
210                }
211            }
212        } else {
213            None
214        };
215        store_factory
216            .delete_session(&session_id)
217            .await
218            .map_err(|message| EmbedError::StoreFactory {
219                session_id: session_id.clone(),
220                message,
221            })?;
222        Ok(SessionDeleteReport {
223            session_id,
224            process,
225        })
226    }
227
228    pub fn installed_modes(&self) -> impl Iterator<Item = &ModeId> {
229        self.modes.keys()
230    }
231
232    pub fn process_observer(&self) -> Option<&ProcessWorkObserver> {
233        self.process_observer.as_ref()
234    }
235
236    pub fn process_registry(&self) -> Option<Arc<dyn ProcessRegistry>> {
237        self.env.process_registry.as_ref().cloned()
238    }
239
240    pub fn durable_process_worker_config(&self) -> Result<DurableProcessWorkerConfig> {
241        self.durable_process_worker_config_with_plugins(std::iter::empty::<Arc<dyn PluginFactory>>())
242    }
243
244    pub fn durable_process_worker_config_with_plugins(
245        &self,
246        extra_plugin_factories: impl IntoIterator<Item = Arc<dyn PluginFactory>>,
247    ) -> Result<DurableProcessWorkerConfig> {
248        let Some(process_registry) = self.process_registry() else {
249            return Err(EmbedError::MissingProcessRegistry);
250        };
251        let Some(store_factory) = self.store_factory.as_ref() else {
252            return Err(EmbedError::MissingProcessWorkerStoreFactory);
253        };
254        let plugin_host = build_plugin_host_for_mode(
255            &self.modes,
256            &self.default_mode,
257            self.plugin_factories.as_ref(),
258            extra_plugin_factories.into_iter().collect(),
259            true,
260        )?;
261        Ok(DurableProcessWorkerConfig::new(
262            Arc::new(plugin_host),
263            self.env.core.clone(),
264            Arc::clone(store_factory),
265            process_registry,
266        )
267        .with_session_policy(self.policy.clone())
268        .with_residency(self.env.residency))
269    }
270}
271
272fn default_runtime_stack() -> PluginStack {
273    lash_plugin_tool_output_budget::tool_output_budget_stack()
274}
275
276#[derive(Default)]
277pub struct LashCoreBuilder {
278    pub(crate) modes: BTreeMap<ModeId, ModePreset>,
279    pub(crate) default_mode: Option<ModeId>,
280    session_spec: SessionSpec,
281    provider: Option<ProviderHandle>,
282    pub(crate) store_factory: Option<Arc<dyn SessionStoreFactory>>,
283    child_store_factory: Option<Arc<dyn SessionStoreFactory>>,
284    // `RuntimeHostConfig` has no `Default`: the three host-owned durability
285    // dependencies must be named. They are collected here and resolved in
286    // `build()`, which errors if any is unset.
287    effect_host: Option<Arc<dyn EffectHost>>,
288    attachment_store: Option<Arc<dyn AttachmentStore>>,
289    lashlang_artifact_store: Option<Arc<dyn lash_core::LashlangArtifactStore>>,
290    host_event_store: Option<Arc<dyn lash_core::HostEventStore>>,
291    // Benign core overrides applied on top of the resolved core.
292    prompt: Option<PromptLayer>,
293    trace_sink: Option<Arc<dyn lash_trace::TraceSink>>,
294    lashlang_execution_sink: Option<Arc<dyn lash_trace::TraceSink>>,
295    trace_level: Option<lash_trace::TraceLevel>,
296    trace_context: Option<lash_trace::TraceContext>,
297    termination: Option<TerminationPolicy>,
298    // Advanced full-config override; used as the base core when present.
299    runtime_host_config: Option<RuntimeHostConfig>,
300    tool_providers: Vec<Arc<dyn ToolProvider>>,
301    plugin_stack: PluginStack,
302    plugin_host: Option<PluginHost>,
303    residency: Option<Residency>,
304    // Single source of truth for process lifecycle support and process-work
305    // consumption.
306    process_work_source: ProcessWorkSource,
307    queued_work_poke: Option<QueuedWorkPoke>,
308}
309
310impl LashCoreBuilder {
311    pub fn install_mode(mut self, preset: ModePreset) -> Self {
312        let mode_id = preset.mode_id.clone();
313        if self.default_mode.is_none() {
314            self.default_mode = Some(mode_id.clone());
315        }
316        self.modes.insert(mode_id, preset);
317        self
318    }
319
320    pub fn default_mode(mut self, mode: ModeId) -> Self {
321        self.default_mode = Some(mode);
322        self
323    }
324
325    pub fn mode(mut self, mode: ModeId) -> Self {
326        self.default_mode = Some(mode);
327        self
328    }
329
330    pub fn provider(mut self, provider: ProviderHandle) -> Self {
331        self.session_spec = self.session_spec.provider_id(provider.kind());
332        self.provider = Some(provider);
333        self
334    }
335
336    pub fn model(mut self, model: lash_core::ModelSpec) -> Self {
337        self.session_spec = self.session_spec.model(model);
338        self
339    }
340
341    pub fn max_turns(mut self, max_turns: usize) -> Self {
342        self.session_spec = self.session_spec.max_turns(max_turns);
343        self
344    }
345
346    pub fn session_spec(mut self, spec: SessionSpec) -> Self {
347        self.session_spec = spec;
348        self
349    }
350
351    /// Configure a factory that can create a persistence store for any root
352    /// session opened from this core.
353    ///
354    /// The factory must honor `SessionStoreCreateRequest::session_id` and
355    /// return a store for that specific session. Do not use this to wrap one
356    /// pre-opened root store; pass root-only stores with
357    /// `LashCore::session(...).store(store)` instead.
358    pub fn store_factory(mut self, store_factory: Arc<dyn SessionStoreFactory>) -> Self {
359        self.store_factory = Some(store_factory);
360        self
361    }
362
363    /// Configure the persistence factory used by managed child sessions, such
364    /// as local subagents.
365    ///
366    /// Child factories must return a distinct store bound to the requested
367    /// child session id. Hosts that pass an explicit root store with
368    /// `SessionBuilder::store` should set this when child sessions need
369    /// persistence.
370    pub fn child_store_factory(mut self, store_factory: Arc<dyn SessionStoreFactory>) -> Self {
371        self.child_store_factory = Some(store_factory);
372        self
373    }
374
375    pub fn attachment_store(mut self, attachment_store: Arc<dyn AttachmentStore>) -> Self {
376        self.attachment_store = Some(attachment_store);
377        self
378    }
379
380    /// Set the deployment-level Lashlang artifact store (compiled module
381    /// cache, shared across the session tree). A durable store such as
382    /// `lash_sqlite_store::Store` implements it.
383    pub fn lashlang_artifact_store(
384        mut self,
385        artifact_store: Arc<dyn lash_core::LashlangArtifactStore>,
386    ) -> Self {
387        self.lashlang_artifact_store = Some(artifact_store);
388        self
389    }
390
391    /// Set the deployment effect host — the durability boundary every operation
392    /// crosses. Pass [`InlineEffectHost`](crate::durability::InlineEffectHost)
393    /// for in-process execution, or a workflow-backed host for durable
394    /// execution.
395    pub fn effect_host(mut self, effect_host: Arc<dyn EffectHost>) -> Self {
396        self.effect_host = Some(effect_host);
397        self
398    }
399
400    pub fn tools(mut self, tools: Arc<dyn ToolProvider>) -> Self {
401        self.tool_providers.push(tools);
402        self
403    }
404
405    pub fn plugin(mut self, plugin: Arc<dyn PluginFactory>) -> Self {
406        self.plugin_stack.push(plugin);
407        self
408    }
409
410    pub fn plugins(mut self, stack: PluginStack) -> Self {
411        self.plugin_stack = stack;
412        self
413    }
414
415    pub fn configure_plugins(mut self, configure: impl FnOnce(&mut PluginStack)) -> Self {
416        configure(&mut self.plugin_stack);
417        self
418    }
419
420    pub fn trace_sink(mut self, trace_sink: Option<Arc<dyn lash_trace::TraceSink>>) -> Self {
421        self.trace_sink = trace_sink;
422        self
423    }
424
425    pub fn trace_jsonl_path(mut self, path: Option<std::path::PathBuf>) -> Self {
426        self.trace_sink = path.map(|path| {
427            Arc::new(lash_trace::JsonlTraceSink::new(path)) as Arc<dyn lash_trace::TraceSink>
428        });
429        self
430    }
431
432    pub fn lashlang_execution_sink(
433        mut self,
434        lashlang_execution_sink: Option<Arc<dyn lash_trace::TraceSink>>,
435    ) -> Self {
436        self.lashlang_execution_sink = lashlang_execution_sink;
437        self
438    }
439
440    pub fn lashlang_execution_jsonl_path(mut self, path: Option<std::path::PathBuf>) -> Self {
441        self.lashlang_execution_sink = path.map(|path| {
442            Arc::new(lash_trace::JsonlTraceSink::new(path)) as Arc<dyn lash_trace::TraceSink>
443        });
444        self
445    }
446
447    pub fn trace_level(mut self, trace_level: lash_trace::TraceLevel) -> Self {
448        self.trace_level = Some(trace_level);
449        self
450    }
451
452    pub fn trace_context(mut self, trace_context: lash_trace::TraceContext) -> Self {
453        self.trace_context = Some(trace_context);
454        self
455    }
456
457    pub fn termination(mut self, termination: TerminationPolicy) -> Self {
458        self.termination = Some(termination);
459        self
460    }
461
462    pub fn residency(mut self, residency: Residency) -> Self {
463        self.residency = Some(residency);
464        self
465    }
466
467    /// Resolve the runtime host config, requiring the three host-owned
468    /// durability dependencies to have been named.
469    fn resolve_runtime_host_config(&mut self) -> Result<RuntimeHostConfig> {
470        if let Some(base) = self.runtime_host_config.take() {
471            return Ok(self.apply_core_overrides(base));
472        }
473        let effect_host = self
474            .effect_host
475            .take()
476            .ok_or(EmbedError::MissingEffectHost)?;
477        let lashlang_artifact_store = self
478            .lashlang_artifact_store
479            .take()
480            .ok_or(EmbedError::MissingLashlangArtifactStore)?;
481        let attachment_store = self
482            .attachment_store
483            .take()
484            .ok_or(EmbedError::MissingAttachmentStore)?;
485        let core = RuntimeHostConfig::new(effect_host, lashlang_artifact_store, attachment_store);
486        Ok(self.apply_core_overrides(core))
487    }
488
489    /// Apply benign + still-set dependency overrides on top of a base core.
490    fn apply_core_overrides(&mut self, mut core: RuntimeHostConfig) -> RuntimeHostConfig {
491        if let Some(effect_host) = self.effect_host.take() {
492            core.control.effect_host = effect_host;
493        }
494        if let Some(attachment_store) = self.attachment_store.take() {
495            core.durability.attachment_store = attachment_store;
496        }
497        if let Some(artifact_store) = self.lashlang_artifact_store.take() {
498            core.durability.lashlang_artifact_store = artifact_store;
499        }
500        if let Some(prompt) = self.prompt.take() {
501            core.prompt.prompt = prompt;
502        }
503        if let Some(trace_sink) = self.trace_sink.take() {
504            core.tracing.trace_sink = Some(trace_sink);
505        }
506        if let Some(lashlang_execution_sink) = self.lashlang_execution_sink.take() {
507            core.tracing.lashlang_execution_sink = Some(lashlang_execution_sink);
508        }
509        if let Some(trace_level) = self.trace_level.take() {
510            core.tracing.trace_level = trace_level;
511        }
512        if let Some(trace_context) = self.trace_context.take() {
513            core.tracing.trace_context = trace_context;
514        }
515        if let Some(termination) = self.termination.take() {
516            core.control.termination = termination;
517        }
518        core
519    }
520
521    /// Validate store peer-coherence of the wired durability dependencies.
522    ///
523    /// Durability is established by what the host wired; the per-invocation
524    /// durable controller is not visible here (the build-time controller is
525    /// inline by construction), so this checks the stores against each other
526    /// only — never the controller (see A5 in the durable-first wiring spec):
527    ///
528    /// - a durable session store factory requires a durable attachment and
529    ///   artifact store (they back the same session state);
530    /// - a durable process registry requires a session store factory that is
531    ///   itself durable (the registry's process records are meaningless without
532    ///   a durable session behind them).
533    fn ensure_store_peer_coherence(&self) -> Result<()> {
534        // Match `build()`'s wiring exactly: the session store factory it installs
535        // is `child_store_factory.or(store_factory)` (child takes precedence, root
536        // is the fallback). The coherence check must read the tier of that same
537        // effective factory, or a host that wires only a durable child factory
538        // (no root) is wrongly rejected though `build()` would wire it durably.
539        let session_store_tier = self
540            .child_store_factory
541            .as_ref()
542            .or(self.store_factory.as_ref())
543            .map(|factory| factory.durability_tier());
544        let attachment_tier = self
545            .attachment_store
546            .as_ref()
547            .map(|store| store.persistence().durability_tier());
548        let artifact_tier = self
549            .lashlang_artifact_store
550            .as_ref()
551            .map(|store| store.durability_tier());
552        let host_event_store_tier = self
553            .host_event_store
554            .as_ref()
555            .map(|store| store.durability_tier());
556
557        if session_store_tier == Some(DurabilityTier::Durable) {
558            if attachment_tier == Some(DurabilityTier::Inline) {
559                return Err(EmbedError::DurableStorePeerRequired {
560                    facet: "attachment store",
561                });
562            }
563            if artifact_tier == Some(DurabilityTier::Inline) {
564                return Err(EmbedError::DurableStorePeerRequired {
565                    facet: "artifact store",
566                });
567            }
568        }
569
570        if let Some(process_registry) = self.process_work_source.process_registry().as_ref() {
571            if process_registry.durability_tier() == DurabilityTier::Durable {
572                if session_store_tier != Some(DurabilityTier::Durable) {
573                    return Err(EmbedError::DurableProcessRegistryRequiresStoreFactory);
574                }
575                if host_event_store_tier != Some(DurabilityTier::Durable) {
576                    return Err(EmbedError::DurableStorePeerRequired {
577                        facet: "host event store",
578                    });
579                }
580            }
581        }
582
583        if host_event_store_tier == Some(DurabilityTier::Durable) {
584            if session_store_tier != Some(DurabilityTier::Durable) {
585                return Err(EmbedError::DurableStorePeerRequired {
586                    facet: "session store factory",
587                });
588            }
589            if let Some(process_registry) = self.process_work_source.process_registry().as_ref()
590                && process_registry.durability_tier() == DurabilityTier::Inline
591            {
592                return Err(EmbedError::DurableStorePeerRequired {
593                    facet: "process registry",
594                });
595            }
596        }
597
598        Ok(())
599    }
600
601    pub fn build(mut self) -> Result<LashCore> {
602        if self.modes.is_empty() {
603            return Err(EmbedError::NoModesInstalled);
604        }
605        self.ensure_store_peer_coherence()?;
606        let default_mode = self
607            .default_mode
608            .clone()
609            .ok_or(EmbedError::NoModesInstalled)?;
610        if !self.modes.contains_key(&default_mode) {
611            return Err(EmbedError::DefaultModeNotInstalled { mode: default_mode });
612        }
613        let provider_id = self
614            .session_spec
615            .provider_id
616            .clone()
617            .or_else(|| {
618                self.provider
619                    .as_ref()
620                    .map(|provider| provider.kind().to_string())
621            })
622            .unwrap_or_default();
623        let model = self
624            .session_spec
625            .model
626            .clone()
627            .ok_or(EmbedError::MissingModelSpec)?;
628
629        let base_policy = SessionPolicy {
630            provider_id,
631            model,
632            max_turns: self.session_spec.max_turns.flatten(),
633            ..SessionPolicy::default()
634        };
635        let policy = self.session_spec.resolve_against(&base_policy);
636
637        let mut core = self.resolve_runtime_host_config()?;
638        if let Some(provider) = self.provider.clone() {
639            core.providers.provider_resolver =
640                Arc::new(lash_core::SingleProviderResolver::new(provider));
641        }
642
643        let plugin_factories = if let Some(plugin_host) = self.plugin_host {
644            plugin_host.factories().to_vec()
645        } else {
646            let mut factories = Vec::new();
647            if !self.tool_providers.is_empty() {
648                let spec = self
649                    .tool_providers
650                    .into_iter()
651                    .fold(PluginSpec::new(), PluginSpec::with_tool_provider);
652                factories.push(Arc::new(StaticPluginFactory::new("embed_tools", spec))
653                    as Arc<dyn PluginFactory>);
654            }
655            factories.extend(self.plugin_stack.into_factories());
656            factories
657        };
658        let default_plugin_host = build_plugin_host_for_mode(
659            &self.modes,
660            &default_mode,
661            &plugin_factories,
662            Vec::new(),
663            self.process_work_source.has_registry(),
664        )?;
665
666        let process_registry = self.process_work_source.process_registry();
667
668        // Resolve the process work runner before the process source is moved
669        // into the environment. The default inline runner's config is built
670        // eagerly so a missing store factory fails loudly at build, not at
671        // first open. It is built from the *default-mode* plugin host (preset
672        // protocol plugin + process-lifecycle abilities), the same host the
673        // live runtime uses, so the worker can rebuild a runtime for a
674        // default-mode process.
675        let process_work_runner = Self::resolve_process_work_runner(
676            &self.process_work_source,
677            &default_plugin_host,
678            &core,
679            // The worker rebuilds sessions with the same factory `build()` wires
680            // below: `child_store_factory.or(store_factory)`.
681            self.child_store_factory
682                .as_ref()
683                .or(self.store_factory.as_ref()),
684            &policy,
685            self.residency.unwrap_or_default(),
686            self.host_event_store.as_ref(),
687        )?;
688
689        let mut env_builder = RuntimeEnvironment::builder()
690            .with_plugin_host(Arc::new(default_plugin_host))
691            .with_runtime_host_config(core);
692        if let Some(process_registry) = process_registry.as_ref() {
693            env_builder = env_builder.with_process_registry(Arc::clone(process_registry));
694        }
695        if let Some(residency) = self.residency {
696            env_builder = env_builder.with_residency(residency);
697        }
698        if let Some(child_store_factory) = self
699            .child_store_factory
700            .as_ref()
701            .or(self.store_factory.as_ref())
702        {
703            env_builder = env_builder.with_session_store_factory(Arc::clone(child_store_factory));
704        }
705        if let Some(host_event_store) = self.host_event_store.as_ref() {
706            env_builder = env_builder.with_host_event_store(Arc::clone(host_event_store));
707        }
708        if let Some(queued_work_poke) = self.queued_work_poke.clone() {
709            env_builder = env_builder.with_queued_work_poke(queued_work_poke);
710        }
711
712        Ok(LashCore {
713            env: env_builder.build(),
714            policy,
715            modes: Arc::new(self.modes),
716            default_mode,
717            store_factory: self.store_factory,
718            plugin_factories: Arc::new(plugin_factories),
719            provider: self.provider,
720            process_observer: process_registry.map(ProcessWorkObserver::new),
721            process_work_runner: Arc::new(ProcessWorkRunnerSlot::new(process_work_runner)),
722        })
723    }
724
725    /// Decide how a built [`LashCore`] sources its process work runner.
726    ///
727    /// - no registry => nothing to run ([`ProcessWorkRunnerSetup::None`]);
728    /// - external driver wired => use it ([`ProcessWorkRunnerSetup::External`]);
729    /// - inline registry wired => lazily spawn the default inline runner on first open. Its
730    ///   [`DurableProcessWorkerConfig`] is built eagerly when a store factory is
731    ///   present; without one the inline worker cannot rebuild session runtimes.
732    fn resolve_process_work_runner(
733        process_work_source: &ProcessWorkSource,
734        worker_plugin_host: &PluginHost,
735        core: &RuntimeHostConfig,
736        store_factory: Option<&Arc<dyn SessionStoreFactory>>,
737        policy: &SessionPolicy,
738        residency: lash_core::Residency,
739        host_event_store: Option<&Arc<dyn lash_core::HostEventStore>>,
740    ) -> Result<ProcessWorkRunnerSetup> {
741        let process_registry = match process_work_source {
742            ProcessWorkSource::None => return Ok(ProcessWorkRunnerSetup::None),
743            ProcessWorkSource::External(driver) => {
744                return Ok(ProcessWorkRunnerSetup::External {
745                    driver: driver.clone(),
746                });
747            }
748            ProcessWorkSource::Inline { registry } => Arc::clone(registry),
749        };
750        // The worker rebuilds a session runtime per process, so it needs a store
751        // factory; without one the default runner could not execute anything, so
752        // fail loudly rather than silently leave processes unexecuted.
753        let Some(store_factory) = store_factory else {
754            return Err(EmbedError::ProcessRegistryRequiresStoreFactory);
755        };
756        // The worker rebuilds with the *same* plugin host the live runtime uses
757        // for the default mode — including the mode preset's protocol plugin
758        // (which supplies the protocol session capability) and the
759        // process-lifecycle abilities. The bare builder `plugin_factories` omit
760        // the preset factory (added per-mode by `build_plugin_host_for_mode`), so
761        // a worker built from them would fail to rebuild a runtime ("missing
762        // protocol session capability").
763        let config = Box::new(
764            DurableProcessWorkerConfig::new(
765                Arc::new(worker_plugin_host.clone()),
766                core.clone(),
767                Arc::clone(store_factory),
768                process_registry,
769            )
770            .with_session_policy(policy.clone())
771            .with_host_event_store(
772                host_event_store
773                    .cloned()
774                    .unwrap_or_else(|| Arc::new(lash_core::InMemoryHostEventStore::default())),
775            )
776            .with_residency(residency),
777        );
778        Ok(ProcessWorkRunnerSetup::LazyDefault { config })
779    }
780
781    pub fn advanced(self) -> AdvancedLashCoreBuilder {
782        AdvancedLashCoreBuilder { builder: self }
783    }
784
785    pub fn process_registry(mut self, process_registry: Arc<dyn ProcessRegistry>) -> Self {
786        self.process_work_source = ProcessWorkSource::Inline {
787            registry: process_registry,
788        };
789        self
790    }
791
792    pub fn host_event_store(mut self, store: Arc<dyn lash_core::HostEventStore>) -> Self {
793        self.host_event_store = Some(store);
794        self
795    }
796
797    /// Configure an externally owned process work runner.
798    ///
799    /// Durable hosts construct a [`ProcessWorkDriver`] from the same process
800    /// registry and wake handle used by their deployment runner, then pass it
801    /// here. The driver registry becomes the core's process registry and no
802    /// inline runner is spawned.
803    pub fn process_work_driver(mut self, driver: ProcessWorkDriver) -> Self {
804        self.process_work_source = ProcessWorkSource::External(driver);
805        self
806    }
807
808    pub fn with_queued_work_runner(mut self, poke: QueuedWorkPoke) -> Self {
809        self.queued_work_poke = Some(poke);
810        self
811    }
812}
813
814pub(crate) fn build_plugin_host_for_mode(
815    modes: &BTreeMap<ModeId, ModePreset>,
816    mode: &ModeId,
817    common_factories: &[Arc<dyn PluginFactory>],
818    extra_factories: Vec<Arc<dyn PluginFactory>>,
819    process_lifecycle: bool,
820) -> Result<PluginHost> {
821    let preset = modes
822        .get(mode)
823        .ok_or_else(|| EmbedError::ModeNotInstalled { mode: mode.clone() })?;
824    let mut factories = Vec::with_capacity(1 + common_factories.len() + extra_factories.len());
825    factories.push(Arc::clone(&preset.factory));
826    factories.extend(common_factories.iter().cloned());
827    factories.extend(extra_factories);
828    let mut plugin_host = PluginHost::new(factories);
829    if process_lifecycle {
830        let abilities = plugin_host
831            .lashlang_abilities()
832            .with_processes()
833            .with_sleep()
834            .with_process_signals();
835        plugin_host = plugin_host.with_lashlang_abilities(abilities);
836    }
837    Ok(plugin_host)
838}
839
840impl PromptLayerSink for LashCoreBuilder {
841    fn prompt_layer_mut(&mut self) -> &mut PromptLayer {
842        self.prompt.get_or_insert_with(PromptLayer::new)
843    }
844}
845
846pub struct AdvancedLashCoreBuilder {
847    builder: LashCoreBuilder,
848}
849
850impl AdvancedLashCoreBuilder {
851    pub fn runtime_host_config(mut self, core: lash_core::RuntimeHostConfig) -> Self {
852        self.builder.runtime_host_config = Some(core);
853        self
854    }
855
856    pub fn plugin_host(mut self, plugin_host: PluginHost) -> Self {
857        self.builder.plugin_host = Some(plugin_host);
858        self
859    }
860
861    pub fn build(self) -> Result<LashCore> {
862        self.builder.build()
863    }
864}