Skip to main content

lash/
session.rs

1use std::pin::Pin;
2use std::task::{Context, Poll};
3
4use crate::support::*;
5use futures_util::Stream;
6use lash_core::runtime::{DeliveryPolicy, QueuedWorkBatch, SlotPolicy};
7use lash_core::{LiveReplayGap, LiveReplayStoreError, SessionObservationEvent};
8use lash_remote_protocol::{
9    RemoteLiveReplayGap, RemoteSessionCursor, RemoteSessionObservation,
10    RemoteSessionObservationEvent,
11};
12
13pub struct SessionBuilder {
14    pub(crate) core: LashCore,
15    pub(crate) session_id: String,
16    pub(crate) spec: SessionSpec,
17    pub(crate) parent_session_id: Option<String>,
18    pub(crate) session_execution_owner: Option<lash_core::LeaseOwnerIdentity>,
19    pub(crate) store: Option<Arc<dyn RuntimePersistence>>,
20    pub(crate) provider: Option<ProviderHandle>,
21    pub(crate) active_plugins: Vec<ActivePluginBinding>,
22    pub(crate) plugin_factories: Vec<Arc<dyn PluginFactory>>,
23}
24
25#[cfg(feature = "rlm")]
26pub struct RlmSessionBuilder {
27    pub(crate) builder: SessionBuilder,
28    pub(crate) rlm_final_answer_format: Option<lash_rlm_types::RlmFinalAnswerFormat>,
29}
30
31impl SessionBuilder {
32    pub fn provider(mut self, provider: ProviderHandle) -> Self {
33        self.spec = self.spec.provider_id(provider.kind());
34        self.provider = Some(provider);
35        self
36    }
37
38    pub fn session_spec(mut self, spec: SessionSpec) -> Self {
39        self.spec = spec;
40        self
41    }
42
43    pub fn parent(mut self, parent_session_id: impl Into<String>) -> Self {
44        self.parent_session_id = Some(parent_session_id.into());
45        self
46    }
47
48    /// Use an explicit owner identity for durable session execution leases.
49    ///
50    /// This is only for hosts that already serialize one logical execution lane
51    /// and intentionally choose stable owner + incarnation values. Normal
52    /// embedders should keep the default per-open identity.
53    pub fn session_execution_owner(mut self, owner: lash_core::LeaseOwnerIdentity) -> Self {
54        self.session_execution_owner = Some(owner);
55        self
56    }
57
58    /// Use a specific persistence store for this root session.
59    ///
60    /// This is the right API for a host-owned, pre-opened session database.
61    /// Managed child sessions never reuse this store; configure
62    /// `LashCoreBuilder::child_store_factory` when child sessions should also
63    /// persist.
64    pub fn store(mut self, store: Arc<dyn RuntimePersistence>) -> Self {
65        self.store = Some(store);
66        self
67    }
68
69    pub fn plugin<P: PluginBinding>(mut self, config: P::SessionConfig) -> Self {
70        self.active_plugins.push(ActivePluginBinding {
71            id: P::ID,
72            requires_turn_input: P::requires_turn_input(&config),
73        });
74        self.plugin_factories.push(P::factory(&config));
75        self
76    }
77
78    pub async fn open(self) -> Result<LashSession> {
79        let policy = self.session_policy();
80        let store = self.create_store(&policy).await?;
81        let state = self
82            .load_or_default_state(&policy, store.as_deref())
83            .await?;
84        self.open_resolved(policy, state, store).await
85    }
86
87    /// Open this session with a fresh resident graph, ignoring any persisted
88    /// session graph/checkpoint state that may already exist for the same
89    /// session id.
90    ///
91    /// The next successful commit writes a full replacement graph, so normal
92    /// embedders can use this to start over without manually calling
93    /// `load_persisted_session_state` or constructing a `RuntimeSessionState`.
94    /// Use [`Self::open`] for resume and [`Self::open_with_state`] only when
95    /// restoring explicit host-owned state.
96    pub async fn open_fresh(self) -> Result<LashSession> {
97        let policy = self.session_policy();
98        let store = self.create_store(&policy).await?;
99        let state = RuntimeSessionState {
100            session_id: self.session_id.clone(),
101            policy: policy.clone(),
102            graph_replace_required: true,
103            ..RuntimeSessionState::default()
104        };
105        self.open_resolved(policy, state, store).await
106    }
107
108    /// Open with an explicitly supplied runtime state.
109    ///
110    /// This is for advanced hosts that already own a complete state snapshot.
111    /// Normal embedders should use [`Self::open`] to resume according to Lash's
112    /// residency policy or [`Self::open_fresh`] to start over and replace prior
113    /// persisted state on the next commit.
114    pub async fn open_with_state(self, mut state: RuntimeSessionState) -> Result<LashSession> {
115        let policy = self.session_policy();
116        let store = self.create_store(&policy).await?;
117        if state.session_id != self.session_id {
118            return Err(EmbedError::StoreSessionMismatch {
119                loaded: state.session_id,
120                requested: self.session_id,
121            });
122        }
123        let recorded_provider_id = state.policy.recorded_provider_id().to_string();
124        state.policy = policy.clone();
125        state.policy.provider_id = recorded_provider_id;
126        self.open_resolved(policy, state, store).await
127    }
128
129    fn session_policy(&self) -> SessionPolicy {
130        let mut policy = self.spec.resolve_against(&self.core.policy);
131        policy.session_id = Some(self.session_id.clone());
132        policy
133    }
134
135    async fn load_or_default_state(
136        &self,
137        policy: &SessionPolicy,
138        store: Option<&dyn RuntimePersistence>,
139    ) -> Result<RuntimeSessionState> {
140        let state = match store {
141            Some(store) => {
142                let loaded = self.load_persisted_state_for_residency(store).await?;
143                let mut state = loaded.unwrap_or_else(|| RuntimeSessionState {
144                    session_id: self.session_id.clone(),
145                    policy: policy.clone(),
146                    ..RuntimeSessionState::default()
147                });
148                if state.session_id != self.session_id {
149                    return Err(EmbedError::StoreSessionMismatch {
150                        loaded: state.session_id,
151                        requested: self.session_id.clone(),
152                    });
153                }
154                let recorded_provider_id = state.policy.recorded_provider_id().to_string();
155                state.policy = policy.clone();
156                state.policy.provider_id = recorded_provider_id;
157                state
158            }
159            None => RuntimeSessionState {
160                session_id: self.session_id.clone(),
161                policy: policy.clone(),
162                ..RuntimeSessionState::default()
163            },
164        };
165        Ok(state)
166    }
167
168    async fn load_persisted_state_for_residency(
169        &self,
170        store: &dyn RuntimePersistence,
171    ) -> Result<Option<RuntimeSessionState>> {
172        load_persisted_state_for_residency(self.core.env.residency, store).await
173    }
174
175    async fn open_resolved(
176        self,
177        policy: SessionPolicy,
178        state: RuntimeSessionState,
179        store: Option<Arc<dyn RuntimePersistence>>,
180    ) -> Result<LashSession> {
181        let mut env = self.core.env.clone();
182        if let Some(provider) = self.provider.clone().or_else(|| self.core.provider.clone()) {
183            env.core.providers.provider_resolver =
184                Arc::new(lash_core::SingleProviderResolver::new(provider));
185        }
186        let plugin_host = build_plugin_host(
187            self.core.protocol_factory.as_ref(),
188            self.core.plugin_factories.as_ref(),
189            self.plugin_factories,
190        )?;
191        env.core = self
192            .core
193            .runtime_host_for_plugin_host(env.core.clone(), &plugin_host)?;
194        env.plugin_host = Some(Arc::new(plugin_host));
195        let effect_host = Arc::clone(&env.core.control.effect_host);
196        let drivers = self.core.work_driver.drivers().await;
197        env.process_work_driver = drivers.process.clone();
198        env.queued_work_driver = drivers.queued.clone();
199        let mut runtime = LashRuntime::from_environment(&env, policy, state, store).await?;
200        if let Some(owner) = self.session_execution_owner {
201            runtime.set_runtime_lease_owner(owner);
202        }
203        if drivers.drive_process_on_open
204            && let Some(driver) = drivers.process.as_ref()
205        {
206            driver.claim_and_run_pending("session_open").await?;
207        }
208        let handle = RuntimeHandle::with_live_replay_store(
209            runtime,
210            Arc::clone(&self.core.live_replay_store),
211        );
212        Ok(LashSession {
213            runtime: handle,
214            effect_host,
215            parent_session_id: self.parent_session_id,
216            active_plugins: self.active_plugins,
217            process_phase_probe_slot: self.core.work_driver.phase_probe_slot(),
218            turn_cancels: crate::turn::TurnCancelRegistry::default(),
219        })
220    }
221
222    async fn create_store(
223        &self,
224        policy: &SessionPolicy,
225    ) -> Result<Option<Arc<dyn RuntimePersistence>>> {
226        if let Some(store) = self.store.as_ref() {
227            return Ok(Some(Arc::clone(store)));
228        }
229        let Some(factory) = self.core.store_factory.as_ref() else {
230            return Ok(None);
231        };
232        let request = SessionStoreCreateRequest {
233            session_id: self.session_id.clone(),
234            relation: self
235                .parent_session_id
236                .as_ref()
237                .map(|parent_session_id| lash_core::SessionRelation::Child {
238                    parent_session_id: parent_session_id.clone(),
239                    caused_by: None,
240                })
241                .unwrap_or_default(),
242            policy: policy.clone(),
243        };
244        factory
245            .create_store(&request)
246            .await
247            .map(Some)
248            .map_err(|message| EmbedError::StoreFactory {
249                session_id: self.session_id.clone(),
250                message,
251            })
252    }
253}
254
255pub(crate) async fn load_state_for_residency(
256    residency: Residency,
257    session_id: &str,
258    policy: &SessionPolicy,
259    store: &dyn RuntimePersistence,
260) -> Result<RuntimeSessionState> {
261    let mut state = load_persisted_state_for_residency(residency, store)
262        .await?
263        .unwrap_or_else(|| RuntimeSessionState {
264            session_id: session_id.to_string(),
265            policy: policy.clone(),
266            ..RuntimeSessionState::default()
267        });
268    if state.session_id != session_id {
269        return Err(EmbedError::StoreSessionMismatch {
270            loaded: state.session_id,
271            requested: session_id.to_string(),
272        });
273    }
274    let recorded_provider_id = state.policy.recorded_provider_id().to_string();
275    state.policy = policy.clone();
276    state.policy.provider_id = recorded_provider_id;
277    Ok(state)
278}
279
280async fn load_persisted_state_for_residency(
281    residency: Residency,
282    store: &dyn RuntimePersistence,
283) -> Result<Option<RuntimeSessionState>> {
284    match residency {
285        Residency::KeepAll => {
286            let loaded = lash_core::store::load_persisted_session_state(store)
287                .await
288                .map_err(|err| SessionError::Protocol(format!("failed to load store: {err}")))?;
289            Ok(loaded)
290        }
291        Residency::ActivePathOnly => {
292            let active = lash_core::store::load_persisted_session_state_active_path(store, None)
293                .await
294                .map_err(|err| {
295                    SessionError::Protocol(format!("failed to load active-path store: {err}"))
296                })?;
297            if active
298                .as_ref()
299                .is_some_and(|state| state.session_graph.nodes.is_empty())
300            {
301                let mut full = lash_core::store::load_persisted_session_state(store)
302                    .await
303                    .map_err(|err| {
304                        SessionError::Protocol(format!(
305                            "failed to heal active-path store from full graph: {err}"
306                        ))
307                    })?;
308                if let Some(state) = full.as_mut() {
309                    state.graph_replace_required = true;
310                }
311                return Ok(full);
312            }
313            Ok(active)
314        }
315    }
316}
317
318impl PromptLayerSink for SessionBuilder {
319    fn prompt_layer_mut(&mut self) -> &mut PromptLayer {
320        self.spec.prompt.get_or_insert_with(PromptLayer::new)
321    }
322}
323
324#[cfg(feature = "rlm")]
325impl RlmSessionBuilder {
326    pub fn provider(mut self, provider: ProviderHandle) -> Self {
327        self.builder = self.builder.provider(provider);
328        self
329    }
330
331    pub fn session_spec(mut self, spec: SessionSpec) -> Self {
332        self.builder = self.builder.session_spec(spec);
333        self
334    }
335
336    pub fn parent(mut self, parent_session_id: impl Into<String>) -> Self {
337        self.builder = self.builder.parent(parent_session_id);
338        self
339    }
340
341    pub fn session_execution_owner(mut self, owner: lash_core::LeaseOwnerIdentity) -> Self {
342        self.builder = self.builder.session_execution_owner(owner);
343        self
344    }
345
346    pub fn store(mut self, store: Arc<dyn RuntimePersistence>) -> Self {
347        self.builder = self.builder.store(store);
348        self
349    }
350
351    pub fn plugin<P: PluginBinding>(mut self, config: P::SessionConfig) -> Self {
352        self.builder = self.builder.plugin::<P>(config);
353        self
354    }
355
356    pub async fn open(self) -> Result<LashSession> {
357        self.open_resolved(RlmOpenState::Resume).await
358    }
359
360    pub async fn open_fresh(self) -> Result<LashSession> {
361        self.open_resolved(RlmOpenState::Fresh).await
362    }
363
364    pub async fn open_with_state(self, state: RuntimeSessionState) -> Result<LashSession> {
365        self.open_resolved(RlmOpenState::Explicit(state)).await
366    }
367
368    async fn open_resolved(self, open_state: RlmOpenState) -> Result<LashSession> {
369        let Self {
370            builder,
371            rlm_final_answer_format,
372        } = self;
373        let policy = builder.session_policy();
374        let store = builder.create_store(&policy).await?;
375        let mut state = match open_state {
376            RlmOpenState::Resume => {
377                builder
378                    .load_or_default_state(&policy, store.as_deref())
379                    .await?
380            }
381            RlmOpenState::Fresh => RuntimeSessionState {
382                session_id: builder.session_id.clone(),
383                policy: policy.clone(),
384                graph_replace_required: true,
385                ..RuntimeSessionState::default()
386            },
387            RlmOpenState::Explicit(mut state) => {
388                if state.session_id != builder.session_id {
389                    return Err(EmbedError::StoreSessionMismatch {
390                        loaded: state.session_id,
391                        requested: builder.session_id.clone(),
392                    });
393                }
394                let recorded_provider_id = state.policy.recorded_provider_id().to_string();
395                state.policy = policy.clone();
396                state.policy.provider_id = recorded_provider_id;
397                state
398            }
399        };
400        apply_rlm_session_options(
401            builder.parent_session_id.is_none(),
402            rlm_final_answer_format,
403            &mut state,
404        )?;
405        builder.open_resolved(policy, state, store).await
406    }
407}
408
409#[cfg(feature = "rlm")]
410impl PromptLayerSink for RlmSessionBuilder {
411    fn prompt_layer_mut(&mut self) -> &mut PromptLayer {
412        self.builder.prompt_layer_mut()
413    }
414}
415
416#[cfg(feature = "rlm")]
417#[allow(clippy::large_enum_variant)]
418enum RlmOpenState {
419    Resume,
420    Fresh,
421    Explicit(RuntimeSessionState),
422}
423
424#[cfg(feature = "rlm")]
425fn apply_rlm_session_options(
426    is_root_session: bool,
427    explicit_format: Option<lash_rlm_types::RlmFinalAnswerFormat>,
428    state: &mut RuntimeSessionState,
429) -> Result<()> {
430    let final_answer_format = explicit_format.unwrap_or({
431        if is_root_session {
432            lash_rlm_types::RlmFinalAnswerFormat::Markdown
433        } else {
434            lash_rlm_types::RlmFinalAnswerFormat::RawSubmitValue
435        }
436    });
437    let mut extras = if state.protocol_turn_options.is_empty() {
438        lash_rlm_types::RlmCreateExtras::default()
439    } else {
440        state.protocol_turn_options.decode()?
441    };
442    extras.final_answer_format = Some(final_answer_format);
443    let options = ProtocolTurnOptions::typed(extras)?;
444    state.protocol_turn_options = options.clone();
445    for frame in &mut state.agent_frames {
446        frame.protocol_turn_options = options.clone();
447    }
448    Ok(())
449}
450
451#[cfg(all(test, feature = "rlm"))]
452mod tests {
453    use super::*;
454
455    #[test]
456    fn apply_rlm_session_options_preserves_existing_termination() -> Result<()> {
457        let mut state = RuntimeSessionState {
458            protocol_turn_options: ProtocolTurnOptions::typed(lash_rlm_types::RlmCreateExtras {
459                termination: lash_rlm_types::RlmTermination::ProseOrSubmit,
460                final_answer_format: None,
461            })?,
462            ..Default::default()
463        };
464
465        apply_rlm_session_options(true, None, &mut state)?;
466
467        let extras: lash_rlm_types::RlmCreateExtras = state.protocol_turn_options.decode()?;
468        assert_eq!(
469            extras.termination,
470            lash_rlm_types::RlmTermination::ProseOrSubmit
471        );
472        assert_eq!(
473            extras.final_answer_format,
474            Some(lash_rlm_types::RlmFinalAnswerFormat::Markdown)
475        );
476        Ok(())
477    }
478}
479
480#[derive(Clone)]
481pub struct LashSession {
482    pub(crate) runtime: RuntimeHandle,
483    pub(crate) effect_host: Arc<dyn EffectHost>,
484    pub(crate) parent_session_id: Option<String>,
485    pub(crate) active_plugins: Vec<ActivePluginBinding>,
486    pub(crate) process_phase_probe_slot: Option<lash_core::runtime::RuntimeTurnPhaseProbeSlot>,
487    pub(crate) turn_cancels: crate::turn::TurnCancelRegistry,
488}
489
490#[derive(Clone, Debug, Default)]
491pub struct SessionConfigPatch {
492    pub provider: Option<ProviderHandle>,
493    pub model: Option<ModelSpec>,
494    pub prompt: Option<PromptLayer>,
495}
496
497impl LashSession {
498    pub async fn close(self) -> Result<()> {
499        let runtime = self.runtime.writer();
500        let runtime = runtime.lock().await;
501        runtime.unregister_plugin_session()?;
502        Ok(())
503    }
504
505    pub fn session_id(&self) -> String {
506        self.runtime.observe().session_id().to_string()
507    }
508
509    pub fn policy_snapshot(&self) -> SessionPolicy {
510        self.runtime.observe().policy.clone()
511    }
512
513    pub fn observe(&self) -> ObservableSession {
514        ObservableSession {
515            runtime: self.runtime.clone(),
516        }
517    }
518
519    pub fn parent_session_id(&self) -> Option<&str> {
520        self.parent_session_id.as_deref()
521    }
522
523    pub fn effect_host(&self) -> Arc<dyn EffectHost> {
524        Arc::clone(&self.effect_host)
525    }
526
527    pub fn turn(&self, input: TurnInput) -> TurnBuilder {
528        TurnBuilder {
529            runtime: self.runtime.clone(),
530            effect_host: Arc::clone(&self.effect_host),
531            active_plugins: self.active_plugins.clone(),
532            input,
533            cancel: CancellationToken::new(),
534            cancels: self.turn_cancels.clone(),
535            protocol_turn_options: None,
536            provider: None,
537            model: None,
538            turn_id: None,
539        }
540    }
541
542    pub fn queued_turn(&self) -> QueuedTurnBuilder {
543        QueuedTurnBuilder {
544            runtime: self.runtime.clone(),
545            effect_host: Arc::clone(&self.effect_host),
546            cancel: CancellationToken::new(),
547            cancels: self.turn_cancels.clone(),
548            batch_ids: Vec::new(),
549            drain_id: None,
550        }
551    }
552
553    /// Cancel every turn currently executing through this opened session
554    /// (including its clones) and report how many were signalled.
555    ///
556    /// This is the affordance behind a UI "stop" control: hold a clone of the
557    /// session wherever the stop arrives and call this, instead of threading a
558    /// [`CancellationToken`](crate::CancellationToken) into every turn call
559    /// ([`TurnBuilder::cancel`](crate::TurnBuilder::cancel) remains the
560    /// per-turn hook when you need one). A cancelled turn finishes with
561    /// `TurnOutcome::Stopped(TurnStop::Cancelled)` and commits like any other
562    /// turn; the session stays usable.
563    ///
564    /// Scope: turns started from this `LashSession` instance and its clones.
565    /// A handle opened separately for the same session id has its own
566    /// registry and is not reached.
567    pub fn cancel_running_turns(&self) -> usize {
568        self.turn_cancels.cancel_all()
569    }
570
571    pub fn admin(&self) -> SessionAdmin {
572        SessionAdmin {
573            runtime: self.runtime.clone(),
574        }
575    }
576
577    pub async fn configure(&self, patch: SessionConfigPatch) -> Result<()> {
578        self.admin().config().update(patch).await
579    }
580
581    pub fn tools(&self) -> ToolAdmin {
582        ToolAdmin::new(self.admin())
583    }
584
585    pub fn commands(&self) -> SessionCommandAdmin {
586        self.admin().commands()
587    }
588
589    pub fn triggers(&self) -> SessionTriggerAdmin {
590        self.admin().triggers()
591    }
592
593    pub fn processes(&self) -> SessionProcessAdmin {
594        SessionProcessAdmin::new(self.admin())
595    }
596
597    pub fn plugin_operations(&self) -> PluginOperations {
598        PluginOperations {
599            control: self.admin(),
600        }
601    }
602
603    pub fn enqueue(&self, input: TurnInput) -> EnqueueTurnBuilder<'_> {
604        EnqueueTurnBuilder {
605            session: self,
606            input,
607            id: None,
608            delivery_policy: DeliveryPolicy::AfterCurrentTurnCommit,
609            slot_policy: SlotPolicy::Exclusive,
610        }
611    }
612
613    /// Return all pending durable queued-work batches for this session.
614    ///
615    /// This is an admin/introspection view: it includes visible turn input,
616    /// process wakes, and session commands. UI queue previews should filter it
617    /// to visible `TurnInput` batches before rendering or selecting work.
618    pub async fn queued_work(&self) -> Result<Vec<QueuedWorkBatch>> {
619        let observation = self.runtime.observe();
620        let store = observation.queue_store.as_ref().ok_or_else(|| {
621            EmbedError::Runtime(lash_core::RuntimeError::new(
622                lash_core::RuntimeErrorCode::StoreCommitFailed,
623                "queued work inspection requires a persistent runtime store",
624            ))
625        })?;
626        store
627            .list_pending_queued_work(observation.session_id())
628            .await
629            .map_err(|err| {
630                EmbedError::Runtime(lash_core::RuntimeError::new(
631                    lash_core::RuntimeErrorCode::StoreCommitFailed,
632                    err.to_string(),
633                ))
634            })
635    }
636
637    pub async fn cancel_queued_work_batch(
638        &self,
639        batch_id: &str,
640    ) -> Result<Option<QueuedWorkBatch>> {
641        let session_id = self.session_id();
642        self.runtime
643            .cancel_queued_work_batch(&session_id, batch_id)
644            .await
645            .map_err(EmbedError::Runtime)
646    }
647
648    /// Resolve once `batch_id` is no longer pending in the queue store —
649    /// drained by whoever runs queued work (a queued-work runner, a durable
650    /// worker, or another handle's [`queued_turn`](Self::queued_turn)) or
651    /// cancelled. This is the enqueue-and-observe side of the queue: the
652    /// caller never claims the work itself.
653    ///
654    /// Completion is read from the persistent queue store, so it observes
655    /// drains performed by other session handles and other processes alike.
656    /// There is no built-in deadline — nothing resolves if nothing drains the
657    /// queue, so bound it with `tokio::time::timeout` when the worker may be
658    /// unavailable. A batch id the store has never seen resolves immediately.
659    pub async fn await_queued_work_batch(&self, batch_id: &str) -> Result<()> {
660        let observation = self.runtime.observe();
661        let store = observation.queue_store.clone().ok_or_else(|| {
662            EmbedError::Runtime(lash_core::RuntimeError::new(
663                lash_core::RuntimeErrorCode::StoreCommitFailed,
664                "queued work inspection requires a persistent runtime store",
665            ))
666        })?;
667        let session_id = observation.session_id().to_string();
668        drop(observation);
669        let mut delay = std::time::Duration::from_millis(25);
670        loop {
671            let pending = store
672                .list_pending_queued_work(&session_id)
673                .await
674                .map_err(|err| {
675                    EmbedError::Runtime(lash_core::RuntimeError::new(
676                        lash_core::RuntimeErrorCode::StoreCommitFailed,
677                        err.to_string(),
678                    ))
679                })?;
680            if !pending.iter().any(|batch| batch.batch_id == batch_id) {
681                return Ok(());
682            }
683            tokio::time::sleep(delay).await;
684            delay = (delay * 2).min(std::time::Duration::from_millis(400));
685        }
686    }
687
688    pub fn read_view(&self) -> SessionReadView {
689        self.runtime.observe().read_view.clone()
690    }
691
692    pub fn usage_report(&self) -> SessionUsageReport {
693        self.runtime.observe().usage_report.clone()
694    }
695
696    pub async fn set_turn_phase_probe(
697        &self,
698        probe: Arc<dyn lash_core::runtime::RuntimeTurnPhaseProbe>,
699    ) {
700        let writer = self.runtime.writer();
701        let mut runtime = writer.lock().await;
702        runtime.set_turn_phase_probe(Arc::clone(&probe));
703        self.runtime.publish_from(&runtime);
704        if let Some(slot) = &self.process_phase_probe_slot {
705            let observation = self.runtime.observe();
706            slot.set_for_session(observation.session_id(), Arc::clone(&probe));
707            let current_frame = observation.persisted_state.current_agent_frame_id.as_str();
708            if !current_frame.is_empty() {
709                let scope = lash_core::SessionScope::for_agent_frame(
710                    observation.session_id(),
711                    current_frame,
712                );
713                slot.set_for_scope(&scope, probe);
714            }
715        }
716    }
717}
718
719#[derive(Clone)]
720pub struct ObservableSession {
721    pub(crate) runtime: RuntimeHandle,
722}
723
724impl ObservableSession {
725    fn snapshot(&self) -> Arc<RuntimeObservation> {
726        self.runtime.observe()
727    }
728
729    pub fn current_observation(&self) -> SessionObservation {
730        self.runtime.current_session_observation()
731    }
732
733    pub fn current_remote_observation(&self) -> RemoteSessionObservation {
734        RemoteSessionObservation::from_core(self.current_observation())
735    }
736
737    pub fn resume_from_cursor(&self, cursor: &SessionCursor) -> Result<SessionResume> {
738        self.runtime
739            .resume_session_observation(cursor)
740            .map_err(live_replay_error)
741    }
742
743    pub fn subscribe_from_cursor(
744        &self,
745        cursor: &SessionCursor,
746    ) -> Result<SessionObservationSubscription> {
747        self.runtime
748            .subscribe_session_observation(cursor)
749            .map_err(live_replay_error)
750    }
751
752    pub fn subscribe_from_remote_cursor(
753        &self,
754        cursor: &RemoteSessionCursor,
755    ) -> Result<RemoteSessionObservationSubscription> {
756        cursor.validate()?;
757        let cursor = lash_core::SessionCursor::try_from(cursor.clone())?;
758        match self.subscribe_from_cursor(&cursor)? {
759            SessionObservationSubscription::Subscribed(subscription) => {
760                Ok(RemoteSessionObservationSubscription::Subscribed(
761                    RemoteSessionObservationEventStream::new(subscription),
762                ))
763            }
764            SessionObservationSubscription::Gap { observation, gap } => {
765                Ok(RemoteSessionObservationSubscription::Gap {
766                    observation: observation.into(),
767                    gap: gap.into(),
768                })
769            }
770        }
771    }
772
773    /// Subscribe to session observation events and keep the subscription alive
774    /// across recoverable live-replay gaps.
775    ///
776    /// The returned stream yields [`SessionObservationStreamItem::Gap`] when
777    /// the cursor missed the bounded replay window. Callers should replace
778    /// their UI/projection from the included fresh observation, persist
779    /// `gap.latest_cursor`, and keep polling the same stream; it resubscribes
780    /// from that cursor internally.
781    pub fn subscribe_and_recover(&self, cursor: SessionCursor) -> SessionObservationStream {
782        SessionObservationStream {
783            observable: self.clone(),
784            cursor,
785            subscription: None,
786            done: false,
787        }
788    }
789
790    /// Subscribe to remote DTO session observation events and keep the
791    /// subscription alive across recoverable live-replay gaps.
792    pub fn subscribe_and_recover_remote(
793        &self,
794        cursor: RemoteSessionCursor,
795    ) -> Result<RemoteSessionObservationStream> {
796        cursor.validate()?;
797        let cursor = lash_core::SessionCursor::try_from(cursor)?;
798        Ok(RemoteSessionObservationStream {
799            inner: self.subscribe_and_recover(cursor),
800            next_sequence: 0,
801        })
802    }
803
804    pub fn session_id(&self) -> String {
805        self.snapshot().session_id().to_string()
806    }
807
808    pub fn policy_snapshot(&self) -> SessionPolicy {
809        self.snapshot().policy.clone()
810    }
811
812    pub fn read_view(&self) -> SessionReadView {
813        self.snapshot().read_view.clone()
814    }
815
816    pub fn usage_report(&self) -> SessionUsageReport {
817        self.snapshot().usage_report.clone()
818    }
819
820    pub fn tool_state(&self) -> Option<ToolState> {
821        self.snapshot().tool_state.clone()
822    }
823
824    pub fn active_tool_manifests(&self) -> Vec<ToolManifest> {
825        self.snapshot()
826            .tool_state
827            .as_ref()
828            .map(ToolState::tool_manifests)
829            .unwrap_or_default()
830    }
831
832    pub async fn list_process_handles(&self) -> Vec<ProcessHandleSummary> {
833        self.snapshot().list_process_handles().await
834    }
835
836    pub async fn list_all_process_handles(&self) -> Vec<ProcessHandleSummary> {
837        self.snapshot().list_all_process_handles().await
838    }
839
840    pub fn process_scope(&self) -> SessionScope {
841        self.snapshot().process_scope()
842    }
843}
844
845#[derive(Clone, Debug)]
846pub enum SessionObservationStreamItem {
847    /// A replayed or live session observation event.
848    Event(SessionObservationEvent),
849    /// A recoverable replay gap with a fresh durable observation.
850    Gap {
851        observation: SessionObservation,
852        gap: LiveReplayGap,
853    },
854}
855
856pub enum RemoteSessionObservationSubscription {
857    Subscribed(RemoteSessionObservationEventStream),
858    Gap {
859        observation: RemoteSessionObservation,
860        gap: RemoteLiveReplayGap,
861    },
862}
863
864#[derive(Clone, Debug)]
865pub enum RemoteSessionObservationStreamItem {
866    /// A replayed or live session observation event encoded as remote DTOs.
867    Event(RemoteSessionObservationEvent),
868    /// A recoverable replay gap with a fresh remote observation snapshot.
869    Gap {
870        observation: RemoteSessionObservation,
871        gap: RemoteLiveReplayGap,
872    },
873}
874
875pub struct RemoteSessionObservationEventStream {
876    inner: lash_core::LiveReplaySubscription,
877    next_sequence: u64,
878}
879
880impl RemoteSessionObservationEventStream {
881    fn new(inner: lash_core::LiveReplaySubscription) -> Self {
882        Self {
883            inner,
884            next_sequence: 0,
885        }
886    }
887
888    pub async fn next_event(&mut self) -> Result<RemoteSessionObservationEvent> {
889        futures_util::future::poll_fn(|cx| Pin::new(&mut *self).poll_next(cx))
890            .await
891            .transpose()?
892            .ok_or_else(|| live_replay_error(LiveReplayStoreError::Closed))
893    }
894}
895
896impl Stream for RemoteSessionObservationEventStream {
897    type Item = Result<RemoteSessionObservationEvent>;
898
899    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
900        match Pin::new(&mut self.inner).poll_next(cx) {
901            Poll::Pending => Poll::Pending,
902            Poll::Ready(Some(Ok(event))) => {
903                let remote = RemoteSessionObservationEvent::from_core(self.next_sequence, event);
904                self.next_sequence = self.next_sequence.saturating_add(1);
905                Poll::Ready(Some(Ok(remote)))
906            }
907            Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(live_replay_error(err)))),
908            Poll::Ready(None) => Poll::Ready(None),
909        }
910    }
911}
912
913/// Remote DTO stream returned by [`ObservableSession::subscribe_and_recover_remote`].
914pub struct RemoteSessionObservationStream {
915    inner: SessionObservationStream,
916    next_sequence: u64,
917}
918
919impl RemoteSessionObservationStream {
920    pub fn cursor(&self) -> RemoteSessionCursor {
921        RemoteSessionCursor::from(self.inner.cursor())
922    }
923}
924
925impl Stream for RemoteSessionObservationStream {
926    type Item = Result<RemoteSessionObservationStreamItem>;
927
928    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
929        match Pin::new(&mut self.inner).poll_next(cx) {
930            Poll::Pending => Poll::Pending,
931            Poll::Ready(Some(Ok(SessionObservationStreamItem::Event(event)))) => {
932                let remote = RemoteSessionObservationEvent::from_core(self.next_sequence, event);
933                self.next_sequence = self.next_sequence.saturating_add(1);
934                Poll::Ready(Some(Ok(RemoteSessionObservationStreamItem::Event(remote))))
935            }
936            Poll::Ready(Some(Ok(SessionObservationStreamItem::Gap { observation, gap }))) => {
937                Poll::Ready(Some(Ok(RemoteSessionObservationStreamItem::Gap {
938                    observation: observation.into(),
939                    gap: gap.into(),
940                })))
941            }
942            Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err))),
943            Poll::Ready(None) => Poll::Ready(None),
944        }
945    }
946}
947
948/// Stream returned by [`ObservableSession::subscribe_and_recover`].
949pub struct SessionObservationStream {
950    observable: ObservableSession,
951    cursor: SessionCursor,
952    subscription: Option<lash_core::LiveReplaySubscription>,
953    done: bool,
954}
955
956impl SessionObservationStream {
957    pub fn cursor(&self) -> &SessionCursor {
958        &self.cursor
959    }
960}
961
962impl Stream for SessionObservationStream {
963    type Item = Result<SessionObservationStreamItem>;
964
965    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
966        loop {
967            if self.done {
968                return Poll::Ready(None);
969            }
970            if self.subscription.is_none() {
971                match self.observable.subscribe_from_cursor(&self.cursor) {
972                    Ok(SessionObservationSubscription::Subscribed(subscription)) => {
973                        self.subscription = Some(subscription);
974                    }
975                    Ok(SessionObservationSubscription::Gap { observation, gap }) => {
976                        self.cursor = gap.latest_cursor.clone();
977                        return Poll::Ready(Some(Ok(SessionObservationStreamItem::Gap {
978                            observation,
979                            gap,
980                        })));
981                    }
982                    Err(err) => {
983                        self.done = true;
984                        return Poll::Ready(Some(Err(err)));
985                    }
986                }
987            }
988
989            let Some(subscription) = self.subscription.as_mut() else {
990                continue;
991            };
992            match Pin::new(subscription).poll_next(cx) {
993                Poll::Pending => return Poll::Pending,
994                Poll::Ready(Some(Ok(event))) => {
995                    self.cursor = event.cursor.clone();
996                    return Poll::Ready(Some(Ok(SessionObservationStreamItem::Event(event))));
997                }
998                Poll::Ready(Some(Err(LiveReplayStoreError::SubscriberLagged(_)))) => {
999                    self.subscription = None;
1000                    continue;
1001                }
1002                Poll::Ready(Some(Err(err))) => {
1003                    self.done = true;
1004                    return Poll::Ready(Some(Err(live_replay_error(err))));
1005                }
1006                Poll::Ready(None) => {
1007                    self.done = true;
1008                    return Poll::Ready(None);
1009                }
1010            }
1011        }
1012    }
1013}
1014
1015fn live_replay_error(err: lash_core::LiveReplayStoreError) -> EmbedError {
1016    EmbedError::Runtime(lash_core::RuntimeError::new(
1017        RuntimeErrorCode::Other("live_replay".to_string()),
1018        err.to_string(),
1019    ))
1020}
1021
1022pub struct EnqueueTurnBuilder<'a> {
1023    session: &'a LashSession,
1024    input: TurnInput,
1025    id: Option<String>,
1026    delivery_policy: DeliveryPolicy,
1027    slot_policy: SlotPolicy,
1028}
1029
1030impl<'a> EnqueueTurnBuilder<'a> {
1031    pub fn id(mut self, id: impl Into<String>) -> Self {
1032        self.id = Some(id.into());
1033        self
1034    }
1035
1036    pub fn delivery_policy(mut self, policy: DeliveryPolicy) -> Self {
1037        self.delivery_policy = policy;
1038        self
1039    }
1040
1041    pub fn slot_policy(mut self, policy: SlotPolicy) -> Self {
1042        self.slot_policy = policy;
1043        self
1044    }
1045
1046    pub async fn send(self) -> Result<QueuedWorkBatch> {
1047        let source_key = self.id.map(|id| format!("host:{id}"));
1048        self.session
1049            .runtime
1050            .enqueue_turn_input(
1051                self.input,
1052                self.delivery_policy,
1053                self.slot_policy,
1054                source_key,
1055            )
1056            .await
1057            .map_err(EmbedError::Runtime)
1058    }
1059}
1060
1061impl<'a> std::future::IntoFuture for EnqueueTurnBuilder<'a> {
1062    type Output = Result<QueuedWorkBatch>;
1063    type IntoFuture =
1064        std::pin::Pin<Box<dyn std::future::Future<Output = Result<QueuedWorkBatch>> + 'a>>;
1065
1066    fn into_future(self) -> Self::IntoFuture {
1067        Box::pin(self.send())
1068    }
1069}