Skip to main content

lash/
core.rs

1use crate::support::*;
2use lash_core::runtime::{
3    ProcessCommand, ProcessEffectOutcome, RuntimeEffectCommand, RuntimeEffectEnvelope,
4    RuntimeEffectKind, RuntimeEffectLocalExecutor, RuntimeEffectOutcome, RuntimeInvocation,
5    RuntimeScope,
6};
7
8#[derive(Clone)]
9pub struct LashCore {
10    pub(crate) env: RuntimeEnvironment,
11    pub(crate) policy: SessionPolicy,
12    pub(crate) modes: Arc<BTreeMap<ModeId, ModePreset>>,
13    pub(crate) default_mode: ModeId,
14    pub(crate) store_factory: Option<Arc<dyn SessionStoreFactory>>,
15    pub(crate) plugin_factories: Arc<Vec<Arc<dyn PluginFactory>>>,
16    pub(crate) provider: Option<ProviderHandle>,
17    pub(crate) live_replay_store: Arc<dyn LiveReplayStore>,
18    /// Shared resolution of the process work runner. The poke it yields is
19    /// threaded onto every session's host so the process admin seam can wake
20    /// the runner after a successful start. Shared across `LashCore` clones so
21    /// the default inline runner is spawned at most once (Decision 3).
22    pub(crate) process_work_runner: Arc<ProcessWorkRunnerSlot>,
23}
24
25/// How a [`LashCore`] resolves its process work runner, decided at `build()`
26/// and shared across clones.
27pub(crate) enum ProcessWorkRunnerSetup {
28    /// No process registry is wired; there is nothing to run and no poke.
29    None,
30    /// Lazily spawn the default inline [`ProcessWorkRunner`] on first
31    /// `session().open()` (Decision 3: the runtime is guaranteed by then, and
32    /// `build()` is sync — some tests call it outside a tokio runtime). A store
33    /// factory is required to build the config (the worker rebuilds a session
34    /// runtime per process); a registry with no store factory is rejected at
35    /// build with [`EmbedError::ProcessRegistryRequiresStoreFactory`].
36    LazyDefault {
37        config: Box<DurableProcessWorkerConfig>,
38    },
39    /// The host wired an external runner (e.g. the Restate ingress-client
40    /// runner) and handed its driver to the core.
41    External { driver: ProcessWorkDriver },
42}
43
44#[derive(Clone, Default)]
45pub(crate) enum ProcessWorkSource {
46    #[default]
47    None,
48    Inline {
49        registry: Arc<dyn ProcessRegistry>,
50    },
51    External(ProcessWorkDriver),
52}
53
54impl ProcessWorkSource {
55    fn process_registry(&self) -> Option<Arc<dyn ProcessRegistry>> {
56        match self {
57            Self::None => None,
58            Self::Inline { registry } => Some(Arc::clone(registry)),
59            Self::External(driver) => Some(driver.process_registry()),
60        }
61    }
62
63    fn has_registry(&self) -> bool {
64        !matches!(self, Self::None)
65    }
66}
67
68/// Shared, lazily-initialized process-work-runner state for a [`LashCore`].
69///
70/// The once-guard ([`tokio::sync::OnceCell`]) makes the default inline runner
71/// spawn exactly once across `LashCore` clones, on the first `session().open()`
72/// that needs it. The resolved [`ProcessWorkPoke`] (if any) is then reused for
73/// every session host.
74pub(crate) struct ProcessWorkRunnerSlot {
75    setup: ProcessWorkRunnerSetup,
76    poke: tokio::sync::OnceCell<Option<ProcessWorkPoke>>,
77    phase_probe_slot: Option<lash_core::runtime::RuntimeTurnPhaseProbeSlot>,
78}
79
80impl ProcessWorkRunnerSlot {
81    fn new(setup: ProcessWorkRunnerSetup) -> Self {
82        let phase_probe_slot = match &setup {
83            ProcessWorkRunnerSetup::LazyDefault { config } => {
84                Some(config.turn_phase_probe_slot.clone())
85            }
86            ProcessWorkRunnerSetup::None | ProcessWorkRunnerSetup::External { .. } => None,
87        };
88        Self {
89            setup,
90            poke: tokio::sync::OnceCell::new(),
91            phase_probe_slot,
92        }
93    }
94
95    /// Resolve the poke for a session host, spawning the default inline runner
96    /// on first use. Idempotent: the once-guard ensures a single spawn.
97    pub(crate) async fn poke(&self) -> Option<ProcessWorkPoke> {
98        self.poke
99            .get_or_init(|| async {
100                match &self.setup {
101                    ProcessWorkRunnerSetup::None => None,
102                    ProcessWorkRunnerSetup::External { driver } => Some(driver.poke_handle()),
103                    ProcessWorkRunnerSetup::LazyDefault { config } => {
104                        let worker = DurableProcessWorker::new((**config).clone());
105                        Some(ProcessWorkRunner::inline(worker).spawn())
106                    }
107                }
108            })
109            .await
110            .clone()
111    }
112
113    pub(crate) fn phase_probe_slot(&self) -> Option<lash_core::runtime::RuntimeTurnPhaseProbeSlot> {
114        self.phase_probe_slot.clone()
115    }
116}
117
118#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
119pub struct SessionDeleteReport {
120    pub session_id: String,
121    pub process: Option<lash_core::ProcessSessionDeleteReport>,
122}
123
124impl LashCore {
125    pub fn builder() -> LashCoreBuilder {
126        LashCoreBuilder::default()
127    }
128
129    /// Preset for `standard` mode.
130    ///
131    /// Storage and effect durability are still host-owned choices. Provide the
132    /// effect host, Lashlang artifact store, and attachment store facets with
133    /// the builder setters before calling [`LashCoreBuilder::build`].
134    pub fn standard() -> LashCoreBuilder {
135        Self::builder()
136            .install_mode(ModePreset::standard())
137            .default_mode(ModeId::standard())
138            .plugins(default_runtime_stack())
139    }
140
141    /// Preset for `rlm` mode.
142    ///
143    /// Storage and effect durability are still host-owned choices. Provide the
144    /// effect host, Lashlang artifact store, and attachment store facets with
145    /// the builder setters before calling [`LashCoreBuilder::build`].
146    pub fn rlm() -> LashCoreBuilder {
147        Self::builder()
148            .install_mode(ModePreset::rlm())
149            .default_mode(ModeId::rlm())
150            .plugins(default_runtime_stack())
151    }
152
153    pub fn session(&self, session_id: impl Into<String>) -> SessionBuilder {
154        SessionBuilder {
155            core: self.clone(),
156            session_id: session_id.into(),
157            spec: SessionSpec::inherit(),
158            mode: None,
159            parent_session_id: None,
160            store: None,
161            provider: None,
162            active_plugins: Vec::new(),
163            plugin_factories: Vec::new(),
164            rlm_final_answer_format: None,
165        }
166    }
167
168    pub fn triggers(&self) -> crate::admin::CoreTriggerAdmin {
169        crate::admin::CoreTriggerAdmin { core: self.clone() }
170    }
171
172    pub fn processes(&self) -> crate::admin::Processes {
173        crate::admin::Processes { core: self.clone() }
174    }
175
176    pub fn completions(&self) -> crate::admin::Completions {
177        crate::admin::Completions { core: self.clone() }
178    }
179
180    pub fn effect_host(&self) -> Arc<dyn EffectHost> {
181        Arc::clone(&self.env.core.control.effect_host)
182    }
183
184    pub async fn delete_session(
185        &self,
186        session_id: impl AsRef<str>,
187        scoped_effect_controller: ScopedEffectController<'_>,
188    ) -> Result<SessionDeleteReport> {
189        let session_id = session_id.as_ref().to_string();
190        let Some(store_factory) = self.store_factory.as_ref() else {
191            return Err(EmbedError::MissingSessionStoreFactory);
192        };
193        let process = if let Some(process_registry) = self.env.process_registry.as_ref() {
194            let invocation = RuntimeInvocation::effect(
195                RuntimeScope::new(session_id.clone()),
196                format!("process:delete-session:{session_id}"),
197                RuntimeEffectKind::Process,
198                format!("{session_id}:delete-session"),
199            );
200            let outcome = scoped_effect_controller
201                .controller()
202                .execute_effect(
203                    RuntimeEffectEnvelope::new(
204                        invocation,
205                        RuntimeEffectCommand::process(ProcessCommand::DeleteSession {
206                            session_id: session_id.clone(),
207                        }),
208                    ),
209                    RuntimeEffectLocalExecutor::processes(Arc::clone(process_registry)),
210                )
211                .await
212                .map_err(|err| EmbedError::SessionDeleteProcess {
213                    session_id: session_id.clone(),
214                    message: err.to_string(),
215                })?;
216            match outcome {
217                RuntimeEffectOutcome::Process {
218                    result: ProcessEffectOutcome::DeleteSession { report },
219                } => Some(report),
220                other => {
221                    return Err(EmbedError::SessionDeleteProcess {
222                        session_id,
223                        message: format!(
224                            "process delete returned the wrong outcome: {}",
225                            other.kind().as_str()
226                        ),
227                    });
228                }
229            }
230        } else {
231            None
232        };
233        if let Some(trigger_store) = self.env.trigger_store.as_ref() {
234            trigger_store
235                .delete_session_subscriptions(&session_id)
236                .await
237                .map_err(|err| EmbedError::SessionDeleteProcess {
238                    session_id: session_id.clone(),
239                    message: err.to_string(),
240                })?;
241        }
242        self.env
243            .core
244            .control
245            .effect_host
246            .revoke_await_events_for_session(&session_id)
247            .await
248            .map_err(|err| EmbedError::SessionDeleteProcess {
249                session_id: session_id.clone(),
250                message: err.to_string(),
251            })?;
252        store_factory
253            .delete_session(&session_id)
254            .await
255            .map_err(|message| EmbedError::StoreFactory {
256                session_id: session_id.clone(),
257                message,
258            })?;
259        Ok(SessionDeleteReport {
260            session_id,
261            process,
262        })
263    }
264
265    pub fn installed_modes(&self) -> impl Iterator<Item = &ModeId> {
266        self.modes.keys()
267    }
268
269    pub fn process_registry(&self) -> Option<Arc<dyn ProcessRegistry>> {
270        self.env.process_registry.as_ref().cloned()
271    }
272
273    pub fn durable_process_worker_config(&self) -> Result<DurableProcessWorkerConfig> {
274        self.durable_process_worker_config_with_plugins(std::iter::empty::<Arc<dyn PluginFactory>>())
275    }
276
277    pub fn durable_process_worker_config_with_plugins(
278        &self,
279        extra_plugin_factories: impl IntoIterator<Item = Arc<dyn PluginFactory>>,
280    ) -> Result<DurableProcessWorkerConfig> {
281        let Some(process_registry) = self.process_registry() else {
282            return Err(EmbedError::MissingProcessRegistry);
283        };
284        let Some(store_factory) = self.store_factory.as_ref() else {
285            return Err(EmbedError::MissingProcessWorkerStoreFactory);
286        };
287        let plugin_host = build_plugin_host_for_mode(
288            &self.modes,
289            &self.default_mode,
290            self.plugin_factories.as_ref(),
291            extra_plugin_factories.into_iter().collect(),
292            true,
293        )?;
294        let mut config = DurableProcessWorkerConfig::new(
295            Arc::new(plugin_host),
296            self.env.core.clone(),
297            Arc::clone(store_factory),
298            process_registry,
299        )
300        .with_session_policy(self.policy.clone())
301        .with_residency(self.env.residency);
302        if let Some(trigger_store) = self.env.trigger_store.as_ref() {
303            config = config.with_trigger_store(Arc::clone(trigger_store));
304        }
305        Ok(config)
306    }
307}
308
309fn default_runtime_stack() -> PluginStack {
310    lash_plugin_tool_output_budget::tool_output_budget_stack()
311}
312
313#[derive(Default)]
314pub struct LashCoreBuilder {
315    pub(crate) modes: BTreeMap<ModeId, ModePreset>,
316    pub(crate) default_mode: Option<ModeId>,
317    session_spec: SessionSpec,
318    provider: Option<ProviderHandle>,
319    pub(crate) store_factory: Option<Arc<dyn SessionStoreFactory>>,
320    child_store_factory: Option<Arc<dyn SessionStoreFactory>>,
321    // `RuntimeHostConfig` has no `Default`: the three host-owned durability
322    // dependencies must be named. They are collected here and resolved in
323    // `build()`, which errors if any is unset.
324    effect_host: Option<Arc<dyn EffectHost>>,
325    attachment_store: Option<Arc<dyn AttachmentStore>>,
326    lashlang_artifact_store: Option<Arc<dyn lash_core::LashlangArtifactStore>>,
327    trigger_store: Option<Arc<dyn lash_core::TriggerStore>>,
328    // Benign core overrides applied on top of the resolved core.
329    prompt: Option<PromptLayer>,
330    trace_sink: Option<Arc<dyn lash_trace::TraceSink>>,
331    lashlang_execution_sink: Option<Arc<dyn lash_trace::TraceSink>>,
332    trace_level: Option<lash_trace::TraceLevel>,
333    trace_context: Option<lash_trace::TraceContext>,
334    termination: Option<TerminationPolicy>,
335    // Advanced full-config override; used as the base core when present.
336    runtime_host_config: Option<RuntimeHostConfig>,
337    tool_providers: Vec<Arc<dyn ToolProvider>>,
338    plugin_stack: PluginStack,
339    plugin_host: Option<PluginHost>,
340    residency: Option<Residency>,
341    // Single source of truth for process lifecycle support and process-work
342    // consumption.
343    process_work_source: ProcessWorkSource,
344    queued_work_poke: Option<QueuedWorkPoke>,
345    live_replay_store: Option<Arc<dyn LiveReplayStore>>,
346}
347
348impl LashCoreBuilder {
349    pub fn install_mode(mut self, preset: ModePreset) -> Self {
350        let mode_id = preset.mode_id.clone();
351        if self.default_mode.is_none() {
352            self.default_mode = Some(mode_id.clone());
353        }
354        self.modes.insert(mode_id, preset);
355        self
356    }
357
358    pub fn default_mode(mut self, mode: ModeId) -> Self {
359        self.default_mode = Some(mode);
360        self
361    }
362
363    pub fn provider(mut self, provider: ProviderHandle) -> Self {
364        self.session_spec = self.session_spec.provider_id(provider.kind());
365        self.provider = Some(provider);
366        self
367    }
368
369    pub fn model(mut self, model: lash_core::ModelSpec) -> Self {
370        self.session_spec = self.session_spec.model(model);
371        self
372    }
373
374    pub fn max_turns(mut self, max_turns: usize) -> Self {
375        self.session_spec = self.session_spec.max_turns(max_turns);
376        self
377    }
378
379    pub fn session_spec(mut self, spec: SessionSpec) -> Self {
380        self.session_spec = spec;
381        self
382    }
383
384    /// Configure a factory that can create a persistence store for any root
385    /// session opened from this core.
386    ///
387    /// The factory must honor `SessionStoreCreateRequest::session_id` and
388    /// return a store for that specific session. Do not use this to wrap one
389    /// pre-opened root store; pass root-only stores with
390    /// `LashCore::session(...).store(store)` instead.
391    pub fn store_factory(mut self, store_factory: Arc<dyn SessionStoreFactory>) -> Self {
392        self.store_factory = Some(store_factory);
393        self
394    }
395
396    /// Configure the persistence factory used by managed child sessions, such
397    /// as local subagents.
398    ///
399    /// Child factories must return a distinct store bound to the requested
400    /// child session id. Hosts that pass an explicit root store with
401    /// `SessionBuilder::store` should set this when child sessions need
402    /// persistence.
403    pub fn child_store_factory(mut self, store_factory: Arc<dyn SessionStoreFactory>) -> Self {
404        self.child_store_factory = Some(store_factory);
405        self
406    }
407
408    pub fn attachment_store(mut self, attachment_store: Arc<dyn AttachmentStore>) -> Self {
409        self.attachment_store = Some(attachment_store);
410        self
411    }
412
413    /// Set the deployment-level Lashlang artifact store (compiled module
414    /// cache, shared across the session tree). A durable store such as
415    /// `lash_sqlite_store::Store` implements it.
416    pub fn lashlang_artifact_store(
417        mut self,
418        artifact_store: Arc<dyn lash_core::LashlangArtifactStore>,
419    ) -> Self {
420        self.lashlang_artifact_store = Some(artifact_store);
421        self
422    }
423
424    /// Set the deployment effect host — the durability boundary every operation
425    /// crosses. Pass [`InlineEffectHost`](crate::durability::InlineEffectHost)
426    /// for in-process execution, or a workflow-backed host for durable
427    /// execution.
428    pub fn effect_host(mut self, effect_host: Arc<dyn EffectHost>) -> Self {
429        self.effect_host = Some(effect_host);
430        self
431    }
432
433    pub fn tools(mut self, tools: Arc<dyn ToolProvider>) -> Self {
434        self.tool_providers.push(tools);
435        self
436    }
437
438    pub fn plugin(mut self, plugin: Arc<dyn PluginFactory>) -> Self {
439        self.plugin_stack.push(plugin);
440        self
441    }
442
443    pub fn plugins(mut self, stack: PluginStack) -> Self {
444        self.plugin_stack = stack;
445        self
446    }
447
448    pub fn configure_plugins(mut self, configure: impl FnOnce(&mut PluginStack)) -> Self {
449        configure(&mut self.plugin_stack);
450        self
451    }
452
453    pub fn trace_sink(mut self, trace_sink: Arc<dyn lash_trace::TraceSink>) -> Self {
454        self.trace_sink = Some(trace_sink);
455        self
456    }
457
458    pub fn trace_jsonl_path(mut self, path: impl Into<std::path::PathBuf>) -> Self {
459        self.trace_sink = Some(Arc::new(lash_trace::JsonlTraceSink::new(path.into())));
460        self
461    }
462
463    pub fn lashlang_execution_sink(
464        mut self,
465        lashlang_execution_sink: Arc<dyn lash_trace::TraceSink>,
466    ) -> Self {
467        self.lashlang_execution_sink = Some(lashlang_execution_sink);
468        self
469    }
470
471    pub fn lashlang_execution_jsonl_path(mut self, path: impl Into<std::path::PathBuf>) -> Self {
472        self.lashlang_execution_sink = Some(Arc::new(lash_trace::JsonlTraceSink::new(path.into())));
473        self
474    }
475
476    pub fn trace_level(mut self, trace_level: lash_trace::TraceLevel) -> Self {
477        self.trace_level = Some(trace_level);
478        self
479    }
480
481    pub fn trace_context(mut self, trace_context: lash_trace::TraceContext) -> Self {
482        self.trace_context = Some(trace_context);
483        self
484    }
485
486    pub fn termination(mut self, termination: TerminationPolicy) -> Self {
487        self.termination = Some(termination);
488        self
489    }
490
491    pub fn residency(mut self, residency: Residency) -> Self {
492        self.residency = Some(residency);
493        self
494    }
495
496    /// Configure the bounded live replay buffer used by session observation
497    /// cursors. This is best-effort reconnect recovery only; durable state
498    /// still comes from the session store and [`SessionReadView`].
499    pub fn live_replay_store(mut self, live_replay_store: Arc<dyn LiveReplayStore>) -> Self {
500        self.live_replay_store = Some(live_replay_store);
501        self
502    }
503
504    /// Resolve the runtime host config, requiring the three host-owned
505    /// durability dependencies to have been named.
506    fn resolve_runtime_host_config(&mut self) -> Result<RuntimeHostConfig> {
507        if let Some(base) = self.runtime_host_config.take() {
508            return Ok(self.apply_core_overrides(base));
509        }
510        let effect_host = self
511            .effect_host
512            .take()
513            .ok_or(EmbedError::MissingEffectHost)?;
514        let lashlang_artifact_store = self
515            .lashlang_artifact_store
516            .take()
517            .ok_or(EmbedError::MissingLashlangArtifactStore)?;
518        let attachment_store = self
519            .attachment_store
520            .take()
521            .ok_or(EmbedError::MissingAttachmentStore)?;
522        let core = RuntimeHostConfig::new(effect_host, lashlang_artifact_store, attachment_store);
523        Ok(self.apply_core_overrides(core))
524    }
525
526    /// Apply benign + still-set dependency overrides on top of a base core.
527    fn apply_core_overrides(&mut self, mut core: RuntimeHostConfig) -> RuntimeHostConfig {
528        if let Some(effect_host) = self.effect_host.take() {
529            core.control.effect_host = effect_host;
530        }
531        if let Some(attachment_store) = self.attachment_store.take() {
532            core.durability.attachment_store = attachment_store;
533        }
534        if let Some(artifact_store) = self.lashlang_artifact_store.take() {
535            core.durability.lashlang_artifact_store = artifact_store;
536        }
537        if let Some(prompt) = self.prompt.take() {
538            core.prompt.prompt = prompt;
539        }
540        if let Some(trace_sink) = self.trace_sink.take() {
541            core.tracing.trace_sink = Some(trace_sink);
542        }
543        if let Some(lashlang_execution_sink) = self.lashlang_execution_sink.take() {
544            core.tracing.lashlang_execution_sink = Some(lashlang_execution_sink);
545        }
546        if let Some(trace_level) = self.trace_level.take() {
547            core.tracing.trace_level = trace_level;
548        }
549        if let Some(trace_context) = self.trace_context.take() {
550            core.tracing.trace_context = trace_context;
551        }
552        if let Some(termination) = self.termination.take() {
553            core.control.termination = termination;
554        }
555        core
556    }
557
558    /// Validate store peer-coherence of the wired durability dependencies.
559    ///
560    /// Durability is established by what the host wired; the per-invocation
561    /// durable controller is not visible here (the build-time controller is
562    /// inline by construction), so this checks the stores against each other
563    /// only — never the controller (see A5 in the durable-first wiring spec):
564    ///
565    /// - a durable session store factory requires a durable attachment and
566    ///   artifact store (they back the same session state);
567    /// - a durable process registry requires a session store factory that is
568    ///   itself durable (the registry's process records are meaningless without
569    ///   a durable session behind them).
570    fn ensure_store_peer_coherence(&self) -> Result<()> {
571        // Match `build()`'s wiring exactly: the session store factory it installs
572        // is `child_store_factory.or(store_factory)` (child takes precedence, root
573        // is the fallback). The coherence check must read the tier of that same
574        // effective factory, or a host that wires only a durable child factory
575        // (no root) is wrongly rejected though `build()` would wire it durably.
576        let session_store_tier = self
577            .child_store_factory
578            .as_ref()
579            .or(self.store_factory.as_ref())
580            .map(|factory| factory.durability_tier());
581        let attachment_tier = self
582            .attachment_store
583            .as_ref()
584            .map(|store| store.persistence().durability_tier());
585        let artifact_tier = self
586            .lashlang_artifact_store
587            .as_ref()
588            .map(|store| store.durability_tier());
589        let trigger_store_tier = self
590            .trigger_store
591            .as_ref()
592            .map(|store| store.durability_tier());
593
594        if session_store_tier == Some(DurabilityTier::Durable) {
595            if attachment_tier == Some(DurabilityTier::Inline) {
596                return Err(EmbedError::DurableStorePeerRequired {
597                    facet: "attachment store",
598                });
599            }
600            if artifact_tier == Some(DurabilityTier::Inline) {
601                return Err(EmbedError::DurableStorePeerRequired {
602                    facet: "artifact store",
603                });
604            }
605        }
606
607        if let Some(process_registry) = self.process_work_source.process_registry().as_ref()
608            && process_registry.durability_tier() == DurabilityTier::Durable
609        {
610            if session_store_tier != Some(DurabilityTier::Durable) {
611                return Err(EmbedError::DurableProcessRegistryRequiresStoreFactory);
612            }
613            if trigger_store_tier != Some(DurabilityTier::Durable) {
614                return Err(EmbedError::DurableStorePeerRequired {
615                    facet: "trigger store",
616                });
617            }
618        }
619
620        if trigger_store_tier == Some(DurabilityTier::Durable) {
621            if session_store_tier != Some(DurabilityTier::Durable) {
622                return Err(EmbedError::DurableStorePeerRequired {
623                    facet: "session store factory",
624                });
625            }
626            if let Some(process_registry) = self.process_work_source.process_registry().as_ref()
627                && process_registry.durability_tier() == DurabilityTier::Inline
628            {
629                return Err(EmbedError::DurableStorePeerRequired {
630                    facet: "process registry",
631                });
632            }
633        }
634
635        Ok(())
636    }
637
638    pub fn build(mut self) -> Result<LashCore> {
639        if self.modes.is_empty() {
640            return Err(EmbedError::NoModesInstalled);
641        }
642        self.ensure_store_peer_coherence()?;
643        let default_mode = self
644            .default_mode
645            .clone()
646            .ok_or(EmbedError::NoModesInstalled)?;
647        if !self.modes.contains_key(&default_mode) {
648            return Err(EmbedError::DefaultModeNotInstalled { mode: default_mode });
649        }
650        let provider_id = self
651            .session_spec
652            .provider_id
653            .clone()
654            .or_else(|| {
655                self.provider
656                    .as_ref()
657                    .map(|provider| provider.kind().to_string())
658            })
659            .unwrap_or_default();
660        let model = self
661            .session_spec
662            .model
663            .clone()
664            .ok_or(EmbedError::MissingModelSpec)?;
665
666        let base_policy = SessionPolicy {
667            provider_id,
668            model,
669            max_turns: self.session_spec.max_turns.flatten(),
670            ..SessionPolicy::default()
671        };
672        let policy = self.session_spec.resolve_against(&base_policy);
673
674        let mut core = self.resolve_runtime_host_config()?;
675        if let Some(provider) = self.provider.clone() {
676            core.providers.provider_resolver =
677                Arc::new(lash_core::SingleProviderResolver::new(provider));
678        }
679
680        let plugin_factories = if let Some(plugin_host) = self.plugin_host {
681            plugin_host.factories().to_vec()
682        } else {
683            let mut factories = Vec::new();
684            if !self.tool_providers.is_empty() {
685                let spec = self
686                    .tool_providers
687                    .into_iter()
688                    .fold(PluginSpec::new(), PluginSpec::with_tool_provider);
689                factories.push(Arc::new(StaticPluginFactory::new("embed_tools", spec))
690                    as Arc<dyn PluginFactory>);
691            }
692            factories.extend(self.plugin_stack.into_factories());
693            factories
694        };
695        let default_plugin_host = build_plugin_host_for_mode(
696            &self.modes,
697            &default_mode,
698            &plugin_factories,
699            Vec::new(),
700            self.process_work_source.has_registry(),
701        )?;
702
703        let process_registry = self.process_work_source.process_registry();
704
705        // Resolve the process work runner before the process source is moved
706        // into the environment. The default inline runner's config is built
707        // eagerly so a missing store factory fails loudly at build, not at
708        // first open. It is built from the *default-mode* plugin host (preset
709        // protocol plugin + process-lifecycle abilities), the same host the
710        // live runtime uses, so the worker can rebuild a runtime for a
711        // default-mode process.
712        let process_work_runner = Self::resolve_process_work_runner(
713            &self.process_work_source,
714            &default_plugin_host,
715            &core,
716            // The worker rebuilds sessions with the same factory `build()` wires
717            // below: `child_store_factory.or(store_factory)`.
718            self.child_store_factory
719                .as_ref()
720                .or(self.store_factory.as_ref()),
721            &policy,
722            self.residency.unwrap_or_default(),
723            self.trigger_store.as_ref(),
724        )?;
725
726        let mut env_builder = RuntimeEnvironment::builder()
727            .with_plugin_host(Arc::new(default_plugin_host))
728            .with_runtime_host_config(core);
729        if let Some(process_registry) = process_registry.as_ref() {
730            env_builder = env_builder.with_process_registry(Arc::clone(process_registry));
731        }
732        if let Some(residency) = self.residency {
733            env_builder = env_builder.with_residency(residency);
734        }
735        if let Some(child_store_factory) = self
736            .child_store_factory
737            .as_ref()
738            .or(self.store_factory.as_ref())
739        {
740            env_builder = env_builder.with_session_store_factory(Arc::clone(child_store_factory));
741        }
742        if let Some(trigger_store) = self.trigger_store.as_ref() {
743            env_builder = env_builder.with_trigger_store(Arc::clone(trigger_store));
744        }
745        if let Some(queued_work_poke) = self.queued_work_poke.clone() {
746            env_builder = env_builder.with_queued_work_poke(queued_work_poke);
747        }
748
749        let live_replay_store = self
750            .live_replay_store
751            .take()
752            .unwrap_or_else(|| Arc::new(InMemoryLiveReplayStore::default()));
753
754        Ok(LashCore {
755            env: env_builder.build(),
756            policy,
757            modes: Arc::new(self.modes),
758            default_mode,
759            store_factory: self.store_factory,
760            plugin_factories: Arc::new(plugin_factories),
761            provider: self.provider,
762            live_replay_store,
763            process_work_runner: Arc::new(ProcessWorkRunnerSlot::new(process_work_runner)),
764        })
765    }
766
767    /// Decide how a built [`LashCore`] sources its process work runner.
768    ///
769    /// - no registry => nothing to run ([`ProcessWorkRunnerSetup::None`]);
770    /// - external driver wired => use it ([`ProcessWorkRunnerSetup::External`]);
771    /// - inline registry wired => lazily spawn the default inline runner on first open. Its
772    ///   [`DurableProcessWorkerConfig`] is built eagerly when a store factory is
773    ///   present; without one the inline worker cannot rebuild session runtimes.
774    fn resolve_process_work_runner(
775        process_work_source: &ProcessWorkSource,
776        worker_plugin_host: &PluginHost,
777        core: &RuntimeHostConfig,
778        store_factory: Option<&Arc<dyn SessionStoreFactory>>,
779        policy: &SessionPolicy,
780        residency: lash_core::Residency,
781        trigger_store: Option<&Arc<dyn lash_core::TriggerStore>>,
782    ) -> Result<ProcessWorkRunnerSetup> {
783        let process_registry = match process_work_source {
784            ProcessWorkSource::None => return Ok(ProcessWorkRunnerSetup::None),
785            ProcessWorkSource::External(driver) => {
786                return Ok(ProcessWorkRunnerSetup::External {
787                    driver: driver.clone(),
788                });
789            }
790            ProcessWorkSource::Inline { registry } => Arc::clone(registry),
791        };
792        // The worker rebuilds a session runtime per process, so it needs a store
793        // factory; without one the default runner could not execute anything, so
794        // fail loudly rather than silently leave processes unexecuted.
795        let Some(store_factory) = store_factory else {
796            return Err(EmbedError::ProcessRegistryRequiresStoreFactory);
797        };
798        // The worker rebuilds with the *same* plugin host the live runtime uses
799        // for the default mode — including the mode preset's protocol plugin
800        // (which supplies the protocol session capability) and the
801        // process-lifecycle abilities. The bare builder `plugin_factories` omit
802        // the preset factory (added per-mode by `build_plugin_host_for_mode`), so
803        // a worker built from them would fail to rebuild a runtime ("missing
804        // protocol session capability").
805        let phase_probe_slot = lash_core::runtime::RuntimeTurnPhaseProbeSlot::default();
806        let config = Box::new(
807            DurableProcessWorkerConfig::new(
808                Arc::new(worker_plugin_host.clone()),
809                core.clone(),
810                Arc::clone(store_factory),
811                process_registry,
812            )
813            .with_session_policy(policy.clone())
814            .with_trigger_store(
815                trigger_store
816                    .cloned()
817                    .unwrap_or_else(|| Arc::new(lash_core::InMemoryTriggerStore::default())),
818            )
819            .with_residency(residency)
820            .with_turn_phase_probe_slot(phase_probe_slot),
821        );
822        Ok(ProcessWorkRunnerSetup::LazyDefault { config })
823    }
824
825    pub fn advanced(self) -> AdvancedLashCoreBuilder {
826        AdvancedLashCoreBuilder { builder: self }
827    }
828
829    pub fn process_registry(mut self, process_registry: Arc<dyn ProcessRegistry>) -> Self {
830        self.process_work_source = ProcessWorkSource::Inline {
831            registry: process_registry,
832        };
833        self
834    }
835
836    pub fn trigger_store(mut self, store: Arc<dyn lash_core::TriggerStore>) -> Self {
837        self.trigger_store = Some(store);
838        self
839    }
840
841    /// Configure an externally owned process work runner.
842    ///
843    /// Durable hosts construct a [`ProcessWorkDriver`] from the same process
844    /// registry and wake handle used by their deployment runner, then pass it
845    /// here. The driver registry becomes the core's process registry and no
846    /// inline runner is spawned.
847    pub fn process_work_driver(mut self, driver: ProcessWorkDriver) -> Self {
848        self.process_work_source = ProcessWorkSource::External(driver);
849        self
850    }
851
852    /// Wire the wake handle of an externally owned queued-work runner. The
853    /// runtime pokes it whenever new queued work lands for a session.
854    pub fn queued_work_poke(mut self, poke: QueuedWorkPoke) -> Self {
855        self.queued_work_poke = Some(poke);
856        self
857    }
858}
859
860pub(crate) fn build_plugin_host_for_mode(
861    modes: &BTreeMap<ModeId, ModePreset>,
862    mode: &ModeId,
863    common_factories: &[Arc<dyn PluginFactory>],
864    extra_factories: Vec<Arc<dyn PluginFactory>>,
865    process_lifecycle: bool,
866) -> Result<PluginHost> {
867    let preset = modes
868        .get(mode)
869        .ok_or_else(|| EmbedError::ModeNotInstalled { mode: mode.clone() })?;
870    let mut factories = Vec::with_capacity(1 + common_factories.len() + extra_factories.len());
871    factories.push(Arc::clone(&preset.factory));
872    factories.extend(common_factories.iter().cloned());
873    factories.extend(extra_factories);
874    let mut plugin_host = PluginHost::new(factories);
875    if process_lifecycle {
876        let abilities = plugin_host
877            .lashlang_abilities()
878            .with_processes()
879            .with_sleep()
880            .with_process_signals();
881        plugin_host = plugin_host.with_lashlang_abilities(abilities);
882    }
883    Ok(plugin_host)
884}
885
886impl PromptLayerSink for LashCoreBuilder {
887    fn prompt_layer_mut(&mut self) -> &mut PromptLayer {
888        self.prompt.get_or_insert_with(PromptLayer::new)
889    }
890}
891
892pub struct AdvancedLashCoreBuilder {
893    builder: LashCoreBuilder,
894}
895
896impl AdvancedLashCoreBuilder {
897    pub fn runtime_host_config(mut self, core: lash_core::RuntimeHostConfig) -> Self {
898        self.builder.runtime_host_config = Some(core);
899        self
900    }
901
902    pub fn plugin_host(mut self, plugin_host: PluginHost) -> Self {
903        self.builder.plugin_host = Some(plugin_host);
904        self
905    }
906
907    pub fn build(self) -> Result<LashCore> {
908        self.builder.build()
909    }
910}