1use crate::support::*;
2use lash_core::runtime::{DeliveryPolicy, QueuedWorkBatch, SlotPolicy};
3
4pub struct SessionBuilder {
5 pub(crate) core: LashCore,
6 pub(crate) session_id: String,
7 pub(crate) spec: SessionSpec,
8 pub(crate) parent_session_id: Option<String>,
9 pub(crate) store: Option<Arc<dyn RuntimePersistence>>,
10 pub(crate) provider: Option<ProviderHandle>,
11 pub(crate) active_plugins: Vec<ActivePluginBinding>,
12 pub(crate) plugin_factories: Vec<Arc<dyn PluginFactory>>,
13}
14
15#[cfg(feature = "rlm")]
16pub struct RlmSessionBuilder {
17 pub(crate) builder: SessionBuilder,
18 pub(crate) rlm_final_answer_format: Option<lash_rlm_types::RlmFinalAnswerFormat>,
19}
20
21impl SessionBuilder {
22 pub fn provider(mut self, provider: ProviderHandle) -> Self {
23 self.spec = self.spec.provider_id(provider.kind());
24 self.provider = Some(provider);
25 self
26 }
27
28 pub fn session_spec(mut self, spec: SessionSpec) -> Self {
29 self.spec = spec;
30 self
31 }
32
33 pub fn parent(mut self, parent_session_id: impl Into<String>) -> Self {
34 self.parent_session_id = Some(parent_session_id.into());
35 self
36 }
37
38 pub fn store(mut self, store: Arc<dyn RuntimePersistence>) -> Self {
45 self.store = Some(store);
46 self
47 }
48
49 pub fn plugin<P: PluginBinding>(mut self, config: P::SessionConfig) -> Self {
50 self.active_plugins.push(ActivePluginBinding {
51 id: P::ID,
52 requires_turn_input: P::requires_turn_input(&config),
53 });
54 self.plugin_factories.push(P::factory(&config));
55 self
56 }
57
58 pub async fn open(self) -> Result<LashSession> {
59 let policy = self.session_policy();
60 let store = self.create_store(&policy).await?;
61 let state = self
62 .load_or_default_state(&policy, store.as_deref())
63 .await?;
64 self.open_resolved(policy, state, store).await
65 }
66
67 pub async fn open_fresh(self) -> Result<LashSession> {
77 let policy = self.session_policy();
78 let store = self.create_store(&policy).await?;
79 let state = RuntimeSessionState {
80 session_id: self.session_id.clone(),
81 policy: policy.clone(),
82 graph_replace_required: true,
83 ..RuntimeSessionState::default()
84 };
85 self.open_resolved(policy, state, store).await
86 }
87
88 pub async fn open_with_state(self, mut state: RuntimeSessionState) -> Result<LashSession> {
95 let policy = self.session_policy();
96 let store = self.create_store(&policy).await?;
97 if state.session_id != self.session_id {
98 return Err(EmbedError::StoreSessionMismatch {
99 loaded: state.session_id,
100 requested: self.session_id,
101 });
102 }
103 let recorded_provider_id = state.policy.recorded_provider_id().to_string();
104 state.policy = policy.clone();
105 state.policy.provider_id = recorded_provider_id;
106 self.open_resolved(policy, state, store).await
107 }
108
109 fn session_policy(&self) -> SessionPolicy {
110 let mut policy = self.spec.resolve_against(&self.core.policy);
111 policy.session_id = Some(self.session_id.clone());
112 policy
113 }
114
115 async fn load_or_default_state(
116 &self,
117 policy: &SessionPolicy,
118 store: Option<&dyn RuntimePersistence>,
119 ) -> Result<RuntimeSessionState> {
120 let state = match store {
121 Some(store) => {
122 let loaded = self.load_persisted_state_for_residency(store).await?;
123 let mut state = loaded.unwrap_or_else(|| RuntimeSessionState {
124 session_id: self.session_id.clone(),
125 policy: policy.clone(),
126 ..RuntimeSessionState::default()
127 });
128 if state.session_id != self.session_id {
129 return Err(EmbedError::StoreSessionMismatch {
130 loaded: state.session_id,
131 requested: self.session_id.clone(),
132 });
133 }
134 let recorded_provider_id = state.policy.recorded_provider_id().to_string();
135 state.policy = policy.clone();
136 state.policy.provider_id = recorded_provider_id;
137 state
138 }
139 None => RuntimeSessionState {
140 session_id: self.session_id.clone(),
141 policy: policy.clone(),
142 ..RuntimeSessionState::default()
143 },
144 };
145 Ok(state)
146 }
147
148 async fn load_persisted_state_for_residency(
149 &self,
150 store: &dyn RuntimePersistence,
151 ) -> Result<Option<RuntimeSessionState>> {
152 match self.core.env.residency {
153 Residency::KeepAll => {
154 let loaded = lash_core::store::load_persisted_session_state(store)
155 .await
156 .map_err(|err| {
157 SessionError::Protocol(format!("failed to load store: {err}"))
158 })?;
159 Ok(loaded)
160 }
161 Residency::ActivePathOnly => {
162 let active =
163 lash_core::store::load_persisted_session_state_active_path(store, None)
164 .await
165 .map_err(|err| {
166 SessionError::Protocol(format!(
167 "failed to load active-path store: {err}"
168 ))
169 })?;
170 if active
171 .as_ref()
172 .is_some_and(|state| state.session_graph.nodes.is_empty())
173 {
174 let mut full = lash_core::store::load_persisted_session_state(store)
175 .await
176 .map_err(|err| {
177 SessionError::Protocol(format!(
178 "failed to heal active-path store from full graph: {err}"
179 ))
180 })?;
181 if let Some(state) = full.as_mut() {
182 state.graph_replace_required = true;
183 }
184 return Ok(full);
185 }
186 Ok(active)
187 }
188 }
189 }
190
191 async fn open_resolved(
192 self,
193 policy: SessionPolicy,
194 state: RuntimeSessionState,
195 store: Option<Arc<dyn RuntimePersistence>>,
196 ) -> Result<LashSession> {
197 let mut env = self.core.env.clone();
198 if let Some(provider) = self.provider.clone().or_else(|| self.core.provider.clone()) {
199 env.core.providers.provider_resolver =
200 Arc::new(lash_core::SingleProviderResolver::new(provider));
201 }
202 let plugin_host = build_plugin_host(
203 self.core.protocol_factory.as_ref(),
204 self.core.plugin_factories.as_ref(),
205 self.plugin_factories,
206 )?;
207 env.core = self
208 .core
209 .runtime_host_for_plugin_host(env.core.clone(), &plugin_host)?;
210 env.plugin_host = Some(Arc::new(plugin_host));
211 let effect_host = Arc::clone(&env.core.control.effect_host);
212 env.process_work_poke = self.core.process_work_runner.poke().await;
217 let runtime = LashRuntime::from_environment(&env, policy, state, store).await?;
218 let handle = RuntimeHandle::with_live_replay_store(
219 runtime,
220 Arc::clone(&self.core.live_replay_store),
221 );
222 Ok(LashSession {
223 runtime: handle,
224 effect_host,
225 parent_session_id: self.parent_session_id,
226 active_plugins: self.active_plugins,
227 process_phase_probe_slot: self.core.process_work_runner.phase_probe_slot(),
228 turn_cancels: crate::turn::TurnCancelRegistry::default(),
229 })
230 }
231
232 async fn create_store(
233 &self,
234 policy: &SessionPolicy,
235 ) -> Result<Option<Arc<dyn RuntimePersistence>>> {
236 if let Some(store) = self.store.as_ref() {
237 return Ok(Some(Arc::clone(store)));
238 }
239 let Some(factory) = self.core.store_factory.as_ref() else {
240 return Ok(None);
241 };
242 let request = SessionStoreCreateRequest {
243 session_id: self.session_id.clone(),
244 relation: self
245 .parent_session_id
246 .as_ref()
247 .map(|parent_session_id| lash_core::SessionRelation::Child {
248 parent_session_id: parent_session_id.clone(),
249 caused_by: None,
250 })
251 .unwrap_or_default(),
252 policy: policy.clone(),
253 };
254 factory
255 .create_store(&request)
256 .await
257 .map(Some)
258 .map_err(|message| EmbedError::StoreFactory {
259 session_id: self.session_id.clone(),
260 message,
261 })
262 }
263}
264
265impl PromptLayerSink for SessionBuilder {
266 fn prompt_layer_mut(&mut self) -> &mut PromptLayer {
267 self.spec.prompt.get_or_insert_with(PromptLayer::new)
268 }
269}
270
271#[cfg(feature = "rlm")]
272impl RlmSessionBuilder {
273 pub fn provider(mut self, provider: ProviderHandle) -> Self {
274 self.builder = self.builder.provider(provider);
275 self
276 }
277
278 pub fn session_spec(mut self, spec: SessionSpec) -> Self {
279 self.builder = self.builder.session_spec(spec);
280 self
281 }
282
283 pub fn parent(mut self, parent_session_id: impl Into<String>) -> Self {
284 self.builder = self.builder.parent(parent_session_id);
285 self
286 }
287
288 pub fn store(mut self, store: Arc<dyn RuntimePersistence>) -> Self {
289 self.builder = self.builder.store(store);
290 self
291 }
292
293 pub fn plugin<P: PluginBinding>(mut self, config: P::SessionConfig) -> Self {
294 self.builder = self.builder.plugin::<P>(config);
295 self
296 }
297
298 pub async fn open(self) -> Result<LashSession> {
299 self.open_resolved(RlmOpenState::Resume).await
300 }
301
302 pub async fn open_fresh(self) -> Result<LashSession> {
303 self.open_resolved(RlmOpenState::Fresh).await
304 }
305
306 pub async fn open_with_state(self, state: RuntimeSessionState) -> Result<LashSession> {
307 self.open_resolved(RlmOpenState::Explicit(state)).await
308 }
309
310 async fn open_resolved(self, open_state: RlmOpenState) -> Result<LashSession> {
311 let Self {
312 builder,
313 rlm_final_answer_format,
314 } = self;
315 let policy = builder.session_policy();
316 let store = builder.create_store(&policy).await?;
317 let mut state = match open_state {
318 RlmOpenState::Resume => {
319 builder
320 .load_or_default_state(&policy, store.as_deref())
321 .await?
322 }
323 RlmOpenState::Fresh => RuntimeSessionState {
324 session_id: builder.session_id.clone(),
325 policy: policy.clone(),
326 graph_replace_required: true,
327 ..RuntimeSessionState::default()
328 },
329 RlmOpenState::Explicit(mut state) => {
330 if state.session_id != builder.session_id {
331 return Err(EmbedError::StoreSessionMismatch {
332 loaded: state.session_id,
333 requested: builder.session_id.clone(),
334 });
335 }
336 let recorded_provider_id = state.policy.recorded_provider_id().to_string();
337 state.policy = policy.clone();
338 state.policy.provider_id = recorded_provider_id;
339 state
340 }
341 };
342 apply_rlm_session_options(
343 builder.parent_session_id.is_none(),
344 rlm_final_answer_format,
345 &mut state,
346 )?;
347 builder.open_resolved(policy, state, store).await
348 }
349}
350
351#[cfg(feature = "rlm")]
352impl PromptLayerSink for RlmSessionBuilder {
353 fn prompt_layer_mut(&mut self) -> &mut PromptLayer {
354 self.builder.prompt_layer_mut()
355 }
356}
357
358#[cfg(feature = "rlm")]
359enum RlmOpenState {
360 Resume,
361 Fresh,
362 Explicit(RuntimeSessionState),
363}
364
365#[cfg(feature = "rlm")]
366fn apply_rlm_session_options(
367 is_root_session: bool,
368 explicit_format: Option<lash_rlm_types::RlmFinalAnswerFormat>,
369 state: &mut RuntimeSessionState,
370) -> Result<()> {
371 let final_answer_format = explicit_format.unwrap_or_else(|| {
372 if is_root_session {
373 lash_rlm_types::RlmFinalAnswerFormat::Markdown
374 } else {
375 lash_rlm_types::RlmFinalAnswerFormat::RawSubmitValue
376 }
377 });
378 let mut extras = if state.protocol_turn_options.is_empty() {
379 lash_rlm_types::RlmCreateExtras::default()
380 } else {
381 state.protocol_turn_options.decode()?
382 };
383 extras.final_answer_format = Some(final_answer_format);
384 let options = ProtocolTurnOptions::typed(extras)?;
385 state.protocol_turn_options = options.clone();
386 for frame in &mut state.agent_frames {
387 frame.protocol_turn_options = options.clone();
388 }
389 Ok(())
390}
391
392#[derive(Clone)]
393pub struct LashSession {
394 pub(crate) runtime: RuntimeHandle,
395 pub(crate) effect_host: Arc<dyn EffectHost>,
396 pub(crate) parent_session_id: Option<String>,
397 pub(crate) active_plugins: Vec<ActivePluginBinding>,
398 pub(crate) process_phase_probe_slot: Option<lash_core::runtime::RuntimeTurnPhaseProbeSlot>,
399 pub(crate) turn_cancels: crate::turn::TurnCancelRegistry,
400}
401
402#[derive(Clone, Debug, Default)]
403pub struct SessionConfigPatch {
404 pub provider: Option<ProviderHandle>,
405 pub model: Option<ModelSpec>,
406 pub prompt: Option<PromptLayer>,
407}
408
409impl LashSession {
410 pub async fn close(self) -> Result<()> {
411 let runtime = self.runtime.writer();
412 let runtime = runtime.lock().await;
413 runtime.unregister_plugin_session()?;
414 Ok(())
415 }
416
417 pub fn session_id(&self) -> String {
418 self.runtime.observe().session_id().to_string()
419 }
420
421 pub fn policy_snapshot(&self) -> SessionPolicy {
422 self.runtime.observe().policy.clone()
423 }
424
425 pub fn observe(&self) -> ObservableSession {
426 ObservableSession {
427 runtime: self.runtime.clone(),
428 }
429 }
430
431 pub fn parent_session_id(&self) -> Option<&str> {
432 self.parent_session_id.as_deref()
433 }
434
435 pub fn effect_host(&self) -> Arc<dyn EffectHost> {
436 Arc::clone(&self.effect_host)
437 }
438
439 pub fn turn(&self, input: TurnInput) -> TurnBuilder {
440 TurnBuilder {
441 runtime: self.runtime.clone(),
442 effect_host: Arc::clone(&self.effect_host),
443 active_plugins: self.active_plugins.clone(),
444 input,
445 cancel: CancellationToken::new(),
446 cancels: self.turn_cancels.clone(),
447 protocol_turn_options: None,
448 provider: None,
449 model: None,
450 turn_id: None,
451 }
452 }
453
454 pub fn queued_turn(&self) -> QueuedTurnBuilder {
455 QueuedTurnBuilder {
456 runtime: self.runtime.clone(),
457 effect_host: Arc::clone(&self.effect_host),
458 cancel: CancellationToken::new(),
459 cancels: self.turn_cancels.clone(),
460 batch_ids: Vec::new(),
461 drain_id: None,
462 }
463 }
464
465 pub fn cancel_running_turns(&self) -> usize {
480 self.turn_cancels.cancel_all()
481 }
482
483 pub fn admin(&self) -> SessionAdmin {
484 SessionAdmin {
485 runtime: self.runtime.clone(),
486 }
487 }
488
489 pub async fn configure(&self, patch: SessionConfigPatch) -> Result<()> {
490 self.admin().config().update(patch).await
491 }
492
493 pub fn tools(&self) -> ToolAdmin {
494 ToolAdmin::new(self.admin())
495 }
496
497 pub fn commands(&self) -> SessionCommandAdmin {
498 self.admin().commands()
499 }
500
501 pub fn triggers(&self) -> SessionTriggerAdmin {
502 self.admin().triggers()
503 }
504
505 pub fn processes(&self) -> SessionProcessAdmin {
506 SessionProcessAdmin::new(self.admin())
507 }
508
509 pub fn plugin_actions(&self) -> PluginActions {
510 PluginActions {
511 control: self.admin(),
512 }
513 }
514
515 pub fn enqueue(&self, input: TurnInput) -> EnqueueTurnBuilder<'_> {
516 EnqueueTurnBuilder {
517 session: self,
518 input,
519 id: None,
520 delivery_policy: DeliveryPolicy::AfterCurrentTurnCommit,
521 slot_policy: SlotPolicy::Exclusive,
522 }
523 }
524
525 pub async fn queued_work(&self) -> Result<Vec<QueuedWorkBatch>> {
526 let observation = self.runtime.observe();
527 let store = observation.queue_store.as_ref().ok_or_else(|| {
528 EmbedError::Runtime(lash_core::RuntimeError::new(
529 lash_core::RuntimeErrorCode::StoreCommitFailed,
530 "queued work inspection requires a persistent runtime store",
531 ))
532 })?;
533 store
534 .list_pending_queued_work(observation.session_id())
535 .await
536 .map_err(|err| {
537 EmbedError::Runtime(lash_core::RuntimeError::new(
538 lash_core::RuntimeErrorCode::StoreCommitFailed,
539 err.to_string(),
540 ))
541 })
542 }
543
544 pub async fn cancel_queued_work_batch(
545 &self,
546 batch_id: &str,
547 ) -> Result<Option<QueuedWorkBatch>> {
548 let session_id = self.session_id();
549 self.runtime
550 .cancel_queued_work_batch(&session_id, batch_id)
551 .await
552 .map_err(EmbedError::Runtime)
553 }
554
555 pub async fn await_queued_work_batch(&self, batch_id: &str) -> Result<()> {
567 let observation = self.runtime.observe();
568 let store = observation.queue_store.clone().ok_or_else(|| {
569 EmbedError::Runtime(lash_core::RuntimeError::new(
570 lash_core::RuntimeErrorCode::StoreCommitFailed,
571 "queued work inspection requires a persistent runtime store",
572 ))
573 })?;
574 let session_id = observation.session_id().to_string();
575 drop(observation);
576 let mut delay = std::time::Duration::from_millis(25);
577 loop {
578 let pending = store
579 .list_pending_queued_work(&session_id)
580 .await
581 .map_err(|err| {
582 EmbedError::Runtime(lash_core::RuntimeError::new(
583 lash_core::RuntimeErrorCode::StoreCommitFailed,
584 err.to_string(),
585 ))
586 })?;
587 if !pending.iter().any(|batch| batch.batch_id == batch_id) {
588 return Ok(());
589 }
590 tokio::time::sleep(delay).await;
591 delay = (delay * 2).min(std::time::Duration::from_millis(400));
592 }
593 }
594
595 pub fn read_view(&self) -> SessionReadView {
596 self.runtime.observe().read_view.clone()
597 }
598
599 pub fn usage_report(&self) -> SessionUsageReport {
600 self.runtime.observe().usage_report.clone()
601 }
602
603 pub async fn set_turn_phase_probe(
604 &self,
605 probe: Arc<dyn lash_core::runtime::RuntimeTurnPhaseProbe>,
606 ) {
607 let writer = self.runtime.writer();
608 let mut runtime = writer.lock().await;
609 runtime.set_turn_phase_probe(Arc::clone(&probe));
610 self.runtime.publish_from(&runtime);
611 if let Some(slot) = &self.process_phase_probe_slot {
612 let observation = self.runtime.observe();
613 slot.set_for_session(observation.session_id(), Arc::clone(&probe));
614 let current_frame = observation.persisted_state.current_agent_frame_id.as_str();
615 if !current_frame.is_empty() {
616 let scope = lash_core::SessionScope::for_agent_frame(
617 observation.session_id(),
618 current_frame,
619 );
620 slot.set_for_scope(&scope, probe);
621 }
622 }
623 }
624}
625
626#[derive(Clone)]
627pub struct ObservableSession {
628 pub(crate) runtime: RuntimeHandle,
629}
630
631impl ObservableSession {
632 fn snapshot(&self) -> Arc<RuntimeObservation> {
633 self.runtime.observe()
634 }
635
636 pub fn current_observation(&self) -> SessionObservation {
637 self.runtime.current_session_observation()
638 }
639
640 pub fn resume_from_cursor(&self, cursor: &SessionCursor) -> Result<SessionResume> {
641 self.runtime
642 .resume_session_observation(cursor)
643 .map_err(live_replay_error)
644 }
645
646 pub fn subscribe_from_cursor(
647 &self,
648 cursor: &SessionCursor,
649 ) -> Result<SessionObservationSubscription> {
650 self.runtime
651 .subscribe_session_observation(cursor)
652 .map_err(live_replay_error)
653 }
654
655 pub fn session_id(&self) -> String {
656 self.snapshot().session_id().to_string()
657 }
658
659 pub fn policy_snapshot(&self) -> SessionPolicy {
660 self.snapshot().policy.clone()
661 }
662
663 pub fn read_view(&self) -> SessionReadView {
664 self.snapshot().read_view.clone()
665 }
666
667 pub fn usage_report(&self) -> SessionUsageReport {
668 self.snapshot().usage_report.clone()
669 }
670
671 pub fn tool_state(&self) -> Option<ToolState> {
672 self.snapshot().tool_state.clone()
673 }
674
675 pub fn active_tool_manifests(&self) -> Vec<ToolManifest> {
676 self.snapshot()
677 .tool_state
678 .as_ref()
679 .map(ToolState::tool_manifests)
680 .unwrap_or_default()
681 }
682
683 pub async fn list_process_handles(&self) -> Vec<ProcessHandleSummary> {
684 self.snapshot().list_process_handles().await
685 }
686
687 pub async fn list_all_process_handles(&self) -> Vec<ProcessHandleSummary> {
688 self.snapshot().list_all_process_handles().await
689 }
690
691 pub fn process_scope(&self) -> SessionScope {
692 self.snapshot().process_scope()
693 }
694}
695
696fn live_replay_error(err: lash_core::LiveReplayStoreError) -> EmbedError {
697 EmbedError::Runtime(lash_core::RuntimeError::new(
698 RuntimeErrorCode::Other("live_replay".to_string()),
699 err.to_string(),
700 ))
701}
702
703pub struct EnqueueTurnBuilder<'a> {
704 session: &'a LashSession,
705 input: TurnInput,
706 id: Option<String>,
707 delivery_policy: DeliveryPolicy,
708 slot_policy: SlotPolicy,
709}
710
711impl<'a> EnqueueTurnBuilder<'a> {
712 pub fn id(mut self, id: impl Into<String>) -> Self {
713 self.id = Some(id.into());
714 self
715 }
716
717 pub fn delivery_policy(mut self, policy: DeliveryPolicy) -> Self {
718 self.delivery_policy = policy;
719 self
720 }
721
722 pub fn slot_policy(mut self, policy: SlotPolicy) -> Self {
723 self.slot_policy = policy;
724 self
725 }
726
727 pub async fn send(self) -> Result<QueuedWorkBatch> {
728 let source_key = self.id.map(|id| format!("host:{id}"));
729 self.session
730 .runtime
731 .enqueue_turn_input(
732 self.input,
733 self.delivery_policy,
734 self.slot_policy,
735 source_key,
736 )
737 .await
738 .map_err(EmbedError::Runtime)
739 }
740}
741
742impl<'a> std::future::IntoFuture for EnqueueTurnBuilder<'a> {
743 type Output = Result<QueuedWorkBatch>;
744 type IntoFuture =
745 std::pin::Pin<Box<dyn std::future::Future<Output = Result<QueuedWorkBatch>> + 'a>>;
746
747 fn into_future(self) -> Self::IntoFuture {
748 Box::pin(self.send())
749 }
750}