Skip to main content

lash_core/runtime/
builder.rs

1use std::sync::Arc;
2
3use crate::plugin::{PluginFactory, PluginHost, PluginSession};
4use crate::{
5    EffectHost, EmbeddedRuntimeHost, LashRuntime, PluginStack, ProcessRegistry, Residency,
6    RuntimeHostConfig, RuntimePersistence, RuntimeSessionState, SessionError, SessionPolicy,
7    SessionStoreFactory, TerminationPolicy,
8};
9
10enum PluginSource {
11    Host(PluginHost),
12    Session(Arc<PluginSession>),
13}
14
15pub struct EmbeddedRuntimeBuilder {
16    session_id: Option<String>,
17    policy: Option<SessionPolicy>,
18    plugin_options: crate::PluginOptions,
19    initial_state: Option<RuntimeSessionState>,
20    plugin_source: PluginSource,
21    core: RuntimeHostConfig,
22    session_store_factory: Option<Arc<dyn SessionStoreFactory>>,
23    trigger_store: Option<Arc<dyn crate::TriggerStore>>,
24    store: Option<Arc<dyn RuntimePersistence>>,
25    process_registry: Option<Arc<dyn ProcessRegistry>>,
26    process_work_driver: Option<crate::ProcessWorkDriver>,
27    queued_work_driver: Option<crate::QueuedWorkDriver>,
28    residency: Residency,
29}
30
31impl Default for EmbeddedRuntimeBuilder {
32    fn default() -> Self {
33        Self {
34            session_id: None,
35            policy: None,
36            plugin_options: crate::PluginOptions::default(),
37            initial_state: None,
38            plugin_source: PluginSource::Host(PluginHost::empty()),
39            // `RuntimeHostConfig` has no `Default`; start from an explicitly
40            // named in-memory core. Callers that need durable stores override
41            // it with `with_runtime_host`.
42            core: RuntimeHostConfig::in_memory(),
43            session_store_factory: None,
44            trigger_store: Some(Arc::new(crate::InMemoryTriggerStore::default())),
45            store: None,
46            process_registry: None,
47            process_work_driver: None,
48            queued_work_driver: None,
49            residency: Residency::default(),
50        }
51    }
52}
53
54impl EmbeddedRuntimeBuilder {
55    pub fn new() -> Self {
56        Self::default()
57    }
58
59    pub fn session_id(&self) -> Option<&str> {
60        self.session_id.as_deref()
61    }
62
63    pub fn policy(&self) -> Option<&SessionPolicy> {
64        self.policy.as_ref()
65    }
66
67    pub fn with_session_id(mut self, session_id: impl Into<String>) -> Self {
68        self.session_id = Some(session_id.into());
69        self
70    }
71
72    pub fn with_policy(mut self, policy: SessionPolicy) -> Self {
73        self.policy = Some(policy);
74        self
75    }
76
77    pub fn with_plugin_options(mut self, plugin_options: crate::PluginOptions) -> Self {
78        self.plugin_options = plugin_options;
79        self
80    }
81
82    pub fn with_initial_state(mut self, state: RuntimeSessionState) -> Self {
83        self.initial_state = Some(state);
84        self
85    }
86
87    pub fn with_plugin_host(mut self, plugin_host: PluginHost) -> Self {
88        self.plugin_source = PluginSource::Host(plugin_host);
89        self
90    }
91
92    pub fn with_plugin_session(mut self, plugin_session: Arc<PluginSession>) -> Self {
93        self.plugin_source = PluginSource::Session(plugin_session);
94        self
95    }
96
97    pub fn with_plugin_factories(mut self, factories: Vec<Arc<dyn PluginFactory>>) -> Self {
98        let host = PluginHost::new(factories);
99        self.plugin_source = PluginSource::Host(host);
100        self
101    }
102
103    pub fn with_plugin_stack(self, stack: PluginStack) -> Self {
104        self.with_plugin_factories(stack.into_factories())
105    }
106
107    pub fn with_runtime_host(mut self, core: RuntimeHostConfig) -> Self {
108        self.core = core;
109        self
110    }
111
112    pub fn with_attachment_store(
113        mut self,
114        attachment_store: Arc<dyn crate::AttachmentStore>,
115    ) -> Self {
116        self.core.durability.attachment_store = attachment_store;
117        self
118    }
119
120    pub fn with_prompt_template(mut self, prompt_template: crate::PromptTemplate) -> Self {
121        self.core.prompt.prompt.template = Some(prompt_template);
122        self
123    }
124
125    pub fn with_prompt_contribution(mut self, contribution: crate::PromptContribution) -> Self {
126        self.core.prompt.prompt.add_contribution(contribution);
127        self
128    }
129
130    pub fn with_replaced_prompt_slot(
131        mut self,
132        slot: crate::PromptSlot,
133        contributions: impl IntoIterator<Item = crate::PromptContribution>,
134    ) -> Self {
135        self.core.prompt.prompt.replace_slot(slot, contributions);
136        self
137    }
138
139    pub fn with_cleared_prompt_slot(mut self, slot: crate::PromptSlot) -> Self {
140        self.core.prompt.prompt.clear_slot(slot);
141        self
142    }
143
144    pub fn with_prompt_layer(mut self, prompt: crate::PromptLayer) -> Self {
145        self.core.prompt.prompt = prompt;
146        self
147    }
148
149    pub fn with_trace_sink(mut self, sink: Option<Arc<dyn lash_trace::TraceSink>>) -> Self {
150        self.core.tracing.trace_sink = sink;
151        self
152    }
153
154    pub fn with_trace_level(mut self, level: lash_trace::TraceLevel) -> Self {
155        self.core.tracing.trace_level = level;
156        self
157    }
158
159    pub fn with_trace_context(mut self, context: lash_trace::TraceContext) -> Self {
160        self.core.tracing.trace_context = context;
161        self
162    }
163
164    pub fn with_termination(mut self, termination: TerminationPolicy) -> Self {
165        self.core.control.termination = termination;
166        self
167    }
168
169    pub fn with_effect_host(mut self, effect_host: Arc<dyn EffectHost>) -> Self {
170        self.core.control.effect_host = effect_host;
171        self
172    }
173
174    pub fn with_provider_resolver(
175        mut self,
176        provider_resolver: Arc<dyn crate::RuntimeProviderResolver>,
177    ) -> Self {
178        self.core.providers.provider_resolver = provider_resolver;
179        self
180    }
181
182    pub fn with_session_store_factory(
183        mut self,
184        session_store_factory: Arc<dyn SessionStoreFactory>,
185    ) -> Self {
186        self.session_store_factory = Some(session_store_factory);
187        self
188    }
189
190    pub fn with_trigger_store(mut self, store: Arc<dyn crate::TriggerStore>) -> Self {
191        self.trigger_store = Some(store);
192        self
193    }
194
195    pub fn with_store(mut self, store: Arc<dyn RuntimePersistence>) -> Self {
196        self.store = Some(store);
197        self
198    }
199
200    pub fn with_process_registry(mut self, process_registry: Arc<dyn ProcessRegistry>) -> Self {
201        self.process_registry = Some(process_registry);
202        self
203    }
204
205    pub fn with_process_work_driver(mut self, driver: crate::ProcessWorkDriver) -> Self {
206        self.process_work_driver = Some(driver);
207        self
208    }
209
210    pub fn with_queued_work_driver(mut self, driver: crate::QueuedWorkDriver) -> Self {
211        self.queued_work_driver = Some(driver);
212        self
213    }
214
215    /// Trim a rebuilt session's resident graph to match the host's residency.
216    ///
217    /// Defaults to [`Residency::KeepAll`]. Setting [`Residency::ActivePathOnly`]
218    /// makes a rebuilt runtime (e.g. a durable worker reconstructing a session to
219    /// run a background process) keep only the active path resident, matching the
220    /// live runtime's behavior instead of silently retaining the full graph.
221    pub fn with_residency(mut self, residency: Residency) -> Self {
222        self.residency = residency;
223        self
224    }
225
226    fn resolve_state_from_defaults(&self) -> RuntimeSessionState {
227        let mut state = self.initial_state.clone().unwrap_or_default();
228        if let Some(session_id) = &self.session_id {
229            state.session_id = session_id.clone();
230        }
231        if let Some(policy) = &self.policy {
232            state.policy = policy.clone();
233        }
234        state
235    }
236
237    async fn resolve_state(&self) -> Result<RuntimeSessionState, SessionError> {
238        if let Some(state) = &self.initial_state {
239            return Ok({
240                let mut state = state.clone();
241                if let Some(session_id) = &self.session_id {
242                    state.session_id = session_id.clone();
243                }
244                if let Some(policy) = &self.policy {
245                    let recorded_provider_id = state.policy.recorded_provider_id().to_string();
246                    state.policy.provider_id = recorded_provider_id;
247                    state.policy.session_id = policy.session_id.clone();
248                    if state.policy.model.id.trim().is_empty() {
249                        state.policy.model = policy.model.clone();
250                    }
251                }
252                state
253            });
254        }
255        if let Some(store) = &self.store {
256            if let Some(mut state) = crate::store::load_persisted_session_state(store.as_ref())
257                .await
258                .map_err(|err| SessionError::Protocol(format!("failed to load store: {err}")))?
259            {
260                if let Some(session_id) = &self.session_id
261                    && &state.session_id != session_id
262                {
263                    return Err(SessionError::Protocol(format!(
264                        "store is bound to session `{}` but builder requested `{session_id}`",
265                        state.session_id
266                    )));
267                }
268                if let Some(policy) = &self.policy {
269                    let recorded_provider_id = state.policy.recorded_provider_id().to_string();
270                    state.policy.provider_id = recorded_provider_id;
271                    state.policy.session_id = policy.session_id.clone();
272                    if state.policy.model.id.trim().is_empty() {
273                        state.policy.model = policy.model.clone();
274                    }
275                }
276                return Ok(state);
277            }
278            let mut state = self.resolve_state_from_defaults();
279            if let Some(policy) = &self.policy {
280                state.policy = policy.clone();
281            }
282            return Ok(state);
283        }
284        Ok(self.resolve_state_from_defaults())
285    }
286
287    fn resolve_plugins(
288        &self,
289        state: &RuntimeSessionState,
290    ) -> Result<Arc<PluginSession>, SessionError> {
291        match &self.plugin_source {
292            PluginSource::Session(session) => Ok(Arc::clone(session)),
293            PluginSource::Host(host) => host
294                .clone()
295                .isolated_registry()
296                .build_session_with_parent(
297                    state.session_id.clone(),
298                    None,
299                    None,
300                    crate::plugin::SessionAuthorityContext {
301                        plugin_options: self.plugin_options.clone(),
302                        ..crate::plugin::SessionAuthorityContext::default()
303                    },
304                )
305                .map_err(|err| SessionError::Protocol(err.to_string())),
306        }
307    }
308
309    pub async fn build(self) -> Result<LashRuntime, SessionError> {
310        let state = self.resolve_state().await?;
311        let plugins = self.resolve_plugins(&state)?;
312        let embedded_host = EmbeddedRuntimeHost::new(self.core)
313            .with_session_store_factory_option(self.session_store_factory.clone())
314            .with_trigger_store_option(self.trigger_store.clone());
315        // `assemble_runtime` owns the (store, registry) wiring + residency so the
316        // worker rebuild cannot drift from the live open path.
317        let mut runtime = LashRuntime::assemble_runtime(
318            state.policy.clone(),
319            embedded_host,
320            plugins,
321            self.store,
322            self.process_registry,
323            state,
324            self.residency,
325        )
326        .await?;
327        runtime.host.process_work_driver = self.process_work_driver;
328        runtime.host.queued_work_driver = self.queued_work_driver;
329        Ok(runtime)
330    }
331
332    pub async fn build_ephemeral(mut self) -> Result<LashRuntime, SessionError> {
333        self.store = None;
334        self.build().await
335    }
336
337    pub async fn build_persistent(
338        mut self,
339        store: Arc<dyn RuntimePersistence>,
340    ) -> Result<LashRuntime, SessionError> {
341        self.store = Some(store);
342        self.build().await
343    }
344
345    pub async fn build_background_persistent(
346        mut self,
347        store: Arc<dyn RuntimePersistence>,
348        process_registry: Arc<dyn ProcessRegistry>,
349    ) -> Result<LashRuntime, SessionError> {
350        self.store = Some(store);
351        self = self.with_process_registry(process_registry);
352        self.build().await
353    }
354}
355
356impl LashRuntime {
357    pub fn builder() -> EmbeddedRuntimeBuilder {
358        EmbeddedRuntimeBuilder::new()
359    }
360}
361
362trait EmbeddedRuntimeHostExt {
363    fn with_session_store_factory_option(
364        self,
365        session_store_factory: Option<Arc<dyn SessionStoreFactory>>,
366    ) -> Self;
367
368    fn with_trigger_store_option(self, trigger_store: Option<Arc<dyn crate::TriggerStore>>)
369    -> Self;
370}
371
372impl EmbeddedRuntimeHostExt for EmbeddedRuntimeHost {
373    fn with_session_store_factory_option(
374        mut self,
375        session_store_factory: Option<Arc<dyn SessionStoreFactory>>,
376    ) -> Self {
377        self.session_store_factory = session_store_factory;
378        self
379    }
380
381    fn with_trigger_store_option(
382        mut self,
383        trigger_store: Option<Arc<dyn crate::TriggerStore>>,
384    ) -> Self {
385        self.trigger_store = trigger_store;
386        self
387    }
388}