Skip to main content

lash/
session.rs

1use crate::support::*;
2use lash_core::runtime::{DeliveryPolicy, QueuedWorkBatch, SlotPolicy};
3
4pub struct SessionBuilder {
5    pub(crate) core: LashCore,
6    pub(crate) session_id: String,
7    pub(crate) spec: SessionSpec,
8    pub(crate) parent_session_id: Option<String>,
9    pub(crate) session_execution_owner_id: Option<String>,
10    pub(crate) store: Option<Arc<dyn RuntimePersistence>>,
11    pub(crate) provider: Option<ProviderHandle>,
12    pub(crate) active_plugins: Vec<ActivePluginBinding>,
13    pub(crate) plugin_factories: Vec<Arc<dyn PluginFactory>>,
14}
15
16#[cfg(feature = "rlm")]
17pub struct RlmSessionBuilder {
18    pub(crate) builder: SessionBuilder,
19    pub(crate) rlm_final_answer_format: Option<lash_rlm_types::RlmFinalAnswerFormat>,
20}
21
22impl SessionBuilder {
23    pub fn provider(mut self, provider: ProviderHandle) -> Self {
24        self.spec = self.spec.provider_id(provider.kind());
25        self.provider = Some(provider);
26        self
27    }
28
29    pub fn session_spec(mut self, spec: SessionSpec) -> Self {
30        self.spec = spec;
31        self
32    }
33
34    pub fn parent(mut self, parent_session_id: impl Into<String>) -> Self {
35        self.parent_session_id = Some(parent_session_id.into());
36        self
37    }
38
39    /// Use a stable owner id for durable session execution leases.
40    ///
41    /// This is only for hosts that already serialize one logical execution
42    /// lane, such as a durable workflow engine retrying the same invocation on
43    /// another process. Normal embedders should keep the default per-open UUID;
44    /// reusing an owner id across independently running handles makes them the
45    /// same lease owner.
46    pub fn session_execution_owner_id(mut self, owner_id: impl Into<String>) -> Self {
47        self.session_execution_owner_id = Some(owner_id.into());
48        self
49    }
50
51    /// Use a specific persistence store for this root session.
52    ///
53    /// This is the right API for a host-owned, pre-opened session database.
54    /// Managed child sessions never reuse this store; configure
55    /// `LashCoreBuilder::child_store_factory` when child sessions should also
56    /// persist.
57    pub fn store(mut self, store: Arc<dyn RuntimePersistence>) -> Self {
58        self.store = Some(store);
59        self
60    }
61
62    pub fn plugin<P: PluginBinding>(mut self, config: P::SessionConfig) -> Self {
63        self.active_plugins.push(ActivePluginBinding {
64            id: P::ID,
65            requires_turn_input: P::requires_turn_input(&config),
66        });
67        self.plugin_factories.push(P::factory(&config));
68        self
69    }
70
71    pub async fn open(self) -> Result<LashSession> {
72        let policy = self.session_policy();
73        let store = self.create_store(&policy).await?;
74        let state = self
75            .load_or_default_state(&policy, store.as_deref())
76            .await?;
77        self.open_resolved(policy, state, store).await
78    }
79
80    /// Open this session with a fresh resident graph, ignoring any persisted
81    /// session graph/checkpoint state that may already exist for the same
82    /// session id.
83    ///
84    /// The next successful commit writes a full replacement graph, so normal
85    /// embedders can use this to start over without manually calling
86    /// `load_persisted_session_state` or constructing a `RuntimeSessionState`.
87    /// Use [`Self::open`] for resume and [`Self::open_with_state`] only when
88    /// restoring explicit host-owned state.
89    pub async fn open_fresh(self) -> Result<LashSession> {
90        let policy = self.session_policy();
91        let store = self.create_store(&policy).await?;
92        let state = RuntimeSessionState {
93            session_id: self.session_id.clone(),
94            policy: policy.clone(),
95            graph_replace_required: true,
96            ..RuntimeSessionState::default()
97        };
98        self.open_resolved(policy, state, store).await
99    }
100
101    /// Open with an explicitly supplied runtime state.
102    ///
103    /// This is for advanced hosts that already own a complete state snapshot.
104    /// Normal embedders should use [`Self::open`] to resume according to Lash's
105    /// residency policy or [`Self::open_fresh`] to start over and replace prior
106    /// persisted state on the next commit.
107    pub async fn open_with_state(self, mut state: RuntimeSessionState) -> Result<LashSession> {
108        let policy = self.session_policy();
109        let store = self.create_store(&policy).await?;
110        if state.session_id != self.session_id {
111            return Err(EmbedError::StoreSessionMismatch {
112                loaded: state.session_id,
113                requested: self.session_id,
114            });
115        }
116        let recorded_provider_id = state.policy.recorded_provider_id().to_string();
117        state.policy = policy.clone();
118        state.policy.provider_id = recorded_provider_id;
119        self.open_resolved(policy, state, store).await
120    }
121
122    fn session_policy(&self) -> SessionPolicy {
123        let mut policy = self.spec.resolve_against(&self.core.policy);
124        policy.session_id = Some(self.session_id.clone());
125        policy
126    }
127
128    async fn load_or_default_state(
129        &self,
130        policy: &SessionPolicy,
131        store: Option<&dyn RuntimePersistence>,
132    ) -> Result<RuntimeSessionState> {
133        let state = match store {
134            Some(store) => {
135                let loaded = self.load_persisted_state_for_residency(store).await?;
136                let mut state = loaded.unwrap_or_else(|| RuntimeSessionState {
137                    session_id: self.session_id.clone(),
138                    policy: policy.clone(),
139                    ..RuntimeSessionState::default()
140                });
141                if state.session_id != self.session_id {
142                    return Err(EmbedError::StoreSessionMismatch {
143                        loaded: state.session_id,
144                        requested: self.session_id.clone(),
145                    });
146                }
147                let recorded_provider_id = state.policy.recorded_provider_id().to_string();
148                state.policy = policy.clone();
149                state.policy.provider_id = recorded_provider_id;
150                state
151            }
152            None => RuntimeSessionState {
153                session_id: self.session_id.clone(),
154                policy: policy.clone(),
155                ..RuntimeSessionState::default()
156            },
157        };
158        Ok(state)
159    }
160
161    async fn load_persisted_state_for_residency(
162        &self,
163        store: &dyn RuntimePersistence,
164    ) -> Result<Option<RuntimeSessionState>> {
165        load_persisted_state_for_residency(self.core.env.residency, store).await
166    }
167
168    async fn open_resolved(
169        self,
170        policy: SessionPolicy,
171        state: RuntimeSessionState,
172        store: Option<Arc<dyn RuntimePersistence>>,
173    ) -> Result<LashSession> {
174        let mut env = self.core.env.clone();
175        if let Some(provider) = self.provider.clone().or_else(|| self.core.provider.clone()) {
176            env.core.providers.provider_resolver =
177                Arc::new(lash_core::SingleProviderResolver::new(provider));
178        }
179        let plugin_host = build_plugin_host(
180            self.core.protocol_factory.as_ref(),
181            self.core.plugin_factories.as_ref(),
182            self.plugin_factories,
183        )?;
184        env.core = self
185            .core
186            .runtime_host_for_plugin_host(env.core.clone(), &plugin_host)?;
187        env.plugin_host = Some(Arc::new(plugin_host));
188        let effect_host = Arc::clone(&env.core.control.effect_host);
189        let drivers = self.core.work_driver.drivers().await;
190        env.process_work_driver = drivers.process.clone();
191        env.queued_work_driver = drivers.queued.clone();
192        let mut runtime = LashRuntime::from_environment(&env, policy, state, store).await?;
193        if let Some(owner_id) = self.session_execution_owner_id {
194            runtime.set_runtime_scope_id(owner_id);
195        }
196        if drivers.drive_process_on_open
197            && let Some(driver) = drivers.process.as_ref()
198        {
199            driver.claim_and_run_pending("session_open").await?;
200        }
201        let handle = RuntimeHandle::with_live_replay_store(
202            runtime,
203            Arc::clone(&self.core.live_replay_store),
204        );
205        Ok(LashSession {
206            runtime: handle,
207            effect_host,
208            parent_session_id: self.parent_session_id,
209            active_plugins: self.active_plugins,
210            process_phase_probe_slot: self.core.work_driver.phase_probe_slot(),
211            turn_cancels: crate::turn::TurnCancelRegistry::default(),
212        })
213    }
214
215    async fn create_store(
216        &self,
217        policy: &SessionPolicy,
218    ) -> Result<Option<Arc<dyn RuntimePersistence>>> {
219        if let Some(store) = self.store.as_ref() {
220            return Ok(Some(Arc::clone(store)));
221        }
222        let Some(factory) = self.core.store_factory.as_ref() else {
223            return Ok(None);
224        };
225        let request = SessionStoreCreateRequest {
226            session_id: self.session_id.clone(),
227            relation: self
228                .parent_session_id
229                .as_ref()
230                .map(|parent_session_id| lash_core::SessionRelation::Child {
231                    parent_session_id: parent_session_id.clone(),
232                    caused_by: None,
233                })
234                .unwrap_or_default(),
235            policy: policy.clone(),
236        };
237        factory
238            .create_store(&request)
239            .await
240            .map(Some)
241            .map_err(|message| EmbedError::StoreFactory {
242                session_id: self.session_id.clone(),
243                message,
244            })
245    }
246}
247
248pub(crate) async fn load_state_for_residency(
249    residency: Residency,
250    session_id: &str,
251    policy: &SessionPolicy,
252    store: &dyn RuntimePersistence,
253) -> Result<RuntimeSessionState> {
254    let mut state = load_persisted_state_for_residency(residency, store)
255        .await?
256        .unwrap_or_else(|| RuntimeSessionState {
257            session_id: session_id.to_string(),
258            policy: policy.clone(),
259            ..RuntimeSessionState::default()
260        });
261    if state.session_id != session_id {
262        return Err(EmbedError::StoreSessionMismatch {
263            loaded: state.session_id,
264            requested: session_id.to_string(),
265        });
266    }
267    let recorded_provider_id = state.policy.recorded_provider_id().to_string();
268    state.policy = policy.clone();
269    state.policy.provider_id = recorded_provider_id;
270    Ok(state)
271}
272
273async fn load_persisted_state_for_residency(
274    residency: Residency,
275    store: &dyn RuntimePersistence,
276) -> Result<Option<RuntimeSessionState>> {
277    match residency {
278        Residency::KeepAll => {
279            let loaded = lash_core::store::load_persisted_session_state(store)
280                .await
281                .map_err(|err| SessionError::Protocol(format!("failed to load store: {err}")))?;
282            Ok(loaded)
283        }
284        Residency::ActivePathOnly => {
285            let active = lash_core::store::load_persisted_session_state_active_path(store, None)
286                .await
287                .map_err(|err| {
288                    SessionError::Protocol(format!("failed to load active-path store: {err}"))
289                })?;
290            if active
291                .as_ref()
292                .is_some_and(|state| state.session_graph.nodes.is_empty())
293            {
294                let mut full = lash_core::store::load_persisted_session_state(store)
295                    .await
296                    .map_err(|err| {
297                        SessionError::Protocol(format!(
298                            "failed to heal active-path store from full graph: {err}"
299                        ))
300                    })?;
301                if let Some(state) = full.as_mut() {
302                    state.graph_replace_required = true;
303                }
304                return Ok(full);
305            }
306            Ok(active)
307        }
308    }
309}
310
311impl PromptLayerSink for SessionBuilder {
312    fn prompt_layer_mut(&mut self) -> &mut PromptLayer {
313        self.spec.prompt.get_or_insert_with(PromptLayer::new)
314    }
315}
316
317#[cfg(feature = "rlm")]
318impl RlmSessionBuilder {
319    pub fn provider(mut self, provider: ProviderHandle) -> Self {
320        self.builder = self.builder.provider(provider);
321        self
322    }
323
324    pub fn session_spec(mut self, spec: SessionSpec) -> Self {
325        self.builder = self.builder.session_spec(spec);
326        self
327    }
328
329    pub fn parent(mut self, parent_session_id: impl Into<String>) -> Self {
330        self.builder = self.builder.parent(parent_session_id);
331        self
332    }
333
334    pub fn session_execution_owner_id(mut self, owner_id: impl Into<String>) -> Self {
335        self.builder = self.builder.session_execution_owner_id(owner_id);
336        self
337    }
338
339    pub fn store(mut self, store: Arc<dyn RuntimePersistence>) -> Self {
340        self.builder = self.builder.store(store);
341        self
342    }
343
344    pub fn plugin<P: PluginBinding>(mut self, config: P::SessionConfig) -> Self {
345        self.builder = self.builder.plugin::<P>(config);
346        self
347    }
348
349    pub async fn open(self) -> Result<LashSession> {
350        self.open_resolved(RlmOpenState::Resume).await
351    }
352
353    pub async fn open_fresh(self) -> Result<LashSession> {
354        self.open_resolved(RlmOpenState::Fresh).await
355    }
356
357    pub async fn open_with_state(self, state: RuntimeSessionState) -> Result<LashSession> {
358        self.open_resolved(RlmOpenState::Explicit(state)).await
359    }
360
361    async fn open_resolved(self, open_state: RlmOpenState) -> Result<LashSession> {
362        let Self {
363            builder,
364            rlm_final_answer_format,
365        } = self;
366        let policy = builder.session_policy();
367        let store = builder.create_store(&policy).await?;
368        let mut state = match open_state {
369            RlmOpenState::Resume => {
370                builder
371                    .load_or_default_state(&policy, store.as_deref())
372                    .await?
373            }
374            RlmOpenState::Fresh => RuntimeSessionState {
375                session_id: builder.session_id.clone(),
376                policy: policy.clone(),
377                graph_replace_required: true,
378                ..RuntimeSessionState::default()
379            },
380            RlmOpenState::Explicit(mut state) => {
381                if state.session_id != builder.session_id {
382                    return Err(EmbedError::StoreSessionMismatch {
383                        loaded: state.session_id,
384                        requested: builder.session_id.clone(),
385                    });
386                }
387                let recorded_provider_id = state.policy.recorded_provider_id().to_string();
388                state.policy = policy.clone();
389                state.policy.provider_id = recorded_provider_id;
390                state
391            }
392        };
393        apply_rlm_session_options(
394            builder.parent_session_id.is_none(),
395            rlm_final_answer_format,
396            &mut state,
397        )?;
398        builder.open_resolved(policy, state, store).await
399    }
400}
401
402#[cfg(feature = "rlm")]
403impl PromptLayerSink for RlmSessionBuilder {
404    fn prompt_layer_mut(&mut self) -> &mut PromptLayer {
405        self.builder.prompt_layer_mut()
406    }
407}
408
409#[cfg(feature = "rlm")]
410#[allow(clippy::large_enum_variant)]
411enum RlmOpenState {
412    Resume,
413    Fresh,
414    Explicit(RuntimeSessionState),
415}
416
417#[cfg(feature = "rlm")]
418fn apply_rlm_session_options(
419    is_root_session: bool,
420    explicit_format: Option<lash_rlm_types::RlmFinalAnswerFormat>,
421    state: &mut RuntimeSessionState,
422) -> Result<()> {
423    let final_answer_format = explicit_format.unwrap_or({
424        if is_root_session {
425            lash_rlm_types::RlmFinalAnswerFormat::Markdown
426        } else {
427            lash_rlm_types::RlmFinalAnswerFormat::RawSubmitValue
428        }
429    });
430    let mut extras = if state.protocol_turn_options.is_empty() {
431        lash_rlm_types::RlmCreateExtras::default()
432    } else {
433        state.protocol_turn_options.decode()?
434    };
435    extras.final_answer_format = Some(final_answer_format);
436    let options = ProtocolTurnOptions::typed(extras)?;
437    state.protocol_turn_options = options.clone();
438    for frame in &mut state.agent_frames {
439        frame.protocol_turn_options = options.clone();
440    }
441    Ok(())
442}
443
444#[cfg(all(test, feature = "rlm"))]
445mod tests {
446    use super::*;
447
448    #[test]
449    fn apply_rlm_session_options_preserves_existing_termination() -> Result<()> {
450        let mut state = RuntimeSessionState {
451            protocol_turn_options: ProtocolTurnOptions::typed(lash_rlm_types::RlmCreateExtras {
452                termination: lash_rlm_types::RlmTermination::ProseOrSubmit,
453                final_answer_format: None,
454            })?,
455            ..Default::default()
456        };
457
458        apply_rlm_session_options(true, None, &mut state)?;
459
460        let extras: lash_rlm_types::RlmCreateExtras = state.protocol_turn_options.decode()?;
461        assert_eq!(
462            extras.termination,
463            lash_rlm_types::RlmTermination::ProseOrSubmit
464        );
465        assert_eq!(
466            extras.final_answer_format,
467            Some(lash_rlm_types::RlmFinalAnswerFormat::Markdown)
468        );
469        Ok(())
470    }
471}
472
473#[derive(Clone)]
474pub struct LashSession {
475    pub(crate) runtime: RuntimeHandle,
476    pub(crate) effect_host: Arc<dyn EffectHost>,
477    pub(crate) parent_session_id: Option<String>,
478    pub(crate) active_plugins: Vec<ActivePluginBinding>,
479    pub(crate) process_phase_probe_slot: Option<lash_core::runtime::RuntimeTurnPhaseProbeSlot>,
480    pub(crate) turn_cancels: crate::turn::TurnCancelRegistry,
481}
482
483#[derive(Clone, Debug, Default)]
484pub struct SessionConfigPatch {
485    pub provider: Option<ProviderHandle>,
486    pub model: Option<ModelSpec>,
487    pub prompt: Option<PromptLayer>,
488}
489
490impl LashSession {
491    pub async fn close(self) -> Result<()> {
492        let runtime = self.runtime.writer();
493        let runtime = runtime.lock().await;
494        runtime.unregister_plugin_session()?;
495        Ok(())
496    }
497
498    pub fn session_id(&self) -> String {
499        self.runtime.observe().session_id().to_string()
500    }
501
502    pub fn policy_snapshot(&self) -> SessionPolicy {
503        self.runtime.observe().policy.clone()
504    }
505
506    pub fn observe(&self) -> ObservableSession {
507        ObservableSession {
508            runtime: self.runtime.clone(),
509        }
510    }
511
512    pub fn parent_session_id(&self) -> Option<&str> {
513        self.parent_session_id.as_deref()
514    }
515
516    pub fn effect_host(&self) -> Arc<dyn EffectHost> {
517        Arc::clone(&self.effect_host)
518    }
519
520    pub fn turn(&self, input: TurnInput) -> TurnBuilder {
521        TurnBuilder {
522            runtime: self.runtime.clone(),
523            effect_host: Arc::clone(&self.effect_host),
524            active_plugins: self.active_plugins.clone(),
525            input,
526            cancel: CancellationToken::new(),
527            cancels: self.turn_cancels.clone(),
528            protocol_turn_options: None,
529            provider: None,
530            model: None,
531            turn_id: None,
532        }
533    }
534
535    pub fn queued_turn(&self) -> QueuedTurnBuilder {
536        QueuedTurnBuilder {
537            runtime: self.runtime.clone(),
538            effect_host: Arc::clone(&self.effect_host),
539            cancel: CancellationToken::new(),
540            cancels: self.turn_cancels.clone(),
541            batch_ids: Vec::new(),
542            drain_id: None,
543        }
544    }
545
546    /// Cancel every turn currently executing through this opened session
547    /// (including its clones) and report how many were signalled.
548    ///
549    /// This is the affordance behind a UI "stop" control: hold a clone of the
550    /// session wherever the stop arrives and call this, instead of threading a
551    /// [`CancellationToken`](crate::CancellationToken) into every turn call
552    /// ([`TurnBuilder::cancel`](crate::TurnBuilder::cancel) remains the
553    /// per-turn hook when you need one). A cancelled turn finishes with
554    /// `TurnOutcome::Stopped(TurnStop::Cancelled)` and commits like any other
555    /// turn; the session stays usable.
556    ///
557    /// Scope: turns started from this `LashSession` instance and its clones.
558    /// A handle opened separately for the same session id has its own
559    /// registry and is not reached.
560    pub fn cancel_running_turns(&self) -> usize {
561        self.turn_cancels.cancel_all()
562    }
563
564    pub fn admin(&self) -> SessionAdmin {
565        SessionAdmin {
566            runtime: self.runtime.clone(),
567        }
568    }
569
570    pub async fn configure(&self, patch: SessionConfigPatch) -> Result<()> {
571        self.admin().config().update(patch).await
572    }
573
574    pub fn tools(&self) -> ToolAdmin {
575        ToolAdmin::new(self.admin())
576    }
577
578    pub fn commands(&self) -> SessionCommandAdmin {
579        self.admin().commands()
580    }
581
582    pub fn triggers(&self) -> SessionTriggerAdmin {
583        self.admin().triggers()
584    }
585
586    pub fn processes(&self) -> SessionProcessAdmin {
587        SessionProcessAdmin::new(self.admin())
588    }
589
590    pub fn plugin_operations(&self) -> PluginOperations {
591        PluginOperations {
592            control: self.admin(),
593        }
594    }
595
596    pub fn enqueue(&self, input: TurnInput) -> EnqueueTurnBuilder<'_> {
597        EnqueueTurnBuilder {
598            session: self,
599            input,
600            id: None,
601            delivery_policy: DeliveryPolicy::AfterCurrentTurnCommit,
602            slot_policy: SlotPolicy::Exclusive,
603        }
604    }
605
606    pub async fn queued_work(&self) -> Result<Vec<QueuedWorkBatch>> {
607        let observation = self.runtime.observe();
608        let store = observation.queue_store.as_ref().ok_or_else(|| {
609            EmbedError::Runtime(lash_core::RuntimeError::new(
610                lash_core::RuntimeErrorCode::StoreCommitFailed,
611                "queued work inspection requires a persistent runtime store",
612            ))
613        })?;
614        store
615            .list_pending_queued_work(observation.session_id())
616            .await
617            .map_err(|err| {
618                EmbedError::Runtime(lash_core::RuntimeError::new(
619                    lash_core::RuntimeErrorCode::StoreCommitFailed,
620                    err.to_string(),
621                ))
622            })
623    }
624
625    pub async fn cancel_queued_work_batch(
626        &self,
627        batch_id: &str,
628    ) -> Result<Option<QueuedWorkBatch>> {
629        let session_id = self.session_id();
630        self.runtime
631            .cancel_queued_work_batch(&session_id, batch_id)
632            .await
633            .map_err(EmbedError::Runtime)
634    }
635
636    /// Resolve once `batch_id` is no longer pending in the queue store —
637    /// drained by whoever runs queued work (a queued-work runner, a durable
638    /// worker, or another handle's [`queued_turn`](Self::queued_turn)) or
639    /// cancelled. This is the enqueue-and-observe side of the queue: the
640    /// caller never claims the work itself.
641    ///
642    /// Completion is read from the persistent queue store, so it observes
643    /// drains performed by other session handles and other processes alike.
644    /// There is no built-in deadline — nothing resolves if nothing drains the
645    /// queue, so bound it with `tokio::time::timeout` when the worker may be
646    /// unavailable. A batch id the store has never seen resolves immediately.
647    pub async fn await_queued_work_batch(&self, batch_id: &str) -> Result<()> {
648        let observation = self.runtime.observe();
649        let store = observation.queue_store.clone().ok_or_else(|| {
650            EmbedError::Runtime(lash_core::RuntimeError::new(
651                lash_core::RuntimeErrorCode::StoreCommitFailed,
652                "queued work inspection requires a persistent runtime store",
653            ))
654        })?;
655        let session_id = observation.session_id().to_string();
656        drop(observation);
657        let mut delay = std::time::Duration::from_millis(25);
658        loop {
659            let pending = store
660                .list_pending_queued_work(&session_id)
661                .await
662                .map_err(|err| {
663                    EmbedError::Runtime(lash_core::RuntimeError::new(
664                        lash_core::RuntimeErrorCode::StoreCommitFailed,
665                        err.to_string(),
666                    ))
667                })?;
668            if !pending.iter().any(|batch| batch.batch_id == batch_id) {
669                return Ok(());
670            }
671            tokio::time::sleep(delay).await;
672            delay = (delay * 2).min(std::time::Duration::from_millis(400));
673        }
674    }
675
676    pub fn read_view(&self) -> SessionReadView {
677        self.runtime.observe().read_view.clone()
678    }
679
680    pub fn usage_report(&self) -> SessionUsageReport {
681        self.runtime.observe().usage_report.clone()
682    }
683
684    pub async fn set_turn_phase_probe(
685        &self,
686        probe: Arc<dyn lash_core::runtime::RuntimeTurnPhaseProbe>,
687    ) {
688        let writer = self.runtime.writer();
689        let mut runtime = writer.lock().await;
690        runtime.set_turn_phase_probe(Arc::clone(&probe));
691        self.runtime.publish_from(&runtime);
692        if let Some(slot) = &self.process_phase_probe_slot {
693            let observation = self.runtime.observe();
694            slot.set_for_session(observation.session_id(), Arc::clone(&probe));
695            let current_frame = observation.persisted_state.current_agent_frame_id.as_str();
696            if !current_frame.is_empty() {
697                let scope = lash_core::SessionScope::for_agent_frame(
698                    observation.session_id(),
699                    current_frame,
700                );
701                slot.set_for_scope(&scope, probe);
702            }
703        }
704    }
705}
706
707#[derive(Clone)]
708pub struct ObservableSession {
709    pub(crate) runtime: RuntimeHandle,
710}
711
712impl ObservableSession {
713    fn snapshot(&self) -> Arc<RuntimeObservation> {
714        self.runtime.observe()
715    }
716
717    pub fn current_observation(&self) -> SessionObservation {
718        self.runtime.current_session_observation()
719    }
720
721    pub fn resume_from_cursor(&self, cursor: &SessionCursor) -> Result<SessionResume> {
722        self.runtime
723            .resume_session_observation(cursor)
724            .map_err(live_replay_error)
725    }
726
727    pub fn subscribe_from_cursor(
728        &self,
729        cursor: &SessionCursor,
730    ) -> Result<SessionObservationSubscription> {
731        self.runtime
732            .subscribe_session_observation(cursor)
733            .map_err(live_replay_error)
734    }
735
736    pub fn session_id(&self) -> String {
737        self.snapshot().session_id().to_string()
738    }
739
740    pub fn policy_snapshot(&self) -> SessionPolicy {
741        self.snapshot().policy.clone()
742    }
743
744    pub fn read_view(&self) -> SessionReadView {
745        self.snapshot().read_view.clone()
746    }
747
748    pub fn usage_report(&self) -> SessionUsageReport {
749        self.snapshot().usage_report.clone()
750    }
751
752    pub fn tool_state(&self) -> Option<ToolState> {
753        self.snapshot().tool_state.clone()
754    }
755
756    pub fn active_tool_manifests(&self) -> Vec<ToolManifest> {
757        self.snapshot()
758            .tool_state
759            .as_ref()
760            .map(ToolState::tool_manifests)
761            .unwrap_or_default()
762    }
763
764    pub async fn list_process_handles(&self) -> Vec<ProcessHandleSummary> {
765        self.snapshot().list_process_handles().await
766    }
767
768    pub async fn list_all_process_handles(&self) -> Vec<ProcessHandleSummary> {
769        self.snapshot().list_all_process_handles().await
770    }
771
772    pub fn process_scope(&self) -> SessionScope {
773        self.snapshot().process_scope()
774    }
775}
776
777fn live_replay_error(err: lash_core::LiveReplayStoreError) -> EmbedError {
778    EmbedError::Runtime(lash_core::RuntimeError::new(
779        RuntimeErrorCode::Other("live_replay".to_string()),
780        err.to_string(),
781    ))
782}
783
784pub struct EnqueueTurnBuilder<'a> {
785    session: &'a LashSession,
786    input: TurnInput,
787    id: Option<String>,
788    delivery_policy: DeliveryPolicy,
789    slot_policy: SlotPolicy,
790}
791
792impl<'a> EnqueueTurnBuilder<'a> {
793    pub fn id(mut self, id: impl Into<String>) -> Self {
794        self.id = Some(id.into());
795        self
796    }
797
798    pub fn delivery_policy(mut self, policy: DeliveryPolicy) -> Self {
799        self.delivery_policy = policy;
800        self
801    }
802
803    pub fn slot_policy(mut self, policy: SlotPolicy) -> Self {
804        self.slot_policy = policy;
805        self
806    }
807
808    pub async fn send(self) -> Result<QueuedWorkBatch> {
809        let source_key = self.id.map(|id| format!("host:{id}"));
810        self.session
811            .runtime
812            .enqueue_turn_input(
813                self.input,
814                self.delivery_policy,
815                self.slot_policy,
816                source_key,
817            )
818            .await
819            .map_err(EmbedError::Runtime)
820    }
821}
822
823impl<'a> std::future::IntoFuture for EnqueueTurnBuilder<'a> {
824    type Output = Result<QueuedWorkBatch>;
825    type IntoFuture =
826        std::pin::Pin<Box<dyn std::future::Future<Output = Result<QueuedWorkBatch>> + 'a>>;
827
828    fn into_future(self) -> Self::IntoFuture {
829        Box::pin(self.send())
830    }
831}