1use crate::support::*;
2use lash_core::runtime::{DeliveryPolicy, QueuedWorkBatch, SlotPolicy};
3
4pub struct SessionBuilder {
5 pub(crate) core: LashCore,
6 pub(crate) session_id: String,
7 pub(crate) spec: SessionSpec,
8 pub(crate) mode: Option<ModeId>,
9 pub(crate) parent_session_id: Option<String>,
10 pub(crate) store: Option<Arc<dyn RuntimePersistence>>,
11 pub(crate) provider: Option<ProviderHandle>,
12 pub(crate) active_plugins: Vec<ActivePluginBinding>,
13 pub(crate) plugin_factories: Vec<Arc<dyn PluginFactory>>,
14 pub(crate) rlm_final_answer_format: Option<lash_rlm_types::RlmFinalAnswerFormat>,
15}
16
17impl SessionBuilder {
18 pub fn standard(mut self) -> Self {
19 self.mode = Some(ModeId::standard());
20 self
21 }
22
23 pub fn rlm(mut self) -> Self {
24 self.mode = Some(ModeId::rlm());
25 self
26 }
27
28 pub fn mode(mut self, mode: ModeId) -> Self {
29 self.mode = Some(mode);
30 self
31 }
32
33 pub fn provider(mut self, provider: ProviderHandle) -> Self {
34 self.spec = self.spec.provider_id(provider.kind());
35 self.provider = Some(provider);
36 self
37 }
38
39 pub fn session_spec(mut self, spec: SessionSpec) -> Self {
40 self.spec = spec;
41 self
42 }
43
44 pub fn parent(mut self, parent_session_id: impl Into<String>) -> Self {
45 self.parent_session_id = Some(parent_session_id.into());
46 self
47 }
48
49 pub fn store(mut self, store: Arc<dyn RuntimePersistence>) -> Self {
56 self.store = Some(store);
57 self
58 }
59
60 pub fn plugin<P: PluginBinding>(mut self, config: P::SessionConfig) -> Self {
61 self.active_plugins.push(ActivePluginBinding {
62 id: P::ID,
63 requires_turn_input: P::requires_turn_input(&config),
64 });
65 self.plugin_factories.push(P::factory(&config));
66 self
67 }
68
69 pub async fn open(self) -> Result<LashSession> {
70 let (policy, mode) = self.session_policy()?;
71 let store = self.create_store(&policy).await?;
72 let mut state = self
73 .load_or_default_state(&policy, store.as_deref())
74 .await?;
75 self.apply_rlm_session_options(&mode, &mut state)?;
76 self.open_resolved(policy, mode, state, store).await
77 }
78
79 pub async fn open_fresh(self) -> Result<LashSession> {
89 let (policy, mode) = self.session_policy()?;
90 let store = self.create_store(&policy).await?;
91 let mut state = RuntimeSessionState {
92 session_id: self.session_id.clone(),
93 policy: policy.clone(),
94 graph_replace_required: true,
95 ..RuntimeSessionState::default()
96 };
97 self.apply_rlm_session_options(&mode, &mut state)?;
98 self.open_resolved(policy, mode, state, store).await
99 }
100
101 pub async fn open_with_state(self, mut state: RuntimeSessionState) -> Result<LashSession> {
108 let (policy, mode) = self.session_policy()?;
109 let store = self.create_store(&policy).await?;
110 if state.session_id != self.session_id {
111 return Err(EmbedError::StoreSessionMismatch {
112 loaded: state.session_id,
113 requested: self.session_id,
114 });
115 }
116 let recorded_provider_id = state.policy.recorded_provider_id().to_string();
117 state.policy = policy.clone();
118 state.policy.provider_id = recorded_provider_id;
119 self.apply_rlm_session_options(&mode, &mut state)?;
120 self.open_resolved(policy, mode, state, store).await
121 }
122
123 fn session_policy(&self) -> Result<(SessionPolicy, ModeId)> {
124 let mode = self
125 .mode
126 .clone()
127 .unwrap_or_else(|| self.core.default_mode.clone());
128 if !self.core.modes.contains_key(&mode) {
129 return Err(EmbedError::ModeNotInstalled { mode });
130 }
131 let mut policy = self.spec.resolve_against(&self.core.policy);
132 policy.session_id = Some(self.session_id.clone());
133 Ok((policy, mode))
134 }
135
136 async fn load_or_default_state(
137 &self,
138 policy: &SessionPolicy,
139 store: Option<&dyn RuntimePersistence>,
140 ) -> Result<RuntimeSessionState> {
141 let state = match store {
142 Some(store) => {
143 let loaded = self.load_persisted_state_for_residency(store).await?;
144 let mut state = loaded.unwrap_or_else(|| RuntimeSessionState {
145 session_id: self.session_id.clone(),
146 policy: policy.clone(),
147 ..RuntimeSessionState::default()
148 });
149 if state.session_id != self.session_id {
150 return Err(EmbedError::StoreSessionMismatch {
151 loaded: state.session_id,
152 requested: self.session_id.clone(),
153 });
154 }
155 let recorded_provider_id = state.policy.recorded_provider_id().to_string();
156 state.policy = policy.clone();
157 state.policy.provider_id = recorded_provider_id;
158 state
159 }
160 None => RuntimeSessionState {
161 session_id: self.session_id.clone(),
162 policy: policy.clone(),
163 ..RuntimeSessionState::default()
164 },
165 };
166 Ok(state)
167 }
168
169 async fn load_persisted_state_for_residency(
170 &self,
171 store: &dyn RuntimePersistence,
172 ) -> Result<Option<RuntimeSessionState>> {
173 match self.core.env.residency {
174 Residency::KeepAll => {
175 let loaded = lash_core::store::load_persisted_session_state(store)
176 .await
177 .map_err(|err| {
178 SessionError::Protocol(format!("failed to load store: {err}"))
179 })?;
180 Ok(loaded)
181 }
182 Residency::ActivePathOnly => {
183 let active =
184 lash_core::store::load_persisted_session_state_active_path(store, None)
185 .await
186 .map_err(|err| {
187 SessionError::Protocol(format!(
188 "failed to load active-path store: {err}"
189 ))
190 })?;
191 if active
192 .as_ref()
193 .is_some_and(|state| state.session_graph.nodes.is_empty())
194 {
195 let mut full = lash_core::store::load_persisted_session_state(store)
196 .await
197 .map_err(|err| {
198 SessionError::Protocol(format!(
199 "failed to heal active-path store from full graph: {err}"
200 ))
201 })?;
202 if let Some(state) = full.as_mut() {
203 state.graph_replace_required = true;
204 }
205 return Ok(full);
206 }
207 Ok(active)
208 }
209 }
210 }
211
212 fn apply_rlm_session_options(
213 &self,
214 mode: &ModeId,
215 state: &mut RuntimeSessionState,
216 ) -> Result<()> {
217 let Some(final_answer_format) = self.rlm_session_final_answer_format(mode) else {
218 return Ok(());
219 };
220 let mut extras = if state.protocol_turn_options.is_empty() {
221 lash_rlm_types::RlmCreateExtras::default()
222 } else {
223 state.protocol_turn_options.decode()?
224 };
225 extras.final_answer_format = Some(final_answer_format);
226 let options = ProtocolTurnOptions::typed(extras)?;
227 state.protocol_turn_options = options.clone();
228 for frame in &mut state.agent_frames {
229 frame.protocol_turn_options = options.clone();
230 }
231 Ok(())
232 }
233
234 fn rlm_session_final_answer_format(
235 &self,
236 mode: &ModeId,
237 ) -> Option<lash_rlm_types::RlmFinalAnswerFormat> {
238 if mode != &ModeId::rlm() {
239 return None;
240 }
241 self.rlm_final_answer_format.clone().or_else(|| {
242 if self.parent_session_id.is_none() {
243 Some(lash_rlm_types::RlmFinalAnswerFormat::Markdown)
244 } else {
245 Some(lash_rlm_types::RlmFinalAnswerFormat::RawSubmitValue)
246 }
247 })
248 }
249
250 async fn open_resolved(
251 self,
252 policy: SessionPolicy,
253 mode: ModeId,
254 state: RuntimeSessionState,
255 store: Option<Arc<dyn RuntimePersistence>>,
256 ) -> Result<LashSession> {
257 let mut env = self.core.env.clone();
258 if let Some(provider) = self.provider.clone().or_else(|| self.core.provider.clone()) {
259 env.core.providers.provider_resolver =
260 Arc::new(lash_core::SingleProviderResolver::new(provider));
261 }
262 let plugin_host = build_plugin_host_for_mode(
263 &self.core.modes,
264 &mode,
265 self.core.plugin_factories.as_ref(),
266 self.plugin_factories,
267 env.process_registry.is_some(),
268 )?;
269 env.plugin_host = Some(Arc::new(plugin_host));
270 let effect_host = Arc::clone(&env.core.control.effect_host);
271 env.process_work_poke = self.core.process_work_runner.poke().await;
276 let runtime = LashRuntime::from_environment(&env, policy, state, store).await?;
277 let handle = RuntimeHandle::with_live_replay_store(
278 runtime,
279 Arc::clone(&self.core.live_replay_store),
280 );
281 Ok(LashSession {
282 runtime: handle,
283 effect_host,
284 mode,
285 parent_session_id: self.parent_session_id,
286 active_plugins: self.active_plugins,
287 process_phase_probe_slot: self.core.process_work_runner.phase_probe_slot(),
288 turn_cancels: crate::turn::TurnCancelRegistry::default(),
289 })
290 }
291
292 async fn create_store(
293 &self,
294 policy: &SessionPolicy,
295 ) -> Result<Option<Arc<dyn RuntimePersistence>>> {
296 if let Some(store) = self.store.as_ref() {
297 return Ok(Some(Arc::clone(store)));
298 }
299 let Some(factory) = self.core.store_factory.as_ref() else {
300 return Ok(None);
301 };
302 let request = SessionStoreCreateRequest {
303 session_id: self.session_id.clone(),
304 relation: self
305 .parent_session_id
306 .as_ref()
307 .map(|parent_session_id| lash_core::SessionRelation::Child {
308 parent_session_id: parent_session_id.clone(),
309 caused_by: None,
310 })
311 .unwrap_or_default(),
312 policy: policy.clone(),
313 };
314 factory
315 .create_store(&request)
316 .await
317 .map(Some)
318 .map_err(|message| EmbedError::StoreFactory {
319 session_id: self.session_id.clone(),
320 message,
321 })
322 }
323}
324
325impl PromptLayerSink for SessionBuilder {
326 fn prompt_layer_mut(&mut self) -> &mut PromptLayer {
327 self.spec.prompt.get_or_insert_with(PromptLayer::new)
328 }
329}
330
331#[derive(Clone)]
332pub struct LashSession {
333 pub(crate) runtime: RuntimeHandle,
334 pub(crate) effect_host: Arc<dyn EffectHost>,
335 pub(crate) mode: ModeId,
336 pub(crate) parent_session_id: Option<String>,
337 pub(crate) active_plugins: Vec<ActivePluginBinding>,
338 pub(crate) process_phase_probe_slot: Option<lash_core::runtime::RuntimeTurnPhaseProbeSlot>,
339 pub(crate) turn_cancels: crate::turn::TurnCancelRegistry,
340}
341
342#[derive(Clone, Debug, Default)]
343pub struct SessionConfigPatch {
344 pub provider: Option<ProviderHandle>,
345 pub model: Option<ModelSpec>,
346 pub prompt: Option<PromptLayer>,
347}
348
349impl LashSession {
350 pub async fn close(self) -> Result<()> {
351 let runtime = self.runtime.writer();
352 let runtime = runtime.lock().await;
353 runtime.unregister_plugin_session()?;
354 Ok(())
355 }
356
357 pub fn session_id(&self) -> String {
358 self.runtime.observe().session_id().to_string()
359 }
360
361 pub fn policy_snapshot(&self) -> SessionPolicy {
362 self.runtime.observe().policy.clone()
363 }
364
365 pub fn observe(&self) -> ObservableSession {
366 ObservableSession {
367 runtime: self.runtime.clone(),
368 }
369 }
370
371 pub fn mode(&self) -> &ModeId {
372 &self.mode
373 }
374
375 pub fn parent_session_id(&self) -> Option<&str> {
376 self.parent_session_id.as_deref()
377 }
378
379 pub fn effect_host(&self) -> Arc<dyn EffectHost> {
380 Arc::clone(&self.effect_host)
381 }
382
383 pub fn turn(&self, input: TurnInput) -> TurnBuilder {
384 TurnBuilder {
385 runtime: self.runtime.clone(),
386 effect_host: Arc::clone(&self.effect_host),
387 active_plugins: self.active_plugins.clone(),
388 input,
389 cancel: CancellationToken::new(),
390 cancels: self.turn_cancels.clone(),
391 protocol_turn_options: None,
392 provider: None,
393 model: None,
394 turn_id: None,
395 }
396 }
397
398 pub fn queued_turn(&self) -> QueuedTurnBuilder {
399 QueuedTurnBuilder {
400 runtime: self.runtime.clone(),
401 effect_host: Arc::clone(&self.effect_host),
402 cancel: CancellationToken::new(),
403 cancels: self.turn_cancels.clone(),
404 batch_ids: Vec::new(),
405 drain_id: None,
406 }
407 }
408
409 pub fn cancel_running_turns(&self) -> usize {
424 self.turn_cancels.cancel_all()
425 }
426
427 pub fn admin(&self) -> SessionAdmin {
428 SessionAdmin {
429 runtime: self.runtime.clone(),
430 }
431 }
432
433 pub async fn configure(&self, patch: SessionConfigPatch) -> Result<()> {
434 self.admin().config().update(patch).await
435 }
436
437 pub fn tools(&self) -> ToolAdmin {
438 ToolAdmin::new(self.admin())
439 }
440
441 pub fn commands(&self) -> SessionCommandAdmin {
442 self.admin().commands()
443 }
444
445 pub fn triggers(&self) -> SessionTriggerAdmin {
446 self.admin().triggers()
447 }
448
449 pub fn processes(&self) -> SessionProcessAdmin {
450 SessionProcessAdmin::new(self.admin())
451 }
452
453 pub fn plugin_actions(&self) -> PluginActions {
454 PluginActions {
455 control: self.admin(),
456 }
457 }
458
459 pub fn enqueue(&self, input: TurnInput) -> EnqueueTurnBuilder<'_> {
460 EnqueueTurnBuilder {
461 session: self,
462 input,
463 id: None,
464 delivery_policy: DeliveryPolicy::AfterCurrentTurnCommit,
465 slot_policy: SlotPolicy::Exclusive,
466 }
467 }
468
469 pub async fn queued_work(&self) -> Result<Vec<QueuedWorkBatch>> {
470 let observation = self.runtime.observe();
471 let store = observation.queue_store.as_ref().ok_or_else(|| {
472 EmbedError::Runtime(lash_core::RuntimeError::new(
473 lash_core::RuntimeErrorCode::StoreCommitFailed,
474 "queued work inspection requires a persistent runtime store",
475 ))
476 })?;
477 store
478 .list_pending_queued_work(observation.session_id())
479 .await
480 .map_err(|err| {
481 EmbedError::Runtime(lash_core::RuntimeError::new(
482 lash_core::RuntimeErrorCode::StoreCommitFailed,
483 err.to_string(),
484 ))
485 })
486 }
487
488 pub async fn cancel_queued_work_batch(
489 &self,
490 batch_id: &str,
491 ) -> Result<Option<QueuedWorkBatch>> {
492 let session_id = self.session_id();
493 self.runtime
494 .cancel_queued_work_batch(&session_id, batch_id)
495 .await
496 .map_err(EmbedError::Runtime)
497 }
498
499 pub async fn await_queued_work_batch(&self, batch_id: &str) -> Result<()> {
511 let observation = self.runtime.observe();
512 let store = observation.queue_store.clone().ok_or_else(|| {
513 EmbedError::Runtime(lash_core::RuntimeError::new(
514 lash_core::RuntimeErrorCode::StoreCommitFailed,
515 "queued work inspection requires a persistent runtime store",
516 ))
517 })?;
518 let session_id = observation.session_id().to_string();
519 drop(observation);
520 let mut delay = std::time::Duration::from_millis(25);
521 loop {
522 let pending = store
523 .list_pending_queued_work(&session_id)
524 .await
525 .map_err(|err| {
526 EmbedError::Runtime(lash_core::RuntimeError::new(
527 lash_core::RuntimeErrorCode::StoreCommitFailed,
528 err.to_string(),
529 ))
530 })?;
531 if !pending.iter().any(|batch| batch.batch_id == batch_id) {
532 return Ok(());
533 }
534 tokio::time::sleep(delay).await;
535 delay = (delay * 2).min(std::time::Duration::from_millis(400));
536 }
537 }
538
539 pub fn read_view(&self) -> SessionReadView {
540 self.runtime.observe().read_view.clone()
541 }
542
543 pub fn usage_report(&self) -> SessionUsageReport {
544 self.runtime.observe().usage_report.clone()
545 }
546
547 pub async fn set_turn_phase_probe(
548 &self,
549 probe: Arc<dyn lash_core::runtime::RuntimeTurnPhaseProbe>,
550 ) {
551 let writer = self.runtime.writer();
552 let mut runtime = writer.lock().await;
553 runtime.set_turn_phase_probe(Arc::clone(&probe));
554 self.runtime.publish_from(&runtime);
555 if let Some(slot) = &self.process_phase_probe_slot {
556 let observation = self.runtime.observe();
557 slot.set_for_session(observation.session_id(), Arc::clone(&probe));
558 let current_frame = observation.persisted_state.current_agent_frame_id.as_str();
559 if !current_frame.is_empty() {
560 let scope = lash_core::SessionScope::for_agent_frame(
561 observation.session_id(),
562 current_frame,
563 );
564 slot.set_for_scope(&scope, probe);
565 }
566 }
567 }
568}
569
570#[derive(Clone)]
571pub struct ObservableSession {
572 pub(crate) runtime: RuntimeHandle,
573}
574
575impl ObservableSession {
576 fn snapshot(&self) -> Arc<RuntimeObservation> {
577 self.runtime.observe()
578 }
579
580 pub fn current_observation(&self) -> SessionObservation {
581 self.runtime.current_session_observation()
582 }
583
584 pub fn resume_from_cursor(&self, cursor: &SessionCursor) -> Result<SessionResume> {
585 self.runtime
586 .resume_session_observation(cursor)
587 .map_err(live_replay_error)
588 }
589
590 pub fn subscribe_from_cursor(
591 &self,
592 cursor: &SessionCursor,
593 ) -> Result<SessionObservationSubscription> {
594 self.runtime
595 .subscribe_session_observation(cursor)
596 .map_err(live_replay_error)
597 }
598
599 pub fn session_id(&self) -> String {
600 self.snapshot().session_id().to_string()
601 }
602
603 pub fn policy_snapshot(&self) -> SessionPolicy {
604 self.snapshot().policy.clone()
605 }
606
607 pub fn read_view(&self) -> SessionReadView {
608 self.snapshot().read_view.clone()
609 }
610
611 pub fn usage_report(&self) -> SessionUsageReport {
612 self.snapshot().usage_report.clone()
613 }
614
615 pub fn tool_state(&self) -> Option<ToolState> {
616 self.snapshot().tool_state.clone()
617 }
618
619 pub fn active_tool_manifests(&self) -> Vec<ToolManifest> {
620 self.snapshot()
621 .tool_state
622 .as_ref()
623 .map(ToolState::tool_manifests)
624 .unwrap_or_default()
625 }
626
627 pub async fn list_process_handles(&self) -> Vec<ProcessHandleSummary> {
628 self.snapshot().list_process_handles().await
629 }
630
631 pub async fn list_all_process_handles(&self) -> Vec<ProcessHandleSummary> {
632 self.snapshot().list_all_process_handles().await
633 }
634
635 pub fn process_scope(&self) -> SessionScope {
636 self.snapshot().process_scope()
637 }
638}
639
640fn live_replay_error(err: lash_core::LiveReplayStoreError) -> EmbedError {
641 EmbedError::Runtime(lash_core::RuntimeError::new(
642 RuntimeErrorCode::Other("live_replay".to_string()),
643 err.to_string(),
644 ))
645}
646
647pub struct EnqueueTurnBuilder<'a> {
648 session: &'a LashSession,
649 input: TurnInput,
650 id: Option<String>,
651 delivery_policy: DeliveryPolicy,
652 slot_policy: SlotPolicy,
653}
654
655impl<'a> EnqueueTurnBuilder<'a> {
656 pub fn id(mut self, id: impl Into<String>) -> Self {
657 self.id = Some(id.into());
658 self
659 }
660
661 pub fn delivery_policy(mut self, policy: DeliveryPolicy) -> Self {
662 self.delivery_policy = policy;
663 self
664 }
665
666 pub fn slot_policy(mut self, policy: SlotPolicy) -> Self {
667 self.slot_policy = policy;
668 self
669 }
670
671 pub async fn send(self) -> Result<QueuedWorkBatch> {
672 let source_key = self.id.map(|id| format!("host:{id}"));
673 self.session
674 .runtime
675 .enqueue_turn_input(
676 self.input,
677 self.delivery_policy,
678 self.slot_policy,
679 source_key,
680 )
681 .await
682 .map_err(EmbedError::Runtime)
683 }
684}
685
686impl<'a> std::future::IntoFuture for EnqueueTurnBuilder<'a> {
687 type Output = Result<QueuedWorkBatch>;
688 type IntoFuture =
689 std::pin::Pin<Box<dyn std::future::Future<Output = Result<QueuedWorkBatch>> + 'a>>;
690
691 fn into_future(self) -> Self::IntoFuture {
692 Box::pin(self.send())
693 }
694}