1use std::pin::Pin;
2use std::task::{Context, Poll};
3
4use crate::support::*;
5use futures_util::Stream;
6use lash_core::runtime::{DeliveryPolicy, QueuedWorkBatch, SlotPolicy};
7use lash_core::{LiveReplayGap, LiveReplayStoreError, SessionObservationEvent};
8
9pub struct SessionBuilder {
10 pub(crate) core: LashCore,
11 pub(crate) session_id: String,
12 pub(crate) spec: SessionSpec,
13 pub(crate) parent_session_id: Option<String>,
14 pub(crate) session_execution_owner_id: Option<String>,
15 pub(crate) store: Option<Arc<dyn RuntimePersistence>>,
16 pub(crate) provider: Option<ProviderHandle>,
17 pub(crate) active_plugins: Vec<ActivePluginBinding>,
18 pub(crate) plugin_factories: Vec<Arc<dyn PluginFactory>>,
19}
20
21#[cfg(feature = "rlm")]
22pub struct RlmSessionBuilder {
23 pub(crate) builder: SessionBuilder,
24 pub(crate) rlm_final_answer_format: Option<lash_rlm_types::RlmFinalAnswerFormat>,
25}
26
27impl SessionBuilder {
28 pub fn provider(mut self, provider: ProviderHandle) -> Self {
29 self.spec = self.spec.provider_id(provider.kind());
30 self.provider = Some(provider);
31 self
32 }
33
34 pub fn session_spec(mut self, spec: SessionSpec) -> Self {
35 self.spec = spec;
36 self
37 }
38
39 pub fn parent(mut self, parent_session_id: impl Into<String>) -> Self {
40 self.parent_session_id = Some(parent_session_id.into());
41 self
42 }
43
44 pub fn session_execution_owner_id(mut self, owner_id: impl Into<String>) -> Self {
52 self.session_execution_owner_id = Some(owner_id.into());
53 self
54 }
55
56 pub fn store(mut self, store: Arc<dyn RuntimePersistence>) -> Self {
63 self.store = Some(store);
64 self
65 }
66
67 pub fn plugin<P: PluginBinding>(mut self, config: P::SessionConfig) -> Self {
68 self.active_plugins.push(ActivePluginBinding {
69 id: P::ID,
70 requires_turn_input: P::requires_turn_input(&config),
71 });
72 self.plugin_factories.push(P::factory(&config));
73 self
74 }
75
76 pub async fn open(self) -> Result<LashSession> {
77 let policy = self.session_policy();
78 let store = self.create_store(&policy).await?;
79 let state = self
80 .load_or_default_state(&policy, store.as_deref())
81 .await?;
82 self.open_resolved(policy, state, store).await
83 }
84
85 pub async fn open_fresh(self) -> Result<LashSession> {
95 let policy = self.session_policy();
96 let store = self.create_store(&policy).await?;
97 let state = RuntimeSessionState {
98 session_id: self.session_id.clone(),
99 policy: policy.clone(),
100 graph_replace_required: true,
101 ..RuntimeSessionState::default()
102 };
103 self.open_resolved(policy, state, store).await
104 }
105
106 pub async fn open_with_state(self, mut state: RuntimeSessionState) -> Result<LashSession> {
113 let policy = self.session_policy();
114 let store = self.create_store(&policy).await?;
115 if state.session_id != self.session_id {
116 return Err(EmbedError::StoreSessionMismatch {
117 loaded: state.session_id,
118 requested: self.session_id,
119 });
120 }
121 let recorded_provider_id = state.policy.recorded_provider_id().to_string();
122 state.policy = policy.clone();
123 state.policy.provider_id = recorded_provider_id;
124 self.open_resolved(policy, state, store).await
125 }
126
127 fn session_policy(&self) -> SessionPolicy {
128 let mut policy = self.spec.resolve_against(&self.core.policy);
129 policy.session_id = Some(self.session_id.clone());
130 policy
131 }
132
133 async fn load_or_default_state(
134 &self,
135 policy: &SessionPolicy,
136 store: Option<&dyn RuntimePersistence>,
137 ) -> Result<RuntimeSessionState> {
138 let state = match store {
139 Some(store) => {
140 let loaded = self.load_persisted_state_for_residency(store).await?;
141 let mut state = loaded.unwrap_or_else(|| RuntimeSessionState {
142 session_id: self.session_id.clone(),
143 policy: policy.clone(),
144 ..RuntimeSessionState::default()
145 });
146 if state.session_id != self.session_id {
147 return Err(EmbedError::StoreSessionMismatch {
148 loaded: state.session_id,
149 requested: self.session_id.clone(),
150 });
151 }
152 let recorded_provider_id = state.policy.recorded_provider_id().to_string();
153 state.policy = policy.clone();
154 state.policy.provider_id = recorded_provider_id;
155 state
156 }
157 None => RuntimeSessionState {
158 session_id: self.session_id.clone(),
159 policy: policy.clone(),
160 ..RuntimeSessionState::default()
161 },
162 };
163 Ok(state)
164 }
165
166 async fn load_persisted_state_for_residency(
167 &self,
168 store: &dyn RuntimePersistence,
169 ) -> Result<Option<RuntimeSessionState>> {
170 load_persisted_state_for_residency(self.core.env.residency, store).await
171 }
172
173 async fn open_resolved(
174 self,
175 policy: SessionPolicy,
176 state: RuntimeSessionState,
177 store: Option<Arc<dyn RuntimePersistence>>,
178 ) -> Result<LashSession> {
179 let mut env = self.core.env.clone();
180 if let Some(provider) = self.provider.clone().or_else(|| self.core.provider.clone()) {
181 env.core.providers.provider_resolver =
182 Arc::new(lash_core::SingleProviderResolver::new(provider));
183 }
184 let plugin_host = build_plugin_host(
185 self.core.protocol_factory.as_ref(),
186 self.core.plugin_factories.as_ref(),
187 self.plugin_factories,
188 )?;
189 env.core = self
190 .core
191 .runtime_host_for_plugin_host(env.core.clone(), &plugin_host)?;
192 env.plugin_host = Some(Arc::new(plugin_host));
193 let effect_host = Arc::clone(&env.core.control.effect_host);
194 let drivers = self.core.work_driver.drivers().await;
195 env.process_work_driver = drivers.process.clone();
196 env.queued_work_driver = drivers.queued.clone();
197 let mut runtime = LashRuntime::from_environment(&env, policy, state, store).await?;
198 if let Some(owner_id) = self.session_execution_owner_id {
199 runtime.set_runtime_scope_id(owner_id);
200 }
201 if drivers.drive_process_on_open
202 && let Some(driver) = drivers.process.as_ref()
203 {
204 driver.claim_and_run_pending("session_open").await?;
205 }
206 let handle = RuntimeHandle::with_live_replay_store(
207 runtime,
208 Arc::clone(&self.core.live_replay_store),
209 );
210 Ok(LashSession {
211 runtime: handle,
212 effect_host,
213 parent_session_id: self.parent_session_id,
214 active_plugins: self.active_plugins,
215 process_phase_probe_slot: self.core.work_driver.phase_probe_slot(),
216 turn_cancels: crate::turn::TurnCancelRegistry::default(),
217 })
218 }
219
220 async fn create_store(
221 &self,
222 policy: &SessionPolicy,
223 ) -> Result<Option<Arc<dyn RuntimePersistence>>> {
224 if let Some(store) = self.store.as_ref() {
225 return Ok(Some(Arc::clone(store)));
226 }
227 let Some(factory) = self.core.store_factory.as_ref() else {
228 return Ok(None);
229 };
230 let request = SessionStoreCreateRequest {
231 session_id: self.session_id.clone(),
232 relation: self
233 .parent_session_id
234 .as_ref()
235 .map(|parent_session_id| lash_core::SessionRelation::Child {
236 parent_session_id: parent_session_id.clone(),
237 caused_by: None,
238 })
239 .unwrap_or_default(),
240 policy: policy.clone(),
241 };
242 factory
243 .create_store(&request)
244 .await
245 .map(Some)
246 .map_err(|message| EmbedError::StoreFactory {
247 session_id: self.session_id.clone(),
248 message,
249 })
250 }
251}
252
253pub(crate) async fn load_state_for_residency(
254 residency: Residency,
255 session_id: &str,
256 policy: &SessionPolicy,
257 store: &dyn RuntimePersistence,
258) -> Result<RuntimeSessionState> {
259 let mut state = load_persisted_state_for_residency(residency, store)
260 .await?
261 .unwrap_or_else(|| RuntimeSessionState {
262 session_id: session_id.to_string(),
263 policy: policy.clone(),
264 ..RuntimeSessionState::default()
265 });
266 if state.session_id != session_id {
267 return Err(EmbedError::StoreSessionMismatch {
268 loaded: state.session_id,
269 requested: session_id.to_string(),
270 });
271 }
272 let recorded_provider_id = state.policy.recorded_provider_id().to_string();
273 state.policy = policy.clone();
274 state.policy.provider_id = recorded_provider_id;
275 Ok(state)
276}
277
278async fn load_persisted_state_for_residency(
279 residency: Residency,
280 store: &dyn RuntimePersistence,
281) -> Result<Option<RuntimeSessionState>> {
282 match residency {
283 Residency::KeepAll => {
284 let loaded = lash_core::store::load_persisted_session_state(store)
285 .await
286 .map_err(|err| SessionError::Protocol(format!("failed to load store: {err}")))?;
287 Ok(loaded)
288 }
289 Residency::ActivePathOnly => {
290 let active = lash_core::store::load_persisted_session_state_active_path(store, None)
291 .await
292 .map_err(|err| {
293 SessionError::Protocol(format!("failed to load active-path store: {err}"))
294 })?;
295 if active
296 .as_ref()
297 .is_some_and(|state| state.session_graph.nodes.is_empty())
298 {
299 let mut full = lash_core::store::load_persisted_session_state(store)
300 .await
301 .map_err(|err| {
302 SessionError::Protocol(format!(
303 "failed to heal active-path store from full graph: {err}"
304 ))
305 })?;
306 if let Some(state) = full.as_mut() {
307 state.graph_replace_required = true;
308 }
309 return Ok(full);
310 }
311 Ok(active)
312 }
313 }
314}
315
316impl PromptLayerSink for SessionBuilder {
317 fn prompt_layer_mut(&mut self) -> &mut PromptLayer {
318 self.spec.prompt.get_or_insert_with(PromptLayer::new)
319 }
320}
321
322#[cfg(feature = "rlm")]
323impl RlmSessionBuilder {
324 pub fn provider(mut self, provider: ProviderHandle) -> Self {
325 self.builder = self.builder.provider(provider);
326 self
327 }
328
329 pub fn session_spec(mut self, spec: SessionSpec) -> Self {
330 self.builder = self.builder.session_spec(spec);
331 self
332 }
333
334 pub fn parent(mut self, parent_session_id: impl Into<String>) -> Self {
335 self.builder = self.builder.parent(parent_session_id);
336 self
337 }
338
339 pub fn session_execution_owner_id(mut self, owner_id: impl Into<String>) -> Self {
340 self.builder = self.builder.session_execution_owner_id(owner_id);
341 self
342 }
343
344 pub fn store(mut self, store: Arc<dyn RuntimePersistence>) -> Self {
345 self.builder = self.builder.store(store);
346 self
347 }
348
349 pub fn plugin<P: PluginBinding>(mut self, config: P::SessionConfig) -> Self {
350 self.builder = self.builder.plugin::<P>(config);
351 self
352 }
353
354 pub async fn open(self) -> Result<LashSession> {
355 self.open_resolved(RlmOpenState::Resume).await
356 }
357
358 pub async fn open_fresh(self) -> Result<LashSession> {
359 self.open_resolved(RlmOpenState::Fresh).await
360 }
361
362 pub async fn open_with_state(self, state: RuntimeSessionState) -> Result<LashSession> {
363 self.open_resolved(RlmOpenState::Explicit(state)).await
364 }
365
366 async fn open_resolved(self, open_state: RlmOpenState) -> Result<LashSession> {
367 let Self {
368 builder,
369 rlm_final_answer_format,
370 } = self;
371 let policy = builder.session_policy();
372 let store = builder.create_store(&policy).await?;
373 let mut state = match open_state {
374 RlmOpenState::Resume => {
375 builder
376 .load_or_default_state(&policy, store.as_deref())
377 .await?
378 }
379 RlmOpenState::Fresh => RuntimeSessionState {
380 session_id: builder.session_id.clone(),
381 policy: policy.clone(),
382 graph_replace_required: true,
383 ..RuntimeSessionState::default()
384 },
385 RlmOpenState::Explicit(mut state) => {
386 if state.session_id != builder.session_id {
387 return Err(EmbedError::StoreSessionMismatch {
388 loaded: state.session_id,
389 requested: builder.session_id.clone(),
390 });
391 }
392 let recorded_provider_id = state.policy.recorded_provider_id().to_string();
393 state.policy = policy.clone();
394 state.policy.provider_id = recorded_provider_id;
395 state
396 }
397 };
398 apply_rlm_session_options(
399 builder.parent_session_id.is_none(),
400 rlm_final_answer_format,
401 &mut state,
402 )?;
403 builder.open_resolved(policy, state, store).await
404 }
405}
406
407#[cfg(feature = "rlm")]
408impl PromptLayerSink for RlmSessionBuilder {
409 fn prompt_layer_mut(&mut self) -> &mut PromptLayer {
410 self.builder.prompt_layer_mut()
411 }
412}
413
414#[cfg(feature = "rlm")]
415#[allow(clippy::large_enum_variant)]
416enum RlmOpenState {
417 Resume,
418 Fresh,
419 Explicit(RuntimeSessionState),
420}
421
422#[cfg(feature = "rlm")]
423fn apply_rlm_session_options(
424 is_root_session: bool,
425 explicit_format: Option<lash_rlm_types::RlmFinalAnswerFormat>,
426 state: &mut RuntimeSessionState,
427) -> Result<()> {
428 let final_answer_format = explicit_format.unwrap_or({
429 if is_root_session {
430 lash_rlm_types::RlmFinalAnswerFormat::Markdown
431 } else {
432 lash_rlm_types::RlmFinalAnswerFormat::RawSubmitValue
433 }
434 });
435 let mut extras = if state.protocol_turn_options.is_empty() {
436 lash_rlm_types::RlmCreateExtras::default()
437 } else {
438 state.protocol_turn_options.decode()?
439 };
440 extras.final_answer_format = Some(final_answer_format);
441 let options = ProtocolTurnOptions::typed(extras)?;
442 state.protocol_turn_options = options.clone();
443 for frame in &mut state.agent_frames {
444 frame.protocol_turn_options = options.clone();
445 }
446 Ok(())
447}
448
449#[cfg(all(test, feature = "rlm"))]
450mod tests {
451 use super::*;
452
453 #[test]
454 fn apply_rlm_session_options_preserves_existing_termination() -> Result<()> {
455 let mut state = RuntimeSessionState {
456 protocol_turn_options: ProtocolTurnOptions::typed(lash_rlm_types::RlmCreateExtras {
457 termination: lash_rlm_types::RlmTermination::ProseOrSubmit,
458 final_answer_format: None,
459 })?,
460 ..Default::default()
461 };
462
463 apply_rlm_session_options(true, None, &mut state)?;
464
465 let extras: lash_rlm_types::RlmCreateExtras = state.protocol_turn_options.decode()?;
466 assert_eq!(
467 extras.termination,
468 lash_rlm_types::RlmTermination::ProseOrSubmit
469 );
470 assert_eq!(
471 extras.final_answer_format,
472 Some(lash_rlm_types::RlmFinalAnswerFormat::Markdown)
473 );
474 Ok(())
475 }
476}
477
478#[derive(Clone)]
479pub struct LashSession {
480 pub(crate) runtime: RuntimeHandle,
481 pub(crate) effect_host: Arc<dyn EffectHost>,
482 pub(crate) parent_session_id: Option<String>,
483 pub(crate) active_plugins: Vec<ActivePluginBinding>,
484 pub(crate) process_phase_probe_slot: Option<lash_core::runtime::RuntimeTurnPhaseProbeSlot>,
485 pub(crate) turn_cancels: crate::turn::TurnCancelRegistry,
486}
487
488#[derive(Clone, Debug, Default)]
489pub struct SessionConfigPatch {
490 pub provider: Option<ProviderHandle>,
491 pub model: Option<ModelSpec>,
492 pub prompt: Option<PromptLayer>,
493}
494
495impl LashSession {
496 pub async fn close(self) -> Result<()> {
497 let runtime = self.runtime.writer();
498 let runtime = runtime.lock().await;
499 runtime.unregister_plugin_session()?;
500 Ok(())
501 }
502
503 pub fn session_id(&self) -> String {
504 self.runtime.observe().session_id().to_string()
505 }
506
507 pub fn policy_snapshot(&self) -> SessionPolicy {
508 self.runtime.observe().policy.clone()
509 }
510
511 pub fn observe(&self) -> ObservableSession {
512 ObservableSession {
513 runtime: self.runtime.clone(),
514 }
515 }
516
517 pub fn parent_session_id(&self) -> Option<&str> {
518 self.parent_session_id.as_deref()
519 }
520
521 pub fn effect_host(&self) -> Arc<dyn EffectHost> {
522 Arc::clone(&self.effect_host)
523 }
524
525 pub fn turn(&self, input: TurnInput) -> TurnBuilder {
526 TurnBuilder {
527 runtime: self.runtime.clone(),
528 effect_host: Arc::clone(&self.effect_host),
529 active_plugins: self.active_plugins.clone(),
530 input,
531 cancel: CancellationToken::new(),
532 cancels: self.turn_cancels.clone(),
533 protocol_turn_options: None,
534 provider: None,
535 model: None,
536 turn_id: None,
537 }
538 }
539
540 pub fn queued_turn(&self) -> QueuedTurnBuilder {
541 QueuedTurnBuilder {
542 runtime: self.runtime.clone(),
543 effect_host: Arc::clone(&self.effect_host),
544 cancel: CancellationToken::new(),
545 cancels: self.turn_cancels.clone(),
546 batch_ids: Vec::new(),
547 drain_id: None,
548 }
549 }
550
551 pub fn cancel_running_turns(&self) -> usize {
566 self.turn_cancels.cancel_all()
567 }
568
569 pub fn admin(&self) -> SessionAdmin {
570 SessionAdmin {
571 runtime: self.runtime.clone(),
572 }
573 }
574
575 pub async fn configure(&self, patch: SessionConfigPatch) -> Result<()> {
576 self.admin().config().update(patch).await
577 }
578
579 pub fn tools(&self) -> ToolAdmin {
580 ToolAdmin::new(self.admin())
581 }
582
583 pub fn commands(&self) -> SessionCommandAdmin {
584 self.admin().commands()
585 }
586
587 pub fn triggers(&self) -> SessionTriggerAdmin {
588 self.admin().triggers()
589 }
590
591 pub fn processes(&self) -> SessionProcessAdmin {
592 SessionProcessAdmin::new(self.admin())
593 }
594
595 pub fn plugin_operations(&self) -> PluginOperations {
596 PluginOperations {
597 control: self.admin(),
598 }
599 }
600
601 pub fn enqueue(&self, input: TurnInput) -> EnqueueTurnBuilder<'_> {
602 EnqueueTurnBuilder {
603 session: self,
604 input,
605 id: None,
606 delivery_policy: DeliveryPolicy::AfterCurrentTurnCommit,
607 slot_policy: SlotPolicy::Exclusive,
608 }
609 }
610
611 pub async fn queued_work(&self) -> Result<Vec<QueuedWorkBatch>> {
612 let observation = self.runtime.observe();
613 let store = observation.queue_store.as_ref().ok_or_else(|| {
614 EmbedError::Runtime(lash_core::RuntimeError::new(
615 lash_core::RuntimeErrorCode::StoreCommitFailed,
616 "queued work inspection requires a persistent runtime store",
617 ))
618 })?;
619 store
620 .list_pending_queued_work(observation.session_id())
621 .await
622 .map_err(|err| {
623 EmbedError::Runtime(lash_core::RuntimeError::new(
624 lash_core::RuntimeErrorCode::StoreCommitFailed,
625 err.to_string(),
626 ))
627 })
628 }
629
630 pub async fn cancel_queued_work_batch(
631 &self,
632 batch_id: &str,
633 ) -> Result<Option<QueuedWorkBatch>> {
634 let session_id = self.session_id();
635 self.runtime
636 .cancel_queued_work_batch(&session_id, batch_id)
637 .await
638 .map_err(EmbedError::Runtime)
639 }
640
641 pub async fn await_queued_work_batch(&self, batch_id: &str) -> Result<()> {
653 let observation = self.runtime.observe();
654 let store = observation.queue_store.clone().ok_or_else(|| {
655 EmbedError::Runtime(lash_core::RuntimeError::new(
656 lash_core::RuntimeErrorCode::StoreCommitFailed,
657 "queued work inspection requires a persistent runtime store",
658 ))
659 })?;
660 let session_id = observation.session_id().to_string();
661 drop(observation);
662 let mut delay = std::time::Duration::from_millis(25);
663 loop {
664 let pending = store
665 .list_pending_queued_work(&session_id)
666 .await
667 .map_err(|err| {
668 EmbedError::Runtime(lash_core::RuntimeError::new(
669 lash_core::RuntimeErrorCode::StoreCommitFailed,
670 err.to_string(),
671 ))
672 })?;
673 if !pending.iter().any(|batch| batch.batch_id == batch_id) {
674 return Ok(());
675 }
676 tokio::time::sleep(delay).await;
677 delay = (delay * 2).min(std::time::Duration::from_millis(400));
678 }
679 }
680
681 pub fn read_view(&self) -> SessionReadView {
682 self.runtime.observe().read_view.clone()
683 }
684
685 pub fn usage_report(&self) -> SessionUsageReport {
686 self.runtime.observe().usage_report.clone()
687 }
688
689 pub async fn set_turn_phase_probe(
690 &self,
691 probe: Arc<dyn lash_core::runtime::RuntimeTurnPhaseProbe>,
692 ) {
693 let writer = self.runtime.writer();
694 let mut runtime = writer.lock().await;
695 runtime.set_turn_phase_probe(Arc::clone(&probe));
696 self.runtime.publish_from(&runtime);
697 if let Some(slot) = &self.process_phase_probe_slot {
698 let observation = self.runtime.observe();
699 slot.set_for_session(observation.session_id(), Arc::clone(&probe));
700 let current_frame = observation.persisted_state.current_agent_frame_id.as_str();
701 if !current_frame.is_empty() {
702 let scope = lash_core::SessionScope::for_agent_frame(
703 observation.session_id(),
704 current_frame,
705 );
706 slot.set_for_scope(&scope, probe);
707 }
708 }
709 }
710}
711
712#[derive(Clone)]
713pub struct ObservableSession {
714 pub(crate) runtime: RuntimeHandle,
715}
716
717impl ObservableSession {
718 fn snapshot(&self) -> Arc<RuntimeObservation> {
719 self.runtime.observe()
720 }
721
722 pub fn current_observation(&self) -> SessionObservation {
723 self.runtime.current_session_observation()
724 }
725
726 pub fn resume_from_cursor(&self, cursor: &SessionCursor) -> Result<SessionResume> {
727 self.runtime
728 .resume_session_observation(cursor)
729 .map_err(live_replay_error)
730 }
731
732 pub fn subscribe_from_cursor(
733 &self,
734 cursor: &SessionCursor,
735 ) -> Result<SessionObservationSubscription> {
736 self.runtime
737 .subscribe_session_observation(cursor)
738 .map_err(live_replay_error)
739 }
740
741 pub fn subscribe_and_recover(&self, cursor: SessionCursor) -> SessionObservationStream {
750 SessionObservationStream {
751 observable: self.clone(),
752 cursor,
753 subscription: None,
754 done: false,
755 }
756 }
757
758 pub fn session_id(&self) -> String {
759 self.snapshot().session_id().to_string()
760 }
761
762 pub fn policy_snapshot(&self) -> SessionPolicy {
763 self.snapshot().policy.clone()
764 }
765
766 pub fn read_view(&self) -> SessionReadView {
767 self.snapshot().read_view.clone()
768 }
769
770 pub fn usage_report(&self) -> SessionUsageReport {
771 self.snapshot().usage_report.clone()
772 }
773
774 pub fn tool_state(&self) -> Option<ToolState> {
775 self.snapshot().tool_state.clone()
776 }
777
778 pub fn active_tool_manifests(&self) -> Vec<ToolManifest> {
779 self.snapshot()
780 .tool_state
781 .as_ref()
782 .map(ToolState::tool_manifests)
783 .unwrap_or_default()
784 }
785
786 pub async fn list_process_handles(&self) -> Vec<ProcessHandleSummary> {
787 self.snapshot().list_process_handles().await
788 }
789
790 pub async fn list_all_process_handles(&self) -> Vec<ProcessHandleSummary> {
791 self.snapshot().list_all_process_handles().await
792 }
793
794 pub fn process_scope(&self) -> SessionScope {
795 self.snapshot().process_scope()
796 }
797}
798
799#[derive(Clone, Debug)]
800pub enum SessionObservationStreamItem {
801 Event(SessionObservationEvent),
803 Gap {
805 observation: SessionObservation,
806 gap: LiveReplayGap,
807 },
808}
809
810pub struct SessionObservationStream {
812 observable: ObservableSession,
813 cursor: SessionCursor,
814 subscription: Option<lash_core::LiveReplaySubscription>,
815 done: bool,
816}
817
818impl SessionObservationStream {
819 pub fn cursor(&self) -> &SessionCursor {
820 &self.cursor
821 }
822}
823
824impl Stream for SessionObservationStream {
825 type Item = Result<SessionObservationStreamItem>;
826
827 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
828 loop {
829 if self.done {
830 return Poll::Ready(None);
831 }
832 if self.subscription.is_none() {
833 match self.observable.subscribe_from_cursor(&self.cursor) {
834 Ok(SessionObservationSubscription::Subscribed(subscription)) => {
835 self.subscription = Some(subscription);
836 }
837 Ok(SessionObservationSubscription::Gap { observation, gap }) => {
838 self.cursor = gap.latest_cursor.clone();
839 return Poll::Ready(Some(Ok(SessionObservationStreamItem::Gap {
840 observation,
841 gap,
842 })));
843 }
844 Err(err) => {
845 self.done = true;
846 return Poll::Ready(Some(Err(err)));
847 }
848 }
849 }
850
851 let Some(subscription) = self.subscription.as_mut() else {
852 continue;
853 };
854 match Pin::new(subscription).poll_next(cx) {
855 Poll::Pending => return Poll::Pending,
856 Poll::Ready(Some(Ok(event))) => {
857 self.cursor = event.cursor.clone();
858 return Poll::Ready(Some(Ok(SessionObservationStreamItem::Event(event))));
859 }
860 Poll::Ready(Some(Err(LiveReplayStoreError::SubscriberLagged(_)))) => {
861 self.subscription = None;
862 continue;
863 }
864 Poll::Ready(Some(Err(err))) => {
865 self.done = true;
866 return Poll::Ready(Some(Err(live_replay_error(err))));
867 }
868 Poll::Ready(None) => {
869 self.done = true;
870 return Poll::Ready(None);
871 }
872 }
873 }
874 }
875}
876
877fn live_replay_error(err: lash_core::LiveReplayStoreError) -> EmbedError {
878 EmbedError::Runtime(lash_core::RuntimeError::new(
879 RuntimeErrorCode::Other("live_replay".to_string()),
880 err.to_string(),
881 ))
882}
883
884pub struct EnqueueTurnBuilder<'a> {
885 session: &'a LashSession,
886 input: TurnInput,
887 id: Option<String>,
888 delivery_policy: DeliveryPolicy,
889 slot_policy: SlotPolicy,
890}
891
892impl<'a> EnqueueTurnBuilder<'a> {
893 pub fn id(mut self, id: impl Into<String>) -> Self {
894 self.id = Some(id.into());
895 self
896 }
897
898 pub fn delivery_policy(mut self, policy: DeliveryPolicy) -> Self {
899 self.delivery_policy = policy;
900 self
901 }
902
903 pub fn slot_policy(mut self, policy: SlotPolicy) -> Self {
904 self.slot_policy = policy;
905 self
906 }
907
908 pub async fn send(self) -> Result<QueuedWorkBatch> {
909 let source_key = self.id.map(|id| format!("host:{id}"));
910 self.session
911 .runtime
912 .enqueue_turn_input(
913 self.input,
914 self.delivery_policy,
915 self.slot_policy,
916 source_key,
917 )
918 .await
919 .map_err(EmbedError::Runtime)
920 }
921}
922
923impl<'a> std::future::IntoFuture for EnqueueTurnBuilder<'a> {
924 type Output = Result<QueuedWorkBatch>;
925 type IntoFuture =
926 std::pin::Pin<Box<dyn std::future::Future<Output = Result<QueuedWorkBatch>> + 'a>>;
927
928 fn into_future(self) -> Self::IntoFuture {
929 Box::pin(self.send())
930 }
931}