Skip to main content

lash/
core.rs

1use crate::support::*;
2use lash_core::runtime::{
3    ProcessCommand, ProcessEffectOutcome, RuntimeEffectCommand, RuntimeEffectEnvelope,
4    RuntimeEffectKind, RuntimeEffectLocalExecutor, RuntimeEffectOutcome, RuntimeInvocation,
5    RuntimeScope,
6};
7
8type RuntimeHostInstaller =
9    Arc<dyn Fn(RuntimeHostConfig, &PluginHost) -> Result<RuntimeHostConfig> + Send + Sync>;
10
11#[derive(Clone)]
12pub struct LashCore {
13    pub(crate) env: RuntimeEnvironment,
14    pub(crate) policy: SessionPolicy,
15    pub(crate) protocol_factory: Option<Arc<dyn PluginFactory>>,
16    pub(crate) store_factory: Option<Arc<dyn SessionStoreFactory>>,
17    pub(crate) plugin_factories: Arc<Vec<Arc<dyn PluginFactory>>>,
18    pub(crate) provider: Option<ProviderHandle>,
19    pub(crate) live_replay_store: Arc<dyn LiveReplayStore>,
20    pub(crate) runtime_host_installer: Option<RuntimeHostInstaller>,
21    /// Shared resolution of the process work runner. The poke it yields is
22    /// threaded onto every session's host so the process admin seam can wake
23    /// the runner after a successful start. Shared across `LashCore` clones so
24    /// the default inline runner is spawned at most once (Decision 3).
25    pub(crate) process_work_runner: Arc<ProcessWorkRunnerSlot>,
26}
27
28/// How a [`LashCore`] resolves its process work runner, decided at `build()`
29/// and shared across clones.
30pub(crate) enum ProcessWorkRunnerSetup {
31    /// No process registry is wired; there is nothing to run and no poke.
32    None,
33    /// Lazily spawn the default inline [`ProcessWorkRunner`] on first
34    /// `session().open()` (Decision 3: the runtime is guaranteed by then, and
35    /// `build()` is sync — some tests call it outside a tokio runtime). A store
36    /// factory is required to build the config (the worker rebuilds a session
37    /// runtime per process); a registry with no store factory is rejected at
38    /// build with [`EmbedError::ProcessRegistryRequiresStoreFactory`].
39    LazyDefault {
40        config: Box<DurableProcessWorkerConfig>,
41    },
42    /// The host wired an external runner (e.g. the Restate ingress-client
43    /// runner) and handed its driver to the core.
44    External { driver: ProcessWorkDriver },
45}
46
47#[derive(Clone, Default)]
48pub(crate) enum ProcessWorkSource {
49    #[default]
50    None,
51    Inline {
52        registry: Arc<dyn ProcessRegistry>,
53    },
54    External(ProcessWorkDriver),
55}
56
57impl ProcessWorkSource {
58    fn process_registry(&self) -> Option<Arc<dyn ProcessRegistry>> {
59        match self {
60            Self::None => None,
61            Self::Inline { registry } => Some(Arc::clone(registry)),
62            Self::External(driver) => Some(driver.process_registry()),
63        }
64    }
65
66    #[cfg(feature = "rlm")]
67    fn has_registry(&self) -> bool {
68        !matches!(self, Self::None)
69    }
70}
71
72/// Shared, lazily-initialized process-work-runner state for a [`LashCore`].
73///
74/// The once-guard ([`tokio::sync::OnceCell`]) makes the default inline runner
75/// spawn exactly once across `LashCore` clones, on the first `session().open()`
76/// that needs it. The resolved [`ProcessWorkPoke`] (if any) is then reused for
77/// every session host.
78pub(crate) struct ProcessWorkRunnerSlot {
79    setup: ProcessWorkRunnerSetup,
80    poke: tokio::sync::OnceCell<Option<ProcessWorkPoke>>,
81    phase_probe_slot: Option<lash_core::runtime::RuntimeTurnPhaseProbeSlot>,
82}
83
84impl ProcessWorkRunnerSlot {
85    fn new(setup: ProcessWorkRunnerSetup) -> Self {
86        let phase_probe_slot = match &setup {
87            ProcessWorkRunnerSetup::LazyDefault { config } => {
88                Some(config.turn_phase_probe_slot.clone())
89            }
90            ProcessWorkRunnerSetup::None | ProcessWorkRunnerSetup::External { .. } => None,
91        };
92        Self {
93            setup,
94            poke: tokio::sync::OnceCell::new(),
95            phase_probe_slot,
96        }
97    }
98
99    /// Resolve the poke for a session host, spawning the default inline runner
100    /// on first use. Idempotent: the once-guard ensures a single spawn.
101    pub(crate) async fn poke(&self) -> Option<ProcessWorkPoke> {
102        self.poke
103            .get_or_init(|| async {
104                match &self.setup {
105                    ProcessWorkRunnerSetup::None => None,
106                    ProcessWorkRunnerSetup::External { driver } => Some(driver.poke_handle()),
107                    ProcessWorkRunnerSetup::LazyDefault { config } => {
108                        let worker = DurableProcessWorker::new((**config).clone());
109                        Some(ProcessWorkRunner::inline(worker).spawn())
110                    }
111                }
112            })
113            .await
114            .clone()
115    }
116
117    pub(crate) fn phase_probe_slot(&self) -> Option<lash_core::runtime::RuntimeTurnPhaseProbeSlot> {
118        self.phase_probe_slot.clone()
119    }
120}
121
122#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
123pub struct SessionDeleteReport {
124    pub session_id: String,
125    pub process: Option<lash_core::ProcessSessionDeleteReport>,
126}
127
128impl LashCore {
129    pub fn builder() -> LashCoreBuilder {
130        LashCoreBuilder::default()
131    }
132
133    pub fn session(&self, session_id: impl Into<String>) -> SessionBuilder {
134        SessionBuilder {
135            core: self.clone(),
136            session_id: session_id.into(),
137            spec: SessionSpec::inherit(),
138            parent_session_id: None,
139            store: None,
140            provider: None,
141            active_plugins: Vec::new(),
142            plugin_factories: Vec::new(),
143        }
144    }
145
146    pub fn triggers(&self) -> crate::admin::CoreTriggerAdmin {
147        crate::admin::CoreTriggerAdmin { core: self.clone() }
148    }
149
150    pub fn processes(&self) -> crate::admin::Processes {
151        crate::admin::Processes { core: self.clone() }
152    }
153
154    pub fn completions(&self) -> crate::admin::Completions {
155        crate::admin::Completions { core: self.clone() }
156    }
157
158    pub fn effect_host(&self) -> Arc<dyn EffectHost> {
159        Arc::clone(&self.env.core.control.effect_host)
160    }
161
162    pub async fn delete_session(
163        &self,
164        session_id: impl AsRef<str>,
165        scoped_effect_controller: ScopedEffectController<'_>,
166    ) -> Result<SessionDeleteReport> {
167        let session_id = session_id.as_ref().to_string();
168        let Some(store_factory) = self.store_factory.as_ref() else {
169            return Err(EmbedError::MissingSessionStoreFactory);
170        };
171        let process = if let Some(process_registry) = self.env.process_registry.as_ref() {
172            let invocation = RuntimeInvocation::effect(
173                RuntimeScope::new(session_id.clone()),
174                format!("process:delete-session:{session_id}"),
175                RuntimeEffectKind::Process,
176                format!("{session_id}:delete-session"),
177            );
178            let outcome = scoped_effect_controller
179                .controller()
180                .execute_effect(
181                    RuntimeEffectEnvelope::new(
182                        invocation,
183                        RuntimeEffectCommand::process(ProcessCommand::DeleteSession {
184                            session_id: session_id.clone(),
185                        }),
186                    ),
187                    RuntimeEffectLocalExecutor::processes(Arc::clone(process_registry)),
188                )
189                .await
190                .map_err(|err| EmbedError::SessionDeleteProcess {
191                    session_id: session_id.clone(),
192                    message: err.to_string(),
193                })?;
194            match outcome {
195                RuntimeEffectOutcome::Process {
196                    result: ProcessEffectOutcome::DeleteSession { report },
197                } => Some(report),
198                other => {
199                    return Err(EmbedError::SessionDeleteProcess {
200                        session_id,
201                        message: format!(
202                            "process delete returned the wrong outcome: {}",
203                            other.kind().as_str()
204                        ),
205                    });
206                }
207            }
208        } else {
209            None
210        };
211        if let Some(trigger_store) = self.env.trigger_store.as_ref() {
212            trigger_store
213                .delete_session_subscriptions(&session_id)
214                .await
215                .map_err(|err| EmbedError::SessionDeleteProcess {
216                    session_id: session_id.clone(),
217                    message: err.to_string(),
218                })?;
219        }
220        self.env
221            .core
222            .control
223            .effect_host
224            .revoke_await_events_for_session(&session_id)
225            .await
226            .map_err(|err| EmbedError::SessionDeleteProcess {
227                session_id: session_id.clone(),
228                message: err.to_string(),
229            })?;
230        store_factory
231            .delete_session(&session_id)
232            .await
233            .map_err(|message| EmbedError::StoreFactory {
234                session_id: session_id.clone(),
235                message,
236            })?;
237        Ok(SessionDeleteReport {
238            session_id,
239            process,
240        })
241    }
242
243    pub fn process_registry(&self) -> Option<Arc<dyn ProcessRegistry>> {
244        self.env.process_registry.as_ref().cloned()
245    }
246
247    pub fn durable_process_worker_config(&self) -> Result<DurableProcessWorkerConfig> {
248        self.durable_process_worker_config_with_plugins(std::iter::empty::<Arc<dyn PluginFactory>>())
249    }
250
251    pub fn durable_process_worker_config_with_plugins(
252        &self,
253        extra_plugin_factories: impl IntoIterator<Item = Arc<dyn PluginFactory>>,
254    ) -> Result<DurableProcessWorkerConfig> {
255        let Some(process_registry) = self.process_registry() else {
256            return Err(EmbedError::MissingProcessRegistry);
257        };
258        let Some(store_factory) = self.store_factory.as_ref() else {
259            return Err(EmbedError::MissingProcessWorkerStoreFactory);
260        };
261        let plugin_host = build_plugin_host(
262            self.protocol_factory.as_ref(),
263            self.plugin_factories.as_ref(),
264            extra_plugin_factories.into_iter().collect(),
265        )?;
266        let runtime_host =
267            self.runtime_host_for_plugin_host(self.env.core.clone(), &plugin_host)?;
268        let mut config = DurableProcessWorkerConfig::new(
269            Arc::new(plugin_host),
270            runtime_host,
271            Arc::clone(store_factory),
272            process_registry,
273        )
274        .with_session_policy(self.policy.clone())
275        .with_residency(self.env.residency);
276        if let Some(trigger_store) = self.env.trigger_store.as_ref() {
277            config = config.with_trigger_store(Arc::clone(trigger_store));
278        }
279        Ok(config)
280    }
281
282    pub(crate) fn runtime_host_for_plugin_host(
283        &self,
284        runtime_host: RuntimeHostConfig,
285        plugin_host: &PluginHost,
286    ) -> Result<RuntimeHostConfig> {
287        match &self.runtime_host_installer {
288            Some(install) => install(runtime_host, plugin_host),
289            None => Ok(runtime_host),
290        }
291    }
292}
293
294fn default_runtime_stack() -> PluginStack {
295    lash_plugin_tool_output_budget::tool_output_budget_stack()
296}
297
298#[derive(Clone)]
299pub struct StandardCore {
300    core: LashCore,
301}
302
303impl StandardCore {
304    pub fn builder() -> StandardCoreBuilder {
305        StandardCoreBuilder {
306            inner: LashCore::builder()
307                .protocol_plugin(Arc::new(
308                    lash_protocol_standard::StandardProtocolPluginFactory::new(),
309                ))
310                .plugins(default_runtime_stack()),
311        }
312    }
313
314    pub fn session(&self, session_id: impl Into<String>) -> SessionBuilder {
315        self.core.session(session_id)
316    }
317
318    pub fn into_inner(self) -> LashCore {
319        self.core
320    }
321}
322
323impl std::ops::Deref for StandardCore {
324    type Target = LashCore;
325
326    fn deref(&self) -> &Self::Target {
327        &self.core
328    }
329}
330
331pub struct StandardCoreBuilder {
332    inner: LashCoreBuilder,
333}
334
335impl StandardCoreBuilder {
336    pub fn build(self) -> Result<StandardCore> {
337        self.inner.build().map(|core| StandardCore { core })
338    }
339}
340
341impl PromptLayerSink for StandardCoreBuilder {
342    fn prompt_layer_mut(&mut self) -> &mut PromptLayer {
343        self.inner.prompt_layer_mut()
344    }
345}
346
347#[cfg(feature = "rlm")]
348#[derive(Clone)]
349pub struct RlmCore {
350    core: LashCore,
351    surface_config: lash_protocol_rlm::RlmProtocolPluginConfig,
352    process_lifecycle_available: bool,
353    lashlang_artifact_store: Arc<dyn lash_lashlang_runtime::LashlangArtifactStore>,
354}
355
356#[cfg(feature = "rlm")]
357impl RlmCore {
358    pub fn builder() -> RlmCoreBuilder {
359        RlmCoreBuilder {
360            inner: LashCore::builder().plugins(default_runtime_stack()),
361            config: lash_protocol_rlm::RlmProtocolPluginConfig::default(),
362            projection_resolver: Arc::new(lash_protocol_rlm::ProjectionRegistry::default()),
363            lashlang_artifact_store: None,
364            lashlang_execution_sink: None,
365        }
366    }
367
368    pub fn session(&self, session_id: impl Into<String>) -> RlmSessionBuilder {
369        RlmSessionBuilder {
370            builder: self.core.session(session_id),
371            rlm_final_answer_format: None,
372        }
373    }
374
375    pub fn into_inner(self) -> LashCore {
376        self.core
377    }
378
379    pub fn lashlang_compile_surface(
380        &self,
381        request: crate::rlm::LashlangCompileSurfaceRequest,
382    ) -> Result<crate::rlm::LashlangCompileSurface> {
383        let plugin_host = build_plugin_host(
384            self.core.protocol_factory.as_ref(),
385            self.core.plugin_factories.as_ref(),
386            request.extra_plugin_factories,
387        )?;
388        let plugins = plugin_host
389            .build_session_with_parent(
390                &request.session_id,
391                None,
392                None,
393                lash_core::plugin::SessionAuthorityContext {
394                    plugin_options: request.execution_env_spec.plugin_options,
395                    ..Default::default()
396                },
397            )
398            .map_err(lash_core::PluginError::from)?;
399        let tool_catalog = plugins.resolved_tool_catalog(&request.session_id)?;
400        let surface = crate::rlm::rlm_lashlang_surface(
401            &self.surface_config,
402            self.process_lifecycle_available,
403        )
404        .with_plugin_extensions(plugin_host.extensions())
405        .map_err(lash_core::PluginError::Registration)?;
406        let host_environment = surface.host_environment(&tool_catalog);
407        Ok(crate::rlm::LashlangCompileSurface {
408            host_environment,
409            tool_catalog,
410            surface,
411        })
412    }
413
414    pub async fn compile_lashlang_module(
415        &self,
416        request: crate::rlm::LashlangModuleCompileRequest,
417    ) -> std::result::Result<crate::rlm::ModuleCompileOutput, crate::rlm::LashlangModuleCompileError>
418    {
419        let surface = self
420            .lashlang_compile_surface(crate::rlm::LashlangCompileSurfaceRequest {
421                session_id: request.session_id,
422                execution_env_spec: request.execution_env_spec,
423                extra_plugin_factories: request.extra_plugin_factories,
424            })
425            .map_err(|err| {
426                lashlang::ModuleCompileError::Link(lashlang::ModuleCompileDiagnostic {
427                    stage: lashlang::ModuleCompileStage::Link,
428                    message: err.to_string(),
429                    offset: None,
430                    span: None,
431                    line: None,
432                    column: None,
433                    diagnostic: Some(err.to_string()),
434                })
435            })?;
436        lashlang::compile_module(lashlang::ModuleCompileRequest {
437            source: &request.source,
438            environment: &surface.host_environment,
439            artifact_store: Some(self.lashlang_artifact_store.as_ref()),
440        })
441        .await
442    }
443}
444
445#[cfg(feature = "rlm")]
446impl std::ops::Deref for RlmCore {
447    type Target = LashCore;
448
449    fn deref(&self) -> &Self::Target {
450        &self.core
451    }
452}
453
454#[cfg(feature = "rlm")]
455pub struct RlmCoreBuilder {
456    inner: LashCoreBuilder,
457    config: lash_protocol_rlm::RlmProtocolPluginConfig,
458    projection_resolver: Arc<dyn lash_protocol_rlm::ProjectionResolver>,
459    lashlang_artifact_store: Option<Arc<dyn lash_lashlang_runtime::LashlangArtifactStore>>,
460    lashlang_execution_sink: Option<Arc<dyn lash_trace::TraceSink>>,
461}
462
463#[cfg(feature = "rlm")]
464impl RlmCoreBuilder {
465    pub fn rlm_protocol_config(
466        mut self,
467        config: lash_protocol_rlm::RlmProtocolPluginConfig,
468    ) -> Self {
469        self.config = config;
470        self
471    }
472
473    pub fn projection_resolver(
474        mut self,
475        projection_resolver: Arc<dyn lash_protocol_rlm::ProjectionResolver>,
476    ) -> Self {
477        self.projection_resolver = projection_resolver;
478        self
479    }
480
481    pub fn lashlang_artifact_store(
482        mut self,
483        artifact_store: Arc<dyn lash_lashlang_runtime::LashlangArtifactStore>,
484    ) -> Self {
485        self.lashlang_artifact_store = Some(artifact_store);
486        self
487    }
488
489    pub fn lashlang_execution_sink(
490        mut self,
491        lashlang_execution_sink: Arc<dyn lash_trace::TraceSink>,
492    ) -> Self {
493        self.lashlang_execution_sink = Some(lashlang_execution_sink);
494        self
495    }
496
497    pub fn lashlang_execution_jsonl_path(mut self, path: impl Into<std::path::PathBuf>) -> Self {
498        self.lashlang_execution_sink = Some(Arc::new(lash_trace::JsonlTraceSink::new(path.into())));
499        self
500    }
501
502    pub fn build(mut self) -> Result<RlmCore> {
503        let artifact_store = self
504            .lashlang_artifact_store
505            .clone()
506            .ok_or(EmbedError::MissingLashlangArtifactStore)?;
507        if self.inner.effective_session_store_tier() == Some(DurabilityTier::Durable)
508            && artifact_store.durability_tier()
509                == lash_lashlang_runtime::LashlangDurabilityTier::Inline
510        {
511            return Err(EmbedError::DurableStorePeerRequired {
512                facet: "artifact store",
513            });
514        }
515        let process_lifecycle_available = self.inner.process_work_source.has_registry();
516        let config = crate::rlm::rlm_protocol_config(self.config, process_lifecycle_available);
517        let trace_context = self.inner.resolved_trace_context();
518        let protocol_factory = Arc::new(
519            lash_protocol_rlm::RlmProtocolPluginFactory::new(config.clone())
520                .with_projection_resolver(Arc::clone(&self.projection_resolver))
521                .with_lashlang_artifact_store(Arc::clone(&artifact_store))
522                .with_lashlang_execution_trace(
523                    self.lashlang_execution_sink.clone(),
524                    trace_context.clone(),
525                ),
526        );
527        let engine_artifact_store = Arc::clone(&artifact_store);
528        let engine_config = config.clone();
529        let engine_sink = self.lashlang_execution_sink.clone();
530        self.inner.protocol_factory = Some(protocol_factory);
531        self.inner.runtime_host_installer = Some(Arc::new(move |runtime_host, plugin_host| {
532            let surface =
533                crate::rlm::rlm_lashlang_surface(&engine_config, process_lifecycle_available)
534                    .with_plugin_extensions(plugin_host.extensions())
535                    .map_err(lash_core::PluginError::Registration)?;
536            let engine = lash_lashlang_runtime::LashlangProcessEngine::new(
537                Arc::clone(&engine_artifact_store),
538                surface,
539            )
540            .with_execution_trace(
541                engine_sink.clone(),
542                runtime_host.tracing.trace_context.clone(),
543            );
544            Ok(runtime_host.with_process_engine(Arc::new(engine)))
545        }));
546        self.inner.build().map(|core| RlmCore {
547            core,
548            surface_config: config,
549            process_lifecycle_available,
550            lashlang_artifact_store: artifact_store,
551        })
552    }
553}
554
555#[cfg(feature = "rlm")]
556impl PromptLayerSink for RlmCoreBuilder {
557    fn prompt_layer_mut(&mut self) -> &mut PromptLayer {
558        self.inner.prompt_layer_mut()
559    }
560}
561
562macro_rules! forward_core_builder_methods {
563    ($builder:ident) => {
564        impl $builder {
565            pub fn provider(mut self, provider: ProviderHandle) -> Self {
566                self.inner = self.inner.provider(provider);
567                self
568            }
569
570            pub fn model(mut self, model: lash_core::ModelSpec) -> Self {
571                self.inner = self.inner.model(model);
572                self
573            }
574
575            pub fn max_turns(mut self, max_turns: usize) -> Self {
576                self.inner = self.inner.max_turns(max_turns);
577                self
578            }
579
580            pub fn session_spec(mut self, spec: SessionSpec) -> Self {
581                self.inner = self.inner.session_spec(spec);
582                self
583            }
584
585            pub fn store_factory(mut self, store_factory: Arc<dyn SessionStoreFactory>) -> Self {
586                self.inner = self.inner.store_factory(store_factory);
587                self
588            }
589
590            pub fn child_store_factory(
591                mut self,
592                store_factory: Arc<dyn SessionStoreFactory>,
593            ) -> Self {
594                self.inner = self.inner.child_store_factory(store_factory);
595                self
596            }
597
598            pub fn attachment_store(mut self, attachment_store: Arc<dyn AttachmentStore>) -> Self {
599                self.inner = self.inner.attachment_store(attachment_store);
600                self
601            }
602
603            pub fn process_env_store(
604                mut self,
605                process_env_store: Arc<dyn ProcessExecutionEnvStore>,
606            ) -> Self {
607                self.inner = self.inner.process_env_store(process_env_store);
608                self
609            }
610
611            pub fn effect_host(mut self, effect_host: Arc<dyn EffectHost>) -> Self {
612                self.inner = self.inner.effect_host(effect_host);
613                self
614            }
615
616            pub fn tools(mut self, tools: Arc<dyn ToolProvider>) -> Self {
617                self.inner = self.inner.tools(tools);
618                self
619            }
620
621            pub fn plugin(mut self, plugin: Arc<dyn PluginFactory>) -> Self {
622                self.inner = self.inner.plugin(plugin);
623                self
624            }
625
626            pub fn plugins(mut self, stack: PluginStack) -> Self {
627                self.inner = self.inner.plugins(stack);
628                self
629            }
630
631            pub fn configure_plugins(mut self, configure: impl FnOnce(&mut PluginStack)) -> Self {
632                self.inner = self.inner.configure_plugins(configure);
633                self
634            }
635
636            pub fn trace_sink(mut self, trace_sink: Arc<dyn lash_trace::TraceSink>) -> Self {
637                self.inner = self.inner.trace_sink(trace_sink);
638                self
639            }
640
641            pub fn trace_jsonl_path(mut self, path: impl Into<std::path::PathBuf>) -> Self {
642                self.inner = self.inner.trace_jsonl_path(path);
643                self
644            }
645
646            pub fn trace_level(mut self, trace_level: lash_trace::TraceLevel) -> Self {
647                self.inner = self.inner.trace_level(trace_level);
648                self
649            }
650
651            pub fn trace_context(mut self, trace_context: lash_trace::TraceContext) -> Self {
652                self.inner = self.inner.trace_context(trace_context);
653                self
654            }
655
656            pub fn termination(mut self, termination: TerminationPolicy) -> Self {
657                self.inner = self.inner.termination(termination);
658                self
659            }
660
661            pub fn residency(mut self, residency: Residency) -> Self {
662                self.inner = self.inner.residency(residency);
663                self
664            }
665
666            pub fn live_replay_store(
667                mut self,
668                live_replay_store: Arc<dyn LiveReplayStore>,
669            ) -> Self {
670                self.inner = self.inner.live_replay_store(live_replay_store);
671                self
672            }
673
674            pub fn process_registry(mut self, process_registry: Arc<dyn ProcessRegistry>) -> Self {
675                self.inner = self.inner.process_registry(process_registry);
676                self
677            }
678
679            pub fn trigger_store(mut self, store: Arc<dyn lash_core::TriggerStore>) -> Self {
680                self.inner = self.inner.trigger_store(store);
681                self
682            }
683
684            pub fn process_work_driver(mut self, driver: ProcessWorkDriver) -> Self {
685                self.inner = self.inner.process_work_driver(driver);
686                self
687            }
688
689            pub fn queued_work_poke(mut self, poke: QueuedWorkPoke) -> Self {
690                self.inner = self.inner.queued_work_poke(poke);
691                self
692            }
693
694            pub fn runtime_host_config(mut self, core: RuntimeHostConfig) -> Self {
695                self.inner.runtime_host_config = Some(core);
696                self
697            }
698        }
699    };
700}
701
702forward_core_builder_methods!(StandardCoreBuilder);
703#[cfg(feature = "rlm")]
704forward_core_builder_methods!(RlmCoreBuilder);
705
706#[derive(Default)]
707pub struct LashCoreBuilder {
708    pub(crate) protocol_factory: Option<Arc<dyn PluginFactory>>,
709    session_spec: SessionSpec,
710    provider: Option<ProviderHandle>,
711    pub(crate) store_factory: Option<Arc<dyn SessionStoreFactory>>,
712    child_store_factory: Option<Arc<dyn SessionStoreFactory>>,
713    // `RuntimeHostConfig` has no `Default`: the generic host-owned durability
714    // dependencies must be named. They are collected here and resolved in
715    // `build()`, which errors if any is unset.
716    effect_host: Option<Arc<dyn EffectHost>>,
717    attachment_store: Option<Arc<dyn AttachmentStore>>,
718    process_env_store: Option<Arc<dyn ProcessExecutionEnvStore>>,
719    trigger_store: Option<Arc<dyn lash_core::TriggerStore>>,
720    // Benign core overrides applied on top of the resolved core.
721    prompt: Option<PromptLayer>,
722    trace_sink: Option<Arc<dyn lash_trace::TraceSink>>,
723    trace_level: Option<lash_trace::TraceLevel>,
724    trace_context: Option<lash_trace::TraceContext>,
725    termination: Option<TerminationPolicy>,
726    // Advanced full-config override; used as the base core when present.
727    runtime_host_config: Option<RuntimeHostConfig>,
728    tool_providers: Vec<Arc<dyn ToolProvider>>,
729    plugin_stack: PluginStack,
730    plugin_host: Option<PluginHost>,
731    residency: Option<Residency>,
732    // Single source of truth for process lifecycle support and process-work
733    // consumption.
734    process_work_source: ProcessWorkSource,
735    queued_work_poke: Option<QueuedWorkPoke>,
736    live_replay_store: Option<Arc<dyn LiveReplayStore>>,
737    runtime_host_installer: Option<RuntimeHostInstaller>,
738}
739
740impl LashCoreBuilder {
741    pub fn protocol_plugin(mut self, plugin: Arc<dyn PluginFactory>) -> Self {
742        self.protocol_factory = Some(plugin);
743        self
744    }
745
746    pub fn provider(mut self, provider: ProviderHandle) -> Self {
747        self.session_spec = self.session_spec.provider_id(provider.kind());
748        self.provider = Some(provider);
749        self
750    }
751
752    pub fn model(mut self, model: lash_core::ModelSpec) -> Self {
753        self.session_spec = self.session_spec.model(model);
754        self
755    }
756
757    pub fn max_turns(mut self, max_turns: usize) -> Self {
758        self.session_spec = self.session_spec.max_turns(max_turns);
759        self
760    }
761
762    pub fn session_spec(mut self, spec: SessionSpec) -> Self {
763        self.session_spec = spec;
764        self
765    }
766
767    /// Configure a factory that can create a persistence store for any root
768    /// session opened from this core.
769    ///
770    /// The factory must honor `SessionStoreCreateRequest::session_id` and
771    /// return a store for that specific session. Do not use this to wrap one
772    /// pre-opened root store; pass root-only stores with
773    /// `LashCore::session(...).store(store)` instead.
774    pub fn store_factory(mut self, store_factory: Arc<dyn SessionStoreFactory>) -> Self {
775        self.store_factory = Some(store_factory);
776        self
777    }
778
779    /// Configure the persistence factory used by managed child sessions, such
780    /// as local subagents.
781    ///
782    /// Child factories must return a distinct store bound to the requested
783    /// child session id. Hosts that pass an explicit root store with
784    /// `SessionBuilder::store` should set this when child sessions need
785    /// persistence.
786    pub fn child_store_factory(mut self, store_factory: Arc<dyn SessionStoreFactory>) -> Self {
787        self.child_store_factory = Some(store_factory);
788        self
789    }
790
791    pub fn attachment_store(mut self, attachment_store: Arc<dyn AttachmentStore>) -> Self {
792        self.attachment_store = Some(attachment_store);
793        self
794    }
795
796    pub fn process_env_store(
797        mut self,
798        process_env_store: Arc<dyn ProcessExecutionEnvStore>,
799    ) -> Self {
800        self.process_env_store = Some(process_env_store);
801        self
802    }
803
804    /// Set the deployment effect host — the durability boundary every operation
805    /// crosses. Pass [`InlineEffectHost`](crate::durability::InlineEffectHost)
806    /// for in-process execution, or a workflow-backed host for durable
807    /// execution.
808    pub fn effect_host(mut self, effect_host: Arc<dyn EffectHost>) -> Self {
809        self.effect_host = Some(effect_host);
810        self
811    }
812
813    pub fn tools(mut self, tools: Arc<dyn ToolProvider>) -> Self {
814        self.tool_providers.push(tools);
815        self
816    }
817
818    pub fn plugin(mut self, plugin: Arc<dyn PluginFactory>) -> Self {
819        self.plugin_stack.push(plugin);
820        self
821    }
822
823    pub fn plugins(mut self, stack: PluginStack) -> Self {
824        self.plugin_stack = stack;
825        self
826    }
827
828    pub fn configure_plugins(mut self, configure: impl FnOnce(&mut PluginStack)) -> Self {
829        configure(&mut self.plugin_stack);
830        self
831    }
832
833    pub fn trace_sink(mut self, trace_sink: Arc<dyn lash_trace::TraceSink>) -> Self {
834        self.trace_sink = Some(trace_sink);
835        self
836    }
837
838    pub fn trace_jsonl_path(mut self, path: impl Into<std::path::PathBuf>) -> Self {
839        self.trace_sink = Some(Arc::new(lash_trace::JsonlTraceSink::new(path.into())));
840        self
841    }
842
843    pub fn trace_level(mut self, trace_level: lash_trace::TraceLevel) -> Self {
844        self.trace_level = Some(trace_level);
845        self
846    }
847
848    pub fn trace_context(mut self, trace_context: lash_trace::TraceContext) -> Self {
849        self.trace_context = Some(trace_context);
850        self
851    }
852
853    pub fn termination(mut self, termination: TerminationPolicy) -> Self {
854        self.termination = Some(termination);
855        self
856    }
857
858    pub fn residency(mut self, residency: Residency) -> Self {
859        self.residency = Some(residency);
860        self
861    }
862
863    /// Configure the bounded live replay buffer used by session observation
864    /// cursors. This is best-effort reconnect recovery only; durable state
865    /// still comes from the session store and [`SessionReadView`].
866    pub fn live_replay_store(mut self, live_replay_store: Arc<dyn LiveReplayStore>) -> Self {
867        self.live_replay_store = Some(live_replay_store);
868        self
869    }
870
871    /// Resolve the runtime host config, requiring the generic host-owned
872    /// durability dependencies to have been named.
873    fn resolve_runtime_host_config(&mut self) -> Result<RuntimeHostConfig> {
874        if let Some(base) = self.runtime_host_config.take() {
875            return Ok(self.apply_core_overrides(base));
876        }
877        let effect_host = self
878            .effect_host
879            .take()
880            .ok_or(EmbedError::MissingEffectHost)?;
881        let attachment_store = self
882            .attachment_store
883            .take()
884            .ok_or(EmbedError::MissingAttachmentStore)?;
885        let process_env_store = self
886            .process_env_store
887            .take()
888            .ok_or(EmbedError::MissingProcessEnvStore)?;
889        let core = RuntimeHostConfig::new(effect_host, attachment_store, process_env_store);
890        Ok(self.apply_core_overrides(core))
891    }
892
893    /// Apply benign + still-set dependency overrides on top of a base core.
894    fn apply_core_overrides(&mut self, mut core: RuntimeHostConfig) -> RuntimeHostConfig {
895        if let Some(effect_host) = self.effect_host.take() {
896            core.control.effect_host = effect_host;
897        }
898        if let Some(attachment_store) = self.attachment_store.take() {
899            core.durability.attachment_store = attachment_store;
900        }
901        if let Some(process_env_store) = self.process_env_store.take() {
902            core.durability.process_env_store = process_env_store;
903        }
904        if let Some(prompt) = self.prompt.take() {
905            core.prompt.prompt = prompt;
906        }
907        if let Some(trace_sink) = self.trace_sink.take() {
908            core.tracing.trace_sink = Some(trace_sink);
909        }
910        if let Some(trace_level) = self.trace_level.take() {
911            core.tracing.trace_level = trace_level;
912        }
913        if let Some(trace_context) = self.trace_context.take() {
914            core.tracing.trace_context = trace_context;
915        }
916        if let Some(termination) = self.termination.take() {
917            core.control.termination = termination;
918        }
919        core
920    }
921
922    /// Validate store peer-coherence of the wired durability dependencies.
923    ///
924    /// Durability is established by what the host wired; the per-invocation
925    /// durable controller is not visible here (the build-time controller is
926    /// inline by construction), so this checks the stores against each other
927    /// only — never the controller (see A5 in the durable-first wiring spec):
928    ///
929    /// - a durable session store factory requires a durable attachment and
930    ///   artifact store (they back the same session state);
931    /// - a durable process registry requires a session store factory that is
932    ///   itself durable (the registry's process records are meaningless without
933    ///   a durable session behind them).
934    fn effective_session_store_tier(&self) -> Option<DurabilityTier> {
935        self.child_store_factory
936            .as_ref()
937            .or(self.store_factory.as_ref())
938            .map(|factory| factory.durability_tier())
939    }
940
941    #[cfg(feature = "rlm")]
942    fn resolved_trace_context(&self) -> lash_trace::TraceContext {
943        self.trace_context
944            .clone()
945            .or_else(|| {
946                self.runtime_host_config
947                    .as_ref()
948                    .map(|core| core.tracing.trace_context.clone())
949            })
950            .unwrap_or_default()
951    }
952
953    fn ensure_store_peer_coherence(&self) -> Result<()> {
954        // Match `build()`'s wiring exactly: the session store factory it installs
955        // is `child_store_factory.or(store_factory)` (child takes precedence, root
956        // is the fallback). The coherence check must read the tier of that same
957        // effective factory, or a host that wires only a durable child factory
958        // (no root) is wrongly rejected though `build()` would wire it durably.
959        let session_store_tier = self.effective_session_store_tier();
960        let attachment_tier = self
961            .attachment_store
962            .as_ref()
963            .map(|store| store.persistence().durability_tier())
964            .or_else(|| {
965                self.runtime_host_config.as_ref().map(|core| {
966                    core.durability
967                        .attachment_store
968                        .persistence()
969                        .durability_tier()
970                })
971            });
972        let process_env_tier = self
973            .process_env_store
974            .as_ref()
975            .map(|store| store.durability_tier())
976            .or_else(|| {
977                self.runtime_host_config
978                    .as_ref()
979                    .map(|core| core.durability.process_env_store.durability_tier())
980            });
981        let effect_host_tier = self
982            .effect_host
983            .as_ref()
984            .map(|host| host.durability_tier())
985            .or_else(|| {
986                self.runtime_host_config
987                    .as_ref()
988                    .map(|core| core.control.effect_host.durability_tier())
989            });
990        let trigger_store_tier = self
991            .trigger_store
992            .as_ref()
993            .map(|store| store.durability_tier());
994
995        if session_store_tier == Some(DurabilityTier::Durable) {
996            if attachment_tier == Some(DurabilityTier::Inline) {
997                return Err(EmbedError::DurableStorePeerRequired {
998                    facet: "attachment store",
999                });
1000            }
1001            if process_env_tier == Some(DurabilityTier::Inline) {
1002                return Err(EmbedError::DurableStorePeerRequired {
1003                    facet: "process execution environment store",
1004                });
1005            }
1006        }
1007
1008        if let Some(process_registry) = self.process_work_source.process_registry().as_ref()
1009            && process_registry.durability_tier() == DurabilityTier::Durable
1010        {
1011            if session_store_tier != Some(DurabilityTier::Durable) {
1012                return Err(EmbedError::DurableProcessRegistryRequiresStoreFactory);
1013            }
1014            if trigger_store_tier != Some(DurabilityTier::Durable) {
1015                return Err(EmbedError::DurableStorePeerRequired {
1016                    facet: "trigger store",
1017                });
1018            }
1019            if process_env_tier != Some(DurabilityTier::Durable) {
1020                return Err(EmbedError::DurableStorePeerRequired {
1021                    facet: "process execution environment store",
1022                });
1023            }
1024        }
1025
1026        if trigger_store_tier == Some(DurabilityTier::Durable) {
1027            if session_store_tier != Some(DurabilityTier::Durable) {
1028                return Err(EmbedError::DurableStorePeerRequired {
1029                    facet: "session store factory",
1030                });
1031            }
1032            if process_env_tier != Some(DurabilityTier::Durable) {
1033                return Err(EmbedError::DurableStorePeerRequired {
1034                    facet: "process execution environment store",
1035                });
1036            }
1037            if let Some(process_registry) = self.process_work_source.process_registry().as_ref()
1038                && process_registry.durability_tier() == DurabilityTier::Inline
1039            {
1040                return Err(EmbedError::DurableStorePeerRequired {
1041                    facet: "process registry",
1042                });
1043            }
1044        }
1045
1046        if effect_host_tier == Some(DurabilityTier::Durable) {
1047            if attachment_tier != Some(DurabilityTier::Durable) {
1048                return Err(EmbedError::DurableStorePeerRequired {
1049                    facet: "attachment store",
1050                });
1051            }
1052            if process_env_tier != Some(DurabilityTier::Durable) {
1053                return Err(EmbedError::DurableStorePeerRequired {
1054                    facet: "process execution environment store",
1055                });
1056            }
1057        }
1058
1059        Ok(())
1060    }
1061
1062    pub fn build(mut self) -> Result<LashCore> {
1063        self.ensure_store_peer_coherence()?;
1064        let protocol_factory = self.protocol_factory.clone();
1065        if protocol_factory.is_none() && self.plugin_host.is_none() {
1066            return Err(EmbedError::MissingProtocolPlugin);
1067        }
1068        let provider_id = self
1069            .session_spec
1070            .provider_id
1071            .clone()
1072            .or_else(|| {
1073                self.provider
1074                    .as_ref()
1075                    .map(|provider| provider.kind().to_string())
1076            })
1077            .unwrap_or_default();
1078        let model = self
1079            .session_spec
1080            .model
1081            .clone()
1082            .ok_or(EmbedError::MissingModelSpec)?;
1083
1084        let base_policy = SessionPolicy {
1085            provider_id,
1086            model,
1087            max_turns: self.session_spec.max_turns.flatten(),
1088            ..SessionPolicy::default()
1089        };
1090        let policy = self.session_spec.resolve_against(&base_policy);
1091
1092        let mut core = self.resolve_runtime_host_config()?;
1093        if let Some(provider) = self.provider.clone() {
1094            core.providers.provider_resolver =
1095                Arc::new(lash_core::SingleProviderResolver::new(provider));
1096        }
1097        let plugin_factories = if let Some(plugin_host) = self.plugin_host {
1098            plugin_host.factories().to_vec()
1099        } else {
1100            let mut factories = Vec::new();
1101            if !self.tool_providers.is_empty() {
1102                let spec = self
1103                    .tool_providers
1104                    .into_iter()
1105                    .fold(PluginSpec::new(), PluginSpec::with_tool_provider);
1106                factories.push(Arc::new(StaticPluginFactory::new("embed_tools", spec))
1107                    as Arc<dyn PluginFactory>);
1108            }
1109            factories.extend(self.plugin_stack.into_factories());
1110            factories
1111        };
1112        let default_plugin_host =
1113            build_plugin_host(protocol_factory.as_ref(), &plugin_factories, Vec::new())?;
1114        if let Some(install) = &self.runtime_host_installer {
1115            core = install(core, &default_plugin_host)?;
1116        }
1117
1118        let process_registry = self.process_work_source.process_registry();
1119
1120        // Resolve the process work runner before the process source is moved
1121        // into the environment. The default inline runner's config is built
1122        // eagerly so a missing store factory fails loudly at build, not at
1123        // first open. It is built from the same single-protocol plugin host the
1124        // live runtime uses, so the worker can rebuild a runtime for a process.
1125        let process_work_runner = Self::resolve_process_work_runner(
1126            &self.process_work_source,
1127            &default_plugin_host,
1128            &core,
1129            // The worker rebuilds sessions with the same factory `build()` wires
1130            // below: `child_store_factory.or(store_factory)`.
1131            self.child_store_factory
1132                .as_ref()
1133                .or(self.store_factory.as_ref()),
1134            &policy,
1135            self.residency.unwrap_or_default(),
1136            self.trigger_store.as_ref(),
1137        )?;
1138
1139        let mut env_builder = RuntimeEnvironment::builder()
1140            .with_plugin_host(Arc::new(default_plugin_host))
1141            .with_runtime_host_config(core);
1142        if let Some(process_registry) = process_registry.as_ref() {
1143            env_builder = env_builder.with_process_registry(Arc::clone(process_registry));
1144        }
1145        if let Some(residency) = self.residency {
1146            env_builder = env_builder.with_residency(residency);
1147        }
1148        if let Some(child_store_factory) = self
1149            .child_store_factory
1150            .as_ref()
1151            .or(self.store_factory.as_ref())
1152        {
1153            env_builder = env_builder.with_session_store_factory(Arc::clone(child_store_factory));
1154        }
1155        if let Some(trigger_store) = self.trigger_store.as_ref() {
1156            env_builder = env_builder.with_trigger_store(Arc::clone(trigger_store));
1157        }
1158        if let Some(queued_work_poke) = self.queued_work_poke.clone() {
1159            env_builder = env_builder.with_queued_work_poke(queued_work_poke);
1160        }
1161
1162        let live_replay_store = self
1163            .live_replay_store
1164            .take()
1165            .unwrap_or_else(|| Arc::new(InMemoryLiveReplayStore::default()));
1166
1167        Ok(LashCore {
1168            env: env_builder.build(),
1169            policy,
1170            store_factory: self.store_factory,
1171            plugin_factories: Arc::new(plugin_factories),
1172            provider: self.provider,
1173            live_replay_store,
1174            protocol_factory,
1175            runtime_host_installer: self.runtime_host_installer,
1176            process_work_runner: Arc::new(ProcessWorkRunnerSlot::new(process_work_runner)),
1177        })
1178    }
1179
1180    /// Decide how a built [`LashCore`] sources its process work runner.
1181    ///
1182    /// - no registry => nothing to run ([`ProcessWorkRunnerSetup::None`]);
1183    /// - external driver wired => use it ([`ProcessWorkRunnerSetup::External`]);
1184    /// - inline registry wired => lazily spawn the default inline runner on first open. Its
1185    ///   [`DurableProcessWorkerConfig`] is built eagerly when a store factory is
1186    ///   present; without one the inline worker cannot rebuild session runtimes.
1187    fn resolve_process_work_runner(
1188        process_work_source: &ProcessWorkSource,
1189        worker_plugin_host: &PluginHost,
1190        core: &RuntimeHostConfig,
1191        store_factory: Option<&Arc<dyn SessionStoreFactory>>,
1192        policy: &SessionPolicy,
1193        residency: lash_core::Residency,
1194        trigger_store: Option<&Arc<dyn lash_core::TriggerStore>>,
1195    ) -> Result<ProcessWorkRunnerSetup> {
1196        let process_registry = match process_work_source {
1197            ProcessWorkSource::None => return Ok(ProcessWorkRunnerSetup::None),
1198            ProcessWorkSource::External(driver) => {
1199                return Ok(ProcessWorkRunnerSetup::External {
1200                    driver: driver.clone(),
1201                });
1202            }
1203            ProcessWorkSource::Inline { registry } => Arc::clone(registry),
1204        };
1205        // The worker rebuilds a session runtime per process, so it needs a store
1206        // factory; without one the default runner could not execute anything, so
1207        // fail loudly rather than silently leave processes unexecuted.
1208        let Some(store_factory) = store_factory else {
1209            return Err(EmbedError::ProcessRegistryRequiresStoreFactory);
1210        };
1211        // The worker rebuilds with the same plugin host the live runtime uses,
1212        // including the protocol plugin that supplies the protocol session
1213        // capability.
1214        let phase_probe_slot = lash_core::runtime::RuntimeTurnPhaseProbeSlot::default();
1215        let config = Box::new(
1216            DurableProcessWorkerConfig::new(
1217                Arc::new(worker_plugin_host.clone()),
1218                core.clone(),
1219                Arc::clone(store_factory),
1220                process_registry,
1221            )
1222            .with_session_policy(policy.clone())
1223            .with_trigger_store(
1224                trigger_store
1225                    .cloned()
1226                    .unwrap_or_else(|| Arc::new(lash_core::InMemoryTriggerStore::default())),
1227            )
1228            .with_residency(residency)
1229            .with_turn_phase_probe_slot(phase_probe_slot),
1230        );
1231        Ok(ProcessWorkRunnerSetup::LazyDefault { config })
1232    }
1233
1234    pub fn advanced(self) -> AdvancedLashCoreBuilder {
1235        AdvancedLashCoreBuilder { builder: self }
1236    }
1237
1238    pub fn process_registry(mut self, process_registry: Arc<dyn ProcessRegistry>) -> Self {
1239        self.process_work_source = ProcessWorkSource::Inline {
1240            registry: process_registry,
1241        };
1242        self
1243    }
1244
1245    pub fn trigger_store(mut self, store: Arc<dyn lash_core::TriggerStore>) -> Self {
1246        self.trigger_store = Some(store);
1247        self
1248    }
1249
1250    /// Configure an externally owned process work runner.
1251    ///
1252    /// Durable hosts construct a [`ProcessWorkDriver`] from the same process
1253    /// registry and wake handle used by their deployment runner, then pass it
1254    /// here. The driver registry becomes the core's process registry and no
1255    /// inline runner is spawned.
1256    pub fn process_work_driver(mut self, driver: ProcessWorkDriver) -> Self {
1257        self.process_work_source = ProcessWorkSource::External(driver);
1258        self
1259    }
1260
1261    /// Wire the wake handle of an externally owned queued-work runner. The
1262    /// runtime pokes it whenever new queued work lands for a session.
1263    pub fn queued_work_poke(mut self, poke: QueuedWorkPoke) -> Self {
1264        self.queued_work_poke = Some(poke);
1265        self
1266    }
1267}
1268
1269pub(crate) fn build_plugin_host(
1270    protocol_factory: Option<&Arc<dyn PluginFactory>>,
1271    common_factories: &[Arc<dyn PluginFactory>],
1272    extra_factories: Vec<Arc<dyn PluginFactory>>,
1273) -> Result<PluginHost> {
1274    let mut factories = Vec::with_capacity(
1275        usize::from(protocol_factory.is_some()) + common_factories.len() + extra_factories.len(),
1276    );
1277    if let Some(protocol_factory) = protocol_factory {
1278        factories.push(Arc::clone(protocol_factory));
1279    }
1280    factories.extend(common_factories.iter().cloned());
1281    factories.extend(extra_factories);
1282    Ok(PluginHost::new(factories))
1283}
1284
1285impl PromptLayerSink for LashCoreBuilder {
1286    fn prompt_layer_mut(&mut self) -> &mut PromptLayer {
1287        self.prompt.get_or_insert_with(PromptLayer::new)
1288    }
1289}
1290
1291pub struct AdvancedLashCoreBuilder {
1292    builder: LashCoreBuilder,
1293}
1294
1295impl AdvancedLashCoreBuilder {
1296    pub fn runtime_host_config(mut self, core: lash_core::RuntimeHostConfig) -> Self {
1297        self.builder.runtime_host_config = Some(core);
1298        self
1299    }
1300
1301    pub fn plugin_host(mut self, plugin_host: PluginHost) -> Self {
1302        self.builder.plugin_host = Some(plugin_host);
1303        self
1304    }
1305
1306    pub fn build(self) -> Result<LashCore> {
1307        self.builder.build()
1308    }
1309}