1use crate::support::*;
2use lash_core::runtime::{
3 ProcessCommand, ProcessEffectOutcome, RuntimeEffectCommand, RuntimeEffectEnvelope,
4 RuntimeEffectKind, RuntimeEffectLocalExecutor, RuntimeEffectOutcome, RuntimeInvocation,
5 RuntimeScope,
6};
7
8#[derive(Clone)]
9pub struct LashCore {
10 pub(crate) env: RuntimeEnvironment,
11 pub(crate) policy: SessionPolicy,
12 pub(crate) protocol_factory: Option<Arc<dyn PluginFactory>>,
13 pub(crate) store_factory: Option<Arc<dyn SessionStoreFactory>>,
14 pub(crate) plugin_factories: Arc<Vec<Arc<dyn PluginFactory>>>,
15 pub(crate) provider: Option<ProviderHandle>,
16 pub(crate) live_replay_store: Arc<dyn LiveReplayStore>,
17 pub(crate) process_lifecycle_available: bool,
21 pub(crate) work_driver: Arc<InlineWorkDriverSlot>,
24}
25
26pub(crate) enum ProcessWorkDriverSetup {
29 None,
31 LazyDefault {
37 config: Box<DurableProcessWorkerConfig>,
38 },
39 External { driver: ProcessWorkDriver },
41}
42
43#[derive(Clone, Default)]
44pub(crate) enum ProcessWorkSource {
45 #[default]
46 None,
47 Inline {
48 registry: Arc<dyn ProcessRegistry>,
49 hub: Option<lash_core::ProcessChangeHub>,
50 },
51 External(ProcessWorkDriver),
52}
53
54impl ProcessWorkSource {
55 fn process_registry(&self) -> Option<Arc<dyn ProcessRegistry>> {
56 match self {
57 Self::None => None,
58 Self::Inline { registry, .. } => Some(Arc::clone(registry)),
59 Self::External(driver) => Some(driver.process_registry()),
60 }
61 }
62
63 fn has_registry(&self) -> bool {
64 !matches!(self, Self::None)
65 }
66
67 fn watched(self, sink: Option<Arc<dyn lash_core::ProcessEventSink>>) -> Self {
68 match self {
69 Self::Inline {
70 registry,
71 hub: None,
72 } => {
73 let (registry, hub) = lash_core::watch_process_registry_with_sink(registry, sink);
74 Self::Inline {
75 registry,
76 hub: Some(hub),
77 }
78 }
79 other => other,
83 }
84 }
85}
86
87#[derive(Clone, Default)]
88pub(crate) enum QueuedWorkSource {
89 None,
90 #[default]
91 LazyDefault,
92 External(QueuedWorkDriver),
93}
94
95pub(crate) enum QueuedWorkDriverSetup {
96 None,
97 LazyDefault {
98 config: Arc<InlineQueuedWorkRunConfig>,
99 },
100 External {
101 driver: QueuedWorkDriver,
102 },
103}
104
105pub(crate) struct InlineWorkDriverSetup {
106 process: ProcessWorkDriverSetup,
107 queued: QueuedWorkDriverSetup,
108}
109
110#[derive(Clone, Default)]
111pub(crate) struct ResolvedWorkDrivers {
112 pub(crate) process: Option<ProcessWorkDriver>,
113 pub(crate) queued: Option<QueuedWorkDriver>,
114 pub(crate) drive_process_on_open: bool,
115}
116
117pub(crate) struct InlineWorkDriverSlot {
123 setup: InlineWorkDriverSetup,
124 drivers: tokio::sync::OnceCell<ResolvedWorkDrivers>,
125 phase_probe_slot: Option<lash_core::runtime::RuntimeTurnPhaseProbeSlot>,
126}
127
128impl InlineWorkDriverSlot {
129 fn new(setup: InlineWorkDriverSetup) -> Self {
130 let phase_probe_slot = match &setup.process {
131 ProcessWorkDriverSetup::LazyDefault { config } => {
132 Some(config.turn_phase_probe_slot.clone())
133 }
134 ProcessWorkDriverSetup::None | ProcessWorkDriverSetup::External { .. } => None,
135 };
136 Self {
137 setup,
138 drivers: tokio::sync::OnceCell::new(),
139 phase_probe_slot,
140 }
141 }
142
143 pub(crate) async fn drivers(&self) -> ResolvedWorkDrivers {
146 self.drivers
147 .get_or_init(|| async {
148 let queued = match &self.setup.queued {
149 QueuedWorkDriverSetup::None => None,
150 QueuedWorkDriverSetup::External { driver } => Some(driver.clone()),
151 QueuedWorkDriverSetup::LazyDefault { config } => Some(QueuedWorkDriver::new(
152 Arc::new(InlineQueuedWorkRunHandle::new(Arc::clone(config))),
153 )),
154 };
155 let (process, drive_process_on_open) = match &self.setup.process {
156 ProcessWorkDriverSetup::None => (None, false),
157 ProcessWorkDriverSetup::External { driver } => (Some(driver.clone()), false),
158 ProcessWorkDriverSetup::LazyDefault { config } => {
159 let mut config = (**config).clone();
160 if let Some(driver) = queued.clone() {
161 config = config.with_queued_work_driver(driver);
162 }
163 let registry = Arc::clone(&config.process_registry);
164 let hub = config.process_change_hub.clone();
165 let worker = DurableProcessWorker::new(config);
166 let driver = if let Some(hub) = hub {
167 ProcessWorkDriver::from_watched(
168 registry,
169 hub,
170 Arc::new(lash_core::InlineProcessRunHandle::new(worker)),
171 )
172 } else {
173 ProcessWorkDriver::inline(registry, worker)
174 };
175 (Some(driver), true)
176 }
177 };
178 ResolvedWorkDrivers {
179 process,
180 queued,
181 drive_process_on_open,
182 }
183 })
184 .await
185 .clone()
186 }
187
188 pub(crate) fn phase_probe_slot(&self) -> Option<lash_core::runtime::RuntimeTurnPhaseProbeSlot> {
189 self.phase_probe_slot.clone()
190 }
191
192 fn configured_process_work_driver(&self) -> Option<ProcessWorkDriver> {
193 match &self.setup.process {
194 ProcessWorkDriverSetup::External { driver } => Some(driver.clone()),
195 ProcessWorkDriverSetup::None | ProcessWorkDriverSetup::LazyDefault { .. } => None,
196 }
197 }
198
199 fn configured_queued_work_driver(&self) -> Option<QueuedWorkDriver> {
200 match &self.setup.queued {
201 QueuedWorkDriverSetup::External { driver } => Some(driver.clone()),
202 QueuedWorkDriverSetup::None | QueuedWorkDriverSetup::LazyDefault { .. } => None,
203 }
204 }
205}
206
207pub(crate) struct InlineQueuedWorkRunConfig {
208 env: RuntimeEnvironment,
209 policy: SessionPolicy,
210 protocol_factory: Option<Arc<dyn PluginFactory>>,
211 plugin_factories: Arc<Vec<Arc<dyn PluginFactory>>>,
212 store_factory: Arc<dyn SessionStoreFactory>,
213 live_replay_store: Arc<dyn LiveReplayStore>,
214 process_lifecycle_available: bool,
215}
216
217impl InlineQueuedWorkRunConfig {
218 fn new(
219 env: RuntimeEnvironment,
220 policy: SessionPolicy,
221 protocol_factory: Option<Arc<dyn PluginFactory>>,
222 plugin_factories: Arc<Vec<Arc<dyn PluginFactory>>>,
223 store_factory: Arc<dyn SessionStoreFactory>,
224 live_replay_store: Arc<dyn LiveReplayStore>,
225 process_lifecycle_available: bool,
226 ) -> Self {
227 Self {
228 env,
229 policy,
230 protocol_factory,
231 plugin_factories,
232 store_factory,
233 live_replay_store,
234 process_lifecycle_available,
235 }
236 }
237}
238
239struct InlineQueuedWorkRunHandle {
240 config: Arc<InlineQueuedWorkRunConfig>,
241}
242
243impl InlineQueuedWorkRunHandle {
244 fn new(config: Arc<InlineQueuedWorkRunConfig>) -> Self {
245 Self { config }
246 }
247}
248
249#[async_trait]
250impl QueuedWorkRunHandle for InlineQueuedWorkRunHandle {
251 async fn run_queued_work(
252 &self,
253 request: QueuedWorkRunRequest,
254 ) -> std::result::Result<(), lash_core::PluginError> {
255 let Some(session_id) = request.session_id else {
256 return Ok(());
257 };
258 let reason = request.reason;
259 let mut policy = self.config.policy.clone();
260 policy.session_id = Some(session_id.clone());
261 let store = self
262 .config
263 .store_factory
264 .create_store(&SessionStoreCreateRequest {
265 session_id: session_id.clone(),
266 relation: SessionRelation::default(),
267 policy: policy.clone(),
268 })
269 .await
270 .map_err(lash_core::PluginError::Session)?;
271 let state = crate::session::load_state_for_residency(
272 self.config.env.residency,
273 &session_id,
274 &policy,
275 store.as_ref(),
276 )
277 .await
278 .map_err(|err| lash_core::PluginError::Session(err.to_string()))?;
279 let plugin_host = build_plugin_host(
280 self.config.protocol_factory.as_ref(),
281 self.config.plugin_factories.as_ref(),
282 Vec::new(),
283 )
284 .map_err(|err| lash_core::PluginError::Session(err.to_string()))?;
285 let mut env = self.config.env.clone();
286 env.core = plugin_host
287 .install_process_engine_contributions(
288 env.core.clone(),
289 self.config.process_lifecycle_available,
290 )
291 .map_err(|err| lash_core::PluginError::Session(err.to_string()))?;
292 env.plugin_host = Some(Arc::new(plugin_host));
293 let effect_host = Arc::clone(&env.core.control.effect_host);
294 let runtime = LashRuntime::from_environment(&env, policy, state, Some(store))
295 .await
296 .map_err(|err| lash_core::PluginError::Session(err.to_string()))?;
297 let handle = RuntimeHandle::with_live_replay_store(
298 runtime,
299 Arc::clone(&self.config.live_replay_store),
300 );
301 let scope = lash_core::ExecutionScope::queue_drain(session_id, reason);
302 let scoped = effect_host
303 .scoped(scope)
304 .map_err(|err| lash_core::PluginError::Session(err.to_string()))?;
305 crate::turn::stream_next_queued_prepared_turn(
306 &handle,
307 crate::turn::TurnSinks::default(),
308 scoped,
309 CancellationToken::new(),
310 &[],
311 )
312 .await
313 .map_err(|err| lash_core::PluginError::Session(err.to_string()))?;
314 Ok(())
315 }
316}
317
318#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
319pub struct SessionDeleteReport {
320 pub session_id: String,
321 pub process: Option<lash_core::ProcessSessionDeleteReport>,
322}
323
324impl LashCore {
325 pub fn builder() -> LashCoreBuilder {
326 LashCoreBuilder::default()
327 }
328
329 pub fn standard_builder() -> LashCoreBuilder {
332 LashCore::builder()
333 .protocol_plugin(Arc::new(
334 lash_protocol_standard::StandardProtocolPluginFactory::new(),
335 ))
336 .plugins(default_runtime_stack())
337 }
338
339 #[cfg(feature = "rlm")]
347 pub fn rlm_builder(factory: crate::rlm::RlmProtocolPluginFactory) -> LashCoreBuilder {
348 LashCore::builder()
349 .protocol_plugin(Arc::new(factory))
350 .plugins(default_runtime_stack())
351 }
352
353 pub fn session(&self, session_id: impl Into<String>) -> SessionBuilder {
354 SessionBuilder {
355 core: self.clone(),
356 session_id: session_id.into(),
357 spec: SessionSpec::inherit(),
358 parent_session_id: None,
359 session_execution_owner: None,
360 store: None,
361 provider: None,
362 active_plugins: Vec::new(),
363 plugin_factories: Vec::new(),
364 plugin_options: PluginOptions::default(),
365 }
366 }
367
368 pub async fn resume(&self, parked: ParkedSession) -> Result<LashSession> {
381 let plugin_host = build_plugin_host(
386 self.protocol_factory.as_ref(),
387 self.plugin_factories.as_ref(),
388 Vec::new(),
389 )?;
390 let mut env = self.env.clone();
391 env.core = plugin_host.install_process_engine_contributions(
392 env.core.clone(),
393 self.process_lifecycle_available,
394 )?;
395 env.plugin_host = Some(Arc::new(plugin_host));
396 let effect_host = Arc::clone(&env.core.control.effect_host);
397 let drivers = self.work_driver.drivers().await;
398 env.process_work_driver = drivers.process.clone();
399 env.queued_work_driver = drivers.queued.clone();
400 let runtime = LashRuntime::resume(parked.inner, &env).await?;
401 let handle =
402 RuntimeHandle::with_live_replay_store(runtime, Arc::clone(&self.live_replay_store));
403 Ok(LashSession {
404 runtime: handle,
405 effect_host,
406 parent_session_id: None,
407 active_plugins: Vec::new(),
408 process_phase_probe_slot: self.work_driver.phase_probe_slot(),
409 turn_cancels: crate::turn::TurnCancelRegistry::default(),
410 })
411 }
412
413 pub fn flush_trace_sink(&self) -> Result<()> {
424 if let Some(sink) = self.env.core.tracing.trace_sink.as_ref() {
425 sink.flush()?;
426 }
427 Ok(())
428 }
429
430 pub fn triggers(&self) -> crate::admin::CoreTriggerAdmin {
431 crate::admin::CoreTriggerAdmin { core: self.clone() }
432 }
433
434 pub fn processes(&self) -> crate::process_admin::Processes {
435 crate::process_admin::Processes { core: self.clone() }
436 }
437
438 pub fn completions(&self) -> crate::admin::Completions {
439 crate::admin::Completions { core: self.clone() }
440 }
441
442 pub fn effect_host(&self) -> Arc<dyn EffectHost> {
443 Arc::clone(&self.env.core.control.effect_host)
444 }
445
446 pub async fn delete_session(
447 &self,
448 session_id: impl AsRef<str>,
449 scoped_effect_controller: ScopedEffectController<'_>,
450 ) -> Result<SessionDeleteReport> {
451 let session_id = session_id.as_ref().to_string();
452 let Some(store_factory) = self.store_factory.as_ref() else {
453 return Err(EmbedError::MissingSessionStoreFactory);
454 };
455 let process = if let Some(process_registry) = self.env.process_registry.as_ref() {
456 let invocation = RuntimeInvocation::effect(
457 RuntimeScope::new(session_id.clone()),
458 format!("process:delete-session:{session_id}"),
459 RuntimeEffectKind::Process,
460 format!("{session_id}:delete-session"),
461 );
462 let outcome = scoped_effect_controller
463 .controller()
464 .execute_effect(
465 RuntimeEffectEnvelope::new(
466 invocation,
467 RuntimeEffectCommand::process(ProcessCommand::DeleteSession {
468 session_id: session_id.clone(),
469 }),
470 ),
471 RuntimeEffectLocalExecutor::processes(
472 Arc::clone(process_registry),
473 self.env.process_work_driver.clone(),
474 ),
475 )
476 .await
477 .map_err(|err| EmbedError::SessionDeleteProcess {
478 session_id: session_id.clone(),
479 message: err.to_string(),
480 })?;
481 match outcome {
482 RuntimeEffectOutcome::Process {
483 result: ProcessEffectOutcome::DeleteSession { report },
484 } => Some(report),
485 other => {
486 return Err(EmbedError::SessionDeleteProcess {
487 session_id,
488 message: format!(
489 "process delete returned the wrong outcome: {}",
490 other.kind().as_str()
491 ),
492 });
493 }
494 }
495 } else {
496 None
497 };
498 if let Some(trigger_store) = self.env.trigger_store.as_ref() {
499 trigger_store
500 .delete_session_subscriptions(&session_id)
501 .await
502 .map_err(|err| EmbedError::SessionDeleteProcess {
503 session_id: session_id.clone(),
504 message: err.to_string(),
505 })?;
506 }
507 self.env
508 .core
509 .control
510 .effect_host
511 .revoke_await_events_for_session(&session_id)
512 .await
513 .map_err(|err| EmbedError::SessionDeleteProcess {
514 session_id: session_id.clone(),
515 message: err.to_string(),
516 })?;
517 store_factory
518 .delete_session(&session_id)
519 .await
520 .map_err(|message| EmbedError::StoreFactory {
521 session_id: session_id.clone(),
522 message,
523 })?;
524 Ok(SessionDeleteReport {
525 session_id,
526 process,
527 })
528 }
529
530 pub fn process_registry(&self) -> Option<Arc<dyn ProcessRegistry>> {
531 self.env.process_registry.as_ref().cloned()
532 }
533
534 pub fn durable_process_worker_config(&self) -> Result<DurableProcessWorkerConfig> {
535 self.durable_process_worker_config_with_plugins(std::iter::empty::<Arc<dyn PluginFactory>>())
536 }
537
538 pub fn durable_process_worker_config_with_plugins(
539 &self,
540 extra_plugin_factories: impl IntoIterator<Item = Arc<dyn PluginFactory>>,
541 ) -> Result<DurableProcessWorkerConfig> {
542 let Some(process_registry) = self.process_registry() else {
543 return Err(EmbedError::MissingProcessRegistry);
544 };
545 let Some(store_factory) = self.store_factory.as_ref() else {
546 return Err(EmbedError::MissingProcessWorkerStoreFactory);
547 };
548 let plugin_host = build_plugin_host(
549 self.protocol_factory.as_ref(),
550 self.plugin_factories.as_ref(),
551 extra_plugin_factories.into_iter().collect(),
552 )?;
553 let runtime_host = plugin_host.install_process_engine_contributions(
554 self.env.core.clone(),
555 self.process_lifecycle_available,
556 )?;
557 let mut config = DurableProcessWorkerConfig::new(
558 Arc::new(plugin_host),
559 runtime_host,
560 Arc::clone(store_factory),
561 process_registry,
562 )
563 .with_session_policy(self.policy.clone())
564 .with_residency(self.env.residency);
565 if let Some(trigger_store) = self.env.trigger_store.as_ref() {
566 config = config.with_trigger_store(Arc::clone(trigger_store));
567 }
568 if let Some(driver) = self.work_driver.configured_process_work_driver() {
569 config = config
570 .with_change_hub(driver.change_hub())
571 .with_process_work_driver(driver);
572 }
573 if let Some(driver) = self.work_driver.configured_queued_work_driver() {
574 config = config.with_queued_work_driver(driver);
575 }
576 Ok(config)
577 }
578}
579
580fn default_runtime_stack() -> PluginStack {
581 lash_plugin_tool_output_budget::tool_output_budget_stack()
582}
583
584#[derive(Default)]
585pub struct LashCoreBuilder {
586 pub(crate) protocol_factory: Option<Arc<dyn PluginFactory>>,
587 session_spec: SessionSpec,
588 provider: Option<ProviderHandle>,
589 pub(crate) store_factory: Option<Arc<dyn SessionStoreFactory>>,
590 child_store_factory: Option<Arc<dyn SessionStoreFactory>>,
591 effect_host: Option<Arc<dyn EffectHost>>,
595 attachment_store: Option<Arc<dyn AttachmentStore>>,
596 process_env_store: Option<Arc<dyn ProcessExecutionEnvStore>>,
597 trigger_store: Option<Arc<dyn lash_core::TriggerStore>>,
598 prompt: Option<PromptLayer>,
600 trace_sink: Option<Arc<dyn lash_trace::TraceSink>>,
601 trace_level: Option<lash_trace::TraceLevel>,
602 trace_context: Option<lash_trace::TraceContext>,
603 termination: Option<TerminationPolicy>,
604 runtime_host_config: Option<RuntimeHostConfig>,
606 tool_providers: Vec<Arc<dyn ToolProvider>>,
607 plugin_stack: PluginStack,
608 plugin_host: Option<PluginHost>,
609 residency: Option<Residency>,
610 lease_timings: Option<lash_core::LeaseTimings>,
611 process_work_source: ProcessWorkSource,
614 process_event_sink: Option<Arc<dyn lash_core::ProcessEventSink>>,
617 queued_work_source: QueuedWorkSource,
618 live_replay_store: Option<Arc<dyn LiveReplayStore>>,
619}
620
621impl LashCoreBuilder {
622 pub fn protocol_plugin(mut self, plugin: Arc<dyn PluginFactory>) -> Self {
623 self.protocol_factory = Some(plugin);
624 self
625 }
626
627 pub fn provider(mut self, provider: ProviderHandle) -> Self {
628 self.session_spec = self.session_spec.provider_id(provider.kind());
629 self.provider = Some(provider);
630 self
631 }
632
633 pub fn model(mut self, model: lash_core::ModelSpec) -> Self {
634 self.session_spec = self.session_spec.model(model);
635 self
636 }
637
638 pub fn max_turns(mut self, max_turns: usize) -> Self {
639 self.session_spec = self.session_spec.max_turns(max_turns);
640 self
641 }
642
643 pub fn session_spec(mut self, spec: SessionSpec) -> Self {
644 self.session_spec = spec;
645 self
646 }
647
648 pub fn store_factory(mut self, store_factory: Arc<dyn SessionStoreFactory>) -> Self {
656 self.store_factory = Some(store_factory);
657 self
658 }
659
660 pub fn child_store_factory(mut self, store_factory: Arc<dyn SessionStoreFactory>) -> Self {
668 self.child_store_factory = Some(store_factory);
669 self
670 }
671
672 pub fn attachment_store(mut self, attachment_store: Arc<dyn AttachmentStore>) -> Self {
673 self.attachment_store = Some(attachment_store);
674 self
675 }
676
677 pub fn process_env_store(
678 mut self,
679 process_env_store: Arc<dyn ProcessExecutionEnvStore>,
680 ) -> Self {
681 self.process_env_store = Some(process_env_store);
682 self
683 }
684
685 pub fn effect_host(mut self, effect_host: Arc<dyn EffectHost>) -> Self {
690 self.effect_host = Some(effect_host);
691 self
692 }
693
694 pub fn tools(mut self, tools: Arc<dyn ToolProvider>) -> Self {
695 self.tool_providers.push(tools);
696 self
697 }
698
699 pub fn plugin(mut self, plugin: Arc<dyn PluginFactory>) -> Self {
700 self.plugin_stack.push(plugin);
701 self
702 }
703
704 pub fn plugins(mut self, stack: PluginStack) -> Self {
705 self.plugin_stack = stack;
706 self
707 }
708
709 pub fn configure_plugins(mut self, configure: impl FnOnce(&mut PluginStack)) -> Self {
710 configure(&mut self.plugin_stack);
711 self
712 }
713
714 pub fn trace_sink(mut self, trace_sink: Arc<dyn lash_trace::TraceSink>) -> Self {
715 self.trace_sink = Some(trace_sink);
716 self
717 }
718
719 pub fn trace_jsonl_path(mut self, path: impl Into<std::path::PathBuf>) -> Self {
720 self.trace_sink = Some(Arc::new(lash_trace::JsonlTraceSink::new(path.into())));
721 self
722 }
723
724 pub fn trace_level(mut self, trace_level: lash_trace::TraceLevel) -> Self {
725 self.trace_level = Some(trace_level);
726 self
727 }
728
729 pub fn trace_context(mut self, trace_context: lash_trace::TraceContext) -> Self {
730 self.trace_context = Some(trace_context);
731 self
732 }
733
734 pub fn termination(mut self, termination: TerminationPolicy) -> Self {
735 self.termination = Some(termination);
736 self
737 }
738
739 pub fn residency(mut self, residency: Residency) -> Self {
740 self.residency = Some(residency);
741 self
742 }
743
744 pub fn lease_timings(mut self, lease_timings: lash_core::LeaseTimings) -> Self {
757 self.lease_timings = Some(lease_timings);
758 self
759 }
760
761 pub fn live_replay_store(mut self, live_replay_store: Arc<dyn LiveReplayStore>) -> Self {
765 self.live_replay_store = Some(live_replay_store);
766 self
767 }
768
769 fn resolve_runtime_host_config(&mut self) -> Result<RuntimeHostConfig> {
772 if let Some(base) = self.runtime_host_config.take() {
773 return Ok(self.apply_core_overrides(base));
774 }
775 let effect_host = self
776 .effect_host
777 .take()
778 .ok_or(EmbedError::MissingEffectHost)?;
779 let attachment_store = self
780 .attachment_store
781 .take()
782 .ok_or(EmbedError::MissingAttachmentStore)?;
783 let process_env_store = self
784 .process_env_store
785 .take()
786 .ok_or(EmbedError::MissingProcessEnvStore)?;
787 let core = RuntimeHostConfig::new(effect_host, attachment_store, process_env_store);
788 Ok(self.apply_core_overrides(core))
789 }
790
791 fn apply_core_overrides(&mut self, mut core: RuntimeHostConfig) -> RuntimeHostConfig {
793 if let Some(effect_host) = self.effect_host.take() {
794 core.control.effect_host = effect_host;
795 }
796 if let Some(attachment_store) = self.attachment_store.take() {
797 core.durability.attachment_store = attachment_store;
798 }
799 if let Some(process_env_store) = self.process_env_store.take() {
800 core.durability.process_env_store = process_env_store;
801 }
802 if let Some(prompt) = self.prompt.take() {
803 core.prompt.prompt = prompt;
804 }
805 if let Some(trace_sink) = self.trace_sink.take() {
806 core.tracing.trace_sink = Some(trace_sink);
807 }
808 if let Some(trace_level) = self.trace_level.take() {
809 core.tracing.trace_level = trace_level;
810 }
811 if let Some(trace_context) = self.trace_context.take() {
812 core.tracing.trace_context = trace_context;
813 }
814 if let Some(termination) = self.termination.take() {
815 core.control.termination = termination;
816 }
817 if let Some(lease_timings) = self.lease_timings.take() {
818 core.control.lease_timings = lease_timings;
819 }
820 core
821 }
822
823 fn effective_session_store_tier(&self) -> Option<DurabilityTier> {
836 self.child_store_factory
837 .as_ref()
838 .or(self.store_factory.as_ref())
839 .map(|factory| factory.durability_tier())
840 }
841
842 fn ensure_store_peer_coherence(
851 session_store_tier: Option<DurabilityTier>,
852 trigger_store_tier: Option<DurabilityTier>,
853 process_registry_tier: Option<DurabilityTier>,
854 core: &RuntimeHostConfig,
855 ) -> Result<()> {
856 let attachment_tier = Some(
857 core.durability
858 .attachment_store
859 .persistence()
860 .durability_tier(),
861 );
862 let process_env_tier = Some(core.durability.process_env_store.durability_tier());
863 let effect_host_tier = Some(core.control.effect_host.durability_tier());
864
865 if session_store_tier == Some(DurabilityTier::Durable) {
866 if attachment_tier == Some(DurabilityTier::Inline) {
867 return Err(EmbedError::DurableStorePeerRequired {
868 facet: "attachment store",
869 });
870 }
871 if process_env_tier == Some(DurabilityTier::Inline) {
872 return Err(EmbedError::DurableStorePeerRequired {
873 facet: "process execution environment store",
874 });
875 }
876 for engine in core.process_engines.engines() {
879 if engine.durability_tier() == DurabilityTier::Inline {
880 return Err(EmbedError::DurableStorePeerRequired {
881 facet: engine.kind(),
882 });
883 }
884 }
885 }
886
887 if process_registry_tier == Some(DurabilityTier::Durable) {
888 if session_store_tier != Some(DurabilityTier::Durable) {
889 return Err(EmbedError::DurableProcessRegistryRequiresStoreFactory);
890 }
891 if trigger_store_tier != Some(DurabilityTier::Durable) {
892 return Err(EmbedError::DurableStorePeerRequired {
893 facet: "trigger store",
894 });
895 }
896 if process_env_tier != Some(DurabilityTier::Durable) {
897 return Err(EmbedError::DurableStorePeerRequired {
898 facet: "process execution environment store",
899 });
900 }
901 }
902
903 if trigger_store_tier == Some(DurabilityTier::Durable) {
904 if session_store_tier != Some(DurabilityTier::Durable) {
905 return Err(EmbedError::DurableStorePeerRequired {
906 facet: "session store factory",
907 });
908 }
909 if process_env_tier != Some(DurabilityTier::Durable) {
910 return Err(EmbedError::DurableStorePeerRequired {
911 facet: "process execution environment store",
912 });
913 }
914 if process_registry_tier == Some(DurabilityTier::Inline) {
915 return Err(EmbedError::DurableStorePeerRequired {
916 facet: "process registry",
917 });
918 }
919 }
920
921 if effect_host_tier == Some(DurabilityTier::Durable) {
922 if attachment_tier != Some(DurabilityTier::Durable) {
923 return Err(EmbedError::DurableStorePeerRequired {
924 facet: "attachment store",
925 });
926 }
927 if process_env_tier != Some(DurabilityTier::Durable) {
928 return Err(EmbedError::DurableStorePeerRequired {
929 facet: "process execution environment store",
930 });
931 }
932 }
933
934 Ok(())
935 }
936
937 pub fn build(mut self) -> Result<LashCore> {
938 let protocol_factory = self.protocol_factory.clone();
939 if protocol_factory.is_none() && self.plugin_host.is_none() {
940 return Err(EmbedError::MissingProtocolPlugin);
941 }
942 let provider_id = self
943 .session_spec
944 .provider_id
945 .clone()
946 .or_else(|| {
947 self.provider
948 .as_ref()
949 .map(|provider| provider.kind().to_string())
950 })
951 .unwrap_or_default();
952 let model = self
953 .session_spec
954 .model
955 .clone()
956 .ok_or(EmbedError::MissingModelSpec)?;
957
958 let base_policy = SessionPolicy {
959 provider_id,
960 model,
961 max_turns: self.session_spec.max_turns.flatten(),
962 ..SessionPolicy::default()
963 };
964 let policy = self.session_spec.resolve_against(&base_policy);
965
966 let session_store_tier = self.effective_session_store_tier();
969 let trigger_store_tier = self
970 .trigger_store
971 .as_ref()
972 .map(|store| store.durability_tier());
973 let process_work_source = self
974 .process_work_source
975 .clone()
976 .watched(self.process_event_sink.clone());
977 let process_registry_tier = process_work_source
978 .process_registry()
979 .map(|registry| registry.durability_tier());
980
981 let mut core = self.resolve_runtime_host_config()?;
982 if let Some(provider) = self.provider.clone() {
983 core.providers.provider_resolver =
984 Arc::new(lash_core::SingleProviderResolver::new(provider));
985 }
986 let plugin_factories = if let Some(plugin_host) = self.plugin_host {
987 plugin_host.factories().to_vec()
988 } else {
989 let mut factories = Vec::new();
990 if !self.tool_providers.is_empty() {
991 let spec = self
992 .tool_providers
993 .into_iter()
994 .fold(PluginSpec::new(), PluginSpec::with_tool_provider);
995 factories.push(Arc::new(StaticPluginFactory::new("embed_tools", spec))
996 as Arc<dyn PluginFactory>);
997 }
998 factories.extend(self.plugin_stack.into_factories());
999 factories
1000 };
1001 let default_plugin_host =
1002 build_plugin_host(protocol_factory.as_ref(), &plugin_factories, Vec::new())?;
1003 let process_lifecycle_available = process_work_source.has_registry();
1007 let core_with_engines = default_plugin_host
1015 .install_process_engine_contributions(core.clone(), process_lifecycle_available)?;
1016 Self::ensure_store_peer_coherence(
1019 session_store_tier,
1020 trigger_store_tier,
1021 process_registry_tier,
1022 &core_with_engines,
1023 )?;
1024
1025 let process_registry = process_work_source.process_registry();
1026
1027 let process_work_driver = Self::resolve_process_work_driver(
1033 &process_work_source,
1034 &default_plugin_host,
1035 &core,
1036 process_lifecycle_available,
1037 self.child_store_factory
1040 .as_ref()
1041 .or(self.store_factory.as_ref()),
1042 &policy,
1043 self.residency.unwrap_or_default(),
1044 self.trigger_store.as_ref(),
1045 )?;
1046
1047 let live_replay_clock = Arc::clone(&core.clock);
1048 let mut env_builder = RuntimeEnvironment::builder()
1049 .with_plugin_host(Arc::new(default_plugin_host))
1050 .with_runtime_host_config(core);
1051 if let Some(process_registry) = process_registry.as_ref() {
1052 env_builder = env_builder.with_process_registry(Arc::clone(process_registry));
1053 }
1054 if let Some(residency) = self.residency {
1055 env_builder = env_builder.with_residency(residency);
1056 }
1057 if let Some(child_store_factory) = self
1058 .child_store_factory
1059 .as_ref()
1060 .or(self.store_factory.as_ref())
1061 {
1062 env_builder = env_builder.with_session_store_factory(Arc::clone(child_store_factory));
1063 }
1064 if let Some(trigger_store) = self.trigger_store.as_ref() {
1065 env_builder = env_builder.with_trigger_store(Arc::clone(trigger_store));
1066 }
1067 let live_replay_store = self.live_replay_store.take().unwrap_or_else(|| {
1068 Arc::new(InMemoryLiveReplayStore::with_clock(
1069 lash_core::InMemoryLiveReplayStoreConfig::default(),
1070 live_replay_clock,
1071 ))
1072 });
1073 let env = env_builder.build();
1074 let queued_work_driver = Self::resolve_queued_work_driver(
1075 &self.queued_work_source,
1076 env.clone(),
1077 policy.clone(),
1078 protocol_factory.clone(),
1079 Arc::new(plugin_factories.clone()),
1080 self.child_store_factory
1081 .as_ref()
1082 .or(self.store_factory.as_ref()),
1083 Arc::clone(&live_replay_store),
1084 process_lifecycle_available,
1085 );
1086 let work_driver = InlineWorkDriverSetup {
1087 process: process_work_driver,
1088 queued: queued_work_driver,
1089 };
1090
1091 Ok(LashCore {
1092 env,
1093 policy,
1094 store_factory: self.store_factory,
1095 plugin_factories: Arc::new(plugin_factories),
1096 provider: self.provider,
1097 live_replay_store,
1098 protocol_factory,
1099 process_lifecycle_available,
1100 work_driver: Arc::new(InlineWorkDriverSlot::new(work_driver)),
1101 })
1102 }
1103
1104 #[allow(clippy::too_many_arguments)]
1115 fn resolve_process_work_driver(
1116 process_work_source: &ProcessWorkSource,
1117 worker_plugin_host: &PluginHost,
1118 core: &RuntimeHostConfig,
1119 process_lifecycle_available: bool,
1120 store_factory: Option<&Arc<dyn SessionStoreFactory>>,
1121 policy: &SessionPolicy,
1122 residency: lash_core::Residency,
1123 trigger_store: Option<&Arc<dyn lash_core::TriggerStore>>,
1124 ) -> Result<ProcessWorkDriverSetup> {
1125 let (process_registry, process_change_hub) = match process_work_source {
1126 ProcessWorkSource::None => return Ok(ProcessWorkDriverSetup::None),
1127 ProcessWorkSource::External(driver) => {
1128 return Ok(ProcessWorkDriverSetup::External {
1129 driver: driver.clone(),
1130 });
1131 }
1132 ProcessWorkSource::Inline { registry, hub } => (Arc::clone(registry), hub.clone()),
1133 };
1134 let Some(store_factory) = store_factory else {
1138 return Err(EmbedError::ProcessRegistryRequiresStoreFactory);
1139 };
1140 let runtime_host = worker_plugin_host
1145 .install_process_engine_contributions(core.clone(), process_lifecycle_available)?;
1146 let phase_probe_slot = lash_core::runtime::RuntimeTurnPhaseProbeSlot::default();
1147 let mut config = DurableProcessWorkerConfig::new(
1148 Arc::new(worker_plugin_host.clone()),
1149 runtime_host,
1150 Arc::clone(store_factory),
1151 process_registry,
1152 )
1153 .with_session_policy(policy.clone())
1154 .with_trigger_store(trigger_store.cloned().unwrap_or_else(|| {
1155 Arc::new(lash_core::InMemoryTriggerStore::with_clock(Arc::clone(
1156 &core.clock,
1157 )))
1158 }))
1159 .with_residency(residency)
1160 .with_turn_phase_probe_slot(phase_probe_slot);
1161 if let Some(hub) = process_change_hub {
1162 config = config.with_change_hub(hub);
1163 }
1164 let config = Box::new(config);
1165 Ok(ProcessWorkDriverSetup::LazyDefault { config })
1166 }
1167
1168 #[allow(clippy::too_many_arguments)]
1169 fn resolve_queued_work_driver(
1170 queued_work_source: &QueuedWorkSource,
1171 env: RuntimeEnvironment,
1172 policy: SessionPolicy,
1173 protocol_factory: Option<Arc<dyn PluginFactory>>,
1174 plugin_factories: Arc<Vec<Arc<dyn PluginFactory>>>,
1175 store_factory: Option<&Arc<dyn SessionStoreFactory>>,
1176 live_replay_store: Arc<dyn LiveReplayStore>,
1177 process_lifecycle_available: bool,
1178 ) -> QueuedWorkDriverSetup {
1179 match queued_work_source {
1180 QueuedWorkSource::None => QueuedWorkDriverSetup::None,
1181 QueuedWorkSource::External(driver) => QueuedWorkDriverSetup::External {
1182 driver: driver.clone(),
1183 },
1184 QueuedWorkSource::LazyDefault => match store_factory {
1185 Some(store_factory) => QueuedWorkDriverSetup::LazyDefault {
1186 config: Arc::new(InlineQueuedWorkRunConfig::new(
1187 env,
1188 policy,
1189 protocol_factory,
1190 plugin_factories,
1191 Arc::clone(store_factory),
1192 live_replay_store,
1193 process_lifecycle_available,
1194 )),
1195 },
1196 None => QueuedWorkDriverSetup::None,
1197 },
1198 }
1199 }
1200
1201 pub fn advanced(self) -> AdvancedLashCoreBuilder {
1202 AdvancedLashCoreBuilder { builder: self }
1203 }
1204
1205 pub fn process_registry(mut self, process_registry: Arc<dyn ProcessRegistry>) -> Self {
1206 self.process_work_source = ProcessWorkSource::Inline {
1207 registry: process_registry,
1208 hub: None,
1209 };
1210 self
1211 }
1212
1213 pub fn process_event_sink(mut self, sink: Arc<dyn lash_core::ProcessEventSink>) -> Self {
1228 self.process_event_sink = Some(sink);
1229 self
1230 }
1231
1232 pub fn trigger_store(mut self, store: Arc<dyn lash_core::TriggerStore>) -> Self {
1233 self.trigger_store = Some(store);
1234 self
1235 }
1236
1237 pub fn process_work_driver(mut self, driver: ProcessWorkDriver) -> Self {
1244 self.process_work_source = ProcessWorkSource::External(driver);
1245 self
1246 }
1247
1248 pub fn queued_work_driver(mut self, driver: QueuedWorkDriver) -> Self {
1250 self.queued_work_source = QueuedWorkSource::External(driver);
1251 self
1252 }
1253
1254 pub fn disable_queued_work_driver(mut self) -> Self {
1255 self.queued_work_source = QueuedWorkSource::None;
1256 self
1257 }
1258}
1259
1260pub(crate) fn build_plugin_host(
1261 protocol_factory: Option<&Arc<dyn PluginFactory>>,
1262 common_factories: &[Arc<dyn PluginFactory>],
1263 extra_factories: Vec<Arc<dyn PluginFactory>>,
1264) -> Result<PluginHost> {
1265 let mut factories = Vec::with_capacity(
1266 usize::from(protocol_factory.is_some()) + common_factories.len() + extra_factories.len(),
1267 );
1268 if let Some(protocol_factory) = protocol_factory {
1269 factories.push(Arc::clone(protocol_factory));
1270 }
1271 factories.extend(common_factories.iter().cloned());
1272 factories.extend(extra_factories);
1273 Ok(PluginHost::new(factories))
1274}
1275
1276impl PromptLayerSink for LashCoreBuilder {
1277 fn prompt_layer_mut(&mut self) -> &mut PromptLayer {
1278 self.prompt.get_or_insert_with(PromptLayer::new)
1279 }
1280}
1281
1282pub struct AdvancedLashCoreBuilder {
1283 builder: LashCoreBuilder,
1284}
1285
1286impl AdvancedLashCoreBuilder {
1287 pub fn runtime_host_config(mut self, core: lash_core::RuntimeHostConfig) -> Self {
1288 self.builder.runtime_host_config = Some(core);
1289 self
1290 }
1291
1292 pub fn plugin_host(mut self, plugin_host: PluginHost) -> Self {
1293 self.builder.plugin_host = Some(plugin_host);
1294 self
1295 }
1296
1297 pub fn build(self) -> Result<LashCore> {
1298 self.builder.build()
1299 }
1300}