Skip to main content

lash/
session.rs

1use crate::support::*;
2use lash_core::runtime::{DeliveryPolicy, QueuedWorkBatch, SlotPolicy};
3
4pub struct SessionBuilder {
5    pub(crate) core: LashCore,
6    pub(crate) session_id: String,
7    pub(crate) spec: SessionSpec,
8    pub(crate) parent_session_id: Option<String>,
9    pub(crate) store: Option<Arc<dyn RuntimePersistence>>,
10    pub(crate) provider: Option<ProviderHandle>,
11    pub(crate) active_plugins: Vec<ActivePluginBinding>,
12    pub(crate) plugin_factories: Vec<Arc<dyn PluginFactory>>,
13}
14
15#[cfg(feature = "rlm")]
16pub struct RlmSessionBuilder {
17    pub(crate) builder: SessionBuilder,
18    pub(crate) rlm_final_answer_format: Option<lash_rlm_types::RlmFinalAnswerFormat>,
19}
20
21impl SessionBuilder {
22    pub fn provider(mut self, provider: ProviderHandle) -> Self {
23        self.spec = self.spec.provider_id(provider.kind());
24        self.provider = Some(provider);
25        self
26    }
27
28    pub fn session_spec(mut self, spec: SessionSpec) -> Self {
29        self.spec = spec;
30        self
31    }
32
33    pub fn parent(mut self, parent_session_id: impl Into<String>) -> Self {
34        self.parent_session_id = Some(parent_session_id.into());
35        self
36    }
37
38    /// Use a specific persistence store for this root session.
39    ///
40    /// This is the right API for a host-owned, pre-opened session database.
41    /// Managed child sessions never reuse this store; configure
42    /// `LashCoreBuilder::child_store_factory` when child sessions should also
43    /// persist.
44    pub fn store(mut self, store: Arc<dyn RuntimePersistence>) -> Self {
45        self.store = Some(store);
46        self
47    }
48
49    pub fn plugin<P: PluginBinding>(mut self, config: P::SessionConfig) -> Self {
50        self.active_plugins.push(ActivePluginBinding {
51            id: P::ID,
52            requires_turn_input: P::requires_turn_input(&config),
53        });
54        self.plugin_factories.push(P::factory(&config));
55        self
56    }
57
58    pub async fn open(self) -> Result<LashSession> {
59        let policy = self.session_policy();
60        let store = self.create_store(&policy).await?;
61        let state = self
62            .load_or_default_state(&policy, store.as_deref())
63            .await?;
64        self.open_resolved(policy, state, store).await
65    }
66
67    /// Open this session with a fresh resident graph, ignoring any persisted
68    /// session graph/checkpoint state that may already exist for the same
69    /// session id.
70    ///
71    /// The next successful commit writes a full replacement graph, so normal
72    /// embedders can use this to start over without manually calling
73    /// `load_persisted_session_state` or constructing a `RuntimeSessionState`.
74    /// Use [`Self::open`] for resume and [`Self::open_with_state`] only when
75    /// restoring explicit host-owned state.
76    pub async fn open_fresh(self) -> Result<LashSession> {
77        let policy = self.session_policy();
78        let store = self.create_store(&policy).await?;
79        let state = RuntimeSessionState {
80            session_id: self.session_id.clone(),
81            policy: policy.clone(),
82            graph_replace_required: true,
83            ..RuntimeSessionState::default()
84        };
85        self.open_resolved(policy, state, store).await
86    }
87
88    /// Open with an explicitly supplied runtime state.
89    ///
90    /// This is for advanced hosts that already own a complete state snapshot.
91    /// Normal embedders should use [`Self::open`] to resume according to Lash's
92    /// residency policy or [`Self::open_fresh`] to start over and replace prior
93    /// persisted state on the next commit.
94    pub async fn open_with_state(self, mut state: RuntimeSessionState) -> Result<LashSession> {
95        let policy = self.session_policy();
96        let store = self.create_store(&policy).await?;
97        if state.session_id != self.session_id {
98            return Err(EmbedError::StoreSessionMismatch {
99                loaded: state.session_id,
100                requested: self.session_id,
101            });
102        }
103        let recorded_provider_id = state.policy.recorded_provider_id().to_string();
104        state.policy = policy.clone();
105        state.policy.provider_id = recorded_provider_id;
106        self.open_resolved(policy, state, store).await
107    }
108
109    fn session_policy(&self) -> SessionPolicy {
110        let mut policy = self.spec.resolve_against(&self.core.policy);
111        policy.session_id = Some(self.session_id.clone());
112        policy
113    }
114
115    async fn load_or_default_state(
116        &self,
117        policy: &SessionPolicy,
118        store: Option<&dyn RuntimePersistence>,
119    ) -> Result<RuntimeSessionState> {
120        let state = match store {
121            Some(store) => {
122                let loaded = self.load_persisted_state_for_residency(store).await?;
123                let mut state = loaded.unwrap_or_else(|| RuntimeSessionState {
124                    session_id: self.session_id.clone(),
125                    policy: policy.clone(),
126                    ..RuntimeSessionState::default()
127                });
128                if state.session_id != self.session_id {
129                    return Err(EmbedError::StoreSessionMismatch {
130                        loaded: state.session_id,
131                        requested: self.session_id.clone(),
132                    });
133                }
134                let recorded_provider_id = state.policy.recorded_provider_id().to_string();
135                state.policy = policy.clone();
136                state.policy.provider_id = recorded_provider_id;
137                state
138            }
139            None => RuntimeSessionState {
140                session_id: self.session_id.clone(),
141                policy: policy.clone(),
142                ..RuntimeSessionState::default()
143            },
144        };
145        Ok(state)
146    }
147
148    async fn load_persisted_state_for_residency(
149        &self,
150        store: &dyn RuntimePersistence,
151    ) -> Result<Option<RuntimeSessionState>> {
152        load_persisted_state_for_residency(self.core.env.residency, store).await
153    }
154
155    async fn open_resolved(
156        self,
157        policy: SessionPolicy,
158        state: RuntimeSessionState,
159        store: Option<Arc<dyn RuntimePersistence>>,
160    ) -> Result<LashSession> {
161        let mut env = self.core.env.clone();
162        if let Some(provider) = self.provider.clone().or_else(|| self.core.provider.clone()) {
163            env.core.providers.provider_resolver =
164                Arc::new(lash_core::SingleProviderResolver::new(provider));
165        }
166        let plugin_host = build_plugin_host(
167            self.core.protocol_factory.as_ref(),
168            self.core.plugin_factories.as_ref(),
169            self.plugin_factories,
170        )?;
171        env.core = self
172            .core
173            .runtime_host_for_plugin_host(env.core.clone(), &plugin_host)?;
174        env.plugin_host = Some(Arc::new(plugin_host));
175        let effect_host = Arc::clone(&env.core.control.effect_host);
176        let drivers = self.core.work_driver.drivers().await;
177        env.process_work_driver = drivers.process.clone();
178        env.queued_work_driver = drivers.queued.clone();
179        let runtime = LashRuntime::from_environment(&env, policy, state, store).await?;
180        if drivers.drive_process_on_open
181            && let Some(driver) = drivers.process.as_ref()
182        {
183            driver.claim_and_run_pending("session_open").await?;
184        }
185        let handle = RuntimeHandle::with_live_replay_store(
186            runtime,
187            Arc::clone(&self.core.live_replay_store),
188        );
189        Ok(LashSession {
190            runtime: handle,
191            effect_host,
192            parent_session_id: self.parent_session_id,
193            active_plugins: self.active_plugins,
194            process_phase_probe_slot: self.core.work_driver.phase_probe_slot(),
195            turn_cancels: crate::turn::TurnCancelRegistry::default(),
196        })
197    }
198
199    async fn create_store(
200        &self,
201        policy: &SessionPolicy,
202    ) -> Result<Option<Arc<dyn RuntimePersistence>>> {
203        if let Some(store) = self.store.as_ref() {
204            return Ok(Some(Arc::clone(store)));
205        }
206        let Some(factory) = self.core.store_factory.as_ref() else {
207            return Ok(None);
208        };
209        let request = SessionStoreCreateRequest {
210            session_id: self.session_id.clone(),
211            relation: self
212                .parent_session_id
213                .as_ref()
214                .map(|parent_session_id| lash_core::SessionRelation::Child {
215                    parent_session_id: parent_session_id.clone(),
216                    caused_by: None,
217                })
218                .unwrap_or_default(),
219            policy: policy.clone(),
220        };
221        factory
222            .create_store(&request)
223            .await
224            .map(Some)
225            .map_err(|message| EmbedError::StoreFactory {
226                session_id: self.session_id.clone(),
227                message,
228            })
229    }
230}
231
232pub(crate) async fn load_state_for_residency(
233    residency: Residency,
234    session_id: &str,
235    policy: &SessionPolicy,
236    store: &dyn RuntimePersistence,
237) -> Result<RuntimeSessionState> {
238    let mut state = load_persisted_state_for_residency(residency, store)
239        .await?
240        .unwrap_or_else(|| RuntimeSessionState {
241            session_id: session_id.to_string(),
242            policy: policy.clone(),
243            ..RuntimeSessionState::default()
244        });
245    if state.session_id != session_id {
246        return Err(EmbedError::StoreSessionMismatch {
247            loaded: state.session_id,
248            requested: session_id.to_string(),
249        });
250    }
251    let recorded_provider_id = state.policy.recorded_provider_id().to_string();
252    state.policy = policy.clone();
253    state.policy.provider_id = recorded_provider_id;
254    Ok(state)
255}
256
257async fn load_persisted_state_for_residency(
258    residency: Residency,
259    store: &dyn RuntimePersistence,
260) -> Result<Option<RuntimeSessionState>> {
261    match residency {
262        Residency::KeepAll => {
263            let loaded = lash_core::store::load_persisted_session_state(store)
264                .await
265                .map_err(|err| SessionError::Protocol(format!("failed to load store: {err}")))?;
266            Ok(loaded)
267        }
268        Residency::ActivePathOnly => {
269            let active = lash_core::store::load_persisted_session_state_active_path(store, None)
270                .await
271                .map_err(|err| {
272                    SessionError::Protocol(format!("failed to load active-path store: {err}"))
273                })?;
274            if active
275                .as_ref()
276                .is_some_and(|state| state.session_graph.nodes.is_empty())
277            {
278                let mut full = lash_core::store::load_persisted_session_state(store)
279                    .await
280                    .map_err(|err| {
281                        SessionError::Protocol(format!(
282                            "failed to heal active-path store from full graph: {err}"
283                        ))
284                    })?;
285                if let Some(state) = full.as_mut() {
286                    state.graph_replace_required = true;
287                }
288                return Ok(full);
289            }
290            Ok(active)
291        }
292    }
293}
294
295impl PromptLayerSink for SessionBuilder {
296    fn prompt_layer_mut(&mut self) -> &mut PromptLayer {
297        self.spec.prompt.get_or_insert_with(PromptLayer::new)
298    }
299}
300
301#[cfg(feature = "rlm")]
302impl RlmSessionBuilder {
303    pub fn provider(mut self, provider: ProviderHandle) -> Self {
304        self.builder = self.builder.provider(provider);
305        self
306    }
307
308    pub fn session_spec(mut self, spec: SessionSpec) -> Self {
309        self.builder = self.builder.session_spec(spec);
310        self
311    }
312
313    pub fn parent(mut self, parent_session_id: impl Into<String>) -> Self {
314        self.builder = self.builder.parent(parent_session_id);
315        self
316    }
317
318    pub fn store(mut self, store: Arc<dyn RuntimePersistence>) -> Self {
319        self.builder = self.builder.store(store);
320        self
321    }
322
323    pub fn plugin<P: PluginBinding>(mut self, config: P::SessionConfig) -> Self {
324        self.builder = self.builder.plugin::<P>(config);
325        self
326    }
327
328    pub async fn open(self) -> Result<LashSession> {
329        self.open_resolved(RlmOpenState::Resume).await
330    }
331
332    pub async fn open_fresh(self) -> Result<LashSession> {
333        self.open_resolved(RlmOpenState::Fresh).await
334    }
335
336    pub async fn open_with_state(self, state: RuntimeSessionState) -> Result<LashSession> {
337        self.open_resolved(RlmOpenState::Explicit(state)).await
338    }
339
340    async fn open_resolved(self, open_state: RlmOpenState) -> Result<LashSession> {
341        let Self {
342            builder,
343            rlm_final_answer_format,
344        } = self;
345        let policy = builder.session_policy();
346        let store = builder.create_store(&policy).await?;
347        let mut state = match open_state {
348            RlmOpenState::Resume => {
349                builder
350                    .load_or_default_state(&policy, store.as_deref())
351                    .await?
352            }
353            RlmOpenState::Fresh => RuntimeSessionState {
354                session_id: builder.session_id.clone(),
355                policy: policy.clone(),
356                graph_replace_required: true,
357                ..RuntimeSessionState::default()
358            },
359            RlmOpenState::Explicit(mut state) => {
360                if state.session_id != builder.session_id {
361                    return Err(EmbedError::StoreSessionMismatch {
362                        loaded: state.session_id,
363                        requested: builder.session_id.clone(),
364                    });
365                }
366                let recorded_provider_id = state.policy.recorded_provider_id().to_string();
367                state.policy = policy.clone();
368                state.policy.provider_id = recorded_provider_id;
369                state
370            }
371        };
372        apply_rlm_session_options(
373            builder.parent_session_id.is_none(),
374            rlm_final_answer_format,
375            &mut state,
376        )?;
377        builder.open_resolved(policy, state, store).await
378    }
379}
380
381#[cfg(feature = "rlm")]
382impl PromptLayerSink for RlmSessionBuilder {
383    fn prompt_layer_mut(&mut self) -> &mut PromptLayer {
384        self.builder.prompt_layer_mut()
385    }
386}
387
388#[cfg(feature = "rlm")]
389enum RlmOpenState {
390    Resume,
391    Fresh,
392    Explicit(RuntimeSessionState),
393}
394
395#[cfg(feature = "rlm")]
396fn apply_rlm_session_options(
397    is_root_session: bool,
398    explicit_format: Option<lash_rlm_types::RlmFinalAnswerFormat>,
399    state: &mut RuntimeSessionState,
400) -> Result<()> {
401    let final_answer_format = explicit_format.unwrap_or_else(|| {
402        if is_root_session {
403            lash_rlm_types::RlmFinalAnswerFormat::Markdown
404        } else {
405            lash_rlm_types::RlmFinalAnswerFormat::RawSubmitValue
406        }
407    });
408    let mut extras = if state.protocol_turn_options.is_empty() {
409        lash_rlm_types::RlmCreateExtras::default()
410    } else {
411        state.protocol_turn_options.decode()?
412    };
413    extras.final_answer_format = Some(final_answer_format);
414    let options = ProtocolTurnOptions::typed(extras)?;
415    state.protocol_turn_options = options.clone();
416    for frame in &mut state.agent_frames {
417        frame.protocol_turn_options = options.clone();
418    }
419    Ok(())
420}
421
422#[cfg(all(test, feature = "rlm"))]
423mod tests {
424    use super::*;
425
426    #[test]
427    fn apply_rlm_session_options_preserves_existing_termination() -> Result<()> {
428        let mut state = RuntimeSessionState::default();
429        state.protocol_turn_options =
430            ProtocolTurnOptions::typed(lash_rlm_types::RlmCreateExtras {
431                termination: lash_rlm_types::RlmTermination::ProseOrSubmit,
432                final_answer_format: None,
433            })?;
434
435        apply_rlm_session_options(true, None, &mut state)?;
436
437        let extras: lash_rlm_types::RlmCreateExtras = state.protocol_turn_options.decode()?;
438        assert_eq!(
439            extras.termination,
440            lash_rlm_types::RlmTermination::ProseOrSubmit
441        );
442        assert_eq!(
443            extras.final_answer_format,
444            Some(lash_rlm_types::RlmFinalAnswerFormat::Markdown)
445        );
446        Ok(())
447    }
448}
449
450#[derive(Clone)]
451pub struct LashSession {
452    pub(crate) runtime: RuntimeHandle,
453    pub(crate) effect_host: Arc<dyn EffectHost>,
454    pub(crate) parent_session_id: Option<String>,
455    pub(crate) active_plugins: Vec<ActivePluginBinding>,
456    pub(crate) process_phase_probe_slot: Option<lash_core::runtime::RuntimeTurnPhaseProbeSlot>,
457    pub(crate) turn_cancels: crate::turn::TurnCancelRegistry,
458}
459
460#[derive(Clone, Debug, Default)]
461pub struct SessionConfigPatch {
462    pub provider: Option<ProviderHandle>,
463    pub model: Option<ModelSpec>,
464    pub prompt: Option<PromptLayer>,
465}
466
467impl LashSession {
468    pub async fn close(self) -> Result<()> {
469        let runtime = self.runtime.writer();
470        let runtime = runtime.lock().await;
471        runtime.unregister_plugin_session()?;
472        Ok(())
473    }
474
475    pub fn session_id(&self) -> String {
476        self.runtime.observe().session_id().to_string()
477    }
478
479    pub fn policy_snapshot(&self) -> SessionPolicy {
480        self.runtime.observe().policy.clone()
481    }
482
483    pub fn observe(&self) -> ObservableSession {
484        ObservableSession {
485            runtime: self.runtime.clone(),
486        }
487    }
488
489    pub fn parent_session_id(&self) -> Option<&str> {
490        self.parent_session_id.as_deref()
491    }
492
493    pub fn effect_host(&self) -> Arc<dyn EffectHost> {
494        Arc::clone(&self.effect_host)
495    }
496
497    pub fn turn(&self, input: TurnInput) -> TurnBuilder {
498        TurnBuilder {
499            runtime: self.runtime.clone(),
500            effect_host: Arc::clone(&self.effect_host),
501            active_plugins: self.active_plugins.clone(),
502            input,
503            cancel: CancellationToken::new(),
504            cancels: self.turn_cancels.clone(),
505            protocol_turn_options: None,
506            provider: None,
507            model: None,
508            turn_id: None,
509        }
510    }
511
512    pub fn queued_turn(&self) -> QueuedTurnBuilder {
513        QueuedTurnBuilder {
514            runtime: self.runtime.clone(),
515            effect_host: Arc::clone(&self.effect_host),
516            cancel: CancellationToken::new(),
517            cancels: self.turn_cancels.clone(),
518            batch_ids: Vec::new(),
519            drain_id: None,
520        }
521    }
522
523    /// Cancel every turn currently executing through this opened session
524    /// (including its clones) and report how many were signalled.
525    ///
526    /// This is the affordance behind a UI "stop" control: hold a clone of the
527    /// session wherever the stop arrives and call this, instead of threading a
528    /// [`CancellationToken`](crate::CancellationToken) into every turn call
529    /// ([`TurnBuilder::cancel`](crate::TurnBuilder::cancel) remains the
530    /// per-turn hook when you need one). A cancelled turn finishes with
531    /// `TurnOutcome::Stopped(TurnStop::Cancelled)` and commits like any other
532    /// turn; the session stays usable.
533    ///
534    /// Scope: turns started from this `LashSession` instance and its clones.
535    /// A handle opened separately for the same session id has its own
536    /// registry and is not reached.
537    pub fn cancel_running_turns(&self) -> usize {
538        self.turn_cancels.cancel_all()
539    }
540
541    pub fn admin(&self) -> SessionAdmin {
542        SessionAdmin {
543            runtime: self.runtime.clone(),
544        }
545    }
546
547    pub async fn configure(&self, patch: SessionConfigPatch) -> Result<()> {
548        self.admin().config().update(patch).await
549    }
550
551    pub fn tools(&self) -> ToolAdmin {
552        ToolAdmin::new(self.admin())
553    }
554
555    pub fn commands(&self) -> SessionCommandAdmin {
556        self.admin().commands()
557    }
558
559    pub fn triggers(&self) -> SessionTriggerAdmin {
560        self.admin().triggers()
561    }
562
563    pub fn processes(&self) -> SessionProcessAdmin {
564        SessionProcessAdmin::new(self.admin())
565    }
566
567    pub fn plugin_operations(&self) -> PluginOperations {
568        PluginOperations {
569            control: self.admin(),
570        }
571    }
572
573    pub fn enqueue(&self, input: TurnInput) -> EnqueueTurnBuilder<'_> {
574        EnqueueTurnBuilder {
575            session: self,
576            input,
577            id: None,
578            delivery_policy: DeliveryPolicy::AfterCurrentTurnCommit,
579            slot_policy: SlotPolicy::Exclusive,
580        }
581    }
582
583    pub async fn queued_work(&self) -> Result<Vec<QueuedWorkBatch>> {
584        let observation = self.runtime.observe();
585        let store = observation.queue_store.as_ref().ok_or_else(|| {
586            EmbedError::Runtime(lash_core::RuntimeError::new(
587                lash_core::RuntimeErrorCode::StoreCommitFailed,
588                "queued work inspection requires a persistent runtime store",
589            ))
590        })?;
591        store
592            .list_pending_queued_work(observation.session_id())
593            .await
594            .map_err(|err| {
595                EmbedError::Runtime(lash_core::RuntimeError::new(
596                    lash_core::RuntimeErrorCode::StoreCommitFailed,
597                    err.to_string(),
598                ))
599            })
600    }
601
602    pub async fn cancel_queued_work_batch(
603        &self,
604        batch_id: &str,
605    ) -> Result<Option<QueuedWorkBatch>> {
606        let session_id = self.session_id();
607        self.runtime
608            .cancel_queued_work_batch(&session_id, batch_id)
609            .await
610            .map_err(EmbedError::Runtime)
611    }
612
613    /// Resolve once `batch_id` is no longer pending in the queue store —
614    /// drained by whoever runs queued work (a queued-work runner, a durable
615    /// worker, or another handle's [`queued_turn`](Self::queued_turn)) or
616    /// cancelled. This is the enqueue-and-observe side of the queue: the
617    /// caller never claims the work itself.
618    ///
619    /// Completion is read from the persistent queue store, so it observes
620    /// drains performed by other session handles and other processes alike.
621    /// There is no built-in deadline — nothing resolves if nothing drains the
622    /// queue, so bound it with `tokio::time::timeout` when the worker may be
623    /// unavailable. A batch id the store has never seen resolves immediately.
624    pub async fn await_queued_work_batch(&self, batch_id: &str) -> Result<()> {
625        let observation = self.runtime.observe();
626        let store = observation.queue_store.clone().ok_or_else(|| {
627            EmbedError::Runtime(lash_core::RuntimeError::new(
628                lash_core::RuntimeErrorCode::StoreCommitFailed,
629                "queued work inspection requires a persistent runtime store",
630            ))
631        })?;
632        let session_id = observation.session_id().to_string();
633        drop(observation);
634        let mut delay = std::time::Duration::from_millis(25);
635        loop {
636            let pending = store
637                .list_pending_queued_work(&session_id)
638                .await
639                .map_err(|err| {
640                    EmbedError::Runtime(lash_core::RuntimeError::new(
641                        lash_core::RuntimeErrorCode::StoreCommitFailed,
642                        err.to_string(),
643                    ))
644                })?;
645            if !pending.iter().any(|batch| batch.batch_id == batch_id) {
646                return Ok(());
647            }
648            tokio::time::sleep(delay).await;
649            delay = (delay * 2).min(std::time::Duration::from_millis(400));
650        }
651    }
652
653    pub fn read_view(&self) -> SessionReadView {
654        self.runtime.observe().read_view.clone()
655    }
656
657    pub fn usage_report(&self) -> SessionUsageReport {
658        self.runtime.observe().usage_report.clone()
659    }
660
661    pub async fn set_turn_phase_probe(
662        &self,
663        probe: Arc<dyn lash_core::runtime::RuntimeTurnPhaseProbe>,
664    ) {
665        let writer = self.runtime.writer();
666        let mut runtime = writer.lock().await;
667        runtime.set_turn_phase_probe(Arc::clone(&probe));
668        self.runtime.publish_from(&runtime);
669        if let Some(slot) = &self.process_phase_probe_slot {
670            let observation = self.runtime.observe();
671            slot.set_for_session(observation.session_id(), Arc::clone(&probe));
672            let current_frame = observation.persisted_state.current_agent_frame_id.as_str();
673            if !current_frame.is_empty() {
674                let scope = lash_core::SessionScope::for_agent_frame(
675                    observation.session_id(),
676                    current_frame,
677                );
678                slot.set_for_scope(&scope, probe);
679            }
680        }
681    }
682}
683
684#[derive(Clone)]
685pub struct ObservableSession {
686    pub(crate) runtime: RuntimeHandle,
687}
688
689impl ObservableSession {
690    fn snapshot(&self) -> Arc<RuntimeObservation> {
691        self.runtime.observe()
692    }
693
694    pub fn current_observation(&self) -> SessionObservation {
695        self.runtime.current_session_observation()
696    }
697
698    pub fn resume_from_cursor(&self, cursor: &SessionCursor) -> Result<SessionResume> {
699        self.runtime
700            .resume_session_observation(cursor)
701            .map_err(live_replay_error)
702    }
703
704    pub fn subscribe_from_cursor(
705        &self,
706        cursor: &SessionCursor,
707    ) -> Result<SessionObservationSubscription> {
708        self.runtime
709            .subscribe_session_observation(cursor)
710            .map_err(live_replay_error)
711    }
712
713    pub fn session_id(&self) -> String {
714        self.snapshot().session_id().to_string()
715    }
716
717    pub fn policy_snapshot(&self) -> SessionPolicy {
718        self.snapshot().policy.clone()
719    }
720
721    pub fn read_view(&self) -> SessionReadView {
722        self.snapshot().read_view.clone()
723    }
724
725    pub fn usage_report(&self) -> SessionUsageReport {
726        self.snapshot().usage_report.clone()
727    }
728
729    pub fn tool_state(&self) -> Option<ToolState> {
730        self.snapshot().tool_state.clone()
731    }
732
733    pub fn active_tool_manifests(&self) -> Vec<ToolManifest> {
734        self.snapshot()
735            .tool_state
736            .as_ref()
737            .map(ToolState::tool_manifests)
738            .unwrap_or_default()
739    }
740
741    pub async fn list_process_handles(&self) -> Vec<ProcessHandleSummary> {
742        self.snapshot().list_process_handles().await
743    }
744
745    pub async fn list_all_process_handles(&self) -> Vec<ProcessHandleSummary> {
746        self.snapshot().list_all_process_handles().await
747    }
748
749    pub fn process_scope(&self) -> SessionScope {
750        self.snapshot().process_scope()
751    }
752}
753
754fn live_replay_error(err: lash_core::LiveReplayStoreError) -> EmbedError {
755    EmbedError::Runtime(lash_core::RuntimeError::new(
756        RuntimeErrorCode::Other("live_replay".to_string()),
757        err.to_string(),
758    ))
759}
760
761pub struct EnqueueTurnBuilder<'a> {
762    session: &'a LashSession,
763    input: TurnInput,
764    id: Option<String>,
765    delivery_policy: DeliveryPolicy,
766    slot_policy: SlotPolicy,
767}
768
769impl<'a> EnqueueTurnBuilder<'a> {
770    pub fn id(mut self, id: impl Into<String>) -> Self {
771        self.id = Some(id.into());
772        self
773    }
774
775    pub fn delivery_policy(mut self, policy: DeliveryPolicy) -> Self {
776        self.delivery_policy = policy;
777        self
778    }
779
780    pub fn slot_policy(mut self, policy: SlotPolicy) -> Self {
781        self.slot_policy = policy;
782        self
783    }
784
785    pub async fn send(self) -> Result<QueuedWorkBatch> {
786        let source_key = self.id.map(|id| format!("host:{id}"));
787        self.session
788            .runtime
789            .enqueue_turn_input(
790                self.input,
791                self.delivery_policy,
792                self.slot_policy,
793                source_key,
794            )
795            .await
796            .map_err(EmbedError::Runtime)
797    }
798}
799
800impl<'a> std::future::IntoFuture for EnqueueTurnBuilder<'a> {
801    type Output = Result<QueuedWorkBatch>;
802    type IntoFuture =
803        std::pin::Pin<Box<dyn std::future::Future<Output = Result<QueuedWorkBatch>> + 'a>>;
804
805    fn into_future(self) -> Self::IntoFuture {
806        Box::pin(self.send())
807    }
808}