Skip to main content

lash/
session.rs

1use std::pin::Pin;
2use std::task::{Context, Poll};
3
4use crate::support::*;
5use futures_util::Stream;
6use lash_core::runtime::{
7    PendingTurnInput, PendingTurnInputCancelOutcome, PendingTurnInputCancelResult,
8    PendingTurnInputCancelTarget, PendingTurnInputSuffixCancelOutcome, QueuedWorkBatch,
9    TurnInputIngress,
10};
11use lash_core::{LiveReplayGap, LiveReplayStoreError, SessionObservationEvent};
12use lash_remote_protocol::{
13    RemoteLiveReplayGap, RemoteSessionCursor, RemoteSessionObservation,
14    RemoteSessionObservationEvent,
15};
16
17pub struct SessionBuilder {
18    pub(crate) core: LashCore,
19    pub(crate) session_id: String,
20    pub(crate) spec: SessionSpec,
21    pub(crate) parent_session_id: Option<String>,
22    pub(crate) session_execution_owner: Option<lash_core::LeaseOwnerIdentity>,
23    pub(crate) store: Option<Arc<dyn RuntimePersistence>>,
24    pub(crate) provider: Option<ProviderHandle>,
25    pub(crate) active_plugins: Vec<ActivePluginBinding>,
26    pub(crate) plugin_factories: Vec<Arc<dyn PluginFactory>>,
27}
28
29#[cfg(feature = "rlm")]
30pub struct RlmSessionBuilder {
31    pub(crate) builder: SessionBuilder,
32    pub(crate) rlm_final_answer_format: Option<lash_rlm_types::RlmFinalAnswerFormat>,
33}
34
35impl SessionBuilder {
36    pub fn provider(mut self, provider: ProviderHandle) -> Self {
37        self.spec = self.spec.provider_id(provider.kind());
38        self.provider = Some(provider);
39        self
40    }
41
42    pub fn session_spec(mut self, spec: SessionSpec) -> Self {
43        self.spec = spec;
44        self
45    }
46
47    pub fn parent(mut self, parent_session_id: impl Into<String>) -> Self {
48        self.parent_session_id = Some(parent_session_id.into());
49        self
50    }
51
52    /// Use an explicit owner identity for durable session execution leases.
53    ///
54    /// This is only for hosts that already serialize one logical execution lane
55    /// and intentionally choose stable owner + incarnation values. Normal
56    /// embedders should keep the default per-open identity.
57    pub fn session_execution_owner(mut self, owner: lash_core::LeaseOwnerIdentity) -> Self {
58        self.session_execution_owner = Some(owner);
59        self
60    }
61
62    /// Use a specific persistence store for this root session.
63    ///
64    /// This is the right API for a host-owned, pre-opened session database.
65    /// Managed child sessions never reuse this store; configure
66    /// `LashCoreBuilder::child_store_factory` when child sessions should also
67    /// persist.
68    pub fn store(mut self, store: Arc<dyn RuntimePersistence>) -> Self {
69        self.store = Some(store);
70        self
71    }
72
73    pub fn plugin<P: PluginBinding>(mut self, config: P::SessionConfig) -> Self {
74        self.active_plugins.push(ActivePluginBinding {
75            id: P::ID,
76            requires_turn_input: P::requires_turn_input(&config),
77        });
78        self.plugin_factories.push(P::factory(&config));
79        self
80    }
81
82    pub async fn open(self) -> Result<LashSession> {
83        let policy = self.session_policy();
84        let store = self.create_store(&policy).await?;
85        let state = self
86            .load_or_default_state(&policy, store.as_deref())
87            .await?;
88        self.open_resolved(policy, state, store).await
89    }
90
91    /// Open this session with a fresh resident graph, ignoring any persisted
92    /// session graph/checkpoint state that may already exist for the same
93    /// session id.
94    ///
95    /// The next successful commit writes a full replacement graph, so normal
96    /// embedders can use this to start over without manually calling
97    /// `load_persisted_session_state` or constructing a `RuntimeSessionState`.
98    /// Use [`Self::open`] for resume and [`Self::open_with_state`] only when
99    /// restoring explicit host-owned state.
100    pub async fn open_fresh(self) -> Result<LashSession> {
101        let policy = self.session_policy();
102        let store = self.create_store(&policy).await?;
103        let state = RuntimeSessionState {
104            session_id: self.session_id.clone(),
105            policy: policy.clone(),
106            graph_replace_required: true,
107            ..RuntimeSessionState::default()
108        };
109        self.open_resolved(policy, state, store).await
110    }
111
112    /// Open with an explicitly supplied runtime state.
113    ///
114    /// This is for advanced hosts that already own a complete state snapshot.
115    /// Normal embedders should use [`Self::open`] to resume according to Lash's
116    /// residency policy or [`Self::open_fresh`] to start over and replace prior
117    /// persisted state on the next commit.
118    pub async fn open_with_state(self, mut state: RuntimeSessionState) -> Result<LashSession> {
119        let policy = self.session_policy();
120        let store = self.create_store(&policy).await?;
121        if state.session_id != self.session_id {
122            return Err(EmbedError::StoreSessionMismatch {
123                loaded: state.session_id,
124                requested: self.session_id,
125            });
126        }
127        let recorded_provider_id = state.policy.recorded_provider_id().to_string();
128        state.policy = policy.clone();
129        state.policy.provider_id = recorded_provider_id;
130        self.open_resolved(policy, state, store).await
131    }
132
133    fn session_policy(&self) -> SessionPolicy {
134        let mut policy = self.spec.resolve_against(&self.core.policy);
135        policy.session_id = Some(self.session_id.clone());
136        policy
137    }
138
139    async fn load_or_default_state(
140        &self,
141        policy: &SessionPolicy,
142        store: Option<&dyn RuntimePersistence>,
143    ) -> Result<RuntimeSessionState> {
144        let state = match store {
145            Some(store) => {
146                let loaded = self.load_persisted_state_for_residency(store).await?;
147                let mut state = loaded.unwrap_or_else(|| RuntimeSessionState {
148                    session_id: self.session_id.clone(),
149                    policy: policy.clone(),
150                    ..RuntimeSessionState::default()
151                });
152                if state.session_id != self.session_id {
153                    return Err(EmbedError::StoreSessionMismatch {
154                        loaded: state.session_id,
155                        requested: self.session_id.clone(),
156                    });
157                }
158                let recorded_provider_id = state.policy.recorded_provider_id().to_string();
159                state.policy = policy.clone();
160                state.policy.provider_id = recorded_provider_id;
161                state
162            }
163            None => RuntimeSessionState {
164                session_id: self.session_id.clone(),
165                policy: policy.clone(),
166                ..RuntimeSessionState::default()
167            },
168        };
169        Ok(state)
170    }
171
172    async fn load_persisted_state_for_residency(
173        &self,
174        store: &dyn RuntimePersistence,
175    ) -> Result<Option<RuntimeSessionState>> {
176        load_persisted_state_for_residency(self.core.env.residency, store).await
177    }
178
179    async fn open_resolved(
180        self,
181        policy: SessionPolicy,
182        state: RuntimeSessionState,
183        store: Option<Arc<dyn RuntimePersistence>>,
184    ) -> Result<LashSession> {
185        let mut env = self.core.env.clone();
186        if let Some(provider) = self.provider.clone().or_else(|| self.core.provider.clone()) {
187            env.core.providers.provider_resolver =
188                Arc::new(lash_core::SingleProviderResolver::new(provider));
189        }
190        let plugin_host = build_plugin_host(
191            self.core.protocol_factory.as_ref(),
192            self.core.plugin_factories.as_ref(),
193            self.plugin_factories,
194        )?;
195        env.core = self
196            .core
197            .runtime_host_for_plugin_host(env.core.clone(), &plugin_host)?;
198        env.plugin_host = Some(Arc::new(plugin_host));
199        let effect_host = Arc::clone(&env.core.control.effect_host);
200        let drivers = self.core.work_driver.drivers().await;
201        env.process_work_driver = drivers.process.clone();
202        env.queued_work_driver = drivers.queued.clone();
203        let mut runtime = LashRuntime::from_environment(&env, policy, state, store).await?;
204        if let Some(owner) = self.session_execution_owner {
205            runtime.set_runtime_lease_owner(owner);
206        }
207        if drivers.drive_process_on_open
208            && let Some(driver) = drivers.process.as_ref()
209        {
210            driver.claim_and_run_pending("session_open").await?;
211        }
212        let handle = RuntimeHandle::with_live_replay_store(
213            runtime,
214            Arc::clone(&self.core.live_replay_store),
215        );
216        Ok(LashSession {
217            runtime: handle,
218            effect_host,
219            parent_session_id: self.parent_session_id,
220            active_plugins: self.active_plugins,
221            process_phase_probe_slot: self.core.work_driver.phase_probe_slot(),
222            turn_cancels: crate::turn::TurnCancelRegistry::default(),
223        })
224    }
225
226    async fn create_store(
227        &self,
228        policy: &SessionPolicy,
229    ) -> Result<Option<Arc<dyn RuntimePersistence>>> {
230        if let Some(store) = self.store.as_ref() {
231            return Ok(Some(Arc::clone(store)));
232        }
233        let Some(factory) = self.core.store_factory.as_ref() else {
234            return Ok(None);
235        };
236        let request = SessionStoreCreateRequest {
237            session_id: self.session_id.clone(),
238            relation: self
239                .parent_session_id
240                .as_ref()
241                .map(|parent_session_id| lash_core::SessionRelation::Child {
242                    parent_session_id: parent_session_id.clone(),
243                    caused_by: None,
244                })
245                .unwrap_or_default(),
246            policy: policy.clone(),
247        };
248        factory
249            .create_store(&request)
250            .await
251            .map(Some)
252            .map_err(|message| EmbedError::StoreFactory {
253                session_id: self.session_id.clone(),
254                message,
255            })
256    }
257}
258
259pub(crate) async fn load_state_for_residency(
260    residency: Residency,
261    session_id: &str,
262    policy: &SessionPolicy,
263    store: &dyn RuntimePersistence,
264) -> Result<RuntimeSessionState> {
265    let mut state = load_persisted_state_for_residency(residency, store)
266        .await?
267        .unwrap_or_else(|| RuntimeSessionState {
268            session_id: session_id.to_string(),
269            policy: policy.clone(),
270            ..RuntimeSessionState::default()
271        });
272    if state.session_id != session_id {
273        return Err(EmbedError::StoreSessionMismatch {
274            loaded: state.session_id,
275            requested: session_id.to_string(),
276        });
277    }
278    let recorded_provider_id = state.policy.recorded_provider_id().to_string();
279    state.policy = policy.clone();
280    state.policy.provider_id = recorded_provider_id;
281    Ok(state)
282}
283
284async fn load_persisted_state_for_residency(
285    residency: Residency,
286    store: &dyn RuntimePersistence,
287) -> Result<Option<RuntimeSessionState>> {
288    match residency {
289        Residency::KeepAll => {
290            let loaded = lash_core::store::load_persisted_session_state(store)
291                .await
292                .map_err(|err| SessionError::Protocol(format!("failed to load store: {err}")))?;
293            Ok(loaded)
294        }
295        Residency::ActivePathOnly => {
296            let active = lash_core::store::load_persisted_session_state_active_path(store, None)
297                .await
298                .map_err(|err| {
299                    SessionError::Protocol(format!("failed to load active-path store: {err}"))
300                })?;
301            if active
302                .as_ref()
303                .is_some_and(|state| state.session_graph.nodes.is_empty())
304            {
305                let mut full = lash_core::store::load_persisted_session_state(store)
306                    .await
307                    .map_err(|err| {
308                        SessionError::Protocol(format!(
309                            "failed to heal active-path store from full graph: {err}"
310                        ))
311                    })?;
312                if let Some(state) = full.as_mut() {
313                    state.graph_replace_required = true;
314                }
315                return Ok(full);
316            }
317            Ok(active)
318        }
319    }
320}
321
322impl PromptLayerSink for SessionBuilder {
323    fn prompt_layer_mut(&mut self) -> &mut PromptLayer {
324        self.spec.prompt.get_or_insert_with(PromptLayer::new)
325    }
326}
327
328#[cfg(feature = "rlm")]
329impl RlmSessionBuilder {
330    pub fn provider(mut self, provider: ProviderHandle) -> Self {
331        self.builder = self.builder.provider(provider);
332        self
333    }
334
335    pub fn session_spec(mut self, spec: SessionSpec) -> Self {
336        self.builder = self.builder.session_spec(spec);
337        self
338    }
339
340    pub fn parent(mut self, parent_session_id: impl Into<String>) -> Self {
341        self.builder = self.builder.parent(parent_session_id);
342        self
343    }
344
345    pub fn session_execution_owner(mut self, owner: lash_core::LeaseOwnerIdentity) -> Self {
346        self.builder = self.builder.session_execution_owner(owner);
347        self
348    }
349
350    pub fn store(mut self, store: Arc<dyn RuntimePersistence>) -> Self {
351        self.builder = self.builder.store(store);
352        self
353    }
354
355    pub fn plugin<P: PluginBinding>(mut self, config: P::SessionConfig) -> Self {
356        self.builder = self.builder.plugin::<P>(config);
357        self
358    }
359
360    pub async fn open(self) -> Result<LashSession> {
361        self.open_resolved(RlmOpenState::Resume).await
362    }
363
364    pub async fn open_fresh(self) -> Result<LashSession> {
365        self.open_resolved(RlmOpenState::Fresh).await
366    }
367
368    pub async fn open_with_state(self, state: RuntimeSessionState) -> Result<LashSession> {
369        self.open_resolved(RlmOpenState::Explicit(state)).await
370    }
371
372    async fn open_resolved(self, open_state: RlmOpenState) -> Result<LashSession> {
373        let Self {
374            builder,
375            rlm_final_answer_format,
376        } = self;
377        let policy = builder.session_policy();
378        let store = builder.create_store(&policy).await?;
379        let mut state = match open_state {
380            RlmOpenState::Resume => {
381                builder
382                    .load_or_default_state(&policy, store.as_deref())
383                    .await?
384            }
385            RlmOpenState::Fresh => RuntimeSessionState {
386                session_id: builder.session_id.clone(),
387                policy: policy.clone(),
388                graph_replace_required: true,
389                ..RuntimeSessionState::default()
390            },
391            RlmOpenState::Explicit(mut state) => {
392                if state.session_id != builder.session_id {
393                    return Err(EmbedError::StoreSessionMismatch {
394                        loaded: state.session_id,
395                        requested: builder.session_id.clone(),
396                    });
397                }
398                let recorded_provider_id = state.policy.recorded_provider_id().to_string();
399                state.policy = policy.clone();
400                state.policy.provider_id = recorded_provider_id;
401                state
402            }
403        };
404        apply_rlm_session_options(
405            builder.parent_session_id.is_none(),
406            rlm_final_answer_format,
407            &mut state,
408        )?;
409        builder.open_resolved(policy, state, store).await
410    }
411}
412
413#[cfg(feature = "rlm")]
414impl PromptLayerSink for RlmSessionBuilder {
415    fn prompt_layer_mut(&mut self) -> &mut PromptLayer {
416        self.builder.prompt_layer_mut()
417    }
418}
419
420#[cfg(feature = "rlm")]
421#[allow(clippy::large_enum_variant)]
422enum RlmOpenState {
423    Resume,
424    Fresh,
425    Explicit(RuntimeSessionState),
426}
427
428#[cfg(feature = "rlm")]
429fn apply_rlm_session_options(
430    is_root_session: bool,
431    explicit_format: Option<lash_rlm_types::RlmFinalAnswerFormat>,
432    state: &mut RuntimeSessionState,
433) -> Result<()> {
434    let final_answer_format = explicit_format.unwrap_or({
435        if is_root_session {
436            lash_rlm_types::RlmFinalAnswerFormat::Markdown
437        } else {
438            lash_rlm_types::RlmFinalAnswerFormat::RawFinalValue
439        }
440    });
441    let mut extras = if state.protocol_turn_options.is_empty() {
442        lash_rlm_types::RlmCreateExtras::default()
443    } else {
444        state.protocol_turn_options.decode()?
445    };
446    extras.final_answer_format = Some(final_answer_format);
447    let options = ProtocolTurnOptions::typed(extras)?;
448    state.protocol_turn_options = options.clone();
449    for frame in &mut state.agent_frames {
450        frame.protocol_turn_options = options.clone();
451    }
452    Ok(())
453}
454
455#[cfg(all(test, feature = "rlm"))]
456mod tests {
457    use super::*;
458
459    #[test]
460    fn apply_rlm_session_options_preserves_existing_termination() -> Result<()> {
461        let mut state = RuntimeSessionState {
462            protocol_turn_options: ProtocolTurnOptions::typed(lash_rlm_types::RlmCreateExtras {
463                termination: lash_rlm_types::RlmTermination::Natural,
464                final_answer_format: None,
465            })?,
466            ..Default::default()
467        };
468
469        apply_rlm_session_options(true, None, &mut state)?;
470
471        let extras: lash_rlm_types::RlmCreateExtras = state.protocol_turn_options.decode()?;
472        assert_eq!(extras.termination, lash_rlm_types::RlmTermination::Natural);
473        assert_eq!(
474            extras.final_answer_format,
475            Some(lash_rlm_types::RlmFinalAnswerFormat::Markdown)
476        );
477        Ok(())
478    }
479}
480
481#[derive(Clone)]
482pub struct LashSession {
483    pub(crate) runtime: RuntimeHandle,
484    pub(crate) effect_host: Arc<dyn EffectHost>,
485    pub(crate) parent_session_id: Option<String>,
486    pub(crate) active_plugins: Vec<ActivePluginBinding>,
487    pub(crate) process_phase_probe_slot: Option<lash_core::runtime::RuntimeTurnPhaseProbeSlot>,
488    pub(crate) turn_cancels: crate::turn::TurnCancelRegistry,
489}
490
491#[derive(Clone, Debug, Default)]
492pub struct SessionConfigPatch {
493    pub provider: Option<ProviderHandle>,
494    pub model: Option<ModelSpec>,
495    pub prompt: Option<PromptLayer>,
496}
497
498impl LashSession {
499    pub async fn close(self) -> Result<()> {
500        let runtime = self.runtime.writer();
501        let runtime = runtime.lock().await;
502        runtime.unregister_plugin_session()?;
503        Ok(())
504    }
505
506    pub fn session_id(&self) -> String {
507        self.runtime.observe().session_id().to_string()
508    }
509
510    pub fn policy_snapshot(&self) -> SessionPolicy {
511        self.runtime.observe().policy.clone()
512    }
513
514    pub fn observe(&self) -> ObservableSession {
515        ObservableSession {
516            runtime: self.runtime.clone(),
517        }
518    }
519
520    pub fn parent_session_id(&self) -> Option<&str> {
521        self.parent_session_id.as_deref()
522    }
523
524    pub fn effect_host(&self) -> Arc<dyn EffectHost> {
525        Arc::clone(&self.effect_host)
526    }
527
528    pub fn turn(&self, input: TurnInput) -> TurnBuilder {
529        TurnBuilder {
530            runtime: self.runtime.clone(),
531            effect_host: Arc::clone(&self.effect_host),
532            active_plugins: self.active_plugins.clone(),
533            input,
534            cancel: CancellationToken::new(),
535            cancels: self.turn_cancels.clone(),
536            protocol_turn_options: None,
537            provider: None,
538            model: None,
539            turn_id: None,
540        }
541    }
542
543    pub fn queued_turn(&self) -> QueuedTurnBuilder {
544        QueuedTurnBuilder {
545            runtime: self.runtime.clone(),
546            effect_host: Arc::clone(&self.effect_host),
547            cancel: CancellationToken::new(),
548            cancels: self.turn_cancels.clone(),
549            batch_ids: Vec::new(),
550            drain_id: None,
551        }
552    }
553
554    /// Cancel every turn currently executing through this opened session
555    /// (including its clones) and report how many were signalled.
556    ///
557    /// This is the affordance behind a UI "stop" control: hold a clone of the
558    /// session wherever the stop arrives and call this, instead of threading a
559    /// [`CancellationToken`](crate::CancellationToken) into every turn call
560    /// ([`TurnBuilder::cancel`](crate::TurnBuilder::cancel) remains the
561    /// per-turn hook when you need one). A cancelled turn finishes with
562    /// `TurnOutcome::Stopped(TurnStop::Cancelled)` and commits like any other
563    /// turn; the session stays usable.
564    ///
565    /// Scope: turns started from this `LashSession` instance and its clones.
566    /// A handle opened separately for the same session id has its own
567    /// registry and is not reached.
568    pub fn cancel_running_turns(&self) -> usize {
569        self.turn_cancels.cancel_all()
570    }
571
572    pub fn admin(&self) -> SessionAdmin {
573        SessionAdmin {
574            runtime: self.runtime.clone(),
575        }
576    }
577
578    pub async fn configure(&self, patch: SessionConfigPatch) -> Result<()> {
579        self.admin().config().update(patch).await
580    }
581
582    pub fn tools(&self) -> ToolAdmin {
583        ToolAdmin::new(self.admin())
584    }
585
586    pub fn commands(&self) -> SessionCommandAdmin {
587        self.admin().commands()
588    }
589
590    pub fn triggers(&self) -> SessionTriggerAdmin {
591        self.admin().triggers()
592    }
593
594    pub fn processes(&self) -> SessionProcessAdmin {
595        SessionProcessAdmin::new(self.admin())
596    }
597
598    pub fn plugin_operations(&self) -> PluginOperations {
599        PluginOperations {
600            control: self.admin(),
601        }
602    }
603
604    pub fn enqueue(&self, input: TurnInput) -> EnqueueTurnBuilder<'_> {
605        EnqueueTurnBuilder {
606            session: self,
607            input,
608            id: None,
609            ingress: TurnInputIngress::NextTurn,
610        }
611    }
612
613    /// Return all pending durable queued-work batches for this session.
614    ///
615    /// This is an admin/introspection view for non-user queued work such as
616    /// process wakes and session commands. User-visible model input is stored
617    /// separately as pending turn input and is exposed by
618    /// [`pending_turn_inputs`](Self::pending_turn_inputs).
619    pub async fn queued_work(&self) -> Result<Vec<QueuedWorkBatch>> {
620        let observation = self.runtime.observe();
621        let store = observation.queue_store.as_ref().ok_or_else(|| {
622            EmbedError::Runtime(lash_core::RuntimeError::new(
623                lash_core::RuntimeErrorCode::StoreCommitFailed,
624                "queued work inspection requires a persistent runtime store",
625            ))
626        })?;
627        store
628            .list_pending_queued_work(observation.session_id())
629            .await
630            .map_err(|err| {
631                EmbedError::Runtime(lash_core::RuntimeError::new(
632                    lash_core::RuntimeErrorCode::StoreCommitFailed,
633                    err.to_string(),
634                ))
635            })
636    }
637
638    pub async fn pending_turn_inputs(&self) -> Result<Vec<PendingTurnInput>> {
639        let observation = self.runtime.observe();
640        let store = observation.queue_store.as_ref().ok_or_else(|| {
641            EmbedError::Runtime(lash_core::RuntimeError::new(
642                lash_core::RuntimeErrorCode::StoreCommitFailed,
643                "pending turn input inspection requires a persistent runtime store",
644            ))
645        })?;
646        store
647            .list_pending_turn_inputs(observation.session_id())
648            .await
649            .map_err(|err| {
650                EmbedError::Runtime(lash_core::RuntimeError::new(
651                    lash_core::RuntimeErrorCode::StoreCommitFailed,
652                    err.to_string(),
653                ))
654            })
655    }
656
657    pub async fn cancel_pending_turn_input(
658        &self,
659        input_id: &str,
660    ) -> Result<PendingTurnInputCancelOutcome> {
661        let session_id = self.session_id();
662        self.runtime
663            .cancel_pending_turn_input(&session_id, input_id)
664            .await
665            .map_err(EmbedError::Runtime)
666    }
667
668    /// Atomically cancel a set of pending user inputs by runtime input id or
669    /// app source key.
670    ///
671    /// This is the app reconciliation path for explicit selections such as
672    /// "remove these pending drafts". Returned outcomes distinguish newly
673    /// cancelled input from input that was already claimed, completed,
674    /// cancelled, or missing.
675    pub async fn cancel_pending_turn_inputs(
676        &self,
677        targets: impl IntoIterator<Item = PendingTurnInputCancelTarget>,
678    ) -> Result<Vec<PendingTurnInputCancelResult>> {
679        let session_id = self.session_id();
680        let targets = targets.into_iter().collect::<Vec<_>>();
681        self.runtime
682            .cancel_pending_turn_inputs(&session_id, &targets)
683            .await
684            .map_err(EmbedError::Runtime)
685    }
686
687    /// Atomically cancel the same-session pending-input suffix from `anchor`.
688    ///
689    /// Apps that let users edit previously submitted product messages should
690    /// map the edited message to the stored pending-input `input_id` or
691    /// `source_key`, call this method, and only restore/edit drafts that return
692    /// [`PendingTurnInputCancelOutcome::Cancelled`]. Claimed or completed
693    /// inputs have already crossed the runtime boundary and should be treated
694    /// as reconciliation state, not local editable drafts.
695    pub async fn cancel_pending_turn_input_suffix(
696        &self,
697        anchor: PendingTurnInputCancelTarget,
698    ) -> Result<PendingTurnInputSuffixCancelOutcome> {
699        let session_id = self.session_id();
700        self.runtime
701            .cancel_pending_turn_input_suffix(&session_id, &anchor)
702            .await
703            .map_err(EmbedError::Runtime)
704    }
705
706    pub async fn cancel_queued_work_batch(
707        &self,
708        batch_id: &str,
709    ) -> Result<Option<QueuedWorkBatch>> {
710        let session_id = self.session_id();
711        self.runtime
712            .cancel_queued_work_batch(&session_id, batch_id)
713            .await
714            .map_err(EmbedError::Runtime)
715    }
716
717    /// Resolve once `batch_id` is no longer pending in the queue store —
718    /// drained by whoever runs queued work (a queued-work runner, a durable
719    /// worker, or another handle's [`queued_turn`](Self::queued_turn)) or
720    /// cancelled. This is the enqueue-and-observe side of the queue: the
721    /// caller never claims the work itself.
722    ///
723    /// Completion is read from the persistent queue store, so it observes
724    /// drains performed by other session handles and other processes alike.
725    /// There is no built-in deadline — nothing resolves if nothing drains the
726    /// queue, so bound it with `tokio::time::timeout` when the worker may be
727    /// unavailable. A batch id the store has never seen resolves immediately.
728    pub async fn await_queued_work_batch(&self, batch_id: &str) -> Result<()> {
729        let observation = self.runtime.observe();
730        let store = observation.queue_store.clone().ok_or_else(|| {
731            EmbedError::Runtime(lash_core::RuntimeError::new(
732                lash_core::RuntimeErrorCode::StoreCommitFailed,
733                "queued work inspection requires a persistent runtime store",
734            ))
735        })?;
736        let session_id = observation.session_id().to_string();
737        drop(observation);
738        let mut delay = std::time::Duration::from_millis(25);
739        loop {
740            let pending = store
741                .list_pending_queued_work(&session_id)
742                .await
743                .map_err(|err| {
744                    EmbedError::Runtime(lash_core::RuntimeError::new(
745                        lash_core::RuntimeErrorCode::StoreCommitFailed,
746                        err.to_string(),
747                    ))
748                })?;
749            if !pending.iter().any(|batch| batch.batch_id == batch_id) {
750                return Ok(());
751            }
752            tokio::time::sleep(delay).await;
753            delay = (delay * 2).min(std::time::Duration::from_millis(400));
754        }
755    }
756
757    pub fn read_view(&self) -> SessionReadView {
758        self.runtime.observe().read_view.clone()
759    }
760
761    pub fn usage_report(&self) -> SessionUsageReport {
762        self.runtime.observe().usage_report.clone()
763    }
764
765    pub async fn set_turn_phase_probe(
766        &self,
767        probe: Arc<dyn lash_core::runtime::RuntimeTurnPhaseProbe>,
768    ) {
769        let writer = self.runtime.writer();
770        let mut runtime = writer.lock().await;
771        runtime.set_turn_phase_probe(Arc::clone(&probe));
772        self.runtime.publish_from(&runtime);
773        if let Some(slot) = &self.process_phase_probe_slot {
774            let observation = self.runtime.observe();
775            slot.set_for_session(observation.session_id(), Arc::clone(&probe));
776            let current_frame = observation.persisted_state.current_agent_frame_id.as_str();
777            if !current_frame.is_empty() {
778                let scope = lash_core::SessionScope::for_agent_frame(
779                    observation.session_id(),
780                    current_frame,
781                );
782                slot.set_for_scope(&scope, probe);
783            }
784        }
785    }
786}
787
788#[derive(Clone)]
789pub struct ObservableSession {
790    pub(crate) runtime: RuntimeHandle,
791}
792
793impl ObservableSession {
794    fn snapshot(&self) -> Arc<RuntimeObservation> {
795        self.runtime.observe()
796    }
797
798    pub fn current_observation(&self) -> SessionObservation {
799        self.runtime.current_session_observation()
800    }
801
802    pub fn current_remote_observation(&self) -> RemoteSessionObservation {
803        RemoteSessionObservation::from_core(self.current_observation())
804    }
805
806    pub fn resume_from_cursor(&self, cursor: &SessionCursor) -> Result<SessionResume> {
807        self.runtime
808            .resume_session_observation(cursor)
809            .map_err(live_replay_error)
810    }
811
812    pub fn subscribe_from_cursor(
813        &self,
814        cursor: &SessionCursor,
815    ) -> Result<SessionObservationSubscription> {
816        self.runtime
817            .subscribe_session_observation(cursor)
818            .map_err(live_replay_error)
819    }
820
821    pub fn subscribe_from_remote_cursor(
822        &self,
823        cursor: &RemoteSessionCursor,
824    ) -> Result<RemoteSessionObservationSubscription> {
825        cursor.validate()?;
826        let cursor = lash_core::SessionCursor::try_from(cursor.clone())?;
827        match self.subscribe_from_cursor(&cursor)? {
828            SessionObservationSubscription::Subscribed(subscription) => {
829                Ok(RemoteSessionObservationSubscription::Subscribed(
830                    RemoteSessionObservationEventStream::new(subscription),
831                ))
832            }
833            SessionObservationSubscription::Gap { observation, gap } => {
834                Ok(RemoteSessionObservationSubscription::Gap {
835                    observation: observation.into(),
836                    gap: gap.into(),
837                })
838            }
839        }
840    }
841
842    /// Subscribe to session observation events and keep the subscription alive
843    /// across recoverable live-replay gaps.
844    ///
845    /// The returned stream yields [`SessionObservationStreamItem::Gap`] when
846    /// the cursor missed the bounded replay window. Callers should replace
847    /// their UI/projection from the included fresh observation, persist
848    /// `gap.latest_cursor`, and keep polling the same stream; it resubscribes
849    /// from that cursor internally.
850    pub fn subscribe_and_recover(&self, cursor: SessionCursor) -> SessionObservationStream {
851        SessionObservationStream {
852            observable: self.clone(),
853            cursor,
854            subscription: None,
855            done: false,
856        }
857    }
858
859    /// Subscribe to remote DTO session observation events and keep the
860    /// subscription alive across recoverable live-replay gaps.
861    pub fn subscribe_and_recover_remote(
862        &self,
863        cursor: RemoteSessionCursor,
864    ) -> Result<RemoteSessionObservationStream> {
865        cursor.validate()?;
866        let cursor = lash_core::SessionCursor::try_from(cursor)?;
867        Ok(RemoteSessionObservationStream {
868            inner: self.subscribe_and_recover(cursor),
869            next_sequence: 0,
870        })
871    }
872
873    pub fn session_id(&self) -> String {
874        self.snapshot().session_id().to_string()
875    }
876
877    pub fn policy_snapshot(&self) -> SessionPolicy {
878        self.snapshot().policy.clone()
879    }
880
881    pub fn read_view(&self) -> SessionReadView {
882        self.snapshot().read_view.clone()
883    }
884
885    pub fn usage_report(&self) -> SessionUsageReport {
886        self.snapshot().usage_report.clone()
887    }
888
889    pub fn tool_state(&self) -> Option<ToolState> {
890        self.snapshot().tool_state.clone()
891    }
892
893    pub fn active_tool_manifests(&self) -> Vec<ToolManifest> {
894        self.snapshot()
895            .tool_state
896            .as_ref()
897            .map(ToolState::tool_manifests)
898            .unwrap_or_default()
899    }
900
901    pub async fn list_process_handles(&self) -> Vec<ProcessHandleSummary> {
902        self.snapshot().list_process_handles().await
903    }
904
905    pub async fn list_all_process_handles(&self) -> Vec<ProcessHandleSummary> {
906        self.snapshot().list_all_process_handles().await
907    }
908
909    pub fn process_scope(&self) -> SessionScope {
910        self.snapshot().process_scope()
911    }
912}
913
914#[derive(Clone, Debug)]
915pub enum SessionObservationStreamItem {
916    /// A replayed or live session observation event.
917    Event(SessionObservationEvent),
918    /// A recoverable replay gap with a fresh durable observation.
919    Gap {
920        observation: SessionObservation,
921        gap: LiveReplayGap,
922    },
923}
924
925pub enum RemoteSessionObservationSubscription {
926    Subscribed(RemoteSessionObservationEventStream),
927    Gap {
928        observation: RemoteSessionObservation,
929        gap: RemoteLiveReplayGap,
930    },
931}
932
933#[derive(Clone, Debug)]
934pub enum RemoteSessionObservationStreamItem {
935    /// A replayed or live session observation event encoded as remote DTOs.
936    Event(RemoteSessionObservationEvent),
937    /// A recoverable replay gap with a fresh remote observation snapshot.
938    Gap {
939        observation: RemoteSessionObservation,
940        gap: RemoteLiveReplayGap,
941    },
942}
943
944pub struct RemoteSessionObservationEventStream {
945    inner: lash_core::LiveReplaySubscription,
946    next_sequence: u64,
947}
948
949impl RemoteSessionObservationEventStream {
950    fn new(inner: lash_core::LiveReplaySubscription) -> Self {
951        Self {
952            inner,
953            next_sequence: 0,
954        }
955    }
956
957    pub async fn next_event(&mut self) -> Result<RemoteSessionObservationEvent> {
958        futures_util::future::poll_fn(|cx| Pin::new(&mut *self).poll_next(cx))
959            .await
960            .transpose()?
961            .ok_or_else(|| live_replay_error(LiveReplayStoreError::Closed))
962    }
963}
964
965impl Stream for RemoteSessionObservationEventStream {
966    type Item = Result<RemoteSessionObservationEvent>;
967
968    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
969        match Pin::new(&mut self.inner).poll_next(cx) {
970            Poll::Pending => Poll::Pending,
971            Poll::Ready(Some(Ok(event))) => {
972                let remote = RemoteSessionObservationEvent::from_core(self.next_sequence, event);
973                self.next_sequence = self.next_sequence.saturating_add(1);
974                Poll::Ready(Some(Ok(remote)))
975            }
976            Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(live_replay_error(err)))),
977            Poll::Ready(None) => Poll::Ready(None),
978        }
979    }
980}
981
982/// Remote DTO stream returned by [`ObservableSession::subscribe_and_recover_remote`].
983pub struct RemoteSessionObservationStream {
984    inner: SessionObservationStream,
985    next_sequence: u64,
986}
987
988impl RemoteSessionObservationStream {
989    pub fn cursor(&self) -> RemoteSessionCursor {
990        RemoteSessionCursor::from(self.inner.cursor())
991    }
992}
993
994impl Stream for RemoteSessionObservationStream {
995    type Item = Result<RemoteSessionObservationStreamItem>;
996
997    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
998        match Pin::new(&mut self.inner).poll_next(cx) {
999            Poll::Pending => Poll::Pending,
1000            Poll::Ready(Some(Ok(SessionObservationStreamItem::Event(event)))) => {
1001                let remote = RemoteSessionObservationEvent::from_core(self.next_sequence, event);
1002                self.next_sequence = self.next_sequence.saturating_add(1);
1003                Poll::Ready(Some(Ok(RemoteSessionObservationStreamItem::Event(remote))))
1004            }
1005            Poll::Ready(Some(Ok(SessionObservationStreamItem::Gap { observation, gap }))) => {
1006                Poll::Ready(Some(Ok(RemoteSessionObservationStreamItem::Gap {
1007                    observation: observation.into(),
1008                    gap: gap.into(),
1009                })))
1010            }
1011            Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err))),
1012            Poll::Ready(None) => Poll::Ready(None),
1013        }
1014    }
1015}
1016
1017/// Stream returned by [`ObservableSession::subscribe_and_recover`].
1018pub struct SessionObservationStream {
1019    observable: ObservableSession,
1020    cursor: SessionCursor,
1021    subscription: Option<lash_core::LiveReplaySubscription>,
1022    done: bool,
1023}
1024
1025impl SessionObservationStream {
1026    pub fn cursor(&self) -> &SessionCursor {
1027        &self.cursor
1028    }
1029}
1030
1031impl Stream for SessionObservationStream {
1032    type Item = Result<SessionObservationStreamItem>;
1033
1034    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1035        loop {
1036            if self.done {
1037                return Poll::Ready(None);
1038            }
1039            if self.subscription.is_none() {
1040                match self.observable.subscribe_from_cursor(&self.cursor) {
1041                    Ok(SessionObservationSubscription::Subscribed(subscription)) => {
1042                        self.subscription = Some(subscription);
1043                    }
1044                    Ok(SessionObservationSubscription::Gap { observation, gap }) => {
1045                        self.cursor = gap.latest_cursor.clone();
1046                        return Poll::Ready(Some(Ok(SessionObservationStreamItem::Gap {
1047                            observation,
1048                            gap,
1049                        })));
1050                    }
1051                    Err(err) => {
1052                        self.done = true;
1053                        return Poll::Ready(Some(Err(err)));
1054                    }
1055                }
1056            }
1057
1058            let Some(subscription) = self.subscription.as_mut() else {
1059                continue;
1060            };
1061            match Pin::new(subscription).poll_next(cx) {
1062                Poll::Pending => return Poll::Pending,
1063                Poll::Ready(Some(Ok(event))) => {
1064                    self.cursor = event.cursor.clone();
1065                    return Poll::Ready(Some(Ok(SessionObservationStreamItem::Event(event))));
1066                }
1067                Poll::Ready(Some(Err(LiveReplayStoreError::SubscriberLagged(_)))) => {
1068                    self.subscription = None;
1069                    continue;
1070                }
1071                Poll::Ready(Some(Err(err))) => {
1072                    self.done = true;
1073                    return Poll::Ready(Some(Err(live_replay_error(err))));
1074                }
1075                Poll::Ready(None) => {
1076                    self.done = true;
1077                    return Poll::Ready(None);
1078                }
1079            }
1080        }
1081    }
1082}
1083
1084fn live_replay_error(err: lash_core::LiveReplayStoreError) -> EmbedError {
1085    EmbedError::Runtime(lash_core::RuntimeError::new(
1086        RuntimeErrorCode::Other("live_replay".to_string()),
1087        err.to_string(),
1088    ))
1089}
1090
1091pub struct EnqueueTurnBuilder<'a> {
1092    session: &'a LashSession,
1093    input: TurnInput,
1094    id: Option<String>,
1095    ingress: TurnInputIngress,
1096}
1097
1098impl<'a> EnqueueTurnBuilder<'a> {
1099    pub fn id(mut self, id: impl Into<String>) -> Self {
1100        self.id = Some(id.into());
1101        self
1102    }
1103
1104    pub fn ingress(mut self, ingress: TurnInputIngress) -> Self {
1105        self.ingress = ingress;
1106        self
1107    }
1108
1109    pub async fn send(self) -> Result<PendingTurnInput> {
1110        let source_key = self.id.map(|id| format!("host:{id}"));
1111        self.session
1112            .runtime
1113            .enqueue_turn_input(self.input, self.ingress, source_key)
1114            .await
1115            .map_err(EmbedError::Runtime)
1116    }
1117}
1118
1119impl<'a> std::future::IntoFuture for EnqueueTurnBuilder<'a> {
1120    type Output = Result<PendingTurnInput>;
1121    type IntoFuture =
1122        std::pin::Pin<Box<dyn std::future::Future<Output = Result<PendingTurnInput>> + 'a>>;
1123
1124    fn into_future(self) -> Self::IntoFuture {
1125        Box::pin(self.send())
1126    }
1127}