Skip to main content

lash/
core.rs

1use crate::support::*;
2use lash_core::runtime::{
3    ProcessCommand, ProcessEffectOutcome, RuntimeEffectCommand, RuntimeEffectEnvelope,
4    RuntimeEffectKind, RuntimeEffectLocalExecutor, RuntimeEffectOutcome, RuntimeInvocation,
5    RuntimeScope,
6};
7
8type RuntimeHostInstaller =
9    Arc<dyn Fn(RuntimeHostConfig, &PluginHost) -> Result<RuntimeHostConfig> + Send + Sync>;
10
11#[derive(Clone)]
12pub struct LashCore {
13    pub(crate) env: RuntimeEnvironment,
14    pub(crate) policy: SessionPolicy,
15    pub(crate) protocol_factory: Option<Arc<dyn PluginFactory>>,
16    pub(crate) store_factory: Option<Arc<dyn SessionStoreFactory>>,
17    pub(crate) plugin_factories: Arc<Vec<Arc<dyn PluginFactory>>>,
18    pub(crate) provider: Option<ProviderHandle>,
19    pub(crate) live_replay_store: Arc<dyn LiveReplayStore>,
20    pub(crate) runtime_host_installer: Option<RuntimeHostInstaller>,
21    /// Shared resolution of the process work runner. The poke it yields is
22    /// threaded onto every session's host so the process admin seam can wake
23    /// the runner after a successful start. Shared across `LashCore` clones so
24    /// the default inline runner is spawned at most once (Decision 3).
25    pub(crate) process_work_runner: Arc<ProcessWorkRunnerSlot>,
26}
27
28/// How a [`LashCore`] resolves its process work runner, decided at `build()`
29/// and shared across clones.
30pub(crate) enum ProcessWorkRunnerSetup {
31    /// No process registry is wired; there is nothing to run and no poke.
32    None,
33    /// Lazily spawn the default inline [`ProcessWorkRunner`] on first
34    /// `session().open()` (Decision 3: the runtime is guaranteed by then, and
35    /// `build()` is sync — some tests call it outside a tokio runtime). A store
36    /// factory is required to build the config (the worker rebuilds a session
37    /// runtime per process); a registry with no store factory is rejected at
38    /// build with [`EmbedError::ProcessRegistryRequiresStoreFactory`].
39    LazyDefault {
40        config: Box<DurableProcessWorkerConfig>,
41    },
42    /// The host wired an external runner (e.g. the Restate ingress-client
43    /// runner) and handed its driver to the core.
44    External { driver: ProcessWorkDriver },
45}
46
47#[derive(Clone, Default)]
48pub(crate) enum ProcessWorkSource {
49    #[default]
50    None,
51    Inline {
52        registry: Arc<dyn ProcessRegistry>,
53    },
54    External(ProcessWorkDriver),
55}
56
57impl ProcessWorkSource {
58    fn process_registry(&self) -> Option<Arc<dyn ProcessRegistry>> {
59        match self {
60            Self::None => None,
61            Self::Inline { registry } => Some(Arc::clone(registry)),
62            Self::External(driver) => Some(driver.process_registry()),
63        }
64    }
65
66    #[cfg(feature = "rlm")]
67    fn has_registry(&self) -> bool {
68        !matches!(self, Self::None)
69    }
70}
71
72/// Shared, lazily-initialized process-work-runner state for a [`LashCore`].
73///
74/// The once-guard ([`tokio::sync::OnceCell`]) makes the default inline runner
75/// spawn exactly once across `LashCore` clones, on the first `session().open()`
76/// that needs it. The resolved [`ProcessWorkPoke`] (if any) is then reused for
77/// every session host.
78pub(crate) struct ProcessWorkRunnerSlot {
79    setup: ProcessWorkRunnerSetup,
80    poke: tokio::sync::OnceCell<Option<ProcessWorkPoke>>,
81    phase_probe_slot: Option<lash_core::runtime::RuntimeTurnPhaseProbeSlot>,
82}
83
84impl ProcessWorkRunnerSlot {
85    fn new(setup: ProcessWorkRunnerSetup) -> Self {
86        let phase_probe_slot = match &setup {
87            ProcessWorkRunnerSetup::LazyDefault { config } => {
88                Some(config.turn_phase_probe_slot.clone())
89            }
90            ProcessWorkRunnerSetup::None | ProcessWorkRunnerSetup::External { .. } => None,
91        };
92        Self {
93            setup,
94            poke: tokio::sync::OnceCell::new(),
95            phase_probe_slot,
96        }
97    }
98
99    /// Resolve the poke for a session host, spawning the default inline runner
100    /// on first use. Idempotent: the once-guard ensures a single spawn.
101    pub(crate) async fn poke(&self) -> Option<ProcessWorkPoke> {
102        self.poke
103            .get_or_init(|| async {
104                match &self.setup {
105                    ProcessWorkRunnerSetup::None => None,
106                    ProcessWorkRunnerSetup::External { driver } => Some(driver.poke_handle()),
107                    ProcessWorkRunnerSetup::LazyDefault { config } => {
108                        let worker = DurableProcessWorker::new((**config).clone());
109                        Some(ProcessWorkRunner::inline(worker).spawn())
110                    }
111                }
112            })
113            .await
114            .clone()
115    }
116
117    pub(crate) fn phase_probe_slot(&self) -> Option<lash_core::runtime::RuntimeTurnPhaseProbeSlot> {
118        self.phase_probe_slot.clone()
119    }
120}
121
122#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
123pub struct SessionDeleteReport {
124    pub session_id: String,
125    pub process: Option<lash_core::ProcessSessionDeleteReport>,
126}
127
128impl LashCore {
129    pub fn builder() -> LashCoreBuilder {
130        LashCoreBuilder::default()
131    }
132
133    pub fn session(&self, session_id: impl Into<String>) -> SessionBuilder {
134        SessionBuilder {
135            core: self.clone(),
136            session_id: session_id.into(),
137            spec: SessionSpec::inherit(),
138            parent_session_id: None,
139            store: None,
140            provider: None,
141            active_plugins: Vec::new(),
142            plugin_factories: Vec::new(),
143        }
144    }
145
146    pub fn triggers(&self) -> crate::admin::CoreTriggerAdmin {
147        crate::admin::CoreTriggerAdmin { core: self.clone() }
148    }
149
150    pub fn processes(&self) -> crate::admin::Processes {
151        crate::admin::Processes { core: self.clone() }
152    }
153
154    pub fn completions(&self) -> crate::admin::Completions {
155        crate::admin::Completions { core: self.clone() }
156    }
157
158    pub fn effect_host(&self) -> Arc<dyn EffectHost> {
159        Arc::clone(&self.env.core.control.effect_host)
160    }
161
162    pub async fn delete_session(
163        &self,
164        session_id: impl AsRef<str>,
165        scoped_effect_controller: ScopedEffectController<'_>,
166    ) -> Result<SessionDeleteReport> {
167        let session_id = session_id.as_ref().to_string();
168        let Some(store_factory) = self.store_factory.as_ref() else {
169            return Err(EmbedError::MissingSessionStoreFactory);
170        };
171        let process = if let Some(process_registry) = self.env.process_registry.as_ref() {
172            let invocation = RuntimeInvocation::effect(
173                RuntimeScope::new(session_id.clone()),
174                format!("process:delete-session:{session_id}"),
175                RuntimeEffectKind::Process,
176                format!("{session_id}:delete-session"),
177            );
178            let outcome = scoped_effect_controller
179                .controller()
180                .execute_effect(
181                    RuntimeEffectEnvelope::new(
182                        invocation,
183                        RuntimeEffectCommand::process(ProcessCommand::DeleteSession {
184                            session_id: session_id.clone(),
185                        }),
186                    ),
187                    RuntimeEffectLocalExecutor::processes(Arc::clone(process_registry)),
188                )
189                .await
190                .map_err(|err| EmbedError::SessionDeleteProcess {
191                    session_id: session_id.clone(),
192                    message: err.to_string(),
193                })?;
194            match outcome {
195                RuntimeEffectOutcome::Process {
196                    result: ProcessEffectOutcome::DeleteSession { report },
197                } => Some(report),
198                other => {
199                    return Err(EmbedError::SessionDeleteProcess {
200                        session_id,
201                        message: format!(
202                            "process delete returned the wrong outcome: {}",
203                            other.kind().as_str()
204                        ),
205                    });
206                }
207            }
208        } else {
209            None
210        };
211        if let Some(trigger_store) = self.env.trigger_store.as_ref() {
212            trigger_store
213                .delete_session_subscriptions(&session_id)
214                .await
215                .map_err(|err| EmbedError::SessionDeleteProcess {
216                    session_id: session_id.clone(),
217                    message: err.to_string(),
218                })?;
219        }
220        self.env
221            .core
222            .control
223            .effect_host
224            .revoke_await_events_for_session(&session_id)
225            .await
226            .map_err(|err| EmbedError::SessionDeleteProcess {
227                session_id: session_id.clone(),
228                message: err.to_string(),
229            })?;
230        store_factory
231            .delete_session(&session_id)
232            .await
233            .map_err(|message| EmbedError::StoreFactory {
234                session_id: session_id.clone(),
235                message,
236            })?;
237        Ok(SessionDeleteReport {
238            session_id,
239            process,
240        })
241    }
242
243    pub fn process_registry(&self) -> Option<Arc<dyn ProcessRegistry>> {
244        self.env.process_registry.as_ref().cloned()
245    }
246
247    pub fn durable_process_worker_config(&self) -> Result<DurableProcessWorkerConfig> {
248        self.durable_process_worker_config_with_plugins(std::iter::empty::<Arc<dyn PluginFactory>>())
249    }
250
251    pub fn durable_process_worker_config_with_plugins(
252        &self,
253        extra_plugin_factories: impl IntoIterator<Item = Arc<dyn PluginFactory>>,
254    ) -> Result<DurableProcessWorkerConfig> {
255        let Some(process_registry) = self.process_registry() else {
256            return Err(EmbedError::MissingProcessRegistry);
257        };
258        let Some(store_factory) = self.store_factory.as_ref() else {
259            return Err(EmbedError::MissingProcessWorkerStoreFactory);
260        };
261        let plugin_host = build_plugin_host(
262            self.protocol_factory.as_ref(),
263            self.plugin_factories.as_ref(),
264            extra_plugin_factories.into_iter().collect(),
265        )?;
266        let runtime_host =
267            self.runtime_host_for_plugin_host(self.env.core.clone(), &plugin_host)?;
268        let mut config = DurableProcessWorkerConfig::new(
269            Arc::new(plugin_host),
270            runtime_host,
271            Arc::clone(store_factory),
272            process_registry,
273        )
274        .with_session_policy(self.policy.clone())
275        .with_residency(self.env.residency);
276        if let Some(trigger_store) = self.env.trigger_store.as_ref() {
277            config = config.with_trigger_store(Arc::clone(trigger_store));
278        }
279        Ok(config)
280    }
281
282    pub(crate) fn runtime_host_for_plugin_host(
283        &self,
284        runtime_host: RuntimeHostConfig,
285        plugin_host: &PluginHost,
286    ) -> Result<RuntimeHostConfig> {
287        match &self.runtime_host_installer {
288            Some(install) => install(runtime_host, plugin_host),
289            None => Ok(runtime_host),
290        }
291    }
292}
293
294fn default_runtime_stack() -> PluginStack {
295    lash_plugin_tool_output_budget::tool_output_budget_stack()
296}
297
298#[derive(Clone)]
299pub struct StandardCore {
300    core: LashCore,
301}
302
303impl StandardCore {
304    pub fn builder() -> StandardCoreBuilder {
305        StandardCoreBuilder {
306            inner: LashCore::builder()
307                .protocol_plugin(Arc::new(
308                    lash_protocol_standard::StandardProtocolPluginFactory::new(),
309                ))
310                .plugins(default_runtime_stack()),
311        }
312    }
313
314    pub fn session(&self, session_id: impl Into<String>) -> SessionBuilder {
315        self.core.session(session_id)
316    }
317
318    pub fn into_inner(self) -> LashCore {
319        self.core
320    }
321}
322
323impl std::ops::Deref for StandardCore {
324    type Target = LashCore;
325
326    fn deref(&self) -> &Self::Target {
327        &self.core
328    }
329}
330
331pub struct StandardCoreBuilder {
332    inner: LashCoreBuilder,
333}
334
335impl StandardCoreBuilder {
336    pub fn build(self) -> Result<StandardCore> {
337        self.inner.build().map(|core| StandardCore { core })
338    }
339}
340
341impl PromptLayerSink for StandardCoreBuilder {
342    fn prompt_layer_mut(&mut self) -> &mut PromptLayer {
343        self.inner.prompt_layer_mut()
344    }
345}
346
347#[cfg(feature = "rlm")]
348#[derive(Clone)]
349pub struct RlmCore {
350    core: LashCore,
351    surface_config: lash_protocol_rlm::RlmProtocolPluginConfig,
352    process_lifecycle_available: bool,
353}
354
355#[cfg(feature = "rlm")]
356impl RlmCore {
357    pub fn builder() -> RlmCoreBuilder {
358        RlmCoreBuilder {
359            inner: LashCore::builder().plugins(default_runtime_stack()),
360            config: lash_protocol_rlm::RlmProtocolPluginConfig::default(),
361            projection_resolver: Arc::new(lash_protocol_rlm::ProjectionRegistry::default()),
362            lashlang_artifact_store: None,
363            lashlang_execution_sink: None,
364        }
365    }
366
367    pub fn session(&self, session_id: impl Into<String>) -> RlmSessionBuilder {
368        RlmSessionBuilder {
369            builder: self.core.session(session_id),
370            rlm_final_answer_format: None,
371        }
372    }
373
374    pub fn into_inner(self) -> LashCore {
375        self.core
376    }
377
378    pub fn lashlang_compile_surface(
379        &self,
380        request: crate::rlm::LashlangCompileSurfaceRequest,
381    ) -> Result<crate::rlm::LashlangCompileSurface> {
382        let plugin_host = build_plugin_host(
383            self.core.protocol_factory.as_ref(),
384            self.core.plugin_factories.as_ref(),
385            request.extra_plugin_factories,
386        )?;
387        let plugins = plugin_host
388            .build_session_with_parent(
389                &request.session_id,
390                None,
391                None,
392                lash_core::plugin::SessionAuthorityContext {
393                    plugin_options: request.execution_env_spec.plugin_options,
394                    ..Default::default()
395                },
396            )
397            .map_err(lash_core::PluginError::from)?;
398        let tool_catalog = plugins.resolved_tool_catalog(&request.session_id)?;
399        let surface = crate::rlm::rlm_lashlang_surface(
400            &self.surface_config,
401            self.process_lifecycle_available,
402        )
403        .with_plugin_extensions(plugin_host.extensions())
404        .map_err(lash_core::PluginError::Registration)?;
405        let host_environment = surface.host_environment(&tool_catalog);
406        Ok(crate::rlm::LashlangCompileSurface {
407            host_environment,
408            tool_catalog,
409            surface,
410        })
411    }
412}
413
414#[cfg(feature = "rlm")]
415impl std::ops::Deref for RlmCore {
416    type Target = LashCore;
417
418    fn deref(&self) -> &Self::Target {
419        &self.core
420    }
421}
422
423#[cfg(feature = "rlm")]
424pub struct RlmCoreBuilder {
425    inner: LashCoreBuilder,
426    config: lash_protocol_rlm::RlmProtocolPluginConfig,
427    projection_resolver: Arc<dyn lash_protocol_rlm::ProjectionResolver>,
428    lashlang_artifact_store: Option<Arc<dyn lash_lashlang_runtime::LashlangArtifactStore>>,
429    lashlang_execution_sink: Option<Arc<dyn lash_trace::TraceSink>>,
430}
431
432#[cfg(feature = "rlm")]
433impl RlmCoreBuilder {
434    pub fn rlm_protocol_config(
435        mut self,
436        config: lash_protocol_rlm::RlmProtocolPluginConfig,
437    ) -> Self {
438        self.config = config;
439        self
440    }
441
442    pub fn projection_resolver(
443        mut self,
444        projection_resolver: Arc<dyn lash_protocol_rlm::ProjectionResolver>,
445    ) -> Self {
446        self.projection_resolver = projection_resolver;
447        self
448    }
449
450    pub fn lashlang_artifact_store(
451        mut self,
452        artifact_store: Arc<dyn lash_lashlang_runtime::LashlangArtifactStore>,
453    ) -> Self {
454        self.lashlang_artifact_store = Some(artifact_store);
455        self
456    }
457
458    pub fn lashlang_execution_sink(
459        mut self,
460        lashlang_execution_sink: Arc<dyn lash_trace::TraceSink>,
461    ) -> Self {
462        self.lashlang_execution_sink = Some(lashlang_execution_sink);
463        self
464    }
465
466    pub fn lashlang_execution_jsonl_path(mut self, path: impl Into<std::path::PathBuf>) -> Self {
467        self.lashlang_execution_sink = Some(Arc::new(lash_trace::JsonlTraceSink::new(path.into())));
468        self
469    }
470
471    pub fn build(mut self) -> Result<RlmCore> {
472        let artifact_store = self
473            .lashlang_artifact_store
474            .clone()
475            .ok_or(EmbedError::MissingLashlangArtifactStore)?;
476        if self.inner.effective_session_store_tier() == Some(DurabilityTier::Durable)
477            && artifact_store.durability_tier()
478                == lash_lashlang_runtime::LashlangDurabilityTier::Inline
479        {
480            return Err(EmbedError::DurableStorePeerRequired {
481                facet: "artifact store",
482            });
483        }
484        let process_lifecycle_available = self.inner.process_work_source.has_registry();
485        let config = crate::rlm::rlm_protocol_config(self.config, process_lifecycle_available);
486        let trace_context = self.inner.resolved_trace_context();
487        let protocol_factory = Arc::new(
488            lash_protocol_rlm::RlmProtocolPluginFactory::new(config.clone())
489                .with_projection_resolver(Arc::clone(&self.projection_resolver))
490                .with_lashlang_artifact_store(Arc::clone(&artifact_store))
491                .with_lashlang_execution_trace(
492                    self.lashlang_execution_sink.clone(),
493                    trace_context.clone(),
494                ),
495        );
496        let engine_artifact_store = Arc::clone(&artifact_store);
497        let engine_config = config.clone();
498        let engine_sink = self.lashlang_execution_sink.clone();
499        self.inner.protocol_factory = Some(protocol_factory);
500        self.inner.runtime_host_installer = Some(Arc::new(move |runtime_host, plugin_host| {
501            let surface =
502                crate::rlm::rlm_lashlang_surface(&engine_config, process_lifecycle_available)
503                    .with_plugin_extensions(plugin_host.extensions())
504                    .map_err(lash_core::PluginError::Registration)?;
505            let engine = lash_lashlang_runtime::LashlangProcessEngine::new(
506                Arc::clone(&engine_artifact_store),
507                surface,
508            )
509            .with_execution_trace(
510                engine_sink.clone(),
511                runtime_host.tracing.trace_context.clone(),
512            );
513            Ok(runtime_host.with_process_engine(Arc::new(engine)))
514        }));
515        self.inner.build().map(|core| RlmCore {
516            core,
517            surface_config: config,
518            process_lifecycle_available,
519        })
520    }
521}
522
523#[cfg(feature = "rlm")]
524impl PromptLayerSink for RlmCoreBuilder {
525    fn prompt_layer_mut(&mut self) -> &mut PromptLayer {
526        self.inner.prompt_layer_mut()
527    }
528}
529
530macro_rules! forward_core_builder_methods {
531    ($builder:ident) => {
532        impl $builder {
533            pub fn provider(mut self, provider: ProviderHandle) -> Self {
534                self.inner = self.inner.provider(provider);
535                self
536            }
537
538            pub fn model(mut self, model: lash_core::ModelSpec) -> Self {
539                self.inner = self.inner.model(model);
540                self
541            }
542
543            pub fn max_turns(mut self, max_turns: usize) -> Self {
544                self.inner = self.inner.max_turns(max_turns);
545                self
546            }
547
548            pub fn session_spec(mut self, spec: SessionSpec) -> Self {
549                self.inner = self.inner.session_spec(spec);
550                self
551            }
552
553            pub fn store_factory(mut self, store_factory: Arc<dyn SessionStoreFactory>) -> Self {
554                self.inner = self.inner.store_factory(store_factory);
555                self
556            }
557
558            pub fn child_store_factory(
559                mut self,
560                store_factory: Arc<dyn SessionStoreFactory>,
561            ) -> Self {
562                self.inner = self.inner.child_store_factory(store_factory);
563                self
564            }
565
566            pub fn attachment_store(mut self, attachment_store: Arc<dyn AttachmentStore>) -> Self {
567                self.inner = self.inner.attachment_store(attachment_store);
568                self
569            }
570
571            pub fn process_env_store(
572                mut self,
573                process_env_store: Arc<dyn ProcessExecutionEnvStore>,
574            ) -> Self {
575                self.inner = self.inner.process_env_store(process_env_store);
576                self
577            }
578
579            pub fn effect_host(mut self, effect_host: Arc<dyn EffectHost>) -> Self {
580                self.inner = self.inner.effect_host(effect_host);
581                self
582            }
583
584            pub fn tools(mut self, tools: Arc<dyn ToolProvider>) -> Self {
585                self.inner = self.inner.tools(tools);
586                self
587            }
588
589            pub fn plugin(mut self, plugin: Arc<dyn PluginFactory>) -> Self {
590                self.inner = self.inner.plugin(plugin);
591                self
592            }
593
594            pub fn plugins(mut self, stack: PluginStack) -> Self {
595                self.inner = self.inner.plugins(stack);
596                self
597            }
598
599            pub fn configure_plugins(mut self, configure: impl FnOnce(&mut PluginStack)) -> Self {
600                self.inner = self.inner.configure_plugins(configure);
601                self
602            }
603
604            pub fn trace_sink(mut self, trace_sink: Arc<dyn lash_trace::TraceSink>) -> Self {
605                self.inner = self.inner.trace_sink(trace_sink);
606                self
607            }
608
609            pub fn trace_jsonl_path(mut self, path: impl Into<std::path::PathBuf>) -> Self {
610                self.inner = self.inner.trace_jsonl_path(path);
611                self
612            }
613
614            pub fn trace_level(mut self, trace_level: lash_trace::TraceLevel) -> Self {
615                self.inner = self.inner.trace_level(trace_level);
616                self
617            }
618
619            pub fn trace_context(mut self, trace_context: lash_trace::TraceContext) -> Self {
620                self.inner = self.inner.trace_context(trace_context);
621                self
622            }
623
624            pub fn termination(mut self, termination: TerminationPolicy) -> Self {
625                self.inner = self.inner.termination(termination);
626                self
627            }
628
629            pub fn residency(mut self, residency: Residency) -> Self {
630                self.inner = self.inner.residency(residency);
631                self
632            }
633
634            pub fn live_replay_store(
635                mut self,
636                live_replay_store: Arc<dyn LiveReplayStore>,
637            ) -> Self {
638                self.inner = self.inner.live_replay_store(live_replay_store);
639                self
640            }
641
642            pub fn process_registry(mut self, process_registry: Arc<dyn ProcessRegistry>) -> Self {
643                self.inner = self.inner.process_registry(process_registry);
644                self
645            }
646
647            pub fn trigger_store(mut self, store: Arc<dyn lash_core::TriggerStore>) -> Self {
648                self.inner = self.inner.trigger_store(store);
649                self
650            }
651
652            pub fn process_work_driver(mut self, driver: ProcessWorkDriver) -> Self {
653                self.inner = self.inner.process_work_driver(driver);
654                self
655            }
656
657            pub fn queued_work_poke(mut self, poke: QueuedWorkPoke) -> Self {
658                self.inner = self.inner.queued_work_poke(poke);
659                self
660            }
661
662            pub fn runtime_host_config(mut self, core: RuntimeHostConfig) -> Self {
663                self.inner.runtime_host_config = Some(core);
664                self
665            }
666        }
667    };
668}
669
670forward_core_builder_methods!(StandardCoreBuilder);
671#[cfg(feature = "rlm")]
672forward_core_builder_methods!(RlmCoreBuilder);
673
674#[derive(Default)]
675pub struct LashCoreBuilder {
676    pub(crate) protocol_factory: Option<Arc<dyn PluginFactory>>,
677    session_spec: SessionSpec,
678    provider: Option<ProviderHandle>,
679    pub(crate) store_factory: Option<Arc<dyn SessionStoreFactory>>,
680    child_store_factory: Option<Arc<dyn SessionStoreFactory>>,
681    // `RuntimeHostConfig` has no `Default`: the generic host-owned durability
682    // dependencies must be named. They are collected here and resolved in
683    // `build()`, which errors if any is unset.
684    effect_host: Option<Arc<dyn EffectHost>>,
685    attachment_store: Option<Arc<dyn AttachmentStore>>,
686    process_env_store: Option<Arc<dyn ProcessExecutionEnvStore>>,
687    trigger_store: Option<Arc<dyn lash_core::TriggerStore>>,
688    // Benign core overrides applied on top of the resolved core.
689    prompt: Option<PromptLayer>,
690    trace_sink: Option<Arc<dyn lash_trace::TraceSink>>,
691    trace_level: Option<lash_trace::TraceLevel>,
692    trace_context: Option<lash_trace::TraceContext>,
693    termination: Option<TerminationPolicy>,
694    // Advanced full-config override; used as the base core when present.
695    runtime_host_config: Option<RuntimeHostConfig>,
696    tool_providers: Vec<Arc<dyn ToolProvider>>,
697    plugin_stack: PluginStack,
698    plugin_host: Option<PluginHost>,
699    residency: Option<Residency>,
700    // Single source of truth for process lifecycle support and process-work
701    // consumption.
702    process_work_source: ProcessWorkSource,
703    queued_work_poke: Option<QueuedWorkPoke>,
704    live_replay_store: Option<Arc<dyn LiveReplayStore>>,
705    runtime_host_installer: Option<RuntimeHostInstaller>,
706}
707
708impl LashCoreBuilder {
709    pub fn protocol_plugin(mut self, plugin: Arc<dyn PluginFactory>) -> Self {
710        self.protocol_factory = Some(plugin);
711        self
712    }
713
714    pub fn provider(mut self, provider: ProviderHandle) -> Self {
715        self.session_spec = self.session_spec.provider_id(provider.kind());
716        self.provider = Some(provider);
717        self
718    }
719
720    pub fn model(mut self, model: lash_core::ModelSpec) -> Self {
721        self.session_spec = self.session_spec.model(model);
722        self
723    }
724
725    pub fn max_turns(mut self, max_turns: usize) -> Self {
726        self.session_spec = self.session_spec.max_turns(max_turns);
727        self
728    }
729
730    pub fn session_spec(mut self, spec: SessionSpec) -> Self {
731        self.session_spec = spec;
732        self
733    }
734
735    /// Configure a factory that can create a persistence store for any root
736    /// session opened from this core.
737    ///
738    /// The factory must honor `SessionStoreCreateRequest::session_id` and
739    /// return a store for that specific session. Do not use this to wrap one
740    /// pre-opened root store; pass root-only stores with
741    /// `LashCore::session(...).store(store)` instead.
742    pub fn store_factory(mut self, store_factory: Arc<dyn SessionStoreFactory>) -> Self {
743        self.store_factory = Some(store_factory);
744        self
745    }
746
747    /// Configure the persistence factory used by managed child sessions, such
748    /// as local subagents.
749    ///
750    /// Child factories must return a distinct store bound to the requested
751    /// child session id. Hosts that pass an explicit root store with
752    /// `SessionBuilder::store` should set this when child sessions need
753    /// persistence.
754    pub fn child_store_factory(mut self, store_factory: Arc<dyn SessionStoreFactory>) -> Self {
755        self.child_store_factory = Some(store_factory);
756        self
757    }
758
759    pub fn attachment_store(mut self, attachment_store: Arc<dyn AttachmentStore>) -> Self {
760        self.attachment_store = Some(attachment_store);
761        self
762    }
763
764    pub fn process_env_store(
765        mut self,
766        process_env_store: Arc<dyn ProcessExecutionEnvStore>,
767    ) -> Self {
768        self.process_env_store = Some(process_env_store);
769        self
770    }
771
772    /// Set the deployment effect host — the durability boundary every operation
773    /// crosses. Pass [`InlineEffectHost`](crate::durability::InlineEffectHost)
774    /// for in-process execution, or a workflow-backed host for durable
775    /// execution.
776    pub fn effect_host(mut self, effect_host: Arc<dyn EffectHost>) -> Self {
777        self.effect_host = Some(effect_host);
778        self
779    }
780
781    pub fn tools(mut self, tools: Arc<dyn ToolProvider>) -> Self {
782        self.tool_providers.push(tools);
783        self
784    }
785
786    pub fn plugin(mut self, plugin: Arc<dyn PluginFactory>) -> Self {
787        self.plugin_stack.push(plugin);
788        self
789    }
790
791    pub fn plugins(mut self, stack: PluginStack) -> Self {
792        self.plugin_stack = stack;
793        self
794    }
795
796    pub fn configure_plugins(mut self, configure: impl FnOnce(&mut PluginStack)) -> Self {
797        configure(&mut self.plugin_stack);
798        self
799    }
800
801    pub fn trace_sink(mut self, trace_sink: Arc<dyn lash_trace::TraceSink>) -> Self {
802        self.trace_sink = Some(trace_sink);
803        self
804    }
805
806    pub fn trace_jsonl_path(mut self, path: impl Into<std::path::PathBuf>) -> Self {
807        self.trace_sink = Some(Arc::new(lash_trace::JsonlTraceSink::new(path.into())));
808        self
809    }
810
811    pub fn trace_level(mut self, trace_level: lash_trace::TraceLevel) -> Self {
812        self.trace_level = Some(trace_level);
813        self
814    }
815
816    pub fn trace_context(mut self, trace_context: lash_trace::TraceContext) -> Self {
817        self.trace_context = Some(trace_context);
818        self
819    }
820
821    pub fn termination(mut self, termination: TerminationPolicy) -> Self {
822        self.termination = Some(termination);
823        self
824    }
825
826    pub fn residency(mut self, residency: Residency) -> Self {
827        self.residency = Some(residency);
828        self
829    }
830
831    /// Configure the bounded live replay buffer used by session observation
832    /// cursors. This is best-effort reconnect recovery only; durable state
833    /// still comes from the session store and [`SessionReadView`].
834    pub fn live_replay_store(mut self, live_replay_store: Arc<dyn LiveReplayStore>) -> Self {
835        self.live_replay_store = Some(live_replay_store);
836        self
837    }
838
839    /// Resolve the runtime host config, requiring the generic host-owned
840    /// durability dependencies to have been named.
841    fn resolve_runtime_host_config(&mut self) -> Result<RuntimeHostConfig> {
842        if let Some(base) = self.runtime_host_config.take() {
843            return Ok(self.apply_core_overrides(base));
844        }
845        let effect_host = self
846            .effect_host
847            .take()
848            .ok_or(EmbedError::MissingEffectHost)?;
849        let attachment_store = self
850            .attachment_store
851            .take()
852            .ok_or(EmbedError::MissingAttachmentStore)?;
853        let process_env_store = self
854            .process_env_store
855            .take()
856            .ok_or(EmbedError::MissingProcessEnvStore)?;
857        let core = RuntimeHostConfig::new(effect_host, attachment_store, process_env_store);
858        Ok(self.apply_core_overrides(core))
859    }
860
861    /// Apply benign + still-set dependency overrides on top of a base core.
862    fn apply_core_overrides(&mut self, mut core: RuntimeHostConfig) -> RuntimeHostConfig {
863        if let Some(effect_host) = self.effect_host.take() {
864            core.control.effect_host = effect_host;
865        }
866        if let Some(attachment_store) = self.attachment_store.take() {
867            core.durability.attachment_store = attachment_store;
868        }
869        if let Some(process_env_store) = self.process_env_store.take() {
870            core.durability.process_env_store = process_env_store;
871        }
872        if let Some(prompt) = self.prompt.take() {
873            core.prompt.prompt = prompt;
874        }
875        if let Some(trace_sink) = self.trace_sink.take() {
876            core.tracing.trace_sink = Some(trace_sink);
877        }
878        if let Some(trace_level) = self.trace_level.take() {
879            core.tracing.trace_level = trace_level;
880        }
881        if let Some(trace_context) = self.trace_context.take() {
882            core.tracing.trace_context = trace_context;
883        }
884        if let Some(termination) = self.termination.take() {
885            core.control.termination = termination;
886        }
887        core
888    }
889
890    /// Validate store peer-coherence of the wired durability dependencies.
891    ///
892    /// Durability is established by what the host wired; the per-invocation
893    /// durable controller is not visible here (the build-time controller is
894    /// inline by construction), so this checks the stores against each other
895    /// only — never the controller (see A5 in the durable-first wiring spec):
896    ///
897    /// - a durable session store factory requires a durable attachment and
898    ///   artifact store (they back the same session state);
899    /// - a durable process registry requires a session store factory that is
900    ///   itself durable (the registry's process records are meaningless without
901    ///   a durable session behind them).
902    fn effective_session_store_tier(&self) -> Option<DurabilityTier> {
903        self.child_store_factory
904            .as_ref()
905            .or(self.store_factory.as_ref())
906            .map(|factory| factory.durability_tier())
907    }
908
909    #[cfg(feature = "rlm")]
910    fn resolved_trace_context(&self) -> lash_trace::TraceContext {
911        self.trace_context
912            .clone()
913            .or_else(|| {
914                self.runtime_host_config
915                    .as_ref()
916                    .map(|core| core.tracing.trace_context.clone())
917            })
918            .unwrap_or_default()
919    }
920
921    fn ensure_store_peer_coherence(&self) -> Result<()> {
922        // Match `build()`'s wiring exactly: the session store factory it installs
923        // is `child_store_factory.or(store_factory)` (child takes precedence, root
924        // is the fallback). The coherence check must read the tier of that same
925        // effective factory, or a host that wires only a durable child factory
926        // (no root) is wrongly rejected though `build()` would wire it durably.
927        let session_store_tier = self.effective_session_store_tier();
928        let attachment_tier = self
929            .attachment_store
930            .as_ref()
931            .map(|store| store.persistence().durability_tier())
932            .or_else(|| {
933                self.runtime_host_config.as_ref().map(|core| {
934                    core.durability
935                        .attachment_store
936                        .persistence()
937                        .durability_tier()
938                })
939            });
940        let process_env_tier = self
941            .process_env_store
942            .as_ref()
943            .map(|store| store.durability_tier())
944            .or_else(|| {
945                self.runtime_host_config
946                    .as_ref()
947                    .map(|core| core.durability.process_env_store.durability_tier())
948            });
949        let effect_host_tier = self
950            .effect_host
951            .as_ref()
952            .map(|host| host.durability_tier())
953            .or_else(|| {
954                self.runtime_host_config
955                    .as_ref()
956                    .map(|core| core.control.effect_host.durability_tier())
957            });
958        let trigger_store_tier = self
959            .trigger_store
960            .as_ref()
961            .map(|store| store.durability_tier());
962
963        if session_store_tier == Some(DurabilityTier::Durable) {
964            if attachment_tier == Some(DurabilityTier::Inline) {
965                return Err(EmbedError::DurableStorePeerRequired {
966                    facet: "attachment store",
967                });
968            }
969            if process_env_tier == Some(DurabilityTier::Inline) {
970                return Err(EmbedError::DurableStorePeerRequired {
971                    facet: "process execution environment store",
972                });
973            }
974        }
975
976        if let Some(process_registry) = self.process_work_source.process_registry().as_ref()
977            && process_registry.durability_tier() == DurabilityTier::Durable
978        {
979            if session_store_tier != Some(DurabilityTier::Durable) {
980                return Err(EmbedError::DurableProcessRegistryRequiresStoreFactory);
981            }
982            if trigger_store_tier != Some(DurabilityTier::Durable) {
983                return Err(EmbedError::DurableStorePeerRequired {
984                    facet: "trigger store",
985                });
986            }
987            if process_env_tier != Some(DurabilityTier::Durable) {
988                return Err(EmbedError::DurableStorePeerRequired {
989                    facet: "process execution environment store",
990                });
991            }
992        }
993
994        if trigger_store_tier == Some(DurabilityTier::Durable) {
995            if session_store_tier != Some(DurabilityTier::Durable) {
996                return Err(EmbedError::DurableStorePeerRequired {
997                    facet: "session store factory",
998                });
999            }
1000            if process_env_tier != Some(DurabilityTier::Durable) {
1001                return Err(EmbedError::DurableStorePeerRequired {
1002                    facet: "process execution environment store",
1003                });
1004            }
1005            if let Some(process_registry) = self.process_work_source.process_registry().as_ref()
1006                && process_registry.durability_tier() == DurabilityTier::Inline
1007            {
1008                return Err(EmbedError::DurableStorePeerRequired {
1009                    facet: "process registry",
1010                });
1011            }
1012        }
1013
1014        if effect_host_tier == Some(DurabilityTier::Durable) {
1015            if attachment_tier != Some(DurabilityTier::Durable) {
1016                return Err(EmbedError::DurableStorePeerRequired {
1017                    facet: "attachment store",
1018                });
1019            }
1020            if process_env_tier != Some(DurabilityTier::Durable) {
1021                return Err(EmbedError::DurableStorePeerRequired {
1022                    facet: "process execution environment store",
1023                });
1024            }
1025        }
1026
1027        Ok(())
1028    }
1029
1030    pub fn build(mut self) -> Result<LashCore> {
1031        self.ensure_store_peer_coherence()?;
1032        let protocol_factory = self.protocol_factory.clone();
1033        if protocol_factory.is_none() && self.plugin_host.is_none() {
1034            return Err(EmbedError::MissingProtocolPlugin);
1035        }
1036        let provider_id = self
1037            .session_spec
1038            .provider_id
1039            .clone()
1040            .or_else(|| {
1041                self.provider
1042                    .as_ref()
1043                    .map(|provider| provider.kind().to_string())
1044            })
1045            .unwrap_or_default();
1046        let model = self
1047            .session_spec
1048            .model
1049            .clone()
1050            .ok_or(EmbedError::MissingModelSpec)?;
1051
1052        let base_policy = SessionPolicy {
1053            provider_id,
1054            model,
1055            max_turns: self.session_spec.max_turns.flatten(),
1056            ..SessionPolicy::default()
1057        };
1058        let policy = self.session_spec.resolve_against(&base_policy);
1059
1060        let mut core = self.resolve_runtime_host_config()?;
1061        if let Some(provider) = self.provider.clone() {
1062            core.providers.provider_resolver =
1063                Arc::new(lash_core::SingleProviderResolver::new(provider));
1064        }
1065        let plugin_factories = if let Some(plugin_host) = self.plugin_host {
1066            plugin_host.factories().to_vec()
1067        } else {
1068            let mut factories = Vec::new();
1069            if !self.tool_providers.is_empty() {
1070                let spec = self
1071                    .tool_providers
1072                    .into_iter()
1073                    .fold(PluginSpec::new(), PluginSpec::with_tool_provider);
1074                factories.push(Arc::new(StaticPluginFactory::new("embed_tools", spec))
1075                    as Arc<dyn PluginFactory>);
1076            }
1077            factories.extend(self.plugin_stack.into_factories());
1078            factories
1079        };
1080        let default_plugin_host =
1081            build_plugin_host(protocol_factory.as_ref(), &plugin_factories, Vec::new())?;
1082        if let Some(install) = &self.runtime_host_installer {
1083            core = install(core, &default_plugin_host)?;
1084        }
1085
1086        let process_registry = self.process_work_source.process_registry();
1087
1088        // Resolve the process work runner before the process source is moved
1089        // into the environment. The default inline runner's config is built
1090        // eagerly so a missing store factory fails loudly at build, not at
1091        // first open. It is built from the same single-protocol plugin host the
1092        // live runtime uses, so the worker can rebuild a runtime for a process.
1093        let process_work_runner = Self::resolve_process_work_runner(
1094            &self.process_work_source,
1095            &default_plugin_host,
1096            &core,
1097            // The worker rebuilds sessions with the same factory `build()` wires
1098            // below: `child_store_factory.or(store_factory)`.
1099            self.child_store_factory
1100                .as_ref()
1101                .or(self.store_factory.as_ref()),
1102            &policy,
1103            self.residency.unwrap_or_default(),
1104            self.trigger_store.as_ref(),
1105        )?;
1106
1107        let mut env_builder = RuntimeEnvironment::builder()
1108            .with_plugin_host(Arc::new(default_plugin_host))
1109            .with_runtime_host_config(core);
1110        if let Some(process_registry) = process_registry.as_ref() {
1111            env_builder = env_builder.with_process_registry(Arc::clone(process_registry));
1112        }
1113        if let Some(residency) = self.residency {
1114            env_builder = env_builder.with_residency(residency);
1115        }
1116        if let Some(child_store_factory) = self
1117            .child_store_factory
1118            .as_ref()
1119            .or(self.store_factory.as_ref())
1120        {
1121            env_builder = env_builder.with_session_store_factory(Arc::clone(child_store_factory));
1122        }
1123        if let Some(trigger_store) = self.trigger_store.as_ref() {
1124            env_builder = env_builder.with_trigger_store(Arc::clone(trigger_store));
1125        }
1126        if let Some(queued_work_poke) = self.queued_work_poke.clone() {
1127            env_builder = env_builder.with_queued_work_poke(queued_work_poke);
1128        }
1129
1130        let live_replay_store = self
1131            .live_replay_store
1132            .take()
1133            .unwrap_or_else(|| Arc::new(InMemoryLiveReplayStore::default()));
1134
1135        Ok(LashCore {
1136            env: env_builder.build(),
1137            policy,
1138            store_factory: self.store_factory,
1139            plugin_factories: Arc::new(plugin_factories),
1140            provider: self.provider,
1141            live_replay_store,
1142            protocol_factory,
1143            runtime_host_installer: self.runtime_host_installer,
1144            process_work_runner: Arc::new(ProcessWorkRunnerSlot::new(process_work_runner)),
1145        })
1146    }
1147
1148    /// Decide how a built [`LashCore`] sources its process work runner.
1149    ///
1150    /// - no registry => nothing to run ([`ProcessWorkRunnerSetup::None`]);
1151    /// - external driver wired => use it ([`ProcessWorkRunnerSetup::External`]);
1152    /// - inline registry wired => lazily spawn the default inline runner on first open. Its
1153    ///   [`DurableProcessWorkerConfig`] is built eagerly when a store factory is
1154    ///   present; without one the inline worker cannot rebuild session runtimes.
1155    fn resolve_process_work_runner(
1156        process_work_source: &ProcessWorkSource,
1157        worker_plugin_host: &PluginHost,
1158        core: &RuntimeHostConfig,
1159        store_factory: Option<&Arc<dyn SessionStoreFactory>>,
1160        policy: &SessionPolicy,
1161        residency: lash_core::Residency,
1162        trigger_store: Option<&Arc<dyn lash_core::TriggerStore>>,
1163    ) -> Result<ProcessWorkRunnerSetup> {
1164        let process_registry = match process_work_source {
1165            ProcessWorkSource::None => return Ok(ProcessWorkRunnerSetup::None),
1166            ProcessWorkSource::External(driver) => {
1167                return Ok(ProcessWorkRunnerSetup::External {
1168                    driver: driver.clone(),
1169                });
1170            }
1171            ProcessWorkSource::Inline { registry } => Arc::clone(registry),
1172        };
1173        // The worker rebuilds a session runtime per process, so it needs a store
1174        // factory; without one the default runner could not execute anything, so
1175        // fail loudly rather than silently leave processes unexecuted.
1176        let Some(store_factory) = store_factory else {
1177            return Err(EmbedError::ProcessRegistryRequiresStoreFactory);
1178        };
1179        // The worker rebuilds with the same plugin host the live runtime uses,
1180        // including the protocol plugin that supplies the protocol session
1181        // capability.
1182        let phase_probe_slot = lash_core::runtime::RuntimeTurnPhaseProbeSlot::default();
1183        let config = Box::new(
1184            DurableProcessWorkerConfig::new(
1185                Arc::new(worker_plugin_host.clone()),
1186                core.clone(),
1187                Arc::clone(store_factory),
1188                process_registry,
1189            )
1190            .with_session_policy(policy.clone())
1191            .with_trigger_store(
1192                trigger_store
1193                    .cloned()
1194                    .unwrap_or_else(|| Arc::new(lash_core::InMemoryTriggerStore::default())),
1195            )
1196            .with_residency(residency)
1197            .with_turn_phase_probe_slot(phase_probe_slot),
1198        );
1199        Ok(ProcessWorkRunnerSetup::LazyDefault { config })
1200    }
1201
1202    pub fn advanced(self) -> AdvancedLashCoreBuilder {
1203        AdvancedLashCoreBuilder { builder: self }
1204    }
1205
1206    pub fn process_registry(mut self, process_registry: Arc<dyn ProcessRegistry>) -> Self {
1207        self.process_work_source = ProcessWorkSource::Inline {
1208            registry: process_registry,
1209        };
1210        self
1211    }
1212
1213    pub fn trigger_store(mut self, store: Arc<dyn lash_core::TriggerStore>) -> Self {
1214        self.trigger_store = Some(store);
1215        self
1216    }
1217
1218    /// Configure an externally owned process work runner.
1219    ///
1220    /// Durable hosts construct a [`ProcessWorkDriver`] from the same process
1221    /// registry and wake handle used by their deployment runner, then pass it
1222    /// here. The driver registry becomes the core's process registry and no
1223    /// inline runner is spawned.
1224    pub fn process_work_driver(mut self, driver: ProcessWorkDriver) -> Self {
1225        self.process_work_source = ProcessWorkSource::External(driver);
1226        self
1227    }
1228
1229    /// Wire the wake handle of an externally owned queued-work runner. The
1230    /// runtime pokes it whenever new queued work lands for a session.
1231    pub fn queued_work_poke(mut self, poke: QueuedWorkPoke) -> Self {
1232        self.queued_work_poke = Some(poke);
1233        self
1234    }
1235}
1236
1237pub(crate) fn build_plugin_host(
1238    protocol_factory: Option<&Arc<dyn PluginFactory>>,
1239    common_factories: &[Arc<dyn PluginFactory>],
1240    extra_factories: Vec<Arc<dyn PluginFactory>>,
1241) -> Result<PluginHost> {
1242    let mut factories = Vec::with_capacity(
1243        usize::from(protocol_factory.is_some()) + common_factories.len() + extra_factories.len(),
1244    );
1245    if let Some(protocol_factory) = protocol_factory {
1246        factories.push(Arc::clone(protocol_factory));
1247    }
1248    factories.extend(common_factories.iter().cloned());
1249    factories.extend(extra_factories);
1250    Ok(PluginHost::new(factories))
1251}
1252
1253impl PromptLayerSink for LashCoreBuilder {
1254    fn prompt_layer_mut(&mut self) -> &mut PromptLayer {
1255        self.prompt.get_or_insert_with(PromptLayer::new)
1256    }
1257}
1258
1259pub struct AdvancedLashCoreBuilder {
1260    builder: LashCoreBuilder,
1261}
1262
1263impl AdvancedLashCoreBuilder {
1264    pub fn runtime_host_config(mut self, core: lash_core::RuntimeHostConfig) -> Self {
1265        self.builder.runtime_host_config = Some(core);
1266        self
1267    }
1268
1269    pub fn plugin_host(mut self, plugin_host: PluginHost) -> Self {
1270        self.builder.plugin_host = Some(plugin_host);
1271        self
1272    }
1273
1274    pub fn build(self) -> Result<LashCore> {
1275        self.builder.build()
1276    }
1277}