1use crate::support::*;
2use lash_core::runtime::{
3 ProcessCommand, ProcessEffectOutcome, RuntimeEffectCommand, RuntimeEffectEnvelope,
4 RuntimeEffectKind, RuntimeEffectLocalExecutor, RuntimeEffectOutcome, RuntimeInvocation,
5 RuntimeScope,
6};
7
8type RuntimeHostInstaller =
9 Arc<dyn Fn(RuntimeHostConfig, &PluginHost) -> Result<RuntimeHostConfig> + Send + Sync>;
10
11#[derive(Clone)]
12pub struct LashCore {
13 pub(crate) env: RuntimeEnvironment,
14 pub(crate) policy: SessionPolicy,
15 pub(crate) protocol_factory: Option<Arc<dyn PluginFactory>>,
16 pub(crate) store_factory: Option<Arc<dyn SessionStoreFactory>>,
17 pub(crate) plugin_factories: Arc<Vec<Arc<dyn PluginFactory>>>,
18 pub(crate) provider: Option<ProviderHandle>,
19 pub(crate) live_replay_store: Arc<dyn LiveReplayStore>,
20 pub(crate) runtime_host_installer: Option<RuntimeHostInstaller>,
21 pub(crate) process_work_runner: Arc<ProcessWorkRunnerSlot>,
26}
27
28pub(crate) enum ProcessWorkRunnerSetup {
31 None,
33 LazyDefault {
40 config: Box<DurableProcessWorkerConfig>,
41 },
42 External { driver: ProcessWorkDriver },
45}
46
47#[derive(Clone, Default)]
48pub(crate) enum ProcessWorkSource {
49 #[default]
50 None,
51 Inline {
52 registry: Arc<dyn ProcessRegistry>,
53 },
54 External(ProcessWorkDriver),
55}
56
57impl ProcessWorkSource {
58 fn process_registry(&self) -> Option<Arc<dyn ProcessRegistry>> {
59 match self {
60 Self::None => None,
61 Self::Inline { registry } => Some(Arc::clone(registry)),
62 Self::External(driver) => Some(driver.process_registry()),
63 }
64 }
65
66 #[cfg(feature = "rlm")]
67 fn has_registry(&self) -> bool {
68 !matches!(self, Self::None)
69 }
70}
71
72pub(crate) struct ProcessWorkRunnerSlot {
79 setup: ProcessWorkRunnerSetup,
80 poke: tokio::sync::OnceCell<Option<ProcessWorkPoke>>,
81 phase_probe_slot: Option<lash_core::runtime::RuntimeTurnPhaseProbeSlot>,
82}
83
84impl ProcessWorkRunnerSlot {
85 fn new(setup: ProcessWorkRunnerSetup) -> Self {
86 let phase_probe_slot = match &setup {
87 ProcessWorkRunnerSetup::LazyDefault { config } => {
88 Some(config.turn_phase_probe_slot.clone())
89 }
90 ProcessWorkRunnerSetup::None | ProcessWorkRunnerSetup::External { .. } => None,
91 };
92 Self {
93 setup,
94 poke: tokio::sync::OnceCell::new(),
95 phase_probe_slot,
96 }
97 }
98
99 pub(crate) async fn poke(&self) -> Option<ProcessWorkPoke> {
102 self.poke
103 .get_or_init(|| async {
104 match &self.setup {
105 ProcessWorkRunnerSetup::None => None,
106 ProcessWorkRunnerSetup::External { driver } => Some(driver.poke_handle()),
107 ProcessWorkRunnerSetup::LazyDefault { config } => {
108 let worker = DurableProcessWorker::new((**config).clone());
109 Some(ProcessWorkRunner::inline(worker).spawn())
110 }
111 }
112 })
113 .await
114 .clone()
115 }
116
117 pub(crate) fn phase_probe_slot(&self) -> Option<lash_core::runtime::RuntimeTurnPhaseProbeSlot> {
118 self.phase_probe_slot.clone()
119 }
120}
121
122#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
123pub struct SessionDeleteReport {
124 pub session_id: String,
125 pub process: Option<lash_core::ProcessSessionDeleteReport>,
126}
127
128impl LashCore {
129 pub fn builder() -> LashCoreBuilder {
130 LashCoreBuilder::default()
131 }
132
133 pub fn session(&self, session_id: impl Into<String>) -> SessionBuilder {
134 SessionBuilder {
135 core: self.clone(),
136 session_id: session_id.into(),
137 spec: SessionSpec::inherit(),
138 parent_session_id: None,
139 store: None,
140 provider: None,
141 active_plugins: Vec::new(),
142 plugin_factories: Vec::new(),
143 }
144 }
145
146 pub fn triggers(&self) -> crate::admin::CoreTriggerAdmin {
147 crate::admin::CoreTriggerAdmin { core: self.clone() }
148 }
149
150 pub fn processes(&self) -> crate::admin::Processes {
151 crate::admin::Processes { core: self.clone() }
152 }
153
154 pub fn completions(&self) -> crate::admin::Completions {
155 crate::admin::Completions { core: self.clone() }
156 }
157
158 pub fn effect_host(&self) -> Arc<dyn EffectHost> {
159 Arc::clone(&self.env.core.control.effect_host)
160 }
161
162 pub async fn delete_session(
163 &self,
164 session_id: impl AsRef<str>,
165 scoped_effect_controller: ScopedEffectController<'_>,
166 ) -> Result<SessionDeleteReport> {
167 let session_id = session_id.as_ref().to_string();
168 let Some(store_factory) = self.store_factory.as_ref() else {
169 return Err(EmbedError::MissingSessionStoreFactory);
170 };
171 let process = if let Some(process_registry) = self.env.process_registry.as_ref() {
172 let invocation = RuntimeInvocation::effect(
173 RuntimeScope::new(session_id.clone()),
174 format!("process:delete-session:{session_id}"),
175 RuntimeEffectKind::Process,
176 format!("{session_id}:delete-session"),
177 );
178 let outcome = scoped_effect_controller
179 .controller()
180 .execute_effect(
181 RuntimeEffectEnvelope::new(
182 invocation,
183 RuntimeEffectCommand::process(ProcessCommand::DeleteSession {
184 session_id: session_id.clone(),
185 }),
186 ),
187 RuntimeEffectLocalExecutor::processes(Arc::clone(process_registry)),
188 )
189 .await
190 .map_err(|err| EmbedError::SessionDeleteProcess {
191 session_id: session_id.clone(),
192 message: err.to_string(),
193 })?;
194 match outcome {
195 RuntimeEffectOutcome::Process {
196 result: ProcessEffectOutcome::DeleteSession { report },
197 } => Some(report),
198 other => {
199 return Err(EmbedError::SessionDeleteProcess {
200 session_id,
201 message: format!(
202 "process delete returned the wrong outcome: {}",
203 other.kind().as_str()
204 ),
205 });
206 }
207 }
208 } else {
209 None
210 };
211 if let Some(trigger_store) = self.env.trigger_store.as_ref() {
212 trigger_store
213 .delete_session_subscriptions(&session_id)
214 .await
215 .map_err(|err| EmbedError::SessionDeleteProcess {
216 session_id: session_id.clone(),
217 message: err.to_string(),
218 })?;
219 }
220 self.env
221 .core
222 .control
223 .effect_host
224 .revoke_await_events_for_session(&session_id)
225 .await
226 .map_err(|err| EmbedError::SessionDeleteProcess {
227 session_id: session_id.clone(),
228 message: err.to_string(),
229 })?;
230 store_factory
231 .delete_session(&session_id)
232 .await
233 .map_err(|message| EmbedError::StoreFactory {
234 session_id: session_id.clone(),
235 message,
236 })?;
237 Ok(SessionDeleteReport {
238 session_id,
239 process,
240 })
241 }
242
243 pub fn process_registry(&self) -> Option<Arc<dyn ProcessRegistry>> {
244 self.env.process_registry.as_ref().cloned()
245 }
246
247 pub fn durable_process_worker_config(&self) -> Result<DurableProcessWorkerConfig> {
248 self.durable_process_worker_config_with_plugins(std::iter::empty::<Arc<dyn PluginFactory>>())
249 }
250
251 pub fn durable_process_worker_config_with_plugins(
252 &self,
253 extra_plugin_factories: impl IntoIterator<Item = Arc<dyn PluginFactory>>,
254 ) -> Result<DurableProcessWorkerConfig> {
255 let Some(process_registry) = self.process_registry() else {
256 return Err(EmbedError::MissingProcessRegistry);
257 };
258 let Some(store_factory) = self.store_factory.as_ref() else {
259 return Err(EmbedError::MissingProcessWorkerStoreFactory);
260 };
261 let plugin_host = build_plugin_host(
262 self.protocol_factory.as_ref(),
263 self.plugin_factories.as_ref(),
264 extra_plugin_factories.into_iter().collect(),
265 )?;
266 let runtime_host =
267 self.runtime_host_for_plugin_host(self.env.core.clone(), &plugin_host)?;
268 let mut config = DurableProcessWorkerConfig::new(
269 Arc::new(plugin_host),
270 runtime_host,
271 Arc::clone(store_factory),
272 process_registry,
273 )
274 .with_session_policy(self.policy.clone())
275 .with_residency(self.env.residency);
276 if let Some(trigger_store) = self.env.trigger_store.as_ref() {
277 config = config.with_trigger_store(Arc::clone(trigger_store));
278 }
279 Ok(config)
280 }
281
282 pub(crate) fn runtime_host_for_plugin_host(
283 &self,
284 runtime_host: RuntimeHostConfig,
285 plugin_host: &PluginHost,
286 ) -> Result<RuntimeHostConfig> {
287 match &self.runtime_host_installer {
288 Some(install) => install(runtime_host, plugin_host),
289 None => Ok(runtime_host),
290 }
291 }
292}
293
294fn default_runtime_stack() -> PluginStack {
295 lash_plugin_tool_output_budget::tool_output_budget_stack()
296}
297
298#[derive(Clone)]
299pub struct StandardCore {
300 core: LashCore,
301}
302
303impl StandardCore {
304 pub fn builder() -> StandardCoreBuilder {
305 StandardCoreBuilder {
306 inner: LashCore::builder()
307 .protocol_plugin(Arc::new(
308 lash_protocol_standard::StandardProtocolPluginFactory::new(),
309 ))
310 .plugins(default_runtime_stack()),
311 }
312 }
313
314 pub fn session(&self, session_id: impl Into<String>) -> SessionBuilder {
315 self.core.session(session_id)
316 }
317
318 pub fn into_inner(self) -> LashCore {
319 self.core
320 }
321}
322
323impl std::ops::Deref for StandardCore {
324 type Target = LashCore;
325
326 fn deref(&self) -> &Self::Target {
327 &self.core
328 }
329}
330
331pub struct StandardCoreBuilder {
332 inner: LashCoreBuilder,
333}
334
335impl StandardCoreBuilder {
336 pub fn build(self) -> Result<StandardCore> {
337 self.inner.build().map(|core| StandardCore { core })
338 }
339}
340
341impl PromptLayerSink for StandardCoreBuilder {
342 fn prompt_layer_mut(&mut self) -> &mut PromptLayer {
343 self.inner.prompt_layer_mut()
344 }
345}
346
347#[cfg(feature = "rlm")]
348#[derive(Clone)]
349pub struct RlmCore {
350 core: LashCore,
351 surface_config: lash_protocol_rlm::RlmProtocolPluginConfig,
352 process_lifecycle_available: bool,
353 lashlang_artifact_store: Arc<dyn lash_lashlang_runtime::LashlangArtifactStore>,
354}
355
356#[cfg(feature = "rlm")]
357impl RlmCore {
358 pub fn builder() -> RlmCoreBuilder {
359 RlmCoreBuilder {
360 inner: LashCore::builder().plugins(default_runtime_stack()),
361 config: lash_protocol_rlm::RlmProtocolPluginConfig::default(),
362 projection_resolver: Arc::new(lash_protocol_rlm::ProjectionRegistry::default()),
363 lashlang_artifact_store: None,
364 lashlang_execution_sink: None,
365 }
366 }
367
368 pub fn session(&self, session_id: impl Into<String>) -> RlmSessionBuilder {
369 RlmSessionBuilder {
370 builder: self.core.session(session_id),
371 rlm_final_answer_format: None,
372 }
373 }
374
375 pub fn into_inner(self) -> LashCore {
376 self.core
377 }
378
379 pub fn lashlang_compile_surface(
380 &self,
381 request: crate::rlm::LashlangCompileSurfaceRequest,
382 ) -> Result<crate::rlm::LashlangCompileSurface> {
383 let plugin_host = build_plugin_host(
384 self.core.protocol_factory.as_ref(),
385 self.core.plugin_factories.as_ref(),
386 request.extra_plugin_factories,
387 )?;
388 let plugins = plugin_host
389 .build_session_with_parent(
390 &request.session_id,
391 None,
392 None,
393 lash_core::plugin::SessionAuthorityContext {
394 plugin_options: request.execution_env_spec.plugin_options,
395 ..Default::default()
396 },
397 )
398 .map_err(lash_core::PluginError::from)?;
399 let tool_catalog = plugins.resolved_tool_catalog(&request.session_id)?;
400 let surface = crate::rlm::rlm_lashlang_surface(
401 &self.surface_config,
402 self.process_lifecycle_available,
403 )
404 .with_plugin_extensions(plugin_host.extensions())
405 .map_err(lash_core::PluginError::Registration)?;
406 let host_environment = surface.host_environment(&tool_catalog);
407 Ok(crate::rlm::LashlangCompileSurface {
408 host_environment,
409 tool_catalog,
410 surface,
411 })
412 }
413
414 pub async fn compile_lashlang_module(
415 &self,
416 request: crate::rlm::LashlangModuleCompileRequest,
417 ) -> std::result::Result<crate::rlm::ModuleCompileOutput, crate::rlm::LashlangModuleCompileError>
418 {
419 let surface = self
420 .lashlang_compile_surface(crate::rlm::LashlangCompileSurfaceRequest {
421 session_id: request.session_id,
422 execution_env_spec: request.execution_env_spec,
423 extra_plugin_factories: request.extra_plugin_factories,
424 })
425 .map_err(|err| {
426 lashlang::ModuleCompileError::Link(lashlang::ModuleCompileDiagnostic {
427 stage: lashlang::ModuleCompileStage::Link,
428 message: err.to_string(),
429 offset: None,
430 span: None,
431 line: None,
432 column: None,
433 diagnostic: Some(err.to_string()),
434 })
435 })?;
436 lashlang::compile_module(lashlang::ModuleCompileRequest {
437 source: &request.source,
438 environment: &surface.host_environment,
439 artifact_store: Some(self.lashlang_artifact_store.as_ref()),
440 })
441 .await
442 }
443}
444
445#[cfg(feature = "rlm")]
446impl std::ops::Deref for RlmCore {
447 type Target = LashCore;
448
449 fn deref(&self) -> &Self::Target {
450 &self.core
451 }
452}
453
454#[cfg(feature = "rlm")]
455pub struct RlmCoreBuilder {
456 inner: LashCoreBuilder,
457 config: lash_protocol_rlm::RlmProtocolPluginConfig,
458 projection_resolver: Arc<dyn lash_protocol_rlm::ProjectionResolver>,
459 lashlang_artifact_store: Option<Arc<dyn lash_lashlang_runtime::LashlangArtifactStore>>,
460 lashlang_execution_sink: Option<Arc<dyn lash_trace::TraceSink>>,
461}
462
463#[cfg(feature = "rlm")]
464impl RlmCoreBuilder {
465 pub fn rlm_protocol_config(
466 mut self,
467 config: lash_protocol_rlm::RlmProtocolPluginConfig,
468 ) -> Self {
469 self.config = config;
470 self
471 }
472
473 pub fn projection_resolver(
474 mut self,
475 projection_resolver: Arc<dyn lash_protocol_rlm::ProjectionResolver>,
476 ) -> Self {
477 self.projection_resolver = projection_resolver;
478 self
479 }
480
481 pub fn lashlang_artifact_store(
482 mut self,
483 artifact_store: Arc<dyn lash_lashlang_runtime::LashlangArtifactStore>,
484 ) -> Self {
485 self.lashlang_artifact_store = Some(artifact_store);
486 self
487 }
488
489 pub fn lashlang_execution_sink(
490 mut self,
491 lashlang_execution_sink: Arc<dyn lash_trace::TraceSink>,
492 ) -> Self {
493 self.lashlang_execution_sink = Some(lashlang_execution_sink);
494 self
495 }
496
497 pub fn lashlang_execution_jsonl_path(mut self, path: impl Into<std::path::PathBuf>) -> Self {
498 self.lashlang_execution_sink = Some(Arc::new(lash_trace::JsonlTraceSink::new(path.into())));
499 self
500 }
501
502 pub fn build(mut self) -> Result<RlmCore> {
503 let artifact_store = self
504 .lashlang_artifact_store
505 .clone()
506 .ok_or(EmbedError::MissingLashlangArtifactStore)?;
507 if self.inner.effective_session_store_tier() == Some(DurabilityTier::Durable)
508 && artifact_store.durability_tier()
509 == lash_lashlang_runtime::LashlangDurabilityTier::Inline
510 {
511 return Err(EmbedError::DurableStorePeerRequired {
512 facet: "artifact store",
513 });
514 }
515 let process_lifecycle_available = self.inner.process_work_source.has_registry();
516 let config = crate::rlm::rlm_protocol_config(self.config, process_lifecycle_available);
517 let trace_context = self.inner.resolved_trace_context();
518 let protocol_factory = Arc::new(
519 lash_protocol_rlm::RlmProtocolPluginFactory::new(config.clone())
520 .with_projection_resolver(Arc::clone(&self.projection_resolver))
521 .with_lashlang_artifact_store(Arc::clone(&artifact_store))
522 .with_lashlang_execution_trace(
523 self.lashlang_execution_sink.clone(),
524 trace_context.clone(),
525 ),
526 );
527 let engine_artifact_store = Arc::clone(&artifact_store);
528 let engine_config = config.clone();
529 let engine_sink = self.lashlang_execution_sink.clone();
530 self.inner.protocol_factory = Some(protocol_factory);
531 self.inner.runtime_host_installer = Some(Arc::new(move |runtime_host, plugin_host| {
532 let surface =
533 crate::rlm::rlm_lashlang_surface(&engine_config, process_lifecycle_available)
534 .with_plugin_extensions(plugin_host.extensions())
535 .map_err(lash_core::PluginError::Registration)?;
536 let engine = lash_lashlang_runtime::LashlangProcessEngine::new(
537 Arc::clone(&engine_artifact_store),
538 surface,
539 )
540 .with_execution_trace(
541 engine_sink.clone(),
542 runtime_host.tracing.trace_context.clone(),
543 );
544 Ok(runtime_host.with_process_engine(Arc::new(engine)))
545 }));
546 self.inner.build().map(|core| RlmCore {
547 core,
548 surface_config: config,
549 process_lifecycle_available,
550 lashlang_artifact_store: artifact_store,
551 })
552 }
553}
554
555#[cfg(feature = "rlm")]
556impl PromptLayerSink for RlmCoreBuilder {
557 fn prompt_layer_mut(&mut self) -> &mut PromptLayer {
558 self.inner.prompt_layer_mut()
559 }
560}
561
562macro_rules! forward_core_builder_methods {
563 ($builder:ident) => {
564 impl $builder {
565 pub fn provider(mut self, provider: ProviderHandle) -> Self {
566 self.inner = self.inner.provider(provider);
567 self
568 }
569
570 pub fn model(mut self, model: lash_core::ModelSpec) -> Self {
571 self.inner = self.inner.model(model);
572 self
573 }
574
575 pub fn max_turns(mut self, max_turns: usize) -> Self {
576 self.inner = self.inner.max_turns(max_turns);
577 self
578 }
579
580 pub fn session_spec(mut self, spec: SessionSpec) -> Self {
581 self.inner = self.inner.session_spec(spec);
582 self
583 }
584
585 pub fn store_factory(mut self, store_factory: Arc<dyn SessionStoreFactory>) -> Self {
586 self.inner = self.inner.store_factory(store_factory);
587 self
588 }
589
590 pub fn child_store_factory(
591 mut self,
592 store_factory: Arc<dyn SessionStoreFactory>,
593 ) -> Self {
594 self.inner = self.inner.child_store_factory(store_factory);
595 self
596 }
597
598 pub fn attachment_store(mut self, attachment_store: Arc<dyn AttachmentStore>) -> Self {
599 self.inner = self.inner.attachment_store(attachment_store);
600 self
601 }
602
603 pub fn process_env_store(
604 mut self,
605 process_env_store: Arc<dyn ProcessExecutionEnvStore>,
606 ) -> Self {
607 self.inner = self.inner.process_env_store(process_env_store);
608 self
609 }
610
611 pub fn effect_host(mut self, effect_host: Arc<dyn EffectHost>) -> Self {
612 self.inner = self.inner.effect_host(effect_host);
613 self
614 }
615
616 pub fn tools(mut self, tools: Arc<dyn ToolProvider>) -> Self {
617 self.inner = self.inner.tools(tools);
618 self
619 }
620
621 pub fn plugin(mut self, plugin: Arc<dyn PluginFactory>) -> Self {
622 self.inner = self.inner.plugin(plugin);
623 self
624 }
625
626 pub fn plugins(mut self, stack: PluginStack) -> Self {
627 self.inner = self.inner.plugins(stack);
628 self
629 }
630
631 pub fn configure_plugins(mut self, configure: impl FnOnce(&mut PluginStack)) -> Self {
632 self.inner = self.inner.configure_plugins(configure);
633 self
634 }
635
636 pub fn trace_sink(mut self, trace_sink: Arc<dyn lash_trace::TraceSink>) -> Self {
637 self.inner = self.inner.trace_sink(trace_sink);
638 self
639 }
640
641 pub fn trace_jsonl_path(mut self, path: impl Into<std::path::PathBuf>) -> Self {
642 self.inner = self.inner.trace_jsonl_path(path);
643 self
644 }
645
646 pub fn trace_level(mut self, trace_level: lash_trace::TraceLevel) -> Self {
647 self.inner = self.inner.trace_level(trace_level);
648 self
649 }
650
651 pub fn trace_context(mut self, trace_context: lash_trace::TraceContext) -> Self {
652 self.inner = self.inner.trace_context(trace_context);
653 self
654 }
655
656 pub fn termination(mut self, termination: TerminationPolicy) -> Self {
657 self.inner = self.inner.termination(termination);
658 self
659 }
660
661 pub fn residency(mut self, residency: Residency) -> Self {
662 self.inner = self.inner.residency(residency);
663 self
664 }
665
666 pub fn live_replay_store(
667 mut self,
668 live_replay_store: Arc<dyn LiveReplayStore>,
669 ) -> Self {
670 self.inner = self.inner.live_replay_store(live_replay_store);
671 self
672 }
673
674 pub fn process_registry(mut self, process_registry: Arc<dyn ProcessRegistry>) -> Self {
675 self.inner = self.inner.process_registry(process_registry);
676 self
677 }
678
679 pub fn trigger_store(mut self, store: Arc<dyn lash_core::TriggerStore>) -> Self {
680 self.inner = self.inner.trigger_store(store);
681 self
682 }
683
684 pub fn process_work_driver(mut self, driver: ProcessWorkDriver) -> Self {
685 self.inner = self.inner.process_work_driver(driver);
686 self
687 }
688
689 pub fn queued_work_poke(mut self, poke: QueuedWorkPoke) -> Self {
690 self.inner = self.inner.queued_work_poke(poke);
691 self
692 }
693
694 pub fn runtime_host_config(mut self, core: RuntimeHostConfig) -> Self {
695 self.inner.runtime_host_config = Some(core);
696 self
697 }
698 }
699 };
700}
701
702forward_core_builder_methods!(StandardCoreBuilder);
703#[cfg(feature = "rlm")]
704forward_core_builder_methods!(RlmCoreBuilder);
705
706#[derive(Default)]
707pub struct LashCoreBuilder {
708 pub(crate) protocol_factory: Option<Arc<dyn PluginFactory>>,
709 session_spec: SessionSpec,
710 provider: Option<ProviderHandle>,
711 pub(crate) store_factory: Option<Arc<dyn SessionStoreFactory>>,
712 child_store_factory: Option<Arc<dyn SessionStoreFactory>>,
713 effect_host: Option<Arc<dyn EffectHost>>,
717 attachment_store: Option<Arc<dyn AttachmentStore>>,
718 process_env_store: Option<Arc<dyn ProcessExecutionEnvStore>>,
719 trigger_store: Option<Arc<dyn lash_core::TriggerStore>>,
720 prompt: Option<PromptLayer>,
722 trace_sink: Option<Arc<dyn lash_trace::TraceSink>>,
723 trace_level: Option<lash_trace::TraceLevel>,
724 trace_context: Option<lash_trace::TraceContext>,
725 termination: Option<TerminationPolicy>,
726 runtime_host_config: Option<RuntimeHostConfig>,
728 tool_providers: Vec<Arc<dyn ToolProvider>>,
729 plugin_stack: PluginStack,
730 plugin_host: Option<PluginHost>,
731 residency: Option<Residency>,
732 process_work_source: ProcessWorkSource,
735 queued_work_poke: Option<QueuedWorkPoke>,
736 live_replay_store: Option<Arc<dyn LiveReplayStore>>,
737 runtime_host_installer: Option<RuntimeHostInstaller>,
738}
739
740impl LashCoreBuilder {
741 pub fn protocol_plugin(mut self, plugin: Arc<dyn PluginFactory>) -> Self {
742 self.protocol_factory = Some(plugin);
743 self
744 }
745
746 pub fn provider(mut self, provider: ProviderHandle) -> Self {
747 self.session_spec = self.session_spec.provider_id(provider.kind());
748 self.provider = Some(provider);
749 self
750 }
751
752 pub fn model(mut self, model: lash_core::ModelSpec) -> Self {
753 self.session_spec = self.session_spec.model(model);
754 self
755 }
756
757 pub fn max_turns(mut self, max_turns: usize) -> Self {
758 self.session_spec = self.session_spec.max_turns(max_turns);
759 self
760 }
761
762 pub fn session_spec(mut self, spec: SessionSpec) -> Self {
763 self.session_spec = spec;
764 self
765 }
766
767 pub fn store_factory(mut self, store_factory: Arc<dyn SessionStoreFactory>) -> Self {
775 self.store_factory = Some(store_factory);
776 self
777 }
778
779 pub fn child_store_factory(mut self, store_factory: Arc<dyn SessionStoreFactory>) -> Self {
787 self.child_store_factory = Some(store_factory);
788 self
789 }
790
791 pub fn attachment_store(mut self, attachment_store: Arc<dyn AttachmentStore>) -> Self {
792 self.attachment_store = Some(attachment_store);
793 self
794 }
795
796 pub fn process_env_store(
797 mut self,
798 process_env_store: Arc<dyn ProcessExecutionEnvStore>,
799 ) -> Self {
800 self.process_env_store = Some(process_env_store);
801 self
802 }
803
804 pub fn effect_host(mut self, effect_host: Arc<dyn EffectHost>) -> Self {
809 self.effect_host = Some(effect_host);
810 self
811 }
812
813 pub fn tools(mut self, tools: Arc<dyn ToolProvider>) -> Self {
814 self.tool_providers.push(tools);
815 self
816 }
817
818 pub fn plugin(mut self, plugin: Arc<dyn PluginFactory>) -> Self {
819 self.plugin_stack.push(plugin);
820 self
821 }
822
823 pub fn plugins(mut self, stack: PluginStack) -> Self {
824 self.plugin_stack = stack;
825 self
826 }
827
828 pub fn configure_plugins(mut self, configure: impl FnOnce(&mut PluginStack)) -> Self {
829 configure(&mut self.plugin_stack);
830 self
831 }
832
833 pub fn trace_sink(mut self, trace_sink: Arc<dyn lash_trace::TraceSink>) -> Self {
834 self.trace_sink = Some(trace_sink);
835 self
836 }
837
838 pub fn trace_jsonl_path(mut self, path: impl Into<std::path::PathBuf>) -> Self {
839 self.trace_sink = Some(Arc::new(lash_trace::JsonlTraceSink::new(path.into())));
840 self
841 }
842
843 pub fn trace_level(mut self, trace_level: lash_trace::TraceLevel) -> Self {
844 self.trace_level = Some(trace_level);
845 self
846 }
847
848 pub fn trace_context(mut self, trace_context: lash_trace::TraceContext) -> Self {
849 self.trace_context = Some(trace_context);
850 self
851 }
852
853 pub fn termination(mut self, termination: TerminationPolicy) -> Self {
854 self.termination = Some(termination);
855 self
856 }
857
858 pub fn residency(mut self, residency: Residency) -> Self {
859 self.residency = Some(residency);
860 self
861 }
862
863 pub fn live_replay_store(mut self, live_replay_store: Arc<dyn LiveReplayStore>) -> Self {
867 self.live_replay_store = Some(live_replay_store);
868 self
869 }
870
871 fn resolve_runtime_host_config(&mut self) -> Result<RuntimeHostConfig> {
874 if let Some(base) = self.runtime_host_config.take() {
875 return Ok(self.apply_core_overrides(base));
876 }
877 let effect_host = self
878 .effect_host
879 .take()
880 .ok_or(EmbedError::MissingEffectHost)?;
881 let attachment_store = self
882 .attachment_store
883 .take()
884 .ok_or(EmbedError::MissingAttachmentStore)?;
885 let process_env_store = self
886 .process_env_store
887 .take()
888 .ok_or(EmbedError::MissingProcessEnvStore)?;
889 let core = RuntimeHostConfig::new(effect_host, attachment_store, process_env_store);
890 Ok(self.apply_core_overrides(core))
891 }
892
893 fn apply_core_overrides(&mut self, mut core: RuntimeHostConfig) -> RuntimeHostConfig {
895 if let Some(effect_host) = self.effect_host.take() {
896 core.control.effect_host = effect_host;
897 }
898 if let Some(attachment_store) = self.attachment_store.take() {
899 core.durability.attachment_store = attachment_store;
900 }
901 if let Some(process_env_store) = self.process_env_store.take() {
902 core.durability.process_env_store = process_env_store;
903 }
904 if let Some(prompt) = self.prompt.take() {
905 core.prompt.prompt = prompt;
906 }
907 if let Some(trace_sink) = self.trace_sink.take() {
908 core.tracing.trace_sink = Some(trace_sink);
909 }
910 if let Some(trace_level) = self.trace_level.take() {
911 core.tracing.trace_level = trace_level;
912 }
913 if let Some(trace_context) = self.trace_context.take() {
914 core.tracing.trace_context = trace_context;
915 }
916 if let Some(termination) = self.termination.take() {
917 core.control.termination = termination;
918 }
919 core
920 }
921
922 fn effective_session_store_tier(&self) -> Option<DurabilityTier> {
935 self.child_store_factory
936 .as_ref()
937 .or(self.store_factory.as_ref())
938 .map(|factory| factory.durability_tier())
939 }
940
941 #[cfg(feature = "rlm")]
942 fn resolved_trace_context(&self) -> lash_trace::TraceContext {
943 self.trace_context
944 .clone()
945 .or_else(|| {
946 self.runtime_host_config
947 .as_ref()
948 .map(|core| core.tracing.trace_context.clone())
949 })
950 .unwrap_or_default()
951 }
952
953 fn ensure_store_peer_coherence(&self) -> Result<()> {
954 let session_store_tier = self.effective_session_store_tier();
960 let attachment_tier = self
961 .attachment_store
962 .as_ref()
963 .map(|store| store.persistence().durability_tier())
964 .or_else(|| {
965 self.runtime_host_config.as_ref().map(|core| {
966 core.durability
967 .attachment_store
968 .persistence()
969 .durability_tier()
970 })
971 });
972 let process_env_tier = self
973 .process_env_store
974 .as_ref()
975 .map(|store| store.durability_tier())
976 .or_else(|| {
977 self.runtime_host_config
978 .as_ref()
979 .map(|core| core.durability.process_env_store.durability_tier())
980 });
981 let effect_host_tier = self
982 .effect_host
983 .as_ref()
984 .map(|host| host.durability_tier())
985 .or_else(|| {
986 self.runtime_host_config
987 .as_ref()
988 .map(|core| core.control.effect_host.durability_tier())
989 });
990 let trigger_store_tier = self
991 .trigger_store
992 .as_ref()
993 .map(|store| store.durability_tier());
994
995 if session_store_tier == Some(DurabilityTier::Durable) {
996 if attachment_tier == Some(DurabilityTier::Inline) {
997 return Err(EmbedError::DurableStorePeerRequired {
998 facet: "attachment store",
999 });
1000 }
1001 if process_env_tier == Some(DurabilityTier::Inline) {
1002 return Err(EmbedError::DurableStorePeerRequired {
1003 facet: "process execution environment store",
1004 });
1005 }
1006 }
1007
1008 if let Some(process_registry) = self.process_work_source.process_registry().as_ref()
1009 && process_registry.durability_tier() == DurabilityTier::Durable
1010 {
1011 if session_store_tier != Some(DurabilityTier::Durable) {
1012 return Err(EmbedError::DurableProcessRegistryRequiresStoreFactory);
1013 }
1014 if trigger_store_tier != Some(DurabilityTier::Durable) {
1015 return Err(EmbedError::DurableStorePeerRequired {
1016 facet: "trigger store",
1017 });
1018 }
1019 if process_env_tier != Some(DurabilityTier::Durable) {
1020 return Err(EmbedError::DurableStorePeerRequired {
1021 facet: "process execution environment store",
1022 });
1023 }
1024 }
1025
1026 if trigger_store_tier == Some(DurabilityTier::Durable) {
1027 if session_store_tier != Some(DurabilityTier::Durable) {
1028 return Err(EmbedError::DurableStorePeerRequired {
1029 facet: "session store factory",
1030 });
1031 }
1032 if process_env_tier != Some(DurabilityTier::Durable) {
1033 return Err(EmbedError::DurableStorePeerRequired {
1034 facet: "process execution environment store",
1035 });
1036 }
1037 if let Some(process_registry) = self.process_work_source.process_registry().as_ref()
1038 && process_registry.durability_tier() == DurabilityTier::Inline
1039 {
1040 return Err(EmbedError::DurableStorePeerRequired {
1041 facet: "process registry",
1042 });
1043 }
1044 }
1045
1046 if effect_host_tier == Some(DurabilityTier::Durable) {
1047 if attachment_tier != Some(DurabilityTier::Durable) {
1048 return Err(EmbedError::DurableStorePeerRequired {
1049 facet: "attachment store",
1050 });
1051 }
1052 if process_env_tier != Some(DurabilityTier::Durable) {
1053 return Err(EmbedError::DurableStorePeerRequired {
1054 facet: "process execution environment store",
1055 });
1056 }
1057 }
1058
1059 Ok(())
1060 }
1061
1062 pub fn build(mut self) -> Result<LashCore> {
1063 self.ensure_store_peer_coherence()?;
1064 let protocol_factory = self.protocol_factory.clone();
1065 if protocol_factory.is_none() && self.plugin_host.is_none() {
1066 return Err(EmbedError::MissingProtocolPlugin);
1067 }
1068 let provider_id = self
1069 .session_spec
1070 .provider_id
1071 .clone()
1072 .or_else(|| {
1073 self.provider
1074 .as_ref()
1075 .map(|provider| provider.kind().to_string())
1076 })
1077 .unwrap_or_default();
1078 let model = self
1079 .session_spec
1080 .model
1081 .clone()
1082 .ok_or(EmbedError::MissingModelSpec)?;
1083
1084 let base_policy = SessionPolicy {
1085 provider_id,
1086 model,
1087 max_turns: self.session_spec.max_turns.flatten(),
1088 ..SessionPolicy::default()
1089 };
1090 let policy = self.session_spec.resolve_against(&base_policy);
1091
1092 let mut core = self.resolve_runtime_host_config()?;
1093 if let Some(provider) = self.provider.clone() {
1094 core.providers.provider_resolver =
1095 Arc::new(lash_core::SingleProviderResolver::new(provider));
1096 }
1097 let plugin_factories = if let Some(plugin_host) = self.plugin_host {
1098 plugin_host.factories().to_vec()
1099 } else {
1100 let mut factories = Vec::new();
1101 if !self.tool_providers.is_empty() {
1102 let spec = self
1103 .tool_providers
1104 .into_iter()
1105 .fold(PluginSpec::new(), PluginSpec::with_tool_provider);
1106 factories.push(Arc::new(StaticPluginFactory::new("embed_tools", spec))
1107 as Arc<dyn PluginFactory>);
1108 }
1109 factories.extend(self.plugin_stack.into_factories());
1110 factories
1111 };
1112 let default_plugin_host =
1113 build_plugin_host(protocol_factory.as_ref(), &plugin_factories, Vec::new())?;
1114 if let Some(install) = &self.runtime_host_installer {
1115 core = install(core, &default_plugin_host)?;
1116 }
1117
1118 let process_registry = self.process_work_source.process_registry();
1119
1120 let process_work_runner = Self::resolve_process_work_runner(
1126 &self.process_work_source,
1127 &default_plugin_host,
1128 &core,
1129 self.child_store_factory
1132 .as_ref()
1133 .or(self.store_factory.as_ref()),
1134 &policy,
1135 self.residency.unwrap_or_default(),
1136 self.trigger_store.as_ref(),
1137 )?;
1138
1139 let mut env_builder = RuntimeEnvironment::builder()
1140 .with_plugin_host(Arc::new(default_plugin_host))
1141 .with_runtime_host_config(core);
1142 if let Some(process_registry) = process_registry.as_ref() {
1143 env_builder = env_builder.with_process_registry(Arc::clone(process_registry));
1144 }
1145 if let Some(residency) = self.residency {
1146 env_builder = env_builder.with_residency(residency);
1147 }
1148 if let Some(child_store_factory) = self
1149 .child_store_factory
1150 .as_ref()
1151 .or(self.store_factory.as_ref())
1152 {
1153 env_builder = env_builder.with_session_store_factory(Arc::clone(child_store_factory));
1154 }
1155 if let Some(trigger_store) = self.trigger_store.as_ref() {
1156 env_builder = env_builder.with_trigger_store(Arc::clone(trigger_store));
1157 }
1158 if let Some(queued_work_poke) = self.queued_work_poke.clone() {
1159 env_builder = env_builder.with_queued_work_poke(queued_work_poke);
1160 }
1161
1162 let live_replay_store = self
1163 .live_replay_store
1164 .take()
1165 .unwrap_or_else(|| Arc::new(InMemoryLiveReplayStore::default()));
1166
1167 Ok(LashCore {
1168 env: env_builder.build(),
1169 policy,
1170 store_factory: self.store_factory,
1171 plugin_factories: Arc::new(plugin_factories),
1172 provider: self.provider,
1173 live_replay_store,
1174 protocol_factory,
1175 runtime_host_installer: self.runtime_host_installer,
1176 process_work_runner: Arc::new(ProcessWorkRunnerSlot::new(process_work_runner)),
1177 })
1178 }
1179
1180 fn resolve_process_work_runner(
1188 process_work_source: &ProcessWorkSource,
1189 worker_plugin_host: &PluginHost,
1190 core: &RuntimeHostConfig,
1191 store_factory: Option<&Arc<dyn SessionStoreFactory>>,
1192 policy: &SessionPolicy,
1193 residency: lash_core::Residency,
1194 trigger_store: Option<&Arc<dyn lash_core::TriggerStore>>,
1195 ) -> Result<ProcessWorkRunnerSetup> {
1196 let process_registry = match process_work_source {
1197 ProcessWorkSource::None => return Ok(ProcessWorkRunnerSetup::None),
1198 ProcessWorkSource::External(driver) => {
1199 return Ok(ProcessWorkRunnerSetup::External {
1200 driver: driver.clone(),
1201 });
1202 }
1203 ProcessWorkSource::Inline { registry } => Arc::clone(registry),
1204 };
1205 let Some(store_factory) = store_factory else {
1209 return Err(EmbedError::ProcessRegistryRequiresStoreFactory);
1210 };
1211 let phase_probe_slot = lash_core::runtime::RuntimeTurnPhaseProbeSlot::default();
1215 let config = Box::new(
1216 DurableProcessWorkerConfig::new(
1217 Arc::new(worker_plugin_host.clone()),
1218 core.clone(),
1219 Arc::clone(store_factory),
1220 process_registry,
1221 )
1222 .with_session_policy(policy.clone())
1223 .with_trigger_store(
1224 trigger_store
1225 .cloned()
1226 .unwrap_or_else(|| Arc::new(lash_core::InMemoryTriggerStore::default())),
1227 )
1228 .with_residency(residency)
1229 .with_turn_phase_probe_slot(phase_probe_slot),
1230 );
1231 Ok(ProcessWorkRunnerSetup::LazyDefault { config })
1232 }
1233
1234 pub fn advanced(self) -> AdvancedLashCoreBuilder {
1235 AdvancedLashCoreBuilder { builder: self }
1236 }
1237
1238 pub fn process_registry(mut self, process_registry: Arc<dyn ProcessRegistry>) -> Self {
1239 self.process_work_source = ProcessWorkSource::Inline {
1240 registry: process_registry,
1241 };
1242 self
1243 }
1244
1245 pub fn trigger_store(mut self, store: Arc<dyn lash_core::TriggerStore>) -> Self {
1246 self.trigger_store = Some(store);
1247 self
1248 }
1249
1250 pub fn process_work_driver(mut self, driver: ProcessWorkDriver) -> Self {
1257 self.process_work_source = ProcessWorkSource::External(driver);
1258 self
1259 }
1260
1261 pub fn queued_work_poke(mut self, poke: QueuedWorkPoke) -> Self {
1264 self.queued_work_poke = Some(poke);
1265 self
1266 }
1267}
1268
1269pub(crate) fn build_plugin_host(
1270 protocol_factory: Option<&Arc<dyn PluginFactory>>,
1271 common_factories: &[Arc<dyn PluginFactory>],
1272 extra_factories: Vec<Arc<dyn PluginFactory>>,
1273) -> Result<PluginHost> {
1274 let mut factories = Vec::with_capacity(
1275 usize::from(protocol_factory.is_some()) + common_factories.len() + extra_factories.len(),
1276 );
1277 if let Some(protocol_factory) = protocol_factory {
1278 factories.push(Arc::clone(protocol_factory));
1279 }
1280 factories.extend(common_factories.iter().cloned());
1281 factories.extend(extra_factories);
1282 Ok(PluginHost::new(factories))
1283}
1284
1285impl PromptLayerSink for LashCoreBuilder {
1286 fn prompt_layer_mut(&mut self) -> &mut PromptLayer {
1287 self.prompt.get_or_insert_with(PromptLayer::new)
1288 }
1289}
1290
1291pub struct AdvancedLashCoreBuilder {
1292 builder: LashCoreBuilder,
1293}
1294
1295impl AdvancedLashCoreBuilder {
1296 pub fn runtime_host_config(mut self, core: lash_core::RuntimeHostConfig) -> Self {
1297 self.builder.runtime_host_config = Some(core);
1298 self
1299 }
1300
1301 pub fn plugin_host(mut self, plugin_host: PluginHost) -> Self {
1302 self.builder.plugin_host = Some(plugin_host);
1303 self
1304 }
1305
1306 pub fn build(self) -> Result<LashCore> {
1307 self.builder.build()
1308 }
1309}