Skip to main content

lash/
session.rs

1use std::pin::Pin;
2use std::task::{Context, Poll};
3
4use crate::support::*;
5use futures_util::Stream;
6use lash_core::runtime::{DeliveryPolicy, QueuedWorkBatch, SlotPolicy};
7use lash_core::{LiveReplayGap, LiveReplayStoreError, SessionObservationEvent};
8
9pub struct SessionBuilder {
10    pub(crate) core: LashCore,
11    pub(crate) session_id: String,
12    pub(crate) spec: SessionSpec,
13    pub(crate) parent_session_id: Option<String>,
14    pub(crate) session_execution_owner_id: Option<String>,
15    pub(crate) store: Option<Arc<dyn RuntimePersistence>>,
16    pub(crate) provider: Option<ProviderHandle>,
17    pub(crate) active_plugins: Vec<ActivePluginBinding>,
18    pub(crate) plugin_factories: Vec<Arc<dyn PluginFactory>>,
19}
20
21#[cfg(feature = "rlm")]
22pub struct RlmSessionBuilder {
23    pub(crate) builder: SessionBuilder,
24    pub(crate) rlm_final_answer_format: Option<lash_rlm_types::RlmFinalAnswerFormat>,
25}
26
27impl SessionBuilder {
28    pub fn provider(mut self, provider: ProviderHandle) -> Self {
29        self.spec = self.spec.provider_id(provider.kind());
30        self.provider = Some(provider);
31        self
32    }
33
34    pub fn session_spec(mut self, spec: SessionSpec) -> Self {
35        self.spec = spec;
36        self
37    }
38
39    pub fn parent(mut self, parent_session_id: impl Into<String>) -> Self {
40        self.parent_session_id = Some(parent_session_id.into());
41        self
42    }
43
44    /// Use a stable owner id for durable session execution leases.
45    ///
46    /// This is only for hosts that already serialize one logical execution
47    /// lane, such as a durable workflow engine retrying the same invocation on
48    /// another process. Normal embedders should keep the default per-open UUID;
49    /// reusing an owner id across independently running handles makes them the
50    /// same lease owner.
51    pub fn session_execution_owner_id(mut self, owner_id: impl Into<String>) -> Self {
52        self.session_execution_owner_id = Some(owner_id.into());
53        self
54    }
55
56    /// Use a specific persistence store for this root session.
57    ///
58    /// This is the right API for a host-owned, pre-opened session database.
59    /// Managed child sessions never reuse this store; configure
60    /// `LashCoreBuilder::child_store_factory` when child sessions should also
61    /// persist.
62    pub fn store(mut self, store: Arc<dyn RuntimePersistence>) -> Self {
63        self.store = Some(store);
64        self
65    }
66
67    pub fn plugin<P: PluginBinding>(mut self, config: P::SessionConfig) -> Self {
68        self.active_plugins.push(ActivePluginBinding {
69            id: P::ID,
70            requires_turn_input: P::requires_turn_input(&config),
71        });
72        self.plugin_factories.push(P::factory(&config));
73        self
74    }
75
76    pub async fn open(self) -> Result<LashSession> {
77        let policy = self.session_policy();
78        let store = self.create_store(&policy).await?;
79        let state = self
80            .load_or_default_state(&policy, store.as_deref())
81            .await?;
82        self.open_resolved(policy, state, store).await
83    }
84
85    /// Open this session with a fresh resident graph, ignoring any persisted
86    /// session graph/checkpoint state that may already exist for the same
87    /// session id.
88    ///
89    /// The next successful commit writes a full replacement graph, so normal
90    /// embedders can use this to start over without manually calling
91    /// `load_persisted_session_state` or constructing a `RuntimeSessionState`.
92    /// Use [`Self::open`] for resume and [`Self::open_with_state`] only when
93    /// restoring explicit host-owned state.
94    pub async fn open_fresh(self) -> Result<LashSession> {
95        let policy = self.session_policy();
96        let store = self.create_store(&policy).await?;
97        let state = RuntimeSessionState {
98            session_id: self.session_id.clone(),
99            policy: policy.clone(),
100            graph_replace_required: true,
101            ..RuntimeSessionState::default()
102        };
103        self.open_resolved(policy, state, store).await
104    }
105
106    /// Open with an explicitly supplied runtime state.
107    ///
108    /// This is for advanced hosts that already own a complete state snapshot.
109    /// Normal embedders should use [`Self::open`] to resume according to Lash's
110    /// residency policy or [`Self::open_fresh`] to start over and replace prior
111    /// persisted state on the next commit.
112    pub async fn open_with_state(self, mut state: RuntimeSessionState) -> Result<LashSession> {
113        let policy = self.session_policy();
114        let store = self.create_store(&policy).await?;
115        if state.session_id != self.session_id {
116            return Err(EmbedError::StoreSessionMismatch {
117                loaded: state.session_id,
118                requested: self.session_id,
119            });
120        }
121        let recorded_provider_id = state.policy.recorded_provider_id().to_string();
122        state.policy = policy.clone();
123        state.policy.provider_id = recorded_provider_id;
124        self.open_resolved(policy, state, store).await
125    }
126
127    fn session_policy(&self) -> SessionPolicy {
128        let mut policy = self.spec.resolve_against(&self.core.policy);
129        policy.session_id = Some(self.session_id.clone());
130        policy
131    }
132
133    async fn load_or_default_state(
134        &self,
135        policy: &SessionPolicy,
136        store: Option<&dyn RuntimePersistence>,
137    ) -> Result<RuntimeSessionState> {
138        let state = match store {
139            Some(store) => {
140                let loaded = self.load_persisted_state_for_residency(store).await?;
141                let mut state = loaded.unwrap_or_else(|| RuntimeSessionState {
142                    session_id: self.session_id.clone(),
143                    policy: policy.clone(),
144                    ..RuntimeSessionState::default()
145                });
146                if state.session_id != self.session_id {
147                    return Err(EmbedError::StoreSessionMismatch {
148                        loaded: state.session_id,
149                        requested: self.session_id.clone(),
150                    });
151                }
152                let recorded_provider_id = state.policy.recorded_provider_id().to_string();
153                state.policy = policy.clone();
154                state.policy.provider_id = recorded_provider_id;
155                state
156            }
157            None => RuntimeSessionState {
158                session_id: self.session_id.clone(),
159                policy: policy.clone(),
160                ..RuntimeSessionState::default()
161            },
162        };
163        Ok(state)
164    }
165
166    async fn load_persisted_state_for_residency(
167        &self,
168        store: &dyn RuntimePersistence,
169    ) -> Result<Option<RuntimeSessionState>> {
170        load_persisted_state_for_residency(self.core.env.residency, store).await
171    }
172
173    async fn open_resolved(
174        self,
175        policy: SessionPolicy,
176        state: RuntimeSessionState,
177        store: Option<Arc<dyn RuntimePersistence>>,
178    ) -> Result<LashSession> {
179        let mut env = self.core.env.clone();
180        if let Some(provider) = self.provider.clone().or_else(|| self.core.provider.clone()) {
181            env.core.providers.provider_resolver =
182                Arc::new(lash_core::SingleProviderResolver::new(provider));
183        }
184        let plugin_host = build_plugin_host(
185            self.core.protocol_factory.as_ref(),
186            self.core.plugin_factories.as_ref(),
187            self.plugin_factories,
188        )?;
189        env.core = self
190            .core
191            .runtime_host_for_plugin_host(env.core.clone(), &plugin_host)?;
192        env.plugin_host = Some(Arc::new(plugin_host));
193        let effect_host = Arc::clone(&env.core.control.effect_host);
194        let drivers = self.core.work_driver.drivers().await;
195        env.process_work_driver = drivers.process.clone();
196        env.queued_work_driver = drivers.queued.clone();
197        let mut runtime = LashRuntime::from_environment(&env, policy, state, store).await?;
198        if let Some(owner_id) = self.session_execution_owner_id {
199            runtime.set_runtime_scope_id(owner_id);
200        }
201        if drivers.drive_process_on_open
202            && let Some(driver) = drivers.process.as_ref()
203        {
204            driver.claim_and_run_pending("session_open").await?;
205        }
206        let handle = RuntimeHandle::with_live_replay_store(
207            runtime,
208            Arc::clone(&self.core.live_replay_store),
209        );
210        Ok(LashSession {
211            runtime: handle,
212            effect_host,
213            parent_session_id: self.parent_session_id,
214            active_plugins: self.active_plugins,
215            process_phase_probe_slot: self.core.work_driver.phase_probe_slot(),
216            turn_cancels: crate::turn::TurnCancelRegistry::default(),
217        })
218    }
219
220    async fn create_store(
221        &self,
222        policy: &SessionPolicy,
223    ) -> Result<Option<Arc<dyn RuntimePersistence>>> {
224        if let Some(store) = self.store.as_ref() {
225            return Ok(Some(Arc::clone(store)));
226        }
227        let Some(factory) = self.core.store_factory.as_ref() else {
228            return Ok(None);
229        };
230        let request = SessionStoreCreateRequest {
231            session_id: self.session_id.clone(),
232            relation: self
233                .parent_session_id
234                .as_ref()
235                .map(|parent_session_id| lash_core::SessionRelation::Child {
236                    parent_session_id: parent_session_id.clone(),
237                    caused_by: None,
238                })
239                .unwrap_or_default(),
240            policy: policy.clone(),
241        };
242        factory
243            .create_store(&request)
244            .await
245            .map(Some)
246            .map_err(|message| EmbedError::StoreFactory {
247                session_id: self.session_id.clone(),
248                message,
249            })
250    }
251}
252
253pub(crate) async fn load_state_for_residency(
254    residency: Residency,
255    session_id: &str,
256    policy: &SessionPolicy,
257    store: &dyn RuntimePersistence,
258) -> Result<RuntimeSessionState> {
259    let mut state = load_persisted_state_for_residency(residency, store)
260        .await?
261        .unwrap_or_else(|| RuntimeSessionState {
262            session_id: session_id.to_string(),
263            policy: policy.clone(),
264            ..RuntimeSessionState::default()
265        });
266    if state.session_id != session_id {
267        return Err(EmbedError::StoreSessionMismatch {
268            loaded: state.session_id,
269            requested: session_id.to_string(),
270        });
271    }
272    let recorded_provider_id = state.policy.recorded_provider_id().to_string();
273    state.policy = policy.clone();
274    state.policy.provider_id = recorded_provider_id;
275    Ok(state)
276}
277
278async fn load_persisted_state_for_residency(
279    residency: Residency,
280    store: &dyn RuntimePersistence,
281) -> Result<Option<RuntimeSessionState>> {
282    match residency {
283        Residency::KeepAll => {
284            let loaded = lash_core::store::load_persisted_session_state(store)
285                .await
286                .map_err(|err| SessionError::Protocol(format!("failed to load store: {err}")))?;
287            Ok(loaded)
288        }
289        Residency::ActivePathOnly => {
290            let active = lash_core::store::load_persisted_session_state_active_path(store, None)
291                .await
292                .map_err(|err| {
293                    SessionError::Protocol(format!("failed to load active-path store: {err}"))
294                })?;
295            if active
296                .as_ref()
297                .is_some_and(|state| state.session_graph.nodes.is_empty())
298            {
299                let mut full = lash_core::store::load_persisted_session_state(store)
300                    .await
301                    .map_err(|err| {
302                        SessionError::Protocol(format!(
303                            "failed to heal active-path store from full graph: {err}"
304                        ))
305                    })?;
306                if let Some(state) = full.as_mut() {
307                    state.graph_replace_required = true;
308                }
309                return Ok(full);
310            }
311            Ok(active)
312        }
313    }
314}
315
316impl PromptLayerSink for SessionBuilder {
317    fn prompt_layer_mut(&mut self) -> &mut PromptLayer {
318        self.spec.prompt.get_or_insert_with(PromptLayer::new)
319    }
320}
321
322#[cfg(feature = "rlm")]
323impl RlmSessionBuilder {
324    pub fn provider(mut self, provider: ProviderHandle) -> Self {
325        self.builder = self.builder.provider(provider);
326        self
327    }
328
329    pub fn session_spec(mut self, spec: SessionSpec) -> Self {
330        self.builder = self.builder.session_spec(spec);
331        self
332    }
333
334    pub fn parent(mut self, parent_session_id: impl Into<String>) -> Self {
335        self.builder = self.builder.parent(parent_session_id);
336        self
337    }
338
339    pub fn session_execution_owner_id(mut self, owner_id: impl Into<String>) -> Self {
340        self.builder = self.builder.session_execution_owner_id(owner_id);
341        self
342    }
343
344    pub fn store(mut self, store: Arc<dyn RuntimePersistence>) -> Self {
345        self.builder = self.builder.store(store);
346        self
347    }
348
349    pub fn plugin<P: PluginBinding>(mut self, config: P::SessionConfig) -> Self {
350        self.builder = self.builder.plugin::<P>(config);
351        self
352    }
353
354    pub async fn open(self) -> Result<LashSession> {
355        self.open_resolved(RlmOpenState::Resume).await
356    }
357
358    pub async fn open_fresh(self) -> Result<LashSession> {
359        self.open_resolved(RlmOpenState::Fresh).await
360    }
361
362    pub async fn open_with_state(self, state: RuntimeSessionState) -> Result<LashSession> {
363        self.open_resolved(RlmOpenState::Explicit(state)).await
364    }
365
366    async fn open_resolved(self, open_state: RlmOpenState) -> Result<LashSession> {
367        let Self {
368            builder,
369            rlm_final_answer_format,
370        } = self;
371        let policy = builder.session_policy();
372        let store = builder.create_store(&policy).await?;
373        let mut state = match open_state {
374            RlmOpenState::Resume => {
375                builder
376                    .load_or_default_state(&policy, store.as_deref())
377                    .await?
378            }
379            RlmOpenState::Fresh => RuntimeSessionState {
380                session_id: builder.session_id.clone(),
381                policy: policy.clone(),
382                graph_replace_required: true,
383                ..RuntimeSessionState::default()
384            },
385            RlmOpenState::Explicit(mut state) => {
386                if state.session_id != builder.session_id {
387                    return Err(EmbedError::StoreSessionMismatch {
388                        loaded: state.session_id,
389                        requested: builder.session_id.clone(),
390                    });
391                }
392                let recorded_provider_id = state.policy.recorded_provider_id().to_string();
393                state.policy = policy.clone();
394                state.policy.provider_id = recorded_provider_id;
395                state
396            }
397        };
398        apply_rlm_session_options(
399            builder.parent_session_id.is_none(),
400            rlm_final_answer_format,
401            &mut state,
402        )?;
403        builder.open_resolved(policy, state, store).await
404    }
405}
406
407#[cfg(feature = "rlm")]
408impl PromptLayerSink for RlmSessionBuilder {
409    fn prompt_layer_mut(&mut self) -> &mut PromptLayer {
410        self.builder.prompt_layer_mut()
411    }
412}
413
414#[cfg(feature = "rlm")]
415#[allow(clippy::large_enum_variant)]
416enum RlmOpenState {
417    Resume,
418    Fresh,
419    Explicit(RuntimeSessionState),
420}
421
422#[cfg(feature = "rlm")]
423fn apply_rlm_session_options(
424    is_root_session: bool,
425    explicit_format: Option<lash_rlm_types::RlmFinalAnswerFormat>,
426    state: &mut RuntimeSessionState,
427) -> Result<()> {
428    let final_answer_format = explicit_format.unwrap_or({
429        if is_root_session {
430            lash_rlm_types::RlmFinalAnswerFormat::Markdown
431        } else {
432            lash_rlm_types::RlmFinalAnswerFormat::RawSubmitValue
433        }
434    });
435    let mut extras = if state.protocol_turn_options.is_empty() {
436        lash_rlm_types::RlmCreateExtras::default()
437    } else {
438        state.protocol_turn_options.decode()?
439    };
440    extras.final_answer_format = Some(final_answer_format);
441    let options = ProtocolTurnOptions::typed(extras)?;
442    state.protocol_turn_options = options.clone();
443    for frame in &mut state.agent_frames {
444        frame.protocol_turn_options = options.clone();
445    }
446    Ok(())
447}
448
449#[cfg(all(test, feature = "rlm"))]
450mod tests {
451    use super::*;
452
453    #[test]
454    fn apply_rlm_session_options_preserves_existing_termination() -> Result<()> {
455        let mut state = RuntimeSessionState {
456            protocol_turn_options: ProtocolTurnOptions::typed(lash_rlm_types::RlmCreateExtras {
457                termination: lash_rlm_types::RlmTermination::ProseOrSubmit,
458                final_answer_format: None,
459            })?,
460            ..Default::default()
461        };
462
463        apply_rlm_session_options(true, None, &mut state)?;
464
465        let extras: lash_rlm_types::RlmCreateExtras = state.protocol_turn_options.decode()?;
466        assert_eq!(
467            extras.termination,
468            lash_rlm_types::RlmTermination::ProseOrSubmit
469        );
470        assert_eq!(
471            extras.final_answer_format,
472            Some(lash_rlm_types::RlmFinalAnswerFormat::Markdown)
473        );
474        Ok(())
475    }
476}
477
478#[derive(Clone)]
479pub struct LashSession {
480    pub(crate) runtime: RuntimeHandle,
481    pub(crate) effect_host: Arc<dyn EffectHost>,
482    pub(crate) parent_session_id: Option<String>,
483    pub(crate) active_plugins: Vec<ActivePluginBinding>,
484    pub(crate) process_phase_probe_slot: Option<lash_core::runtime::RuntimeTurnPhaseProbeSlot>,
485    pub(crate) turn_cancels: crate::turn::TurnCancelRegistry,
486}
487
488#[derive(Clone, Debug, Default)]
489pub struct SessionConfigPatch {
490    pub provider: Option<ProviderHandle>,
491    pub model: Option<ModelSpec>,
492    pub prompt: Option<PromptLayer>,
493}
494
495impl LashSession {
496    pub async fn close(self) -> Result<()> {
497        let runtime = self.runtime.writer();
498        let runtime = runtime.lock().await;
499        runtime.unregister_plugin_session()?;
500        Ok(())
501    }
502
503    pub fn session_id(&self) -> String {
504        self.runtime.observe().session_id().to_string()
505    }
506
507    pub fn policy_snapshot(&self) -> SessionPolicy {
508        self.runtime.observe().policy.clone()
509    }
510
511    pub fn observe(&self) -> ObservableSession {
512        ObservableSession {
513            runtime: self.runtime.clone(),
514        }
515    }
516
517    pub fn parent_session_id(&self) -> Option<&str> {
518        self.parent_session_id.as_deref()
519    }
520
521    pub fn effect_host(&self) -> Arc<dyn EffectHost> {
522        Arc::clone(&self.effect_host)
523    }
524
525    pub fn turn(&self, input: TurnInput) -> TurnBuilder {
526        TurnBuilder {
527            runtime: self.runtime.clone(),
528            effect_host: Arc::clone(&self.effect_host),
529            active_plugins: self.active_plugins.clone(),
530            input,
531            cancel: CancellationToken::new(),
532            cancels: self.turn_cancels.clone(),
533            protocol_turn_options: None,
534            provider: None,
535            model: None,
536            turn_id: None,
537        }
538    }
539
540    pub fn queued_turn(&self) -> QueuedTurnBuilder {
541        QueuedTurnBuilder {
542            runtime: self.runtime.clone(),
543            effect_host: Arc::clone(&self.effect_host),
544            cancel: CancellationToken::new(),
545            cancels: self.turn_cancels.clone(),
546            batch_ids: Vec::new(),
547            drain_id: None,
548        }
549    }
550
551    /// Cancel every turn currently executing through this opened session
552    /// (including its clones) and report how many were signalled.
553    ///
554    /// This is the affordance behind a UI "stop" control: hold a clone of the
555    /// session wherever the stop arrives and call this, instead of threading a
556    /// [`CancellationToken`](crate::CancellationToken) into every turn call
557    /// ([`TurnBuilder::cancel`](crate::TurnBuilder::cancel) remains the
558    /// per-turn hook when you need one). A cancelled turn finishes with
559    /// `TurnOutcome::Stopped(TurnStop::Cancelled)` and commits like any other
560    /// turn; the session stays usable.
561    ///
562    /// Scope: turns started from this `LashSession` instance and its clones.
563    /// A handle opened separately for the same session id has its own
564    /// registry and is not reached.
565    pub fn cancel_running_turns(&self) -> usize {
566        self.turn_cancels.cancel_all()
567    }
568
569    pub fn admin(&self) -> SessionAdmin {
570        SessionAdmin {
571            runtime: self.runtime.clone(),
572        }
573    }
574
575    pub async fn configure(&self, patch: SessionConfigPatch) -> Result<()> {
576        self.admin().config().update(patch).await
577    }
578
579    pub fn tools(&self) -> ToolAdmin {
580        ToolAdmin::new(self.admin())
581    }
582
583    pub fn commands(&self) -> SessionCommandAdmin {
584        self.admin().commands()
585    }
586
587    pub fn triggers(&self) -> SessionTriggerAdmin {
588        self.admin().triggers()
589    }
590
591    pub fn processes(&self) -> SessionProcessAdmin {
592        SessionProcessAdmin::new(self.admin())
593    }
594
595    pub fn plugin_operations(&self) -> PluginOperations {
596        PluginOperations {
597            control: self.admin(),
598        }
599    }
600
601    pub fn enqueue(&self, input: TurnInput) -> EnqueueTurnBuilder<'_> {
602        EnqueueTurnBuilder {
603            session: self,
604            input,
605            id: None,
606            delivery_policy: DeliveryPolicy::AfterCurrentTurnCommit,
607            slot_policy: SlotPolicy::Exclusive,
608        }
609    }
610
611    pub async fn queued_work(&self) -> Result<Vec<QueuedWorkBatch>> {
612        let observation = self.runtime.observe();
613        let store = observation.queue_store.as_ref().ok_or_else(|| {
614            EmbedError::Runtime(lash_core::RuntimeError::new(
615                lash_core::RuntimeErrorCode::StoreCommitFailed,
616                "queued work inspection requires a persistent runtime store",
617            ))
618        })?;
619        store
620            .list_pending_queued_work(observation.session_id())
621            .await
622            .map_err(|err| {
623                EmbedError::Runtime(lash_core::RuntimeError::new(
624                    lash_core::RuntimeErrorCode::StoreCommitFailed,
625                    err.to_string(),
626                ))
627            })
628    }
629
630    pub async fn cancel_queued_work_batch(
631        &self,
632        batch_id: &str,
633    ) -> Result<Option<QueuedWorkBatch>> {
634        let session_id = self.session_id();
635        self.runtime
636            .cancel_queued_work_batch(&session_id, batch_id)
637            .await
638            .map_err(EmbedError::Runtime)
639    }
640
641    /// Resolve once `batch_id` is no longer pending in the queue store —
642    /// drained by whoever runs queued work (a queued-work runner, a durable
643    /// worker, or another handle's [`queued_turn`](Self::queued_turn)) or
644    /// cancelled. This is the enqueue-and-observe side of the queue: the
645    /// caller never claims the work itself.
646    ///
647    /// Completion is read from the persistent queue store, so it observes
648    /// drains performed by other session handles and other processes alike.
649    /// There is no built-in deadline — nothing resolves if nothing drains the
650    /// queue, so bound it with `tokio::time::timeout` when the worker may be
651    /// unavailable. A batch id the store has never seen resolves immediately.
652    pub async fn await_queued_work_batch(&self, batch_id: &str) -> Result<()> {
653        let observation = self.runtime.observe();
654        let store = observation.queue_store.clone().ok_or_else(|| {
655            EmbedError::Runtime(lash_core::RuntimeError::new(
656                lash_core::RuntimeErrorCode::StoreCommitFailed,
657                "queued work inspection requires a persistent runtime store",
658            ))
659        })?;
660        let session_id = observation.session_id().to_string();
661        drop(observation);
662        let mut delay = std::time::Duration::from_millis(25);
663        loop {
664            let pending = store
665                .list_pending_queued_work(&session_id)
666                .await
667                .map_err(|err| {
668                    EmbedError::Runtime(lash_core::RuntimeError::new(
669                        lash_core::RuntimeErrorCode::StoreCommitFailed,
670                        err.to_string(),
671                    ))
672                })?;
673            if !pending.iter().any(|batch| batch.batch_id == batch_id) {
674                return Ok(());
675            }
676            tokio::time::sleep(delay).await;
677            delay = (delay * 2).min(std::time::Duration::from_millis(400));
678        }
679    }
680
681    pub fn read_view(&self) -> SessionReadView {
682        self.runtime.observe().read_view.clone()
683    }
684
685    pub fn usage_report(&self) -> SessionUsageReport {
686        self.runtime.observe().usage_report.clone()
687    }
688
689    pub async fn set_turn_phase_probe(
690        &self,
691        probe: Arc<dyn lash_core::runtime::RuntimeTurnPhaseProbe>,
692    ) {
693        let writer = self.runtime.writer();
694        let mut runtime = writer.lock().await;
695        runtime.set_turn_phase_probe(Arc::clone(&probe));
696        self.runtime.publish_from(&runtime);
697        if let Some(slot) = &self.process_phase_probe_slot {
698            let observation = self.runtime.observe();
699            slot.set_for_session(observation.session_id(), Arc::clone(&probe));
700            let current_frame = observation.persisted_state.current_agent_frame_id.as_str();
701            if !current_frame.is_empty() {
702                let scope = lash_core::SessionScope::for_agent_frame(
703                    observation.session_id(),
704                    current_frame,
705                );
706                slot.set_for_scope(&scope, probe);
707            }
708        }
709    }
710}
711
712#[derive(Clone)]
713pub struct ObservableSession {
714    pub(crate) runtime: RuntimeHandle,
715}
716
717impl ObservableSession {
718    fn snapshot(&self) -> Arc<RuntimeObservation> {
719        self.runtime.observe()
720    }
721
722    pub fn current_observation(&self) -> SessionObservation {
723        self.runtime.current_session_observation()
724    }
725
726    pub fn resume_from_cursor(&self, cursor: &SessionCursor) -> Result<SessionResume> {
727        self.runtime
728            .resume_session_observation(cursor)
729            .map_err(live_replay_error)
730    }
731
732    pub fn subscribe_from_cursor(
733        &self,
734        cursor: &SessionCursor,
735    ) -> Result<SessionObservationSubscription> {
736        self.runtime
737            .subscribe_session_observation(cursor)
738            .map_err(live_replay_error)
739    }
740
741    /// Subscribe to session observation events and keep the subscription alive
742    /// across recoverable live-replay gaps.
743    ///
744    /// The returned stream yields [`SessionObservationStreamItem::Gap`] when
745    /// the cursor missed the bounded replay window. Callers should replace
746    /// their UI/projection from the included fresh observation, persist
747    /// `gap.latest_cursor`, and keep polling the same stream; it resubscribes
748    /// from that cursor internally.
749    pub fn subscribe_and_recover(&self, cursor: SessionCursor) -> SessionObservationStream {
750        SessionObservationStream {
751            observable: self.clone(),
752            cursor,
753            subscription: None,
754            done: false,
755        }
756    }
757
758    pub fn session_id(&self) -> String {
759        self.snapshot().session_id().to_string()
760    }
761
762    pub fn policy_snapshot(&self) -> SessionPolicy {
763        self.snapshot().policy.clone()
764    }
765
766    pub fn read_view(&self) -> SessionReadView {
767        self.snapshot().read_view.clone()
768    }
769
770    pub fn usage_report(&self) -> SessionUsageReport {
771        self.snapshot().usage_report.clone()
772    }
773
774    pub fn tool_state(&self) -> Option<ToolState> {
775        self.snapshot().tool_state.clone()
776    }
777
778    pub fn active_tool_manifests(&self) -> Vec<ToolManifest> {
779        self.snapshot()
780            .tool_state
781            .as_ref()
782            .map(ToolState::tool_manifests)
783            .unwrap_or_default()
784    }
785
786    pub async fn list_process_handles(&self) -> Vec<ProcessHandleSummary> {
787        self.snapshot().list_process_handles().await
788    }
789
790    pub async fn list_all_process_handles(&self) -> Vec<ProcessHandleSummary> {
791        self.snapshot().list_all_process_handles().await
792    }
793
794    pub fn process_scope(&self) -> SessionScope {
795        self.snapshot().process_scope()
796    }
797}
798
799#[derive(Clone, Debug)]
800pub enum SessionObservationStreamItem {
801    /// A replayed or live session observation event.
802    Event(SessionObservationEvent),
803    /// A recoverable replay gap with a fresh durable observation.
804    Gap {
805        observation: SessionObservation,
806        gap: LiveReplayGap,
807    },
808}
809
810/// Stream returned by [`ObservableSession::subscribe_and_recover`].
811pub struct SessionObservationStream {
812    observable: ObservableSession,
813    cursor: SessionCursor,
814    subscription: Option<lash_core::LiveReplaySubscription>,
815    done: bool,
816}
817
818impl SessionObservationStream {
819    pub fn cursor(&self) -> &SessionCursor {
820        &self.cursor
821    }
822}
823
824impl Stream for SessionObservationStream {
825    type Item = Result<SessionObservationStreamItem>;
826
827    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
828        loop {
829            if self.done {
830                return Poll::Ready(None);
831            }
832            if self.subscription.is_none() {
833                match self.observable.subscribe_from_cursor(&self.cursor) {
834                    Ok(SessionObservationSubscription::Subscribed(subscription)) => {
835                        self.subscription = Some(subscription);
836                    }
837                    Ok(SessionObservationSubscription::Gap { observation, gap }) => {
838                        self.cursor = gap.latest_cursor.clone();
839                        return Poll::Ready(Some(Ok(SessionObservationStreamItem::Gap {
840                            observation,
841                            gap,
842                        })));
843                    }
844                    Err(err) => {
845                        self.done = true;
846                        return Poll::Ready(Some(Err(err)));
847                    }
848                }
849            }
850
851            let Some(subscription) = self.subscription.as_mut() else {
852                continue;
853            };
854            match Pin::new(subscription).poll_next(cx) {
855                Poll::Pending => return Poll::Pending,
856                Poll::Ready(Some(Ok(event))) => {
857                    self.cursor = event.cursor.clone();
858                    return Poll::Ready(Some(Ok(SessionObservationStreamItem::Event(event))));
859                }
860                Poll::Ready(Some(Err(LiveReplayStoreError::SubscriberLagged(_)))) => {
861                    self.subscription = None;
862                    continue;
863                }
864                Poll::Ready(Some(Err(err))) => {
865                    self.done = true;
866                    return Poll::Ready(Some(Err(live_replay_error(err))));
867                }
868                Poll::Ready(None) => {
869                    self.done = true;
870                    return Poll::Ready(None);
871                }
872            }
873        }
874    }
875}
876
877fn live_replay_error(err: lash_core::LiveReplayStoreError) -> EmbedError {
878    EmbedError::Runtime(lash_core::RuntimeError::new(
879        RuntimeErrorCode::Other("live_replay".to_string()),
880        err.to_string(),
881    ))
882}
883
884pub struct EnqueueTurnBuilder<'a> {
885    session: &'a LashSession,
886    input: TurnInput,
887    id: Option<String>,
888    delivery_policy: DeliveryPolicy,
889    slot_policy: SlotPolicy,
890}
891
892impl<'a> EnqueueTurnBuilder<'a> {
893    pub fn id(mut self, id: impl Into<String>) -> Self {
894        self.id = Some(id.into());
895        self
896    }
897
898    pub fn delivery_policy(mut self, policy: DeliveryPolicy) -> Self {
899        self.delivery_policy = policy;
900        self
901    }
902
903    pub fn slot_policy(mut self, policy: SlotPolicy) -> Self {
904        self.slot_policy = policy;
905        self
906    }
907
908    pub async fn send(self) -> Result<QueuedWorkBatch> {
909        let source_key = self.id.map(|id| format!("host:{id}"));
910        self.session
911            .runtime
912            .enqueue_turn_input(
913                self.input,
914                self.delivery_policy,
915                self.slot_policy,
916                source_key,
917            )
918            .await
919            .map_err(EmbedError::Runtime)
920    }
921}
922
923impl<'a> std::future::IntoFuture for EnqueueTurnBuilder<'a> {
924    type Output = Result<QueuedWorkBatch>;
925    type IntoFuture =
926        std::pin::Pin<Box<dyn std::future::Future<Output = Result<QueuedWorkBatch>> + 'a>>;
927
928    fn into_future(self) -> Self::IntoFuture {
929        Box::pin(self.send())
930    }
931}