Skip to main content

lash/
core.rs

1use crate::support::*;
2use lash_core::runtime::{
3    ProcessCommand, ProcessEffectOutcome, RuntimeEffectCommand, RuntimeEffectEnvelope,
4    RuntimeEffectKind, RuntimeEffectLocalExecutor, RuntimeEffectOutcome, RuntimeInvocation,
5    RuntimeScope,
6};
7
8#[derive(Clone)]
9pub struct LashCore {
10    pub(crate) env: RuntimeEnvironment,
11    pub(crate) policy: SessionPolicy,
12    pub(crate) modes: Arc<BTreeMap<ModeId, ModePreset>>,
13    pub(crate) default_mode: ModeId,
14    pub(crate) store_factory: Option<Arc<dyn SessionStoreFactory>>,
15    pub(crate) plugin_factories: Arc<Vec<Arc<dyn PluginFactory>>>,
16    pub(crate) provider: Option<ProviderHandle>,
17    pub(crate) live_replay_store: Arc<dyn LiveReplayStore>,
18    pub(crate) process_observer: Option<ProcessWorkObserver>,
19    /// Shared resolution of the process work runner. The poke it yields is
20    /// threaded onto every session's host so the process control seam can wake
21    /// the runner after a successful start. Shared across `LashCore` clones so
22    /// the default inline runner is spawned at most once (Decision 3).
23    pub(crate) process_work_runner: Arc<ProcessWorkRunnerSlot>,
24}
25
26/// How a [`LashCore`] resolves its process work runner, decided at `build()`
27/// and shared across clones.
28pub(crate) enum ProcessWorkRunnerSetup {
29    /// No process registry is wired; there is nothing to run and no poke.
30    None,
31    /// Lazily spawn the default inline [`ProcessWorkRunner`] on first
32    /// `session().open()` (Decision 3: the runtime is guaranteed by then, and
33    /// `build()` is sync — some tests call it outside a tokio runtime). A store
34    /// factory is required to build the config (the worker rebuilds a session
35    /// runtime per process); a registry with no store factory is rejected at
36    /// build with [`EmbedError::ProcessRegistryRequiresStoreFactory`].
37    LazyDefault {
38        config: Box<DurableProcessWorkerConfig>,
39    },
40    /// The host wired an external runner (e.g. the Restate ingress-client
41    /// runner) and handed its driver to the core.
42    External { driver: ProcessWorkDriver },
43}
44
45#[derive(Clone, Default)]
46pub(crate) enum ProcessWorkSource {
47    #[default]
48    None,
49    Inline {
50        registry: Arc<dyn ProcessRegistry>,
51    },
52    External(ProcessWorkDriver),
53}
54
55impl ProcessWorkSource {
56    fn process_registry(&self) -> Option<Arc<dyn ProcessRegistry>> {
57        match self {
58            Self::None => None,
59            Self::Inline { registry } => Some(Arc::clone(registry)),
60            Self::External(driver) => Some(driver.process_registry()),
61        }
62    }
63
64    fn has_registry(&self) -> bool {
65        !matches!(self, Self::None)
66    }
67}
68
69/// Shared, lazily-initialized process-work-runner state for a [`LashCore`].
70///
71/// The once-guard ([`tokio::sync::OnceCell`]) makes the default inline runner
72/// spawn exactly once across `LashCore` clones, on the first `session().open()`
73/// that needs it. The resolved [`ProcessWorkPoke`] (if any) is then reused for
74/// every session host.
75pub(crate) struct ProcessWorkRunnerSlot {
76    setup: ProcessWorkRunnerSetup,
77    poke: tokio::sync::OnceCell<Option<ProcessWorkPoke>>,
78}
79
80impl ProcessWorkRunnerSlot {
81    fn new(setup: ProcessWorkRunnerSetup) -> Self {
82        Self {
83            setup,
84            poke: tokio::sync::OnceCell::new(),
85        }
86    }
87
88    /// Resolve the poke for a session host, spawning the default inline runner
89    /// on first use. Idempotent: the once-guard ensures a single spawn.
90    pub(crate) async fn poke(&self) -> Option<ProcessWorkPoke> {
91        self.poke
92            .get_or_init(|| async {
93                match &self.setup {
94                    ProcessWorkRunnerSetup::None => None,
95                    ProcessWorkRunnerSetup::External { driver } => Some(driver.poke_handle()),
96                    ProcessWorkRunnerSetup::LazyDefault { config } => {
97                        let worker = DurableProcessWorker::new((**config).clone());
98                        Some(ProcessWorkRunner::inline(worker).spawn())
99                    }
100                }
101            })
102            .await
103            .clone()
104    }
105}
106
107#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
108pub struct SessionDeleteReport {
109    pub session_id: String,
110    pub process: Option<lash_core::ProcessSessionDeleteReport>,
111}
112
113impl LashCore {
114    pub fn builder() -> LashCoreBuilder {
115        LashCoreBuilder::default()
116    }
117
118    /// Preset for `standard` mode.
119    ///
120    /// Storage and effect durability are still host-owned choices. Provide the
121    /// effect host, Lashlang artifact store, and attachment store facets with
122    /// the builder setters before calling [`LashCoreBuilder::build`].
123    pub fn standard() -> LashCoreBuilder {
124        Self::builder()
125            .install_mode(ModePreset::standard())
126            .default_mode(ModeId::standard())
127            .plugins(default_runtime_stack())
128    }
129
130    /// Preset for `rlm` mode.
131    ///
132    /// Storage and effect durability are still host-owned choices. Provide the
133    /// effect host, Lashlang artifact store, and attachment store facets with
134    /// the builder setters before calling [`LashCoreBuilder::build`].
135    pub fn rlm() -> LashCoreBuilder {
136        Self::builder()
137            .install_mode(ModePreset::rlm())
138            .default_mode(ModeId::rlm())
139            .plugins(default_runtime_stack())
140    }
141
142    pub fn session(&self, session_id: impl Into<String>) -> SessionBuilder {
143        SessionBuilder {
144            core: self.clone(),
145            session_id: session_id.into(),
146            spec: SessionSpec::inherit(),
147            mode: None,
148            parent_session_id: None,
149            store: None,
150            provider: None,
151            active_plugins: Vec::new(),
152            plugin_factories: Vec::new(),
153            rlm_final_answer_format: None,
154        }
155    }
156
157    pub fn host_events(&self) -> crate::control::HostEventsControl {
158        crate::control::HostEventsControl { core: self.clone() }
159    }
160
161    pub fn effect_host(&self) -> Arc<dyn EffectHost> {
162        Arc::clone(&self.env.core.control.effect_host)
163    }
164
165    pub async fn delete_session(
166        &self,
167        session_id: impl AsRef<str>,
168        scoped_effect_controller: ScopedEffectController<'_>,
169    ) -> Result<SessionDeleteReport> {
170        let session_id = session_id.as_ref().to_string();
171        let Some(store_factory) = self.store_factory.as_ref() else {
172            return Err(EmbedError::MissingSessionStoreFactory);
173        };
174        let process = if let Some(process_registry) = self.env.process_registry.as_ref() {
175            let invocation = RuntimeInvocation::effect(
176                RuntimeScope::new(session_id.clone()),
177                format!("process:delete-session:{session_id}"),
178                RuntimeEffectKind::Process,
179                format!("{session_id}:delete-session"),
180            );
181            let outcome = scoped_effect_controller
182                .controller()
183                .execute_effect(
184                    RuntimeEffectEnvelope::new(
185                        invocation,
186                        RuntimeEffectCommand::Process {
187                            command: ProcessCommand::DeleteSession {
188                                session_id: session_id.clone(),
189                            },
190                        },
191                    ),
192                    RuntimeEffectLocalExecutor::process_control(Arc::clone(process_registry)),
193                )
194                .await
195                .map_err(|err| EmbedError::SessionDeleteProcess {
196                    session_id: session_id.clone(),
197                    message: err.to_string(),
198                })?;
199            match outcome {
200                RuntimeEffectOutcome::Process {
201                    result: ProcessEffectOutcome::DeleteSession { report },
202                } => Some(report),
203                other => {
204                    return Err(EmbedError::SessionDeleteProcess {
205                        session_id,
206                        message: format!(
207                            "process delete returned the wrong outcome: {}",
208                            other.kind().as_str()
209                        ),
210                    });
211                }
212            }
213        } else {
214            None
215        };
216        store_factory
217            .delete_session(&session_id)
218            .await
219            .map_err(|message| EmbedError::StoreFactory {
220                session_id: session_id.clone(),
221                message,
222            })?;
223        Ok(SessionDeleteReport {
224            session_id,
225            process,
226        })
227    }
228
229    pub fn installed_modes(&self) -> impl Iterator<Item = &ModeId> {
230        self.modes.keys()
231    }
232
233    pub fn process_observer(&self) -> Option<&ProcessWorkObserver> {
234        self.process_observer.as_ref()
235    }
236
237    pub fn process_registry(&self) -> Option<Arc<dyn ProcessRegistry>> {
238        self.env.process_registry.as_ref().cloned()
239    }
240
241    pub fn durable_process_worker_config(&self) -> Result<DurableProcessWorkerConfig> {
242        self.durable_process_worker_config_with_plugins(std::iter::empty::<Arc<dyn PluginFactory>>())
243    }
244
245    pub fn durable_process_worker_config_with_plugins(
246        &self,
247        extra_plugin_factories: impl IntoIterator<Item = Arc<dyn PluginFactory>>,
248    ) -> Result<DurableProcessWorkerConfig> {
249        let Some(process_registry) = self.process_registry() else {
250            return Err(EmbedError::MissingProcessRegistry);
251        };
252        let Some(store_factory) = self.store_factory.as_ref() else {
253            return Err(EmbedError::MissingProcessWorkerStoreFactory);
254        };
255        let plugin_host = build_plugin_host_for_mode(
256            &self.modes,
257            &self.default_mode,
258            self.plugin_factories.as_ref(),
259            extra_plugin_factories.into_iter().collect(),
260            true,
261        )?;
262        let mut config = DurableProcessWorkerConfig::new(
263            Arc::new(plugin_host),
264            self.env.core.clone(),
265            Arc::clone(store_factory),
266            process_registry,
267        )
268        .with_session_policy(self.policy.clone())
269        .with_residency(self.env.residency);
270        if let Some(host_event_store) = self.env.host_event_store.as_ref() {
271            config = config.with_host_event_store(Arc::clone(host_event_store));
272        }
273        Ok(config)
274    }
275}
276
277fn default_runtime_stack() -> PluginStack {
278    lash_plugin_tool_output_budget::tool_output_budget_stack()
279}
280
281#[derive(Default)]
282pub struct LashCoreBuilder {
283    pub(crate) modes: BTreeMap<ModeId, ModePreset>,
284    pub(crate) default_mode: Option<ModeId>,
285    session_spec: SessionSpec,
286    provider: Option<ProviderHandle>,
287    pub(crate) store_factory: Option<Arc<dyn SessionStoreFactory>>,
288    child_store_factory: Option<Arc<dyn SessionStoreFactory>>,
289    // `RuntimeHostConfig` has no `Default`: the three host-owned durability
290    // dependencies must be named. They are collected here and resolved in
291    // `build()`, which errors if any is unset.
292    effect_host: Option<Arc<dyn EffectHost>>,
293    attachment_store: Option<Arc<dyn AttachmentStore>>,
294    lashlang_artifact_store: Option<Arc<dyn lash_core::LashlangArtifactStore>>,
295    host_event_store: Option<Arc<dyn lash_core::HostEventStore>>,
296    // Benign core overrides applied on top of the resolved core.
297    prompt: Option<PromptLayer>,
298    trace_sink: Option<Arc<dyn lash_trace::TraceSink>>,
299    lashlang_execution_sink: Option<Arc<dyn lash_trace::TraceSink>>,
300    trace_level: Option<lash_trace::TraceLevel>,
301    trace_context: Option<lash_trace::TraceContext>,
302    termination: Option<TerminationPolicy>,
303    // Advanced full-config override; used as the base core when present.
304    runtime_host_config: Option<RuntimeHostConfig>,
305    tool_providers: Vec<Arc<dyn ToolProvider>>,
306    plugin_stack: PluginStack,
307    plugin_host: Option<PluginHost>,
308    residency: Option<Residency>,
309    // Single source of truth for process lifecycle support and process-work
310    // consumption.
311    process_work_source: ProcessWorkSource,
312    queued_work_poke: Option<QueuedWorkPoke>,
313    live_replay_store: Option<Arc<dyn LiveReplayStore>>,
314}
315
316impl LashCoreBuilder {
317    pub fn install_mode(mut self, preset: ModePreset) -> Self {
318        let mode_id = preset.mode_id.clone();
319        if self.default_mode.is_none() {
320            self.default_mode = Some(mode_id.clone());
321        }
322        self.modes.insert(mode_id, preset);
323        self
324    }
325
326    pub fn default_mode(mut self, mode: ModeId) -> Self {
327        self.default_mode = Some(mode);
328        self
329    }
330
331    pub fn provider(mut self, provider: ProviderHandle) -> Self {
332        self.session_spec = self.session_spec.provider_id(provider.kind());
333        self.provider = Some(provider);
334        self
335    }
336
337    pub fn model(mut self, model: lash_core::ModelSpec) -> Self {
338        self.session_spec = self.session_spec.model(model);
339        self
340    }
341
342    pub fn max_turns(mut self, max_turns: usize) -> Self {
343        self.session_spec = self.session_spec.max_turns(max_turns);
344        self
345    }
346
347    pub fn session_spec(mut self, spec: SessionSpec) -> Self {
348        self.session_spec = spec;
349        self
350    }
351
352    /// Configure a factory that can create a persistence store for any root
353    /// session opened from this core.
354    ///
355    /// The factory must honor `SessionStoreCreateRequest::session_id` and
356    /// return a store for that specific session. Do not use this to wrap one
357    /// pre-opened root store; pass root-only stores with
358    /// `LashCore::session(...).store(store)` instead.
359    pub fn store_factory(mut self, store_factory: Arc<dyn SessionStoreFactory>) -> Self {
360        self.store_factory = Some(store_factory);
361        self
362    }
363
364    /// Configure the persistence factory used by managed child sessions, such
365    /// as local subagents.
366    ///
367    /// Child factories must return a distinct store bound to the requested
368    /// child session id. Hosts that pass an explicit root store with
369    /// `SessionBuilder::store` should set this when child sessions need
370    /// persistence.
371    pub fn child_store_factory(mut self, store_factory: Arc<dyn SessionStoreFactory>) -> Self {
372        self.child_store_factory = Some(store_factory);
373        self
374    }
375
376    pub fn attachment_store(mut self, attachment_store: Arc<dyn AttachmentStore>) -> Self {
377        self.attachment_store = Some(attachment_store);
378        self
379    }
380
381    /// Set the deployment-level Lashlang artifact store (compiled module
382    /// cache, shared across the session tree). A durable store such as
383    /// `lash_sqlite_store::Store` implements it.
384    pub fn lashlang_artifact_store(
385        mut self,
386        artifact_store: Arc<dyn lash_core::LashlangArtifactStore>,
387    ) -> Self {
388        self.lashlang_artifact_store = Some(artifact_store);
389        self
390    }
391
392    /// Set the deployment effect host — the durability boundary every operation
393    /// crosses. Pass [`InlineEffectHost`](crate::durability::InlineEffectHost)
394    /// for in-process execution, or a workflow-backed host for durable
395    /// execution.
396    pub fn effect_host(mut self, effect_host: Arc<dyn EffectHost>) -> Self {
397        self.effect_host = Some(effect_host);
398        self
399    }
400
401    pub fn tools(mut self, tools: Arc<dyn ToolProvider>) -> Self {
402        self.tool_providers.push(tools);
403        self
404    }
405
406    pub fn plugin(mut self, plugin: Arc<dyn PluginFactory>) -> Self {
407        self.plugin_stack.push(plugin);
408        self
409    }
410
411    pub fn plugins(mut self, stack: PluginStack) -> Self {
412        self.plugin_stack = stack;
413        self
414    }
415
416    pub fn configure_plugins(mut self, configure: impl FnOnce(&mut PluginStack)) -> Self {
417        configure(&mut self.plugin_stack);
418        self
419    }
420
421    pub fn trace_sink(mut self, trace_sink: Arc<dyn lash_trace::TraceSink>) -> Self {
422        self.trace_sink = Some(trace_sink);
423        self
424    }
425
426    pub fn trace_jsonl_path(mut self, path: impl Into<std::path::PathBuf>) -> Self {
427        self.trace_sink = Some(Arc::new(lash_trace::JsonlTraceSink::new(path.into())));
428        self
429    }
430
431    pub fn lashlang_execution_sink(
432        mut self,
433        lashlang_execution_sink: Arc<dyn lash_trace::TraceSink>,
434    ) -> Self {
435        self.lashlang_execution_sink = Some(lashlang_execution_sink);
436        self
437    }
438
439    pub fn lashlang_execution_jsonl_path(mut self, path: impl Into<std::path::PathBuf>) -> Self {
440        self.lashlang_execution_sink = Some(Arc::new(lash_trace::JsonlTraceSink::new(path.into())));
441        self
442    }
443
444    pub fn trace_level(mut self, trace_level: lash_trace::TraceLevel) -> Self {
445        self.trace_level = Some(trace_level);
446        self
447    }
448
449    pub fn trace_context(mut self, trace_context: lash_trace::TraceContext) -> Self {
450        self.trace_context = Some(trace_context);
451        self
452    }
453
454    pub fn termination(mut self, termination: TerminationPolicy) -> Self {
455        self.termination = Some(termination);
456        self
457    }
458
459    pub fn residency(mut self, residency: Residency) -> Self {
460        self.residency = Some(residency);
461        self
462    }
463
464    /// Configure the bounded live replay buffer used by session observation
465    /// cursors. This is best-effort reconnect recovery only; durable state
466    /// still comes from the session store and [`SessionReadView`].
467    pub fn live_replay_store(mut self, live_replay_store: Arc<dyn LiveReplayStore>) -> Self {
468        self.live_replay_store = Some(live_replay_store);
469        self
470    }
471
472    /// Resolve the runtime host config, requiring the three host-owned
473    /// durability dependencies to have been named.
474    fn resolve_runtime_host_config(&mut self) -> Result<RuntimeHostConfig> {
475        if let Some(base) = self.runtime_host_config.take() {
476            return Ok(self.apply_core_overrides(base));
477        }
478        let effect_host = self
479            .effect_host
480            .take()
481            .ok_or(EmbedError::MissingEffectHost)?;
482        let lashlang_artifact_store = self
483            .lashlang_artifact_store
484            .take()
485            .ok_or(EmbedError::MissingLashlangArtifactStore)?;
486        let attachment_store = self
487            .attachment_store
488            .take()
489            .ok_or(EmbedError::MissingAttachmentStore)?;
490        let core = RuntimeHostConfig::new(effect_host, lashlang_artifact_store, attachment_store);
491        Ok(self.apply_core_overrides(core))
492    }
493
494    /// Apply benign + still-set dependency overrides on top of a base core.
495    fn apply_core_overrides(&mut self, mut core: RuntimeHostConfig) -> RuntimeHostConfig {
496        if let Some(effect_host) = self.effect_host.take() {
497            core.control.effect_host = effect_host;
498        }
499        if let Some(attachment_store) = self.attachment_store.take() {
500            core.durability.attachment_store = attachment_store;
501        }
502        if let Some(artifact_store) = self.lashlang_artifact_store.take() {
503            core.durability.lashlang_artifact_store = artifact_store;
504        }
505        if let Some(prompt) = self.prompt.take() {
506            core.prompt.prompt = prompt;
507        }
508        if let Some(trace_sink) = self.trace_sink.take() {
509            core.tracing.trace_sink = Some(trace_sink);
510        }
511        if let Some(lashlang_execution_sink) = self.lashlang_execution_sink.take() {
512            core.tracing.lashlang_execution_sink = Some(lashlang_execution_sink);
513        }
514        if let Some(trace_level) = self.trace_level.take() {
515            core.tracing.trace_level = trace_level;
516        }
517        if let Some(trace_context) = self.trace_context.take() {
518            core.tracing.trace_context = trace_context;
519        }
520        if let Some(termination) = self.termination.take() {
521            core.control.termination = termination;
522        }
523        core
524    }
525
526    /// Validate store peer-coherence of the wired durability dependencies.
527    ///
528    /// Durability is established by what the host wired; the per-invocation
529    /// durable controller is not visible here (the build-time controller is
530    /// inline by construction), so this checks the stores against each other
531    /// only — never the controller (see A5 in the durable-first wiring spec):
532    ///
533    /// - a durable session store factory requires a durable attachment and
534    ///   artifact store (they back the same session state);
535    /// - a durable process registry requires a session store factory that is
536    ///   itself durable (the registry's process records are meaningless without
537    ///   a durable session behind them).
538    fn ensure_store_peer_coherence(&self) -> Result<()> {
539        // Match `build()`'s wiring exactly: the session store factory it installs
540        // is `child_store_factory.or(store_factory)` (child takes precedence, root
541        // is the fallback). The coherence check must read the tier of that same
542        // effective factory, or a host that wires only a durable child factory
543        // (no root) is wrongly rejected though `build()` would wire it durably.
544        let session_store_tier = self
545            .child_store_factory
546            .as_ref()
547            .or(self.store_factory.as_ref())
548            .map(|factory| factory.durability_tier());
549        let attachment_tier = self
550            .attachment_store
551            .as_ref()
552            .map(|store| store.persistence().durability_tier());
553        let artifact_tier = self
554            .lashlang_artifact_store
555            .as_ref()
556            .map(|store| store.durability_tier());
557        let host_event_store_tier = self
558            .host_event_store
559            .as_ref()
560            .map(|store| store.durability_tier());
561
562        if session_store_tier == Some(DurabilityTier::Durable) {
563            if attachment_tier == Some(DurabilityTier::Inline) {
564                return Err(EmbedError::DurableStorePeerRequired {
565                    facet: "attachment store",
566                });
567            }
568            if artifact_tier == Some(DurabilityTier::Inline) {
569                return Err(EmbedError::DurableStorePeerRequired {
570                    facet: "artifact store",
571                });
572            }
573        }
574
575        if let Some(process_registry) = self.process_work_source.process_registry().as_ref()
576            && process_registry.durability_tier() == DurabilityTier::Durable
577        {
578            if session_store_tier != Some(DurabilityTier::Durable) {
579                return Err(EmbedError::DurableProcessRegistryRequiresStoreFactory);
580            }
581            if host_event_store_tier != Some(DurabilityTier::Durable) {
582                return Err(EmbedError::DurableStorePeerRequired {
583                    facet: "host event store",
584                });
585            }
586        }
587
588        if host_event_store_tier == Some(DurabilityTier::Durable) {
589            if session_store_tier != Some(DurabilityTier::Durable) {
590                return Err(EmbedError::DurableStorePeerRequired {
591                    facet: "session store factory",
592                });
593            }
594            if let Some(process_registry) = self.process_work_source.process_registry().as_ref()
595                && process_registry.durability_tier() == DurabilityTier::Inline
596            {
597                return Err(EmbedError::DurableStorePeerRequired {
598                    facet: "process registry",
599                });
600            }
601        }
602
603        Ok(())
604    }
605
606    pub fn build(mut self) -> Result<LashCore> {
607        if self.modes.is_empty() {
608            return Err(EmbedError::NoModesInstalled);
609        }
610        self.ensure_store_peer_coherence()?;
611        let default_mode = self
612            .default_mode
613            .clone()
614            .ok_or(EmbedError::NoModesInstalled)?;
615        if !self.modes.contains_key(&default_mode) {
616            return Err(EmbedError::DefaultModeNotInstalled { mode: default_mode });
617        }
618        let provider_id = self
619            .session_spec
620            .provider_id
621            .clone()
622            .or_else(|| {
623                self.provider
624                    .as_ref()
625                    .map(|provider| provider.kind().to_string())
626            })
627            .unwrap_or_default();
628        let model = self
629            .session_spec
630            .model
631            .clone()
632            .ok_or(EmbedError::MissingModelSpec)?;
633
634        let base_policy = SessionPolicy {
635            provider_id,
636            model,
637            max_turns: self.session_spec.max_turns.flatten(),
638            ..SessionPolicy::default()
639        };
640        let policy = self.session_spec.resolve_against(&base_policy);
641
642        let mut core = self.resolve_runtime_host_config()?;
643        if let Some(provider) = self.provider.clone() {
644            core.providers.provider_resolver =
645                Arc::new(lash_core::SingleProviderResolver::new(provider));
646        }
647
648        let plugin_factories = if let Some(plugin_host) = self.plugin_host {
649            plugin_host.factories().to_vec()
650        } else {
651            let mut factories = Vec::new();
652            if !self.tool_providers.is_empty() {
653                let spec = self
654                    .tool_providers
655                    .into_iter()
656                    .fold(PluginSpec::new(), PluginSpec::with_tool_provider);
657                factories.push(Arc::new(StaticPluginFactory::new("embed_tools", spec))
658                    as Arc<dyn PluginFactory>);
659            }
660            factories.extend(self.plugin_stack.into_factories());
661            factories
662        };
663        let default_plugin_host = build_plugin_host_for_mode(
664            &self.modes,
665            &default_mode,
666            &plugin_factories,
667            Vec::new(),
668            self.process_work_source.has_registry(),
669        )?;
670
671        let process_registry = self.process_work_source.process_registry();
672
673        // Resolve the process work runner before the process source is moved
674        // into the environment. The default inline runner's config is built
675        // eagerly so a missing store factory fails loudly at build, not at
676        // first open. It is built from the *default-mode* plugin host (preset
677        // protocol plugin + process-lifecycle abilities), the same host the
678        // live runtime uses, so the worker can rebuild a runtime for a
679        // default-mode process.
680        let process_work_runner = Self::resolve_process_work_runner(
681            &self.process_work_source,
682            &default_plugin_host,
683            &core,
684            // The worker rebuilds sessions with the same factory `build()` wires
685            // below: `child_store_factory.or(store_factory)`.
686            self.child_store_factory
687                .as_ref()
688                .or(self.store_factory.as_ref()),
689            &policy,
690            self.residency.unwrap_or_default(),
691            self.host_event_store.as_ref(),
692        )?;
693
694        let mut env_builder = RuntimeEnvironment::builder()
695            .with_plugin_host(Arc::new(default_plugin_host))
696            .with_runtime_host_config(core);
697        if let Some(process_registry) = process_registry.as_ref() {
698            env_builder = env_builder.with_process_registry(Arc::clone(process_registry));
699        }
700        if let Some(residency) = self.residency {
701            env_builder = env_builder.with_residency(residency);
702        }
703        if let Some(child_store_factory) = self
704            .child_store_factory
705            .as_ref()
706            .or(self.store_factory.as_ref())
707        {
708            env_builder = env_builder.with_session_store_factory(Arc::clone(child_store_factory));
709        }
710        if let Some(host_event_store) = self.host_event_store.as_ref() {
711            env_builder = env_builder.with_host_event_store(Arc::clone(host_event_store));
712        }
713        if let Some(queued_work_poke) = self.queued_work_poke.clone() {
714            env_builder = env_builder.with_queued_work_poke(queued_work_poke);
715        }
716
717        let live_replay_store = self
718            .live_replay_store
719            .take()
720            .unwrap_or_else(|| Arc::new(InMemoryLiveReplayStore::default()));
721
722        Ok(LashCore {
723            env: env_builder.build(),
724            policy,
725            modes: Arc::new(self.modes),
726            default_mode,
727            store_factory: self.store_factory,
728            plugin_factories: Arc::new(plugin_factories),
729            provider: self.provider,
730            live_replay_store,
731            process_observer: process_registry.map(ProcessWorkObserver::new),
732            process_work_runner: Arc::new(ProcessWorkRunnerSlot::new(process_work_runner)),
733        })
734    }
735
736    /// Decide how a built [`LashCore`] sources its process work runner.
737    ///
738    /// - no registry => nothing to run ([`ProcessWorkRunnerSetup::None`]);
739    /// - external driver wired => use it ([`ProcessWorkRunnerSetup::External`]);
740    /// - inline registry wired => lazily spawn the default inline runner on first open. Its
741    ///   [`DurableProcessWorkerConfig`] is built eagerly when a store factory is
742    ///   present; without one the inline worker cannot rebuild session runtimes.
743    fn resolve_process_work_runner(
744        process_work_source: &ProcessWorkSource,
745        worker_plugin_host: &PluginHost,
746        core: &RuntimeHostConfig,
747        store_factory: Option<&Arc<dyn SessionStoreFactory>>,
748        policy: &SessionPolicy,
749        residency: lash_core::Residency,
750        host_event_store: Option<&Arc<dyn lash_core::HostEventStore>>,
751    ) -> Result<ProcessWorkRunnerSetup> {
752        let process_registry = match process_work_source {
753            ProcessWorkSource::None => return Ok(ProcessWorkRunnerSetup::None),
754            ProcessWorkSource::External(driver) => {
755                return Ok(ProcessWorkRunnerSetup::External {
756                    driver: driver.clone(),
757                });
758            }
759            ProcessWorkSource::Inline { registry } => Arc::clone(registry),
760        };
761        // The worker rebuilds a session runtime per process, so it needs a store
762        // factory; without one the default runner could not execute anything, so
763        // fail loudly rather than silently leave processes unexecuted.
764        let Some(store_factory) = store_factory else {
765            return Err(EmbedError::ProcessRegistryRequiresStoreFactory);
766        };
767        // The worker rebuilds with the *same* plugin host the live runtime uses
768        // for the default mode — including the mode preset's protocol plugin
769        // (which supplies the protocol session capability) and the
770        // process-lifecycle abilities. The bare builder `plugin_factories` omit
771        // the preset factory (added per-mode by `build_plugin_host_for_mode`), so
772        // a worker built from them would fail to rebuild a runtime ("missing
773        // protocol session capability").
774        let config = Box::new(
775            DurableProcessWorkerConfig::new(
776                Arc::new(worker_plugin_host.clone()),
777                core.clone(),
778                Arc::clone(store_factory),
779                process_registry,
780            )
781            .with_session_policy(policy.clone())
782            .with_host_event_store(
783                host_event_store
784                    .cloned()
785                    .unwrap_or_else(|| Arc::new(lash_core::InMemoryHostEventStore::default())),
786            )
787            .with_residency(residency),
788        );
789        Ok(ProcessWorkRunnerSetup::LazyDefault { config })
790    }
791
792    pub fn advanced(self) -> AdvancedLashCoreBuilder {
793        AdvancedLashCoreBuilder { builder: self }
794    }
795
796    pub fn process_registry(mut self, process_registry: Arc<dyn ProcessRegistry>) -> Self {
797        self.process_work_source = ProcessWorkSource::Inline {
798            registry: process_registry,
799        };
800        self
801    }
802
803    pub fn host_event_store(mut self, store: Arc<dyn lash_core::HostEventStore>) -> Self {
804        self.host_event_store = Some(store);
805        self
806    }
807
808    /// Configure an externally owned process work runner.
809    ///
810    /// Durable hosts construct a [`ProcessWorkDriver`] from the same process
811    /// registry and wake handle used by their deployment runner, then pass it
812    /// here. The driver registry becomes the core's process registry and no
813    /// inline runner is spawned.
814    pub fn process_work_driver(mut self, driver: ProcessWorkDriver) -> Self {
815        self.process_work_source = ProcessWorkSource::External(driver);
816        self
817    }
818
819    /// Wire the wake handle of an externally owned queued-work runner. The
820    /// runtime pokes it whenever new queued work lands for a session.
821    pub fn queued_work_poke(mut self, poke: QueuedWorkPoke) -> Self {
822        self.queued_work_poke = Some(poke);
823        self
824    }
825}
826
827pub(crate) fn build_plugin_host_for_mode(
828    modes: &BTreeMap<ModeId, ModePreset>,
829    mode: &ModeId,
830    common_factories: &[Arc<dyn PluginFactory>],
831    extra_factories: Vec<Arc<dyn PluginFactory>>,
832    process_lifecycle: bool,
833) -> Result<PluginHost> {
834    let preset = modes
835        .get(mode)
836        .ok_or_else(|| EmbedError::ModeNotInstalled { mode: mode.clone() })?;
837    let mut factories = Vec::with_capacity(1 + common_factories.len() + extra_factories.len());
838    factories.push(Arc::clone(&preset.factory));
839    factories.extend(common_factories.iter().cloned());
840    factories.extend(extra_factories);
841    let mut plugin_host = PluginHost::new(factories);
842    if process_lifecycle {
843        let abilities = plugin_host
844            .lashlang_abilities()
845            .with_processes()
846            .with_sleep()
847            .with_process_signals();
848        plugin_host = plugin_host.with_lashlang_abilities(abilities);
849    }
850    Ok(plugin_host)
851}
852
853impl PromptLayerSink for LashCoreBuilder {
854    fn prompt_layer_mut(&mut self) -> &mut PromptLayer {
855        self.prompt.get_or_insert_with(PromptLayer::new)
856    }
857}
858
859pub struct AdvancedLashCoreBuilder {
860    builder: LashCoreBuilder,
861}
862
863impl AdvancedLashCoreBuilder {
864    pub fn runtime_host_config(mut self, core: lash_core::RuntimeHostConfig) -> Self {
865        self.builder.runtime_host_config = Some(core);
866        self
867    }
868
869    pub fn plugin_host(mut self, plugin_host: PluginHost) -> Self {
870        self.builder.plugin_host = Some(plugin_host);
871        self
872    }
873
874    pub fn build(self) -> Result<LashCore> {
875        self.builder.build()
876    }
877}