Skip to main content

lash/
session.rs

1use std::pin::Pin;
2use std::task::{Context, Poll};
3
4use crate::support::*;
5use futures_util::Stream;
6use lash_core::runtime::{
7    PendingTurnInput, PendingTurnInputCancelOutcome, PendingTurnInputCancelResult,
8    PendingTurnInputCancelTarget, PendingTurnInputSuffixCancelOutcome, QueuedWorkBatch,
9    QueuedWorkClaim, TurnInputClaim, TurnInputIngress,
10};
11use lash_core::{LiveReplayGap, LiveReplayStoreError, SessionObservationEvent};
12use lash_remote_protocol::{
13    RemoteLiveReplayGap, RemoteSessionCursor, RemoteSessionObservation,
14    RemoteSessionObservationEvent,
15};
16
17pub struct SessionBuilder {
18    pub(crate) core: LashCore,
19    pub(crate) session_id: String,
20    pub(crate) spec: SessionSpec,
21    pub(crate) parent_session_id: Option<String>,
22    pub(crate) session_execution_owner: Option<lash_core::LeaseOwnerIdentity>,
23    pub(crate) store: Option<Arc<dyn RuntimePersistence>>,
24    pub(crate) provider: Option<ProviderHandle>,
25    pub(crate) active_plugins: Vec<ActivePluginBinding>,
26    pub(crate) plugin_factories: Vec<Arc<dyn PluginFactory>>,
27    /// Plugin-keyed, serializable open-time options. They ride the protocol
28    /// materialization seam (the same `PluginOptions` bag a child
29    /// `SessionCreateRequest` carries) so every plugin gets open-time options
30    /// through one hook.
31    pub(crate) plugin_options: PluginOptions,
32}
33
34impl SessionBuilder {
35    /// Set the plugin-keyed open-time options bag wholesale.
36    pub fn plugin_options(mut self, plugin_options: PluginOptions) -> Self {
37        self.plugin_options = plugin_options;
38        self
39    }
40
41    /// Merge a single plugin's typed options into the open-time options bag,
42    /// preserving any options already set for other plugin keys.
43    pub fn plugin_option<T: serde::Serialize>(
44        mut self,
45        plugin_id: impl Into<String>,
46        extras: T,
47    ) -> Result<Self> {
48        self.plugin_options
49            .insert_typed(plugin_id, extras)
50            .map_err(EmbedError::ProtocolTurnOptions)?;
51        Ok(self)
52    }
53
54    pub fn provider(mut self, provider: ProviderHandle) -> Self {
55        self.spec = self.spec.provider_id(provider.kind());
56        self.provider = Some(provider);
57        self
58    }
59
60    pub fn session_spec(mut self, spec: SessionSpec) -> Self {
61        self.spec = spec;
62        self
63    }
64
65    pub fn parent(mut self, parent_session_id: impl Into<String>) -> Self {
66        self.parent_session_id = Some(parent_session_id.into());
67        self
68    }
69
70    /// Use an explicit owner identity for durable session execution leases.
71    ///
72    /// This is only for hosts that already serialize one logical execution lane
73    /// and intentionally choose stable owner + incarnation values. Normal
74    /// embedders should keep the default per-open identity.
75    pub fn session_execution_owner(mut self, owner: lash_core::LeaseOwnerIdentity) -> Self {
76        self.session_execution_owner = Some(owner);
77        self
78    }
79
80    /// Use a specific persistence store for this root session.
81    ///
82    /// This is the right API for a host-owned, pre-opened session database.
83    /// Managed child sessions never reuse this store; configure
84    /// `LashCoreBuilder::child_store_factory` when child sessions should also
85    /// persist.
86    pub fn store(mut self, store: Arc<dyn RuntimePersistence>) -> Self {
87        self.store = Some(store);
88        self
89    }
90
91    pub fn plugin<P: PluginBinding>(mut self, config: P::SessionConfig) -> Self {
92        self.active_plugins.push(ActivePluginBinding {
93            id: P::ID,
94            requires_turn_input: P::requires_turn_input(&config),
95        });
96        self.plugin_factories.push(P::factory(&config));
97        self
98    }
99
100    pub async fn open(self) -> Result<LashSession> {
101        let policy = self.session_policy();
102        let store = self.create_store(&policy).await?;
103        let state = self
104            .load_or_default_state(&policy, store.as_deref())
105            .await?;
106        self.open_resolved(policy, state, store).await
107    }
108
109    /// Open this session with a fresh resident graph, ignoring any persisted
110    /// session graph/checkpoint state that may already exist for the same
111    /// session id.
112    ///
113    /// The next successful commit writes a full replacement graph, so normal
114    /// embedders can use this to start over without manually calling
115    /// `load_persisted_session_state` or constructing a `RuntimeSessionState`.
116    /// Use [`Self::open`] for resume and [`Self::open_with_state`] only when
117    /// restoring explicit host-owned state.
118    pub async fn open_fresh(self) -> Result<LashSession> {
119        let policy = self.session_policy();
120        let store = self.create_store(&policy).await?;
121        let state = RuntimeSessionState {
122            session_id: self.session_id.clone(),
123            policy: policy.clone(),
124            graph_replace_required: true,
125            ..RuntimeSessionState::default()
126        };
127        self.open_resolved(policy, state, store).await
128    }
129
130    /// Open with an explicitly supplied runtime state.
131    ///
132    /// This is for advanced hosts that already own a complete state snapshot.
133    /// Normal embedders should use [`Self::open`] to resume according to Lash's
134    /// residency policy or [`Self::open_fresh`] to start over and replace prior
135    /// persisted state on the next commit.
136    pub async fn open_with_state(self, mut state: RuntimeSessionState) -> Result<LashSession> {
137        let policy = self.session_policy();
138        let store = self.create_store(&policy).await?;
139        if state.session_id != self.session_id {
140            return Err(EmbedError::StoreSessionMismatch {
141                loaded: state.session_id,
142                requested: self.session_id,
143            });
144        }
145        let recorded_provider_id = state.policy.recorded_provider_id().to_string();
146        state.policy = policy.clone();
147        state.policy.provider_id = recorded_provider_id;
148        self.open_resolved(policy, state, store).await
149    }
150
151    fn session_policy(&self) -> SessionPolicy {
152        let mut policy = self.spec.resolve_against(&self.core.policy);
153        policy.session_id = Some(self.session_id.clone());
154        policy
155    }
156
157    async fn load_or_default_state(
158        &self,
159        policy: &SessionPolicy,
160        store: Option<&dyn RuntimePersistence>,
161    ) -> Result<RuntimeSessionState> {
162        let state = match store {
163            Some(store) => {
164                let loaded = self.load_persisted_state_for_residency(store).await?;
165                let mut state = loaded.unwrap_or_else(|| RuntimeSessionState {
166                    session_id: self.session_id.clone(),
167                    policy: policy.clone(),
168                    ..RuntimeSessionState::default()
169                });
170                if state.session_id != self.session_id {
171                    return Err(EmbedError::StoreSessionMismatch {
172                        loaded: state.session_id,
173                        requested: self.session_id.clone(),
174                    });
175                }
176                let recorded_provider_id = state.policy.recorded_provider_id().to_string();
177                state.policy = policy.clone();
178                state.policy.provider_id = recorded_provider_id;
179                state
180            }
181            None => RuntimeSessionState {
182                session_id: self.session_id.clone(),
183                policy: policy.clone(),
184                ..RuntimeSessionState::default()
185            },
186        };
187        Ok(state)
188    }
189
190    async fn load_persisted_state_for_residency(
191        &self,
192        store: &dyn RuntimePersistence,
193    ) -> Result<Option<RuntimeSessionState>> {
194        load_persisted_state_for_residency(self.core.env.residency, store).await
195    }
196
197    async fn open_resolved(
198        self,
199        policy: SessionPolicy,
200        state: RuntimeSessionState,
201        store: Option<Arc<dyn RuntimePersistence>>,
202    ) -> Result<LashSession> {
203        let mut env = self.core.env.clone();
204        if let Some(provider) = self.provider.clone().or_else(|| self.core.provider.clone()) {
205            env.core.providers.provider_resolver =
206                Arc::new(lash_core::SingleProviderResolver::new(provider));
207        }
208        let plugin_host = build_plugin_host(
209            self.core.protocol_factory.as_ref(),
210            self.core.plugin_factories.as_ref(),
211            self.plugin_factories,
212        )?;
213        env.core = plugin_host.install_process_engine_contributions(
214            env.core.clone(),
215            self.core.process_lifecycle_available,
216        )?;
217        env.plugin_host = Some(Arc::new(plugin_host));
218        let effect_host = Arc::clone(&env.core.control.effect_host);
219        let drivers = self.core.work_driver.drivers().await;
220        env.process_work_driver = drivers.process.clone();
221        env.queued_work_driver = drivers.queued.clone();
222        let mut runtime = LashRuntime::from_environment(&env, policy, state, store).await?;
223        // Fire the protocol materialization hook for this root/builder open
224        // (including resume): the protocol plugin applies and defaults its
225        // per-session options at open time.
226        runtime.configure_protocol_on_materialize(
227            &self.plugin_options,
228            self.parent_session_id.is_none(),
229        )?;
230        if let Some(owner) = self.session_execution_owner {
231            runtime.set_runtime_lease_owner(owner);
232        }
233        if drivers.drive_process_on_open
234            && let Some(driver) = drivers.process.as_ref()
235        {
236            driver.claim_and_run_pending("session_open").await?;
237        }
238        let handle = RuntimeHandle::with_live_replay_store(
239            runtime,
240            Arc::clone(&self.core.live_replay_store),
241        );
242        Ok(LashSession {
243            runtime: handle,
244            effect_host,
245            parent_session_id: self.parent_session_id,
246            active_plugins: self.active_plugins,
247            process_phase_probe_slot: self.core.work_driver.phase_probe_slot(),
248            turn_cancels: crate::turn::TurnCancelRegistry::default(),
249        })
250    }
251
252    async fn create_store(
253        &self,
254        policy: &SessionPolicy,
255    ) -> Result<Option<Arc<dyn RuntimePersistence>>> {
256        if let Some(store) = self.store.as_ref() {
257            return Ok(Some(Arc::clone(store)));
258        }
259        let Some(factory) = self.core.store_factory.as_ref() else {
260            return Ok(None);
261        };
262        let request = SessionStoreCreateRequest {
263            session_id: self.session_id.clone(),
264            relation: self
265                .parent_session_id
266                .as_ref()
267                .map(|parent_session_id| lash_core::SessionRelation::Child {
268                    parent_session_id: parent_session_id.clone(),
269                    caused_by: None,
270                })
271                .unwrap_or_default(),
272            policy: policy.clone(),
273        };
274        factory
275            .create_store(&request)
276            .await
277            .map(Some)
278            .map_err(|message| EmbedError::StoreFactory {
279                session_id: self.session_id.clone(),
280                message,
281            })
282    }
283}
284
285pub(crate) async fn load_state_for_residency(
286    residency: Residency,
287    session_id: &str,
288    policy: &SessionPolicy,
289    store: &dyn RuntimePersistence,
290) -> Result<RuntimeSessionState> {
291    let mut state = load_persisted_state_for_residency(residency, store)
292        .await?
293        .unwrap_or_else(|| RuntimeSessionState {
294            session_id: session_id.to_string(),
295            policy: policy.clone(),
296            ..RuntimeSessionState::default()
297        });
298    if state.session_id != session_id {
299        return Err(EmbedError::StoreSessionMismatch {
300            loaded: state.session_id,
301            requested: session_id.to_string(),
302        });
303    }
304    let recorded_provider_id = state.policy.recorded_provider_id().to_string();
305    state.policy = policy.clone();
306    state.policy.provider_id = recorded_provider_id;
307    Ok(state)
308}
309
310async fn load_persisted_state_for_residency(
311    residency: Residency,
312    store: &dyn RuntimePersistence,
313) -> Result<Option<RuntimeSessionState>> {
314    match residency {
315        Residency::KeepAll => {
316            let loaded = lash_core::store::load_persisted_session_state(store)
317                .await
318                .map_err(|err| SessionError::Protocol(format!("failed to load store: {err}")))?;
319            Ok(loaded)
320        }
321        Residency::ActivePathOnly => {
322            let active = lash_core::store::load_persisted_session_state_active_path(store, None)
323                .await
324                .map_err(|err| {
325                    SessionError::Protocol(format!("failed to load active-path store: {err}"))
326                })?;
327            if active
328                .as_ref()
329                .is_some_and(|state| state.session_graph.nodes.is_empty())
330            {
331                let mut full = lash_core::store::load_persisted_session_state(store)
332                    .await
333                    .map_err(|err| {
334                        SessionError::Protocol(format!(
335                            "failed to heal active-path store from full graph: {err}"
336                        ))
337                    })?;
338                if let Some(state) = full.as_mut() {
339                    state.graph_replace_required = true;
340                }
341                return Ok(full);
342            }
343            Ok(active)
344        }
345    }
346}
347
348impl PromptLayerSink for SessionBuilder {
349    fn prompt_layer_mut(&mut self) -> &mut PromptLayer {
350        self.spec.prompt.get_or_insert_with(PromptLayer::new)
351    }
352}
353
354#[derive(Clone)]
355pub struct LashSession {
356    pub(crate) runtime: RuntimeHandle,
357    pub(crate) effect_host: Arc<dyn EffectHost>,
358    pub(crate) parent_session_id: Option<String>,
359    pub(crate) active_plugins: Vec<ActivePluginBinding>,
360    pub(crate) process_phase_probe_slot: Option<lash_core::runtime::RuntimeTurnPhaseProbeSlot>,
361    pub(crate) turn_cancels: crate::turn::TurnCancelRegistry,
362}
363
364#[derive(Clone, Debug, Default)]
365pub struct SessionConfigPatch {
366    pub provider: Option<ProviderHandle>,
367    pub model: Option<ModelSpec>,
368    pub prompt: Option<PromptLayer>,
369}
370
371/// Lightweight, consuming handle returned by [`LashSession::park`].
372///
373/// Parking flushes a session's dirty state to its store and drops the live
374/// in-memory runtime, keeping only enough to rebuild it: the session id, its
375/// policy, and the store reference. This is the webserver-embedder quiesce /
376/// handoff primitive — cache one of these per idle session at bounded memory
377/// cost regardless of transcript size, then rebuild with
378/// [`LashCore::resume`](crate::LashCore::resume).
379///
380/// The facade owns this vocabulary; it wraps the core parking handle so the
381/// resume path stays a facade capability rather than exposing `lash-core`
382/// environment plumbing to hosts.
383pub struct ParkedSession {
384    pub(crate) inner: lash_core::ParkedSession,
385}
386
387impl ParkedSession {
388    /// The parked session's id. Use it to key a per-session cache of parked
389    /// handles on the host.
390    pub fn session_id(&self) -> &str {
391        self.inner.session_id()
392    }
393}
394
395impl LashSession {
396    /// Durably close this session, then release its in-memory runtime.
397    ///
398    /// `close` is the honest teardown verb: a persistent session flushes its
399    /// dirty state (via a fresh-lease commit) so the store reflects the final
400    /// transcript, its in-memory plugin session is unregistered, and the live
401    /// runtime is dropped. A store-less (ephemeral) session has nothing to
402    /// persist, so closing it is exactly the plugin-session unregister plus
403    /// dropping the runtime.
404    ///
405    /// This consumes the session and requires exclusive ownership: any cloned
406    /// [`LashSession`] handle or in-flight turn keeps a live reference to the
407    /// same runtime, so `close` returns [`EmbedError::SessionStillInUse`] until
408    /// those are dropped or finished. Cancel running turns first with
409    /// [`cancel_running_turns`](Self::cancel_running_turns) if needed.
410    ///
411    /// To keep a handle for later resumption instead of discarding the session,
412    /// use [`park`](Self::park).
413    pub async fn close(self) -> Result<()> {
414        // Persistence is decided before we consume `self`; the observation's
415        // queue store is the facade's canonical "is this session persistent"
416        // signal (the same source `park`'s commit uses).
417        let persistent = self.runtime.observe().queue_store.is_some();
418        let runtime = self.into_owned_runtime()?;
419        runtime.unregister_plugin_session()?;
420        if persistent {
421            // Reuse the core parking primitive to flush + release the lease,
422            // discarding the returned handle: close does not resume.
423            runtime.park().await?;
424        }
425        // Ephemeral sessions: `runtime` drops here, releasing in-memory state.
426        Ok(())
427    }
428
429    /// Quiesce this session for later resumption, returning a lightweight
430    /// [`ParkedSession`] handle.
431    ///
432    /// Parking flushes dirty state to the store (a fresh-lease commit), drops
433    /// the live runtime and its plugin session, and hands back a cheap handle
434    /// the host can cache and later rebuild with
435    /// [`LashCore::resume`](crate::LashCore::resume). This is the
436    /// quiesce/handoff lever for webserver embedders that hold many idle
437    /// sessions: it bounds resident memory per session without deleting durable
438    /// state.
439    ///
440    /// Contract:
441    /// - **Persistent runtime required.** Parking flushes to the store, so a
442    ///   store-less session cannot be parked and returns an error. Use
443    ///   [`close`](Self::close) to tear down an ephemeral session.
444    /// - **Exclusive ownership required.** `park` consumes the session and drops
445    ///   the in-memory runtime, so it needs the sole live reference. A cloned
446    ///   [`LashSession`] or an in-flight turn holds another reference and makes
447    ///   `park` return [`EmbedError::SessionStillInUse`]. Because an executing
448    ///   turn holds such a reference, parking is effectively an *idle-session*
449    ///   operation: finish or [`cancel_running_turns`](Self::cancel_running_turns)
450    ///   first. The store commit itself does not observe an active turn; the
451    ///   exclusive-ownership guard is what makes mid-turn parking an explicit
452    ///   error rather than a silent partial flush.
453    pub async fn park(self) -> Result<ParkedSession> {
454        let runtime = self.into_owned_runtime()?;
455        // We now own the runtime exclusively; release the in-memory plugin
456        // session registration before flushing and dropping it.
457        runtime.unregister_plugin_session()?;
458        let parked = runtime.park().await?;
459        Ok(ParkedSession { inner: parked })
460    }
461
462    /// Consume the session and take sole ownership of the underlying runtime.
463    ///
464    /// Fails with [`EmbedError::SessionStillInUse`] when another live handle
465    /// (a cloned session or an in-flight turn) shares the runtime, so
466    /// consuming operations never proceed on a still-shared runtime.
467    fn into_owned_runtime(self) -> Result<LashRuntime> {
468        let LashSession { runtime, .. } = self;
469        // `writer()` clones the shared `Arc<Mutex<LashRuntime>>`; dropping the
470        // handle then leaves this clone as the sole strong reference iff no
471        // other handle exists, so `try_unwrap` doubles as the exclusive-owner
472        // check.
473        let writer = runtime.writer();
474        drop(runtime);
475        Arc::try_unwrap(writer)
476            .map(|mutex| mutex.into_inner())
477            .map_err(|_| EmbedError::SessionStillInUse)
478    }
479
480    pub fn session_id(&self) -> String {
481        self.runtime.observe().session_id().to_string()
482    }
483
484    pub fn policy_snapshot(&self) -> SessionPolicy {
485        self.runtime.observe().policy.clone()
486    }
487
488    pub fn observe(&self) -> ObservableSession {
489        ObservableSession {
490            runtime: self.runtime.clone(),
491        }
492    }
493
494    pub fn parent_session_id(&self) -> Option<&str> {
495        self.parent_session_id.as_deref()
496    }
497
498    pub fn effect_host(&self) -> Arc<dyn EffectHost> {
499        Arc::clone(&self.effect_host)
500    }
501
502    pub fn turn(&self, input: TurnInput) -> TurnBuilder {
503        TurnBuilder {
504            runtime: self.runtime.clone(),
505            effect_host: Arc::clone(&self.effect_host),
506            active_plugins: self.active_plugins.clone(),
507            input,
508            cancel: CancellationToken::new(),
509            cancels: self.turn_cancels.clone(),
510            protocol_turn_options: None,
511            provider: None,
512            model: None,
513            turn_id: None,
514        }
515    }
516
517    pub fn queued_turn(&self) -> QueuedTurnBuilder {
518        QueuedTurnBuilder {
519            runtime: self.runtime.clone(),
520            effect_host: Arc::clone(&self.effect_host),
521            cancel: CancellationToken::new(),
522            cancels: self.turn_cancels.clone(),
523            batch_ids: Vec::new(),
524            drain_id: None,
525        }
526    }
527
528    /// Cancel every turn currently executing through this opened session
529    /// (including its clones) and report how many were signalled.
530    ///
531    /// This is the affordance behind a UI "stop" control: hold a clone of the
532    /// session wherever the stop arrives and call this, instead of threading a
533    /// [`CancellationToken`](crate::CancellationToken) into every turn call
534    /// ([`TurnBuilder::cancel`](crate::TurnBuilder::cancel) remains the
535    /// per-turn hook when you need one). A cancelled turn finishes with
536    /// `TurnOutcome::Stopped(TurnStop::Cancelled)` and commits like any other
537    /// turn; the session stays usable.
538    ///
539    /// Scope: turns started from this `LashSession` instance and its clones.
540    /// A handle opened separately for the same session id has its own
541    /// registry and is not reached.
542    pub fn cancel_running_turns(&self) -> usize {
543        self.turn_cancels.cancel_all()
544    }
545
546    pub fn admin(&self) -> SessionAdmin {
547        SessionAdmin {
548            runtime: self.runtime.clone(),
549        }
550    }
551
552    pub async fn configure(&self, patch: SessionConfigPatch) -> Result<()> {
553        self.admin().config().update(patch).await
554    }
555
556    pub fn tools(&self) -> ToolAdmin {
557        ToolAdmin::new(self.admin())
558    }
559
560    pub fn commands(&self) -> SessionCommandAdmin {
561        self.admin().commands()
562    }
563
564    pub fn triggers(&self) -> SessionTriggerAdmin {
565        self.admin().triggers()
566    }
567
568    pub fn processes(&self) -> SessionProcessAdmin {
569        SessionProcessAdmin::new(self.admin())
570    }
571
572    /// Refresh the session graph from any background process that signalled it
573    /// changed. This is the honest name for the former
574    /// `processes().await_all()` misnomer (ADR 0019 grill): a session-graph
575    /// resync, not a terminal wait on background work — wait on a process with
576    /// [`SessionProcessAdmin::await_output`]. It lives on the session surface
577    /// because it refreshes the session graph, not the global process registry.
578    pub async fn refresh_background_graph(&self) -> Result<()> {
579        self.admin().refresh_background_graph().await
580    }
581
582    pub fn plugin_operations(&self) -> PluginOperations {
583        PluginOperations {
584            control: self.admin(),
585        }
586    }
587
588    pub fn enqueue(&self, input: TurnInput) -> EnqueueTurnBuilder<'_> {
589        EnqueueTurnBuilder {
590            session: self,
591            input,
592            id: None,
593            ingress: TurnInputIngress::NextTurn,
594        }
595    }
596
597    /// Return all pending durable queued-work batches for this session.
598    ///
599    /// This is an admin/introspection view for non-user queued work such as
600    /// process wakes and session commands. User-visible model input is stored
601    /// separately as pending turn input and is exposed by
602    /// [`pending_turn_inputs`](Self::pending_turn_inputs).
603    pub async fn queued_work(&self) -> Result<Vec<QueuedWorkBatch>> {
604        let observation = self.runtime.observe();
605        let store = observation.queue_store.as_ref().ok_or_else(|| {
606            EmbedError::Runtime(lash_core::RuntimeError::new(
607                lash_core::RuntimeErrorCode::StoreCommitFailed,
608                "queued work inspection requires a persistent runtime store",
609            ))
610        })?;
611        store
612            .list_pending_queued_work(observation.session_id())
613            .await
614            .map_err(|err| {
615                EmbedError::Runtime(lash_core::RuntimeError::new(
616                    lash_core::RuntimeErrorCode::StoreCommitFailed,
617                    err.to_string(),
618                ))
619            })
620    }
621
622    pub async fn pending_turn_inputs(&self) -> Result<Vec<PendingTurnInput>> {
623        let observation = self.runtime.observe();
624        let store = observation.queue_store.as_ref().ok_or_else(|| {
625            EmbedError::Runtime(lash_core::RuntimeError::new(
626                lash_core::RuntimeErrorCode::StoreCommitFailed,
627                "pending turn input inspection requires a persistent runtime store",
628            ))
629        })?;
630        store
631            .list_pending_turn_inputs(observation.session_id())
632            .await
633            .map_err(|err| {
634                EmbedError::Runtime(lash_core::RuntimeError::new(
635                    lash_core::RuntimeErrorCode::StoreCommitFailed,
636                    err.to_string(),
637                ))
638            })
639    }
640
641    pub async fn cancel_pending_turn_input(
642        &self,
643        input_id: &str,
644    ) -> Result<PendingTurnInputCancelOutcome> {
645        let session_id = self.session_id();
646        self.runtime
647            .cancel_pending_turn_input(&session_id, input_id)
648            .await
649            .map_err(EmbedError::Runtime)
650    }
651
652    /// Atomically cancel a set of pending user inputs by runtime input id or
653    /// app source key.
654    ///
655    /// This is the app reconciliation path for explicit selections such as
656    /// "remove these pending drafts". Returned outcomes distinguish newly
657    /// cancelled input from input that was already claimed, completed,
658    /// cancelled, or missing.
659    pub async fn cancel_pending_turn_inputs(
660        &self,
661        targets: impl IntoIterator<Item = PendingTurnInputCancelTarget>,
662    ) -> Result<Vec<PendingTurnInputCancelResult>> {
663        let session_id = self.session_id();
664        let targets = targets.into_iter().collect::<Vec<_>>();
665        self.runtime
666            .cancel_pending_turn_inputs(&session_id, &targets)
667            .await
668            .map_err(EmbedError::Runtime)
669    }
670
671    /// Atomically cancel the same-session pending-input suffix from `anchor`.
672    ///
673    /// Apps that let users edit previously submitted product messages should
674    /// map the edited message to the stored pending-input `input_id` or
675    /// `source_key`, call this method, and only restore/edit drafts that return
676    /// [`PendingTurnInputCancelOutcome::Cancelled`]. Claimed or completed
677    /// inputs have already crossed the runtime boundary and should be treated
678    /// as reconciliation state, not local editable drafts.
679    pub async fn cancel_pending_turn_input_suffix(
680        &self,
681        anchor: PendingTurnInputCancelTarget,
682    ) -> Result<PendingTurnInputSuffixCancelOutcome> {
683        let session_id = self.session_id();
684        self.runtime
685            .cancel_pending_turn_input_suffix(&session_id, &anchor)
686            .await
687            .map_err(EmbedError::Runtime)
688    }
689
690    pub async fn cancel_queued_work_batch(
691        &self,
692        batch_id: &str,
693    ) -> Result<Option<QueuedWorkBatch>> {
694        let session_id = self.session_id();
695        self.runtime
696            .cancel_queued_work_batch(&session_id, batch_id)
697            .await
698            .map_err(EmbedError::Runtime)
699    }
700
701    /// Release a held queued-work claim without completing it, returning its
702    /// batches to the pending queue immediately.
703    ///
704    /// A host stopping an external queued-work driver mid-claim calls this
705    /// with the claims that driver still holds so the work becomes claimable
706    /// again at once instead of waiting out the claim's lease TTL.
707    pub async fn abandon_queued_work_claim(&self, claim: &QueuedWorkClaim) -> Result<()> {
708        self.runtime
709            .abandon_queued_work_claim(claim)
710            .await
711            .map_err(EmbedError::Runtime)
712    }
713
714    /// Release a held pending-turn-input claim without completing it, returning
715    /// its inputs to the pending queue immediately. The turn-input counterpart
716    /// of [`abandon_queued_work_claim`](Self::abandon_queued_work_claim).
717    pub async fn abandon_turn_input_claim(&self, claim: &TurnInputClaim) -> Result<()> {
718        self.runtime
719            .abandon_turn_input_claim(claim)
720            .await
721            .map_err(EmbedError::Runtime)
722    }
723
724    /// Cancel every outstanding durable wait for this session without deleting
725    /// the session.
726    ///
727    /// Each waiter receives a terminal [`Resolution::Cancelled`](crate::Resolution)
728    /// instead of hanging until an external completion arrives, and late
729    /// resolves observe that terminal. The session itself stays usable: new
730    /// durable waits registered afterwards behave normally, unlike the
731    /// tombstoning revocation [`LashCore::delete_session`](crate::LashCore::delete_session)
732    /// performs.
733    pub async fn revoke_durable_waits(&self) -> Result<()> {
734        let session_id = self.session_id();
735        self.effect_host
736            .cancel_await_events_for_session(&session_id)
737            .await
738            .map_err(EmbedError::Runtime)
739    }
740
741    /// Resolve once `batch_id` is no longer pending in the queue store —
742    /// drained by whoever runs queued work (a queued-work runner, a durable
743    /// worker, or another handle's [`queued_turn`](Self::queued_turn)) or
744    /// cancelled. This is the enqueue-and-observe side of the queue: the
745    /// caller never claims the work itself.
746    ///
747    /// Completion is read from the persistent queue store, so it observes
748    /// drains performed by other session handles and other processes alike.
749    /// There is no built-in deadline — nothing resolves if nothing drains the
750    /// queue, so bound it with `tokio::time::timeout` when the worker may be
751    /// unavailable. A batch id the store has never seen resolves immediately.
752    pub async fn await_queued_work_batch(&self, batch_id: &str) -> Result<()> {
753        let observation = self.runtime.observe();
754        let store = observation.queue_store.clone().ok_or_else(|| {
755            EmbedError::Runtime(lash_core::RuntimeError::new(
756                lash_core::RuntimeErrorCode::StoreCommitFailed,
757                "queued work inspection requires a persistent runtime store",
758            ))
759        })?;
760        let session_id = observation.session_id().to_string();
761        drop(observation);
762        let mut delay = std::time::Duration::from_millis(25);
763        loop {
764            let pending = store
765                .list_pending_queued_work(&session_id)
766                .await
767                .map_err(|err| {
768                    EmbedError::Runtime(lash_core::RuntimeError::new(
769                        lash_core::RuntimeErrorCode::StoreCommitFailed,
770                        err.to_string(),
771                    ))
772                })?;
773            if !pending.iter().any(|batch| batch.batch_id == batch_id) {
774                return Ok(());
775            }
776            tokio::time::sleep(delay).await;
777            delay = (delay * 2).min(std::time::Duration::from_millis(400));
778        }
779    }
780
781    pub fn read_view(&self) -> SessionReadView {
782        self.runtime.observe().read_view.clone()
783    }
784
785    pub fn usage_report(&self) -> SessionUsageReport {
786        self.runtime.observe().usage_report.clone()
787    }
788
789    pub async fn set_turn_phase_probe(
790        &self,
791        probe: Arc<dyn lash_core::runtime::RuntimeTurnPhaseProbe>,
792    ) {
793        let writer = self.runtime.writer();
794        let mut runtime = writer.lock().await;
795        runtime.set_turn_phase_probe(Arc::clone(&probe));
796        self.runtime.publish_from(&runtime);
797        if let Some(slot) = &self.process_phase_probe_slot {
798            let observation = self.runtime.observe();
799            slot.set_for_session(observation.session_id(), Arc::clone(&probe));
800            let current_frame = observation.persisted_state.current_agent_frame_id.as_str();
801            if !current_frame.is_empty() {
802                let scope = lash_core::SessionScope::for_agent_frame(
803                    observation.session_id(),
804                    current_frame,
805                );
806                slot.set_for_scope(&scope, probe);
807            }
808        }
809    }
810}
811
812#[derive(Clone)]
813pub struct ObservableSession {
814    pub(crate) runtime: RuntimeHandle,
815}
816
817impl ObservableSession {
818    fn snapshot(&self) -> Arc<RuntimeObservation> {
819        self.runtime.observe()
820    }
821
822    pub fn current_observation(&self) -> SessionObservation {
823        self.runtime.current_session_observation()
824    }
825
826    pub fn current_remote_observation(&self) -> RemoteSessionObservation {
827        RemoteSessionObservation::from_core(self.current_observation())
828    }
829
830    pub fn resume_from_cursor(&self, cursor: &SessionCursor) -> Result<SessionResume> {
831        self.runtime
832            .resume_session_observation(cursor)
833            .map_err(live_replay_error)
834    }
835
836    pub fn subscribe_from_cursor(
837        &self,
838        cursor: &SessionCursor,
839    ) -> Result<SessionObservationSubscription> {
840        self.runtime
841            .subscribe_session_observation(cursor)
842            .map_err(live_replay_error)
843    }
844
845    pub fn subscribe_from_remote_cursor(
846        &self,
847        cursor: &RemoteSessionCursor,
848    ) -> Result<RemoteSessionObservationSubscription> {
849        cursor.validate()?;
850        let cursor = lash_core::SessionCursor::try_from(cursor.clone())?;
851        match self.subscribe_from_cursor(&cursor)? {
852            SessionObservationSubscription::Subscribed(subscription) => {
853                Ok(RemoteSessionObservationSubscription::Subscribed(
854                    RemoteSessionObservationEventStream::new(subscription),
855                ))
856            }
857            SessionObservationSubscription::Gap { observation, gap } => {
858                Ok(RemoteSessionObservationSubscription::Gap {
859                    observation: observation.into(),
860                    gap: gap.into(),
861                })
862            }
863        }
864    }
865
866    /// Subscribe to session observation events and keep the subscription alive
867    /// across recoverable live-replay gaps.
868    ///
869    /// The returned stream yields [`SessionObservationStreamItem::Gap`] when
870    /// the cursor missed the bounded replay window. Callers should replace
871    /// their UI/projection from the included fresh observation, persist
872    /// `gap.latest_cursor`, and keep polling the same stream; it resubscribes
873    /// from that cursor internally.
874    pub fn subscribe_and_recover(&self, cursor: SessionCursor) -> SessionObservationStream {
875        SessionObservationStream {
876            observable: self.clone(),
877            cursor,
878            subscription: None,
879            done: false,
880        }
881    }
882
883    /// Subscribe to remote DTO session observation events and keep the
884    /// subscription alive across recoverable live-replay gaps.
885    pub fn subscribe_and_recover_remote(
886        &self,
887        cursor: RemoteSessionCursor,
888    ) -> Result<RemoteSessionObservationStream> {
889        cursor.validate()?;
890        let cursor = lash_core::SessionCursor::try_from(cursor)?;
891        Ok(RemoteSessionObservationStream {
892            inner: self.subscribe_and_recover(cursor),
893            next_sequence: 0,
894        })
895    }
896
897    pub fn session_id(&self) -> String {
898        self.snapshot().session_id().to_string()
899    }
900
901    pub fn policy_snapshot(&self) -> SessionPolicy {
902        self.snapshot().policy.clone()
903    }
904
905    pub fn read_view(&self) -> SessionReadView {
906        self.snapshot().read_view.clone()
907    }
908
909    pub fn usage_report(&self) -> SessionUsageReport {
910        self.snapshot().usage_report.clone()
911    }
912
913    pub fn tool_state(&self) -> Option<ToolState> {
914        self.snapshot().tool_state.clone()
915    }
916
917    pub fn active_tool_manifests(&self) -> Vec<ToolManifest> {
918        self.snapshot()
919            .tool_state
920            .as_ref()
921            .map(ToolState::tool_manifests)
922            .unwrap_or_default()
923    }
924
925    pub async fn list_process_handles(&self) -> Vec<ProcessHandleSummary> {
926        self.snapshot().list_process_handles().await
927    }
928
929    pub async fn list_all_process_handles(&self) -> Vec<ProcessHandleSummary> {
930        self.snapshot().list_all_process_handles().await
931    }
932
933    pub fn process_scope(&self) -> SessionScope {
934        self.snapshot().process_scope()
935    }
936}
937
938// A public streaming yield produced one item at a time by `Stream::poll_next`;
939// the variant-size spread is transient (never accumulated in a collection), so
940// boxing would only add a per-event heap allocation on the observation hot path
941// and force `*`-deref churn on every SDK consumer. The sibling
942// `RemoteSessionObservationStreamItem` keeps the same inline shape.
943#[allow(clippy::large_enum_variant)]
944#[derive(Clone, Debug)]
945pub enum SessionObservationStreamItem {
946    /// A replayed or live session observation event.
947    Event(SessionObservationEvent),
948    /// A recoverable replay gap with a fresh durable observation.
949    Gap {
950        observation: SessionObservation,
951        gap: LiveReplayGap,
952    },
953}
954
955pub enum RemoteSessionObservationSubscription {
956    Subscribed(RemoteSessionObservationEventStream),
957    Gap {
958        observation: RemoteSessionObservation,
959        gap: RemoteLiveReplayGap,
960    },
961}
962
963#[derive(Clone, Debug)]
964pub enum RemoteSessionObservationStreamItem {
965    /// A replayed or live session observation event encoded as remote DTOs.
966    Event(RemoteSessionObservationEvent),
967    /// A recoverable replay gap with a fresh remote observation snapshot.
968    Gap {
969        observation: RemoteSessionObservation,
970        gap: RemoteLiveReplayGap,
971    },
972}
973
974pub struct RemoteSessionObservationEventStream {
975    inner: lash_core::LiveReplaySubscription,
976    next_sequence: u64,
977}
978
979impl RemoteSessionObservationEventStream {
980    fn new(inner: lash_core::LiveReplaySubscription) -> Self {
981        Self {
982            inner,
983            next_sequence: 0,
984        }
985    }
986
987    pub async fn next_event(&mut self) -> Result<RemoteSessionObservationEvent> {
988        futures_util::future::poll_fn(|cx| Pin::new(&mut *self).poll_next(cx))
989            .await
990            .transpose()?
991            .ok_or_else(|| live_replay_error(LiveReplayStoreError::Closed))
992    }
993}
994
995impl Stream for RemoteSessionObservationEventStream {
996    type Item = Result<RemoteSessionObservationEvent>;
997
998    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
999        match Pin::new(&mut self.inner).poll_next(cx) {
1000            Poll::Pending => Poll::Pending,
1001            Poll::Ready(Some(Ok(event))) => {
1002                let remote = RemoteSessionObservationEvent::from_core(self.next_sequence, event);
1003                self.next_sequence = self.next_sequence.saturating_add(1);
1004                Poll::Ready(Some(Ok(remote)))
1005            }
1006            Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(live_replay_error(err)))),
1007            Poll::Ready(None) => Poll::Ready(None),
1008        }
1009    }
1010}
1011
1012/// Remote DTO stream returned by [`ObservableSession::subscribe_and_recover_remote`].
1013pub struct RemoteSessionObservationStream {
1014    inner: SessionObservationStream,
1015    next_sequence: u64,
1016}
1017
1018impl RemoteSessionObservationStream {
1019    pub fn cursor(&self) -> RemoteSessionCursor {
1020        RemoteSessionCursor::from(self.inner.cursor())
1021    }
1022}
1023
1024impl Stream for RemoteSessionObservationStream {
1025    type Item = Result<RemoteSessionObservationStreamItem>;
1026
1027    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1028        match Pin::new(&mut self.inner).poll_next(cx) {
1029            Poll::Pending => Poll::Pending,
1030            Poll::Ready(Some(Ok(SessionObservationStreamItem::Event(event)))) => {
1031                let remote = RemoteSessionObservationEvent::from_core(self.next_sequence, event);
1032                self.next_sequence = self.next_sequence.saturating_add(1);
1033                Poll::Ready(Some(Ok(RemoteSessionObservationStreamItem::Event(remote))))
1034            }
1035            Poll::Ready(Some(Ok(SessionObservationStreamItem::Gap { observation, gap }))) => {
1036                Poll::Ready(Some(Ok(RemoteSessionObservationStreamItem::Gap {
1037                    observation: observation.into(),
1038                    gap: gap.into(),
1039                })))
1040            }
1041            Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err))),
1042            Poll::Ready(None) => Poll::Ready(None),
1043        }
1044    }
1045}
1046
1047/// Stream returned by [`ObservableSession::subscribe_and_recover`].
1048pub struct SessionObservationStream {
1049    observable: ObservableSession,
1050    cursor: SessionCursor,
1051    subscription: Option<lash_core::LiveReplaySubscription>,
1052    done: bool,
1053}
1054
1055impl SessionObservationStream {
1056    pub fn cursor(&self) -> &SessionCursor {
1057        &self.cursor
1058    }
1059}
1060
1061impl Stream for SessionObservationStream {
1062    type Item = Result<SessionObservationStreamItem>;
1063
1064    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1065        loop {
1066            if self.done {
1067                return Poll::Ready(None);
1068            }
1069            if self.subscription.is_none() {
1070                match self.observable.subscribe_from_cursor(&self.cursor) {
1071                    Ok(SessionObservationSubscription::Subscribed(subscription)) => {
1072                        self.subscription = Some(subscription);
1073                    }
1074                    Ok(SessionObservationSubscription::Gap { observation, gap }) => {
1075                        self.cursor = gap.latest_cursor.clone();
1076                        return Poll::Ready(Some(Ok(SessionObservationStreamItem::Gap {
1077                            observation,
1078                            gap,
1079                        })));
1080                    }
1081                    Err(err) => {
1082                        self.done = true;
1083                        return Poll::Ready(Some(Err(err)));
1084                    }
1085                }
1086            }
1087
1088            let Some(subscription) = self.subscription.as_mut() else {
1089                continue;
1090            };
1091            match Pin::new(subscription).poll_next(cx) {
1092                Poll::Pending => return Poll::Pending,
1093                Poll::Ready(Some(Ok(event))) => {
1094                    self.cursor = event.cursor.clone();
1095                    return Poll::Ready(Some(Ok(SessionObservationStreamItem::Event(event))));
1096                }
1097                Poll::Ready(Some(Err(LiveReplayStoreError::SubscriberLagged(_)))) => {
1098                    self.subscription = None;
1099                    continue;
1100                }
1101                Poll::Ready(Some(Err(err))) => {
1102                    self.done = true;
1103                    return Poll::Ready(Some(Err(live_replay_error(err))));
1104                }
1105                Poll::Ready(None) => {
1106                    self.done = true;
1107                    return Poll::Ready(None);
1108                }
1109            }
1110        }
1111    }
1112}
1113
1114fn live_replay_error(err: lash_core::LiveReplayStoreError) -> EmbedError {
1115    EmbedError::Runtime(lash_core::RuntimeError::new(
1116        RuntimeErrorCode::Other("live_replay".to_string()),
1117        err.to_string(),
1118    ))
1119}
1120
1121pub struct EnqueueTurnBuilder<'a> {
1122    session: &'a LashSession,
1123    input: TurnInput,
1124    id: Option<String>,
1125    ingress: TurnInputIngress,
1126}
1127
1128impl<'a> EnqueueTurnBuilder<'a> {
1129    pub fn id(mut self, id: impl Into<String>) -> Self {
1130        self.id = Some(id.into());
1131        self
1132    }
1133
1134    pub fn ingress(mut self, ingress: TurnInputIngress) -> Self {
1135        self.ingress = ingress;
1136        self
1137    }
1138
1139    pub async fn send(self) -> Result<PendingTurnInput> {
1140        let source_key = self.id.map(|id| format!("host:{id}"));
1141        self.session
1142            .runtime
1143            .enqueue_turn_input(self.input, self.ingress, source_key)
1144            .await
1145            .map_err(EmbedError::Runtime)
1146    }
1147}
1148
1149impl<'a> std::future::IntoFuture for EnqueueTurnBuilder<'a> {
1150    type Output = Result<PendingTurnInput>;
1151    type IntoFuture =
1152        std::pin::Pin<Box<dyn std::future::Future<Output = Result<PendingTurnInput>> + 'a>>;
1153
1154    fn into_future(self) -> Self::IntoFuture {
1155        Box::pin(self.send())
1156    }
1157}