Skip to main content

lash_core/runtime/
builder.rs

1use std::sync::Arc;
2
3use crate::plugin::{PluginFactory, PluginHost, PluginSession};
4use crate::{
5    EffectHost, EmbeddedRuntimeHost, LashRuntime, PluginStack, ProcessRegistry, Residency,
6    RuntimeHostConfig, RuntimePersistence, RuntimeSessionState, SessionError, SessionPolicy,
7    SessionStoreFactory, TerminationPolicy,
8};
9
10enum PluginSource {
11    Host(PluginHost),
12    Session(Arc<PluginSession>),
13}
14
15pub(super) fn lashlang_abilities_for_process_registry(
16    mut abilities: lashlang::LashlangAbilities,
17    process_registry_available: bool,
18) -> lashlang::LashlangAbilities {
19    abilities = abilities.with_sleep();
20    if process_registry_available {
21        abilities.with_processes().with_process_signals()
22    } else {
23        abilities.processes = false;
24        abilities.process_signals = false;
25        abilities
26    }
27}
28
29pub struct EmbeddedRuntimeBuilder {
30    session_id: Option<String>,
31    policy: Option<SessionPolicy>,
32    initial_state: Option<RuntimeSessionState>,
33    plugin_source: PluginSource,
34    core: RuntimeHostConfig,
35    session_store_factory: Option<Arc<dyn SessionStoreFactory>>,
36    store: Option<Arc<dyn RuntimePersistence>>,
37    process_registry: Option<Arc<dyn ProcessRegistry>>,
38    residency: Residency,
39}
40
41impl Default for EmbeddedRuntimeBuilder {
42    fn default() -> Self {
43        Self {
44            session_id: None,
45            policy: None,
46            initial_state: None,
47            plugin_source: PluginSource::Host(PluginHost::empty()),
48            // `RuntimeHostConfig` has no `Default`; start from an explicitly
49            // named in-memory core. Callers that need durable stores override
50            // it with `with_runtime_host`.
51            core: RuntimeHostConfig::in_memory(),
52            session_store_factory: None,
53            store: None,
54            process_registry: None,
55            residency: Residency::default(),
56        }
57    }
58}
59
60impl EmbeddedRuntimeBuilder {
61    pub fn new() -> Self {
62        Self::default()
63    }
64
65    pub fn session_id(&self) -> Option<&str> {
66        self.session_id.as_deref()
67    }
68
69    pub fn policy(&self) -> Option<&SessionPolicy> {
70        self.policy.as_ref()
71    }
72
73    pub fn with_session_id(mut self, session_id: impl Into<String>) -> Self {
74        self.session_id = Some(session_id.into());
75        self
76    }
77
78    pub fn with_policy(mut self, policy: SessionPolicy) -> Self {
79        self.policy = Some(policy);
80        self
81    }
82
83    pub fn with_initial_state(mut self, state: RuntimeSessionState) -> Self {
84        self.initial_state = Some(state);
85        self
86    }
87
88    pub fn with_plugin_host(mut self, plugin_host: PluginHost) -> Self {
89        self.plugin_source = PluginSource::Host(plugin_host);
90        self
91    }
92
93    pub fn with_plugin_session(mut self, plugin_session: Arc<PluginSession>) -> Self {
94        self.plugin_source = PluginSource::Session(plugin_session);
95        self
96    }
97
98    pub fn with_plugin_factories(mut self, factories: Vec<Arc<dyn PluginFactory>>) -> Self {
99        let host = PluginHost::new(factories);
100        self.plugin_source = PluginSource::Host(host);
101        self
102    }
103
104    pub fn with_plugin_stack(self, stack: PluginStack) -> Self {
105        self.with_plugin_factories(stack.into_factories())
106    }
107
108    pub fn with_runtime_host(mut self, core: RuntimeHostConfig) -> Self {
109        self.core = core;
110        self
111    }
112
113    pub fn with_attachment_store(
114        mut self,
115        attachment_store: Arc<dyn crate::AttachmentStore>,
116    ) -> Self {
117        self.core.durability.attachment_store = attachment_store;
118        self
119    }
120
121    pub fn with_prompt_template(mut self, prompt_template: crate::PromptTemplate) -> Self {
122        self.core.prompt.prompt.template = Some(prompt_template);
123        self
124    }
125
126    pub fn with_prompt_contribution(mut self, contribution: crate::PromptContribution) -> Self {
127        self.core.prompt.prompt.add_contribution(contribution);
128        self
129    }
130
131    pub fn with_replaced_prompt_slot(
132        mut self,
133        slot: crate::PromptSlot,
134        contributions: impl IntoIterator<Item = crate::PromptContribution>,
135    ) -> Self {
136        self.core.prompt.prompt.replace_slot(slot, contributions);
137        self
138    }
139
140    pub fn with_cleared_prompt_slot(mut self, slot: crate::PromptSlot) -> Self {
141        self.core.prompt.prompt.clear_slot(slot);
142        self
143    }
144
145    pub fn with_prompt_layer(mut self, prompt: crate::PromptLayer) -> Self {
146        self.core.prompt.prompt = prompt;
147        self
148    }
149
150    pub fn with_trace_sink(mut self, sink: Option<Arc<dyn lash_trace::TraceSink>>) -> Self {
151        self.core.tracing.trace_sink = sink;
152        self
153    }
154
155    pub fn with_lashlang_execution_sink(
156        mut self,
157        sink: Option<Arc<dyn lash_trace::TraceSink>>,
158    ) -> Self {
159        self.core.tracing.lashlang_execution_sink = sink;
160        self
161    }
162
163    pub fn with_lashlang_execution_jsonl_path(mut self, path: Option<std::path::PathBuf>) -> Self {
164        self.core.tracing.lashlang_execution_sink = path.map(|path| {
165            Arc::new(lash_trace::JsonlTraceSink::new(path)) as Arc<dyn lash_trace::TraceSink>
166        });
167        self
168    }
169
170    pub fn with_trace_level(mut self, level: lash_trace::TraceLevel) -> Self {
171        self.core.tracing.trace_level = level;
172        self
173    }
174
175    pub fn with_trace_context(mut self, context: lash_trace::TraceContext) -> Self {
176        self.core.tracing.trace_context = context;
177        self
178    }
179
180    pub fn with_termination(mut self, termination: TerminationPolicy) -> Self {
181        self.core.control.termination = termination;
182        self
183    }
184
185    pub fn with_effect_host(mut self, effect_host: Arc<dyn EffectHost>) -> Self {
186        self.core.control.effect_host = effect_host;
187        self
188    }
189
190    pub fn with_provider_resolver(
191        mut self,
192        provider_resolver: Arc<dyn crate::RuntimeProviderResolver>,
193    ) -> Self {
194        self.core.providers.provider_resolver = provider_resolver;
195        self
196    }
197
198    pub fn with_session_store_factory(
199        mut self,
200        session_store_factory: Arc<dyn SessionStoreFactory>,
201    ) -> Self {
202        self.session_store_factory = Some(session_store_factory);
203        self
204    }
205
206    pub fn with_store(mut self, store: Arc<dyn RuntimePersistence>) -> Self {
207        self.store = Some(store);
208        self
209    }
210
211    pub fn with_process_registry(mut self, process_registry: Arc<dyn ProcessRegistry>) -> Self {
212        self.process_registry = Some(process_registry);
213        if let PluginSource::Host(host) = &mut self.plugin_source {
214            let abilities =
215                lashlang_abilities_for_process_registry(host.lashlang_abilities(), true);
216            *host = host.clone().with_lashlang_abilities(abilities);
217        }
218        self
219    }
220
221    /// Trim a rebuilt session's resident graph to match the host's residency.
222    ///
223    /// Defaults to [`Residency::KeepAll`]. Setting [`Residency::ActivePathOnly`]
224    /// makes a rebuilt runtime (e.g. a durable worker reconstructing a session to
225    /// run a background process) keep only the active path resident, matching the
226    /// live runtime's behavior instead of silently retaining the full graph.
227    pub fn with_residency(mut self, residency: Residency) -> Self {
228        self.residency = residency;
229        self
230    }
231
232    fn resolve_state_from_defaults(&self) -> RuntimeSessionState {
233        let mut state = self.initial_state.clone().unwrap_or_default();
234        if let Some(session_id) = &self.session_id {
235            state.session_id = session_id.clone();
236        }
237        if let Some(policy) = &self.policy {
238            state.policy = policy.clone();
239        }
240        state
241    }
242
243    async fn resolve_state(&self) -> Result<RuntimeSessionState, SessionError> {
244        if let Some(state) = &self.initial_state {
245            return Ok({
246                let mut state = state.clone();
247                if let Some(session_id) = &self.session_id {
248                    state.session_id = session_id.clone();
249                }
250                if let Some(policy) = &self.policy {
251                    let recorded_provider_id = state.policy.recorded_provider_id().to_string();
252                    state.policy.provider_id = recorded_provider_id;
253                    state.policy.session_id = policy.session_id.clone();
254                    if state.policy.model.id.trim().is_empty() {
255                        state.policy.model = policy.model.clone();
256                    }
257                }
258                state
259            });
260        }
261        if let Some(store) = &self.store {
262            if let Some(mut state) = crate::store::load_persisted_session_state(store.as_ref())
263                .await
264                .map_err(|err| SessionError::Protocol(format!("failed to load store: {err}")))?
265            {
266                if let Some(session_id) = &self.session_id
267                    && &state.session_id != session_id
268                {
269                    return Err(SessionError::Protocol(format!(
270                        "store is bound to session `{}` but builder requested `{session_id}`",
271                        state.session_id
272                    )));
273                }
274                if let Some(policy) = &self.policy {
275                    let recorded_provider_id = state.policy.recorded_provider_id().to_string();
276                    state.policy.provider_id = recorded_provider_id;
277                    state.policy.session_id = policy.session_id.clone();
278                    if state.policy.model.id.trim().is_empty() {
279                        state.policy.model = policy.model.clone();
280                    }
281                }
282                return Ok(state);
283            }
284            let mut state = self.resolve_state_from_defaults();
285            if let Some(policy) = &self.policy {
286                state.policy = policy.clone();
287            }
288            return Ok(state);
289        }
290        Ok(self.resolve_state_from_defaults())
291    }
292
293    fn resolve_plugins(
294        &self,
295        state: &RuntimeSessionState,
296    ) -> Result<Arc<PluginSession>, SessionError> {
297        match &self.plugin_source {
298            PluginSource::Session(session) => Ok(Arc::clone(session)),
299            PluginSource::Host(host) => host
300                .clone()
301                .with_lashlang_abilities(lashlang_abilities_for_process_registry(
302                    host.lashlang_abilities(),
303                    self.process_registry.is_some(),
304                ))
305                .isolated_registry()
306                .build_session(state.session_id.clone(), None)
307                .map_err(|err| SessionError::Protocol(err.to_string())),
308        }
309    }
310
311    pub async fn build(self) -> Result<LashRuntime, SessionError> {
312        let state = self.resolve_state().await?;
313        let plugins = self.resolve_plugins(&state)?;
314        let embedded_host = EmbeddedRuntimeHost::new(self.core)
315            .with_session_store_factory_option(self.session_store_factory.clone());
316        // `assemble_runtime` owns the (store, registry) wiring + residency so the
317        // worker rebuild cannot drift from the live open path.
318        LashRuntime::assemble_runtime(
319            state.policy.clone(),
320            embedded_host,
321            plugins,
322            self.store,
323            self.process_registry,
324            state,
325            self.residency,
326        )
327        .await
328    }
329
330    pub async fn build_ephemeral(mut self) -> Result<LashRuntime, SessionError> {
331        self.store = None;
332        self.build().await
333    }
334
335    pub async fn build_persistent(
336        mut self,
337        store: Arc<dyn RuntimePersistence>,
338    ) -> Result<LashRuntime, SessionError> {
339        self.store = Some(store);
340        self.build().await
341    }
342
343    pub async fn build_background_persistent(
344        mut self,
345        store: Arc<dyn RuntimePersistence>,
346        process_registry: Arc<dyn ProcessRegistry>,
347    ) -> Result<LashRuntime, SessionError> {
348        self.store = Some(store);
349        self = self.with_process_registry(process_registry);
350        self.build().await
351    }
352}
353
354impl LashRuntime {
355    pub fn builder() -> EmbeddedRuntimeBuilder {
356        EmbeddedRuntimeBuilder::new()
357    }
358}
359
360trait EmbeddedRuntimeHostExt {
361    fn with_session_store_factory_option(
362        self,
363        session_store_factory: Option<Arc<dyn SessionStoreFactory>>,
364    ) -> Self;
365}
366
367impl EmbeddedRuntimeHostExt for EmbeddedRuntimeHost {
368    fn with_session_store_factory_option(
369        mut self,
370        session_store_factory: Option<Arc<dyn SessionStoreFactory>>,
371    ) -> Self {
372        self.session_store_factory = session_store_factory;
373        self
374    }
375}