Skip to main content

lash/
core.rs

1use crate::support::*;
2use lash_core::runtime::{
3    ProcessCommand, ProcessEffectOutcome, RuntimeEffectCommand, RuntimeEffectEnvelope,
4    RuntimeEffectKind, RuntimeEffectLocalExecutor, RuntimeEffectOutcome, RuntimeInvocation,
5    RuntimeScope,
6};
7
8#[derive(Clone)]
9pub struct LashCore {
10    pub(crate) env: RuntimeEnvironment,
11    pub(crate) policy: SessionPolicy,
12    pub(crate) modes: Arc<BTreeMap<ModeId, ModePreset>>,
13    pub(crate) default_mode: ModeId,
14    pub(crate) store_factory: Option<Arc<dyn SessionStoreFactory>>,
15    pub(crate) plugin_factories: Arc<Vec<Arc<dyn PluginFactory>>>,
16    pub(crate) provider: Option<ProviderHandle>,
17    pub(crate) live_replay_store: Arc<dyn LiveReplayStore>,
18    /// Shared resolution of the process work runner. The poke it yields is
19    /// threaded onto every session's host so the process control seam can wake
20    /// the runner after a successful start. Shared across `LashCore` clones so
21    /// the default inline runner is spawned at most once (Decision 3).
22    pub(crate) process_work_runner: Arc<ProcessWorkRunnerSlot>,
23}
24
25/// How a [`LashCore`] resolves its process work runner, decided at `build()`
26/// and shared across clones.
27pub(crate) enum ProcessWorkRunnerSetup {
28    /// No process registry is wired; there is nothing to run and no poke.
29    None,
30    /// Lazily spawn the default inline [`ProcessWorkRunner`] on first
31    /// `session().open()` (Decision 3: the runtime is guaranteed by then, and
32    /// `build()` is sync — some tests call it outside a tokio runtime). A store
33    /// factory is required to build the config (the worker rebuilds a session
34    /// runtime per process); a registry with no store factory is rejected at
35    /// build with [`EmbedError::ProcessRegistryRequiresStoreFactory`].
36    LazyDefault {
37        config: Box<DurableProcessWorkerConfig>,
38    },
39    /// The host wired an external runner (e.g. the Restate ingress-client
40    /// runner) and handed its driver to the core.
41    External { driver: ProcessWorkDriver },
42}
43
44#[derive(Clone, Default)]
45pub(crate) enum ProcessWorkSource {
46    #[default]
47    None,
48    Inline {
49        registry: Arc<dyn ProcessRegistry>,
50    },
51    External(ProcessWorkDriver),
52}
53
54impl ProcessWorkSource {
55    fn process_registry(&self) -> Option<Arc<dyn ProcessRegistry>> {
56        match self {
57            Self::None => None,
58            Self::Inline { registry } => Some(Arc::clone(registry)),
59            Self::External(driver) => Some(driver.process_registry()),
60        }
61    }
62
63    fn has_registry(&self) -> bool {
64        !matches!(self, Self::None)
65    }
66}
67
68/// Shared, lazily-initialized process-work-runner state for a [`LashCore`].
69///
70/// The once-guard ([`tokio::sync::OnceCell`]) makes the default inline runner
71/// spawn exactly once across `LashCore` clones, on the first `session().open()`
72/// that needs it. The resolved [`ProcessWorkPoke`] (if any) is then reused for
73/// every session host.
74pub(crate) struct ProcessWorkRunnerSlot {
75    setup: ProcessWorkRunnerSetup,
76    poke: tokio::sync::OnceCell<Option<ProcessWorkPoke>>,
77}
78
79impl ProcessWorkRunnerSlot {
80    fn new(setup: ProcessWorkRunnerSetup) -> Self {
81        Self {
82            setup,
83            poke: tokio::sync::OnceCell::new(),
84        }
85    }
86
87    /// Resolve the poke for a session host, spawning the default inline runner
88    /// on first use. Idempotent: the once-guard ensures a single spawn.
89    pub(crate) async fn poke(&self) -> Option<ProcessWorkPoke> {
90        self.poke
91            .get_or_init(|| async {
92                match &self.setup {
93                    ProcessWorkRunnerSetup::None => None,
94                    ProcessWorkRunnerSetup::External { driver } => Some(driver.poke_handle()),
95                    ProcessWorkRunnerSetup::LazyDefault { config } => {
96                        let worker = DurableProcessWorker::new((**config).clone());
97                        Some(ProcessWorkRunner::inline(worker).spawn())
98                    }
99                }
100            })
101            .await
102            .clone()
103    }
104}
105
106#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
107pub struct SessionDeleteReport {
108    pub session_id: String,
109    pub process: Option<lash_core::ProcessSessionDeleteReport>,
110}
111
112impl LashCore {
113    pub fn builder() -> LashCoreBuilder {
114        LashCoreBuilder::default()
115    }
116
117    /// Preset for `standard` mode.
118    ///
119    /// Storage and effect durability are still host-owned choices. Provide the
120    /// effect host, Lashlang artifact store, and attachment store facets with
121    /// the builder setters before calling [`LashCoreBuilder::build`].
122    pub fn standard() -> LashCoreBuilder {
123        Self::builder()
124            .install_mode(ModePreset::standard())
125            .default_mode(ModeId::standard())
126            .plugins(default_runtime_stack())
127    }
128
129    /// Preset for `rlm` mode.
130    ///
131    /// Storage and effect durability are still host-owned choices. Provide the
132    /// effect host, Lashlang artifact store, and attachment store facets with
133    /// the builder setters before calling [`LashCoreBuilder::build`].
134    pub fn rlm() -> LashCoreBuilder {
135        Self::builder()
136            .install_mode(ModePreset::rlm())
137            .default_mode(ModeId::rlm())
138            .plugins(default_runtime_stack())
139    }
140
141    pub fn session(&self, session_id: impl Into<String>) -> SessionBuilder {
142        SessionBuilder {
143            core: self.clone(),
144            session_id: session_id.into(),
145            spec: SessionSpec::inherit(),
146            mode: None,
147            parent_session_id: None,
148            store: None,
149            provider: None,
150            active_plugins: Vec::new(),
151            plugin_factories: Vec::new(),
152            rlm_final_answer_format: None,
153        }
154    }
155
156    pub fn host_events(&self) -> crate::control::HostEventsControl {
157        crate::control::HostEventsControl { core: self.clone() }
158    }
159
160    pub fn processes(&self) -> crate::control::Processes {
161        crate::control::Processes { core: self.clone() }
162    }
163
164    pub fn effect_host(&self) -> Arc<dyn EffectHost> {
165        Arc::clone(&self.env.core.control.effect_host)
166    }
167
168    pub async fn delete_session(
169        &self,
170        session_id: impl AsRef<str>,
171        scoped_effect_controller: ScopedEffectController<'_>,
172    ) -> Result<SessionDeleteReport> {
173        let session_id = session_id.as_ref().to_string();
174        let Some(store_factory) = self.store_factory.as_ref() else {
175            return Err(EmbedError::MissingSessionStoreFactory);
176        };
177        let process = if let Some(process_registry) = self.env.process_registry.as_ref() {
178            let invocation = RuntimeInvocation::effect(
179                RuntimeScope::new(session_id.clone()),
180                format!("process:delete-session:{session_id}"),
181                RuntimeEffectKind::Process,
182                format!("{session_id}:delete-session"),
183            );
184            let outcome = scoped_effect_controller
185                .controller()
186                .execute_effect(
187                    RuntimeEffectEnvelope::new(
188                        invocation,
189                        RuntimeEffectCommand::process(ProcessCommand::DeleteSession {
190                            session_id: session_id.clone(),
191                        }),
192                    ),
193                    RuntimeEffectLocalExecutor::process_control(Arc::clone(process_registry)),
194                )
195                .await
196                .map_err(|err| EmbedError::SessionDeleteProcess {
197                    session_id: session_id.clone(),
198                    message: err.to_string(),
199                })?;
200            match outcome {
201                RuntimeEffectOutcome::Process {
202                    result: ProcessEffectOutcome::DeleteSession { report },
203                } => Some(report),
204                other => {
205                    return Err(EmbedError::SessionDeleteProcess {
206                        session_id,
207                        message: format!(
208                            "process delete returned the wrong outcome: {}",
209                            other.kind().as_str()
210                        ),
211                    });
212                }
213            }
214        } else {
215            None
216        };
217        if let Some(host_event_store) = self.env.host_event_store.as_ref() {
218            host_event_store
219                .delete_session_subscriptions(&session_id)
220                .await
221                .map_err(|err| EmbedError::SessionDeleteProcess {
222                    session_id: session_id.clone(),
223                    message: err.to_string(),
224                })?;
225        }
226        store_factory
227            .delete_session(&session_id)
228            .await
229            .map_err(|message| EmbedError::StoreFactory {
230                session_id: session_id.clone(),
231                message,
232            })?;
233        Ok(SessionDeleteReport {
234            session_id,
235            process,
236        })
237    }
238
239    pub fn installed_modes(&self) -> impl Iterator<Item = &ModeId> {
240        self.modes.keys()
241    }
242
243    pub fn process_registry(&self) -> Option<Arc<dyn ProcessRegistry>> {
244        self.env.process_registry.as_ref().cloned()
245    }
246
247    pub fn durable_process_worker_config(&self) -> Result<DurableProcessWorkerConfig> {
248        self.durable_process_worker_config_with_plugins(std::iter::empty::<Arc<dyn PluginFactory>>())
249    }
250
251    pub fn durable_process_worker_config_with_plugins(
252        &self,
253        extra_plugin_factories: impl IntoIterator<Item = Arc<dyn PluginFactory>>,
254    ) -> Result<DurableProcessWorkerConfig> {
255        let Some(process_registry) = self.process_registry() else {
256            return Err(EmbedError::MissingProcessRegistry);
257        };
258        let Some(store_factory) = self.store_factory.as_ref() else {
259            return Err(EmbedError::MissingProcessWorkerStoreFactory);
260        };
261        let plugin_host = build_plugin_host_for_mode(
262            &self.modes,
263            &self.default_mode,
264            self.plugin_factories.as_ref(),
265            extra_plugin_factories.into_iter().collect(),
266            true,
267        )?;
268        let mut config = DurableProcessWorkerConfig::new(
269            Arc::new(plugin_host),
270            self.env.core.clone(),
271            Arc::clone(store_factory),
272            process_registry,
273        )
274        .with_session_policy(self.policy.clone())
275        .with_residency(self.env.residency);
276        if let Some(host_event_store) = self.env.host_event_store.as_ref() {
277            config = config.with_host_event_store(Arc::clone(host_event_store));
278        }
279        Ok(config)
280    }
281}
282
283fn default_runtime_stack() -> PluginStack {
284    lash_plugin_tool_output_budget::tool_output_budget_stack()
285}
286
287#[derive(Default)]
288pub struct LashCoreBuilder {
289    pub(crate) modes: BTreeMap<ModeId, ModePreset>,
290    pub(crate) default_mode: Option<ModeId>,
291    session_spec: SessionSpec,
292    provider: Option<ProviderHandle>,
293    pub(crate) store_factory: Option<Arc<dyn SessionStoreFactory>>,
294    child_store_factory: Option<Arc<dyn SessionStoreFactory>>,
295    // `RuntimeHostConfig` has no `Default`: the three host-owned durability
296    // dependencies must be named. They are collected here and resolved in
297    // `build()`, which errors if any is unset.
298    effect_host: Option<Arc<dyn EffectHost>>,
299    attachment_store: Option<Arc<dyn AttachmentStore>>,
300    lashlang_artifact_store: Option<Arc<dyn lash_core::LashlangArtifactStore>>,
301    host_event_store: Option<Arc<dyn lash_core::HostEventStore>>,
302    // Benign core overrides applied on top of the resolved core.
303    prompt: Option<PromptLayer>,
304    trace_sink: Option<Arc<dyn lash_trace::TraceSink>>,
305    lashlang_execution_sink: Option<Arc<dyn lash_trace::TraceSink>>,
306    trace_level: Option<lash_trace::TraceLevel>,
307    trace_context: Option<lash_trace::TraceContext>,
308    termination: Option<TerminationPolicy>,
309    // Advanced full-config override; used as the base core when present.
310    runtime_host_config: Option<RuntimeHostConfig>,
311    tool_providers: Vec<Arc<dyn ToolProvider>>,
312    plugin_stack: PluginStack,
313    plugin_host: Option<PluginHost>,
314    residency: Option<Residency>,
315    // Single source of truth for process lifecycle support and process-work
316    // consumption.
317    process_work_source: ProcessWorkSource,
318    queued_work_poke: Option<QueuedWorkPoke>,
319    live_replay_store: Option<Arc<dyn LiveReplayStore>>,
320}
321
322impl LashCoreBuilder {
323    pub fn install_mode(mut self, preset: ModePreset) -> Self {
324        let mode_id = preset.mode_id.clone();
325        if self.default_mode.is_none() {
326            self.default_mode = Some(mode_id.clone());
327        }
328        self.modes.insert(mode_id, preset);
329        self
330    }
331
332    pub fn default_mode(mut self, mode: ModeId) -> Self {
333        self.default_mode = Some(mode);
334        self
335    }
336
337    pub fn provider(mut self, provider: ProviderHandle) -> Self {
338        self.session_spec = self.session_spec.provider_id(provider.kind());
339        self.provider = Some(provider);
340        self
341    }
342
343    pub fn model(mut self, model: lash_core::ModelSpec) -> Self {
344        self.session_spec = self.session_spec.model(model);
345        self
346    }
347
348    pub fn max_turns(mut self, max_turns: usize) -> Self {
349        self.session_spec = self.session_spec.max_turns(max_turns);
350        self
351    }
352
353    pub fn session_spec(mut self, spec: SessionSpec) -> Self {
354        self.session_spec = spec;
355        self
356    }
357
358    /// Configure a factory that can create a persistence store for any root
359    /// session opened from this core.
360    ///
361    /// The factory must honor `SessionStoreCreateRequest::session_id` and
362    /// return a store for that specific session. Do not use this to wrap one
363    /// pre-opened root store; pass root-only stores with
364    /// `LashCore::session(...).store(store)` instead.
365    pub fn store_factory(mut self, store_factory: Arc<dyn SessionStoreFactory>) -> Self {
366        self.store_factory = Some(store_factory);
367        self
368    }
369
370    /// Configure the persistence factory used by managed child sessions, such
371    /// as local subagents.
372    ///
373    /// Child factories must return a distinct store bound to the requested
374    /// child session id. Hosts that pass an explicit root store with
375    /// `SessionBuilder::store` should set this when child sessions need
376    /// persistence.
377    pub fn child_store_factory(mut self, store_factory: Arc<dyn SessionStoreFactory>) -> Self {
378        self.child_store_factory = Some(store_factory);
379        self
380    }
381
382    pub fn attachment_store(mut self, attachment_store: Arc<dyn AttachmentStore>) -> Self {
383        self.attachment_store = Some(attachment_store);
384        self
385    }
386
387    /// Set the deployment-level Lashlang artifact store (compiled module
388    /// cache, shared across the session tree). A durable store such as
389    /// `lash_sqlite_store::Store` implements it.
390    pub fn lashlang_artifact_store(
391        mut self,
392        artifact_store: Arc<dyn lash_core::LashlangArtifactStore>,
393    ) -> Self {
394        self.lashlang_artifact_store = Some(artifact_store);
395        self
396    }
397
398    /// Set the deployment effect host — the durability boundary every operation
399    /// crosses. Pass [`InlineEffectHost`](crate::durability::InlineEffectHost)
400    /// for in-process execution, or a workflow-backed host for durable
401    /// execution.
402    pub fn effect_host(mut self, effect_host: Arc<dyn EffectHost>) -> Self {
403        self.effect_host = Some(effect_host);
404        self
405    }
406
407    pub fn tools(mut self, tools: Arc<dyn ToolProvider>) -> Self {
408        self.tool_providers.push(tools);
409        self
410    }
411
412    pub fn plugin(mut self, plugin: Arc<dyn PluginFactory>) -> Self {
413        self.plugin_stack.push(plugin);
414        self
415    }
416
417    pub fn plugins(mut self, stack: PluginStack) -> Self {
418        self.plugin_stack = stack;
419        self
420    }
421
422    pub fn configure_plugins(mut self, configure: impl FnOnce(&mut PluginStack)) -> Self {
423        configure(&mut self.plugin_stack);
424        self
425    }
426
427    pub fn trace_sink(mut self, trace_sink: Arc<dyn lash_trace::TraceSink>) -> Self {
428        self.trace_sink = Some(trace_sink);
429        self
430    }
431
432    pub fn trace_jsonl_path(mut self, path: impl Into<std::path::PathBuf>) -> Self {
433        self.trace_sink = Some(Arc::new(lash_trace::JsonlTraceSink::new(path.into())));
434        self
435    }
436
437    pub fn lashlang_execution_sink(
438        mut self,
439        lashlang_execution_sink: Arc<dyn lash_trace::TraceSink>,
440    ) -> Self {
441        self.lashlang_execution_sink = Some(lashlang_execution_sink);
442        self
443    }
444
445    pub fn lashlang_execution_jsonl_path(mut self, path: impl Into<std::path::PathBuf>) -> Self {
446        self.lashlang_execution_sink = Some(Arc::new(lash_trace::JsonlTraceSink::new(path.into())));
447        self
448    }
449
450    pub fn trace_level(mut self, trace_level: lash_trace::TraceLevel) -> Self {
451        self.trace_level = Some(trace_level);
452        self
453    }
454
455    pub fn trace_context(mut self, trace_context: lash_trace::TraceContext) -> Self {
456        self.trace_context = Some(trace_context);
457        self
458    }
459
460    pub fn termination(mut self, termination: TerminationPolicy) -> Self {
461        self.termination = Some(termination);
462        self
463    }
464
465    pub fn residency(mut self, residency: Residency) -> Self {
466        self.residency = Some(residency);
467        self
468    }
469
470    /// Configure the bounded live replay buffer used by session observation
471    /// cursors. This is best-effort reconnect recovery only; durable state
472    /// still comes from the session store and [`SessionReadView`].
473    pub fn live_replay_store(mut self, live_replay_store: Arc<dyn LiveReplayStore>) -> Self {
474        self.live_replay_store = Some(live_replay_store);
475        self
476    }
477
478    /// Resolve the runtime host config, requiring the three host-owned
479    /// durability dependencies to have been named.
480    fn resolve_runtime_host_config(&mut self) -> Result<RuntimeHostConfig> {
481        if let Some(base) = self.runtime_host_config.take() {
482            return Ok(self.apply_core_overrides(base));
483        }
484        let effect_host = self
485            .effect_host
486            .take()
487            .ok_or(EmbedError::MissingEffectHost)?;
488        let lashlang_artifact_store = self
489            .lashlang_artifact_store
490            .take()
491            .ok_or(EmbedError::MissingLashlangArtifactStore)?;
492        let attachment_store = self
493            .attachment_store
494            .take()
495            .ok_or(EmbedError::MissingAttachmentStore)?;
496        let core = RuntimeHostConfig::new(effect_host, lashlang_artifact_store, attachment_store);
497        Ok(self.apply_core_overrides(core))
498    }
499
500    /// Apply benign + still-set dependency overrides on top of a base core.
501    fn apply_core_overrides(&mut self, mut core: RuntimeHostConfig) -> RuntimeHostConfig {
502        if let Some(effect_host) = self.effect_host.take() {
503            core.control.effect_host = effect_host;
504        }
505        if let Some(attachment_store) = self.attachment_store.take() {
506            core.durability.attachment_store = attachment_store;
507        }
508        if let Some(artifact_store) = self.lashlang_artifact_store.take() {
509            core.durability.lashlang_artifact_store = artifact_store;
510        }
511        if let Some(prompt) = self.prompt.take() {
512            core.prompt.prompt = prompt;
513        }
514        if let Some(trace_sink) = self.trace_sink.take() {
515            core.tracing.trace_sink = Some(trace_sink);
516        }
517        if let Some(lashlang_execution_sink) = self.lashlang_execution_sink.take() {
518            core.tracing.lashlang_execution_sink = Some(lashlang_execution_sink);
519        }
520        if let Some(trace_level) = self.trace_level.take() {
521            core.tracing.trace_level = trace_level;
522        }
523        if let Some(trace_context) = self.trace_context.take() {
524            core.tracing.trace_context = trace_context;
525        }
526        if let Some(termination) = self.termination.take() {
527            core.control.termination = termination;
528        }
529        core
530    }
531
532    /// Validate store peer-coherence of the wired durability dependencies.
533    ///
534    /// Durability is established by what the host wired; the per-invocation
535    /// durable controller is not visible here (the build-time controller is
536    /// inline by construction), so this checks the stores against each other
537    /// only — never the controller (see A5 in the durable-first wiring spec):
538    ///
539    /// - a durable session store factory requires a durable attachment and
540    ///   artifact store (they back the same session state);
541    /// - a durable process registry requires a session store factory that is
542    ///   itself durable (the registry's process records are meaningless without
543    ///   a durable session behind them).
544    fn ensure_store_peer_coherence(&self) -> Result<()> {
545        // Match `build()`'s wiring exactly: the session store factory it installs
546        // is `child_store_factory.or(store_factory)` (child takes precedence, root
547        // is the fallback). The coherence check must read the tier of that same
548        // effective factory, or a host that wires only a durable child factory
549        // (no root) is wrongly rejected though `build()` would wire it durably.
550        let session_store_tier = self
551            .child_store_factory
552            .as_ref()
553            .or(self.store_factory.as_ref())
554            .map(|factory| factory.durability_tier());
555        let attachment_tier = self
556            .attachment_store
557            .as_ref()
558            .map(|store| store.persistence().durability_tier());
559        let artifact_tier = self
560            .lashlang_artifact_store
561            .as_ref()
562            .map(|store| store.durability_tier());
563        let host_event_store_tier = self
564            .host_event_store
565            .as_ref()
566            .map(|store| store.durability_tier());
567
568        if session_store_tier == Some(DurabilityTier::Durable) {
569            if attachment_tier == Some(DurabilityTier::Inline) {
570                return Err(EmbedError::DurableStorePeerRequired {
571                    facet: "attachment store",
572                });
573            }
574            if artifact_tier == Some(DurabilityTier::Inline) {
575                return Err(EmbedError::DurableStorePeerRequired {
576                    facet: "artifact store",
577                });
578            }
579        }
580
581        if let Some(process_registry) = self.process_work_source.process_registry().as_ref()
582            && process_registry.durability_tier() == DurabilityTier::Durable
583        {
584            if session_store_tier != Some(DurabilityTier::Durable) {
585                return Err(EmbedError::DurableProcessRegistryRequiresStoreFactory);
586            }
587            if host_event_store_tier != Some(DurabilityTier::Durable) {
588                return Err(EmbedError::DurableStorePeerRequired {
589                    facet: "host event store",
590                });
591            }
592        }
593
594        if host_event_store_tier == Some(DurabilityTier::Durable) {
595            if session_store_tier != Some(DurabilityTier::Durable) {
596                return Err(EmbedError::DurableStorePeerRequired {
597                    facet: "session store factory",
598                });
599            }
600            if let Some(process_registry) = self.process_work_source.process_registry().as_ref()
601                && process_registry.durability_tier() == DurabilityTier::Inline
602            {
603                return Err(EmbedError::DurableStorePeerRequired {
604                    facet: "process registry",
605                });
606            }
607        }
608
609        Ok(())
610    }
611
612    pub fn build(mut self) -> Result<LashCore> {
613        if self.modes.is_empty() {
614            return Err(EmbedError::NoModesInstalled);
615        }
616        self.ensure_store_peer_coherence()?;
617        let default_mode = self
618            .default_mode
619            .clone()
620            .ok_or(EmbedError::NoModesInstalled)?;
621        if !self.modes.contains_key(&default_mode) {
622            return Err(EmbedError::DefaultModeNotInstalled { mode: default_mode });
623        }
624        let provider_id = self
625            .session_spec
626            .provider_id
627            .clone()
628            .or_else(|| {
629                self.provider
630                    .as_ref()
631                    .map(|provider| provider.kind().to_string())
632            })
633            .unwrap_or_default();
634        let model = self
635            .session_spec
636            .model
637            .clone()
638            .ok_or(EmbedError::MissingModelSpec)?;
639
640        let base_policy = SessionPolicy {
641            provider_id,
642            model,
643            max_turns: self.session_spec.max_turns.flatten(),
644            ..SessionPolicy::default()
645        };
646        let policy = self.session_spec.resolve_against(&base_policy);
647
648        let mut core = self.resolve_runtime_host_config()?;
649        if let Some(provider) = self.provider.clone() {
650            core.providers.provider_resolver =
651                Arc::new(lash_core::SingleProviderResolver::new(provider));
652        }
653
654        let plugin_factories = if let Some(plugin_host) = self.plugin_host {
655            plugin_host.factories().to_vec()
656        } else {
657            let mut factories = Vec::new();
658            if !self.tool_providers.is_empty() {
659                let spec = self
660                    .tool_providers
661                    .into_iter()
662                    .fold(PluginSpec::new(), PluginSpec::with_tool_provider);
663                factories.push(Arc::new(StaticPluginFactory::new("embed_tools", spec))
664                    as Arc<dyn PluginFactory>);
665            }
666            factories.extend(self.plugin_stack.into_factories());
667            factories
668        };
669        let default_plugin_host = build_plugin_host_for_mode(
670            &self.modes,
671            &default_mode,
672            &plugin_factories,
673            Vec::new(),
674            self.process_work_source.has_registry(),
675        )?;
676
677        let process_registry = self.process_work_source.process_registry();
678
679        // Resolve the process work runner before the process source is moved
680        // into the environment. The default inline runner's config is built
681        // eagerly so a missing store factory fails loudly at build, not at
682        // first open. It is built from the *default-mode* plugin host (preset
683        // protocol plugin + process-lifecycle abilities), the same host the
684        // live runtime uses, so the worker can rebuild a runtime for a
685        // default-mode process.
686        let process_work_runner = Self::resolve_process_work_runner(
687            &self.process_work_source,
688            &default_plugin_host,
689            &core,
690            // The worker rebuilds sessions with the same factory `build()` wires
691            // below: `child_store_factory.or(store_factory)`.
692            self.child_store_factory
693                .as_ref()
694                .or(self.store_factory.as_ref()),
695            &policy,
696            self.residency.unwrap_or_default(),
697            self.host_event_store.as_ref(),
698        )?;
699
700        let mut env_builder = RuntimeEnvironment::builder()
701            .with_plugin_host(Arc::new(default_plugin_host))
702            .with_runtime_host_config(core);
703        if let Some(process_registry) = process_registry.as_ref() {
704            env_builder = env_builder.with_process_registry(Arc::clone(process_registry));
705        }
706        if let Some(residency) = self.residency {
707            env_builder = env_builder.with_residency(residency);
708        }
709        if let Some(child_store_factory) = self
710            .child_store_factory
711            .as_ref()
712            .or(self.store_factory.as_ref())
713        {
714            env_builder = env_builder.with_session_store_factory(Arc::clone(child_store_factory));
715        }
716        if let Some(host_event_store) = self.host_event_store.as_ref() {
717            env_builder = env_builder.with_host_event_store(Arc::clone(host_event_store));
718        }
719        if let Some(queued_work_poke) = self.queued_work_poke.clone() {
720            env_builder = env_builder.with_queued_work_poke(queued_work_poke);
721        }
722
723        let live_replay_store = self
724            .live_replay_store
725            .take()
726            .unwrap_or_else(|| Arc::new(InMemoryLiveReplayStore::default()));
727
728        Ok(LashCore {
729            env: env_builder.build(),
730            policy,
731            modes: Arc::new(self.modes),
732            default_mode,
733            store_factory: self.store_factory,
734            plugin_factories: Arc::new(plugin_factories),
735            provider: self.provider,
736            live_replay_store,
737            process_work_runner: Arc::new(ProcessWorkRunnerSlot::new(process_work_runner)),
738        })
739    }
740
741    /// Decide how a built [`LashCore`] sources its process work runner.
742    ///
743    /// - no registry => nothing to run ([`ProcessWorkRunnerSetup::None`]);
744    /// - external driver wired => use it ([`ProcessWorkRunnerSetup::External`]);
745    /// - inline registry wired => lazily spawn the default inline runner on first open. Its
746    ///   [`DurableProcessWorkerConfig`] is built eagerly when a store factory is
747    ///   present; without one the inline worker cannot rebuild session runtimes.
748    fn resolve_process_work_runner(
749        process_work_source: &ProcessWorkSource,
750        worker_plugin_host: &PluginHost,
751        core: &RuntimeHostConfig,
752        store_factory: Option<&Arc<dyn SessionStoreFactory>>,
753        policy: &SessionPolicy,
754        residency: lash_core::Residency,
755        host_event_store: Option<&Arc<dyn lash_core::HostEventStore>>,
756    ) -> Result<ProcessWorkRunnerSetup> {
757        let process_registry = match process_work_source {
758            ProcessWorkSource::None => return Ok(ProcessWorkRunnerSetup::None),
759            ProcessWorkSource::External(driver) => {
760                return Ok(ProcessWorkRunnerSetup::External {
761                    driver: driver.clone(),
762                });
763            }
764            ProcessWorkSource::Inline { registry } => Arc::clone(registry),
765        };
766        // The worker rebuilds a session runtime per process, so it needs a store
767        // factory; without one the default runner could not execute anything, so
768        // fail loudly rather than silently leave processes unexecuted.
769        let Some(store_factory) = store_factory else {
770            return Err(EmbedError::ProcessRegistryRequiresStoreFactory);
771        };
772        // The worker rebuilds with the *same* plugin host the live runtime uses
773        // for the default mode — including the mode preset's protocol plugin
774        // (which supplies the protocol session capability) and the
775        // process-lifecycle abilities. The bare builder `plugin_factories` omit
776        // the preset factory (added per-mode by `build_plugin_host_for_mode`), so
777        // a worker built from them would fail to rebuild a runtime ("missing
778        // protocol session capability").
779        let config = Box::new(
780            DurableProcessWorkerConfig::new(
781                Arc::new(worker_plugin_host.clone()),
782                core.clone(),
783                Arc::clone(store_factory),
784                process_registry,
785            )
786            .with_session_policy(policy.clone())
787            .with_host_event_store(
788                host_event_store
789                    .cloned()
790                    .unwrap_or_else(|| Arc::new(lash_core::InMemoryHostEventStore::default())),
791            )
792            .with_residency(residency),
793        );
794        Ok(ProcessWorkRunnerSetup::LazyDefault { config })
795    }
796
797    pub fn advanced(self) -> AdvancedLashCoreBuilder {
798        AdvancedLashCoreBuilder { builder: self }
799    }
800
801    pub fn process_registry(mut self, process_registry: Arc<dyn ProcessRegistry>) -> Self {
802        self.process_work_source = ProcessWorkSource::Inline {
803            registry: process_registry,
804        };
805        self
806    }
807
808    pub fn host_event_store(mut self, store: Arc<dyn lash_core::HostEventStore>) -> Self {
809        self.host_event_store = Some(store);
810        self
811    }
812
813    /// Configure an externally owned process work runner.
814    ///
815    /// Durable hosts construct a [`ProcessWorkDriver`] from the same process
816    /// registry and wake handle used by their deployment runner, then pass it
817    /// here. The driver registry becomes the core's process registry and no
818    /// inline runner is spawned.
819    pub fn process_work_driver(mut self, driver: ProcessWorkDriver) -> Self {
820        self.process_work_source = ProcessWorkSource::External(driver);
821        self
822    }
823
824    /// Wire the wake handle of an externally owned queued-work runner. The
825    /// runtime pokes it whenever new queued work lands for a session.
826    pub fn queued_work_poke(mut self, poke: QueuedWorkPoke) -> Self {
827        self.queued_work_poke = Some(poke);
828        self
829    }
830}
831
832pub(crate) fn build_plugin_host_for_mode(
833    modes: &BTreeMap<ModeId, ModePreset>,
834    mode: &ModeId,
835    common_factories: &[Arc<dyn PluginFactory>],
836    extra_factories: Vec<Arc<dyn PluginFactory>>,
837    process_lifecycle: bool,
838) -> Result<PluginHost> {
839    let preset = modes
840        .get(mode)
841        .ok_or_else(|| EmbedError::ModeNotInstalled { mode: mode.clone() })?;
842    let mut factories = Vec::with_capacity(1 + common_factories.len() + extra_factories.len());
843    factories.push(Arc::clone(&preset.factory));
844    factories.extend(common_factories.iter().cloned());
845    factories.extend(extra_factories);
846    let mut plugin_host = PluginHost::new(factories);
847    if process_lifecycle {
848        let abilities = plugin_host
849            .lashlang_abilities()
850            .with_processes()
851            .with_sleep()
852            .with_process_signals();
853        plugin_host = plugin_host.with_lashlang_abilities(abilities);
854    }
855    Ok(plugin_host)
856}
857
858impl PromptLayerSink for LashCoreBuilder {
859    fn prompt_layer_mut(&mut self) -> &mut PromptLayer {
860        self.prompt.get_or_insert_with(PromptLayer::new)
861    }
862}
863
864pub struct AdvancedLashCoreBuilder {
865    builder: LashCoreBuilder,
866}
867
868impl AdvancedLashCoreBuilder {
869    pub fn runtime_host_config(mut self, core: lash_core::RuntimeHostConfig) -> Self {
870        self.builder.runtime_host_config = Some(core);
871        self
872    }
873
874    pub fn plugin_host(mut self, plugin_host: PluginHost) -> Self {
875        self.builder.plugin_host = Some(plugin_host);
876        self
877    }
878
879    pub fn build(self) -> Result<LashCore> {
880        self.builder.build()
881    }
882}