Skip to main content

lash/
session.rs

1use std::pin::Pin;
2use std::task::{Context, Poll};
3
4use crate::support::*;
5use futures_util::Stream;
6use lash_core::runtime::{DeliveryPolicy, QueuedWorkBatch, SlotPolicy};
7use lash_core::{LiveReplayGap, LiveReplayStoreError, SessionObservationEvent};
8use lash_remote_protocol::{
9    RemoteLiveReplayGap, RemoteSessionCursor, RemoteSessionObservation,
10    RemoteSessionObservationEvent,
11};
12
13pub struct SessionBuilder {
14    pub(crate) core: LashCore,
15    pub(crate) session_id: String,
16    pub(crate) spec: SessionSpec,
17    pub(crate) parent_session_id: Option<String>,
18    pub(crate) session_execution_owner_id: Option<String>,
19    pub(crate) store: Option<Arc<dyn RuntimePersistence>>,
20    pub(crate) provider: Option<ProviderHandle>,
21    pub(crate) active_plugins: Vec<ActivePluginBinding>,
22    pub(crate) plugin_factories: Vec<Arc<dyn PluginFactory>>,
23}
24
25#[cfg(feature = "rlm")]
26pub struct RlmSessionBuilder {
27    pub(crate) builder: SessionBuilder,
28    pub(crate) rlm_final_answer_format: Option<lash_rlm_types::RlmFinalAnswerFormat>,
29}
30
31impl SessionBuilder {
32    pub fn provider(mut self, provider: ProviderHandle) -> Self {
33        self.spec = self.spec.provider_id(provider.kind());
34        self.provider = Some(provider);
35        self
36    }
37
38    pub fn session_spec(mut self, spec: SessionSpec) -> Self {
39        self.spec = spec;
40        self
41    }
42
43    pub fn parent(mut self, parent_session_id: impl Into<String>) -> Self {
44        self.parent_session_id = Some(parent_session_id.into());
45        self
46    }
47
48    /// Use a stable owner id for durable session execution leases.
49    ///
50    /// This is only for hosts that already serialize one logical execution
51    /// lane, such as a durable workflow engine retrying the same invocation on
52    /// another process. Normal embedders should keep the default per-open UUID;
53    /// reusing an owner id across independently running handles makes them the
54    /// same lease owner.
55    pub fn session_execution_owner_id(mut self, owner_id: impl Into<String>) -> Self {
56        self.session_execution_owner_id = Some(owner_id.into());
57        self
58    }
59
60    /// Use a specific persistence store for this root session.
61    ///
62    /// This is the right API for a host-owned, pre-opened session database.
63    /// Managed child sessions never reuse this store; configure
64    /// `LashCoreBuilder::child_store_factory` when child sessions should also
65    /// persist.
66    pub fn store(mut self, store: Arc<dyn RuntimePersistence>) -> Self {
67        self.store = Some(store);
68        self
69    }
70
71    pub fn plugin<P: PluginBinding>(mut self, config: P::SessionConfig) -> Self {
72        self.active_plugins.push(ActivePluginBinding {
73            id: P::ID,
74            requires_turn_input: P::requires_turn_input(&config),
75        });
76        self.plugin_factories.push(P::factory(&config));
77        self
78    }
79
80    pub async fn open(self) -> Result<LashSession> {
81        let policy = self.session_policy();
82        let store = self.create_store(&policy).await?;
83        let state = self
84            .load_or_default_state(&policy, store.as_deref())
85            .await?;
86        self.open_resolved(policy, state, store).await
87    }
88
89    /// Open this session with a fresh resident graph, ignoring any persisted
90    /// session graph/checkpoint state that may already exist for the same
91    /// session id.
92    ///
93    /// The next successful commit writes a full replacement graph, so normal
94    /// embedders can use this to start over without manually calling
95    /// `load_persisted_session_state` or constructing a `RuntimeSessionState`.
96    /// Use [`Self::open`] for resume and [`Self::open_with_state`] only when
97    /// restoring explicit host-owned state.
98    pub async fn open_fresh(self) -> Result<LashSession> {
99        let policy = self.session_policy();
100        let store = self.create_store(&policy).await?;
101        let state = RuntimeSessionState {
102            session_id: self.session_id.clone(),
103            policy: policy.clone(),
104            graph_replace_required: true,
105            ..RuntimeSessionState::default()
106        };
107        self.open_resolved(policy, state, store).await
108    }
109
110    /// Open with an explicitly supplied runtime state.
111    ///
112    /// This is for advanced hosts that already own a complete state snapshot.
113    /// Normal embedders should use [`Self::open`] to resume according to Lash's
114    /// residency policy or [`Self::open_fresh`] to start over and replace prior
115    /// persisted state on the next commit.
116    pub async fn open_with_state(self, mut state: RuntimeSessionState) -> Result<LashSession> {
117        let policy = self.session_policy();
118        let store = self.create_store(&policy).await?;
119        if state.session_id != self.session_id {
120            return Err(EmbedError::StoreSessionMismatch {
121                loaded: state.session_id,
122                requested: self.session_id,
123            });
124        }
125        let recorded_provider_id = state.policy.recorded_provider_id().to_string();
126        state.policy = policy.clone();
127        state.policy.provider_id = recorded_provider_id;
128        self.open_resolved(policy, state, store).await
129    }
130
131    fn session_policy(&self) -> SessionPolicy {
132        let mut policy = self.spec.resolve_against(&self.core.policy);
133        policy.session_id = Some(self.session_id.clone());
134        policy
135    }
136
137    async fn load_or_default_state(
138        &self,
139        policy: &SessionPolicy,
140        store: Option<&dyn RuntimePersistence>,
141    ) -> Result<RuntimeSessionState> {
142        let state = match store {
143            Some(store) => {
144                let loaded = self.load_persisted_state_for_residency(store).await?;
145                let mut state = loaded.unwrap_or_else(|| RuntimeSessionState {
146                    session_id: self.session_id.clone(),
147                    policy: policy.clone(),
148                    ..RuntimeSessionState::default()
149                });
150                if state.session_id != self.session_id {
151                    return Err(EmbedError::StoreSessionMismatch {
152                        loaded: state.session_id,
153                        requested: self.session_id.clone(),
154                    });
155                }
156                let recorded_provider_id = state.policy.recorded_provider_id().to_string();
157                state.policy = policy.clone();
158                state.policy.provider_id = recorded_provider_id;
159                state
160            }
161            None => RuntimeSessionState {
162                session_id: self.session_id.clone(),
163                policy: policy.clone(),
164                ..RuntimeSessionState::default()
165            },
166        };
167        Ok(state)
168    }
169
170    async fn load_persisted_state_for_residency(
171        &self,
172        store: &dyn RuntimePersistence,
173    ) -> Result<Option<RuntimeSessionState>> {
174        load_persisted_state_for_residency(self.core.env.residency, store).await
175    }
176
177    async fn open_resolved(
178        self,
179        policy: SessionPolicy,
180        state: RuntimeSessionState,
181        store: Option<Arc<dyn RuntimePersistence>>,
182    ) -> Result<LashSession> {
183        let mut env = self.core.env.clone();
184        if let Some(provider) = self.provider.clone().or_else(|| self.core.provider.clone()) {
185            env.core.providers.provider_resolver =
186                Arc::new(lash_core::SingleProviderResolver::new(provider));
187        }
188        let plugin_host = build_plugin_host(
189            self.core.protocol_factory.as_ref(),
190            self.core.plugin_factories.as_ref(),
191            self.plugin_factories,
192        )?;
193        env.core = self
194            .core
195            .runtime_host_for_plugin_host(env.core.clone(), &plugin_host)?;
196        env.plugin_host = Some(Arc::new(plugin_host));
197        let effect_host = Arc::clone(&env.core.control.effect_host);
198        let drivers = self.core.work_driver.drivers().await;
199        env.process_work_driver = drivers.process.clone();
200        env.queued_work_driver = drivers.queued.clone();
201        let mut runtime = LashRuntime::from_environment(&env, policy, state, store).await?;
202        if let Some(owner_id) = self.session_execution_owner_id {
203            runtime.set_runtime_scope_id(owner_id);
204        }
205        if drivers.drive_process_on_open
206            && let Some(driver) = drivers.process.as_ref()
207        {
208            driver.claim_and_run_pending("session_open").await?;
209        }
210        let handle = RuntimeHandle::with_live_replay_store(
211            runtime,
212            Arc::clone(&self.core.live_replay_store),
213        );
214        Ok(LashSession {
215            runtime: handle,
216            effect_host,
217            parent_session_id: self.parent_session_id,
218            active_plugins: self.active_plugins,
219            process_phase_probe_slot: self.core.work_driver.phase_probe_slot(),
220            turn_cancels: crate::turn::TurnCancelRegistry::default(),
221        })
222    }
223
224    async fn create_store(
225        &self,
226        policy: &SessionPolicy,
227    ) -> Result<Option<Arc<dyn RuntimePersistence>>> {
228        if let Some(store) = self.store.as_ref() {
229            return Ok(Some(Arc::clone(store)));
230        }
231        let Some(factory) = self.core.store_factory.as_ref() else {
232            return Ok(None);
233        };
234        let request = SessionStoreCreateRequest {
235            session_id: self.session_id.clone(),
236            relation: self
237                .parent_session_id
238                .as_ref()
239                .map(|parent_session_id| lash_core::SessionRelation::Child {
240                    parent_session_id: parent_session_id.clone(),
241                    caused_by: None,
242                })
243                .unwrap_or_default(),
244            policy: policy.clone(),
245        };
246        factory
247            .create_store(&request)
248            .await
249            .map(Some)
250            .map_err(|message| EmbedError::StoreFactory {
251                session_id: self.session_id.clone(),
252                message,
253            })
254    }
255}
256
257pub(crate) async fn load_state_for_residency(
258    residency: Residency,
259    session_id: &str,
260    policy: &SessionPolicy,
261    store: &dyn RuntimePersistence,
262) -> Result<RuntimeSessionState> {
263    let mut state = load_persisted_state_for_residency(residency, store)
264        .await?
265        .unwrap_or_else(|| RuntimeSessionState {
266            session_id: session_id.to_string(),
267            policy: policy.clone(),
268            ..RuntimeSessionState::default()
269        });
270    if state.session_id != session_id {
271        return Err(EmbedError::StoreSessionMismatch {
272            loaded: state.session_id,
273            requested: session_id.to_string(),
274        });
275    }
276    let recorded_provider_id = state.policy.recorded_provider_id().to_string();
277    state.policy = policy.clone();
278    state.policy.provider_id = recorded_provider_id;
279    Ok(state)
280}
281
282async fn load_persisted_state_for_residency(
283    residency: Residency,
284    store: &dyn RuntimePersistence,
285) -> Result<Option<RuntimeSessionState>> {
286    match residency {
287        Residency::KeepAll => {
288            let loaded = lash_core::store::load_persisted_session_state(store)
289                .await
290                .map_err(|err| SessionError::Protocol(format!("failed to load store: {err}")))?;
291            Ok(loaded)
292        }
293        Residency::ActivePathOnly => {
294            let active = lash_core::store::load_persisted_session_state_active_path(store, None)
295                .await
296                .map_err(|err| {
297                    SessionError::Protocol(format!("failed to load active-path store: {err}"))
298                })?;
299            if active
300                .as_ref()
301                .is_some_and(|state| state.session_graph.nodes.is_empty())
302            {
303                let mut full = lash_core::store::load_persisted_session_state(store)
304                    .await
305                    .map_err(|err| {
306                        SessionError::Protocol(format!(
307                            "failed to heal active-path store from full graph: {err}"
308                        ))
309                    })?;
310                if let Some(state) = full.as_mut() {
311                    state.graph_replace_required = true;
312                }
313                return Ok(full);
314            }
315            Ok(active)
316        }
317    }
318}
319
320impl PromptLayerSink for SessionBuilder {
321    fn prompt_layer_mut(&mut self) -> &mut PromptLayer {
322        self.spec.prompt.get_or_insert_with(PromptLayer::new)
323    }
324}
325
326#[cfg(feature = "rlm")]
327impl RlmSessionBuilder {
328    pub fn provider(mut self, provider: ProviderHandle) -> Self {
329        self.builder = self.builder.provider(provider);
330        self
331    }
332
333    pub fn session_spec(mut self, spec: SessionSpec) -> Self {
334        self.builder = self.builder.session_spec(spec);
335        self
336    }
337
338    pub fn parent(mut self, parent_session_id: impl Into<String>) -> Self {
339        self.builder = self.builder.parent(parent_session_id);
340        self
341    }
342
343    pub fn session_execution_owner_id(mut self, owner_id: impl Into<String>) -> Self {
344        self.builder = self.builder.session_execution_owner_id(owner_id);
345        self
346    }
347
348    pub fn store(mut self, store: Arc<dyn RuntimePersistence>) -> Self {
349        self.builder = self.builder.store(store);
350        self
351    }
352
353    pub fn plugin<P: PluginBinding>(mut self, config: P::SessionConfig) -> Self {
354        self.builder = self.builder.plugin::<P>(config);
355        self
356    }
357
358    pub async fn open(self) -> Result<LashSession> {
359        self.open_resolved(RlmOpenState::Resume).await
360    }
361
362    pub async fn open_fresh(self) -> Result<LashSession> {
363        self.open_resolved(RlmOpenState::Fresh).await
364    }
365
366    pub async fn open_with_state(self, state: RuntimeSessionState) -> Result<LashSession> {
367        self.open_resolved(RlmOpenState::Explicit(state)).await
368    }
369
370    async fn open_resolved(self, open_state: RlmOpenState) -> Result<LashSession> {
371        let Self {
372            builder,
373            rlm_final_answer_format,
374        } = self;
375        let policy = builder.session_policy();
376        let store = builder.create_store(&policy).await?;
377        let mut state = match open_state {
378            RlmOpenState::Resume => {
379                builder
380                    .load_or_default_state(&policy, store.as_deref())
381                    .await?
382            }
383            RlmOpenState::Fresh => RuntimeSessionState {
384                session_id: builder.session_id.clone(),
385                policy: policy.clone(),
386                graph_replace_required: true,
387                ..RuntimeSessionState::default()
388            },
389            RlmOpenState::Explicit(mut state) => {
390                if state.session_id != builder.session_id {
391                    return Err(EmbedError::StoreSessionMismatch {
392                        loaded: state.session_id,
393                        requested: builder.session_id.clone(),
394                    });
395                }
396                let recorded_provider_id = state.policy.recorded_provider_id().to_string();
397                state.policy = policy.clone();
398                state.policy.provider_id = recorded_provider_id;
399                state
400            }
401        };
402        apply_rlm_session_options(
403            builder.parent_session_id.is_none(),
404            rlm_final_answer_format,
405            &mut state,
406        )?;
407        builder.open_resolved(policy, state, store).await
408    }
409}
410
411#[cfg(feature = "rlm")]
412impl PromptLayerSink for RlmSessionBuilder {
413    fn prompt_layer_mut(&mut self) -> &mut PromptLayer {
414        self.builder.prompt_layer_mut()
415    }
416}
417
418#[cfg(feature = "rlm")]
419#[allow(clippy::large_enum_variant)]
420enum RlmOpenState {
421    Resume,
422    Fresh,
423    Explicit(RuntimeSessionState),
424}
425
426#[cfg(feature = "rlm")]
427fn apply_rlm_session_options(
428    is_root_session: bool,
429    explicit_format: Option<lash_rlm_types::RlmFinalAnswerFormat>,
430    state: &mut RuntimeSessionState,
431) -> Result<()> {
432    let final_answer_format = explicit_format.unwrap_or({
433        if is_root_session {
434            lash_rlm_types::RlmFinalAnswerFormat::Markdown
435        } else {
436            lash_rlm_types::RlmFinalAnswerFormat::RawSubmitValue
437        }
438    });
439    let mut extras = if state.protocol_turn_options.is_empty() {
440        lash_rlm_types::RlmCreateExtras::default()
441    } else {
442        state.protocol_turn_options.decode()?
443    };
444    extras.final_answer_format = Some(final_answer_format);
445    let options = ProtocolTurnOptions::typed(extras)?;
446    state.protocol_turn_options = options.clone();
447    for frame in &mut state.agent_frames {
448        frame.protocol_turn_options = options.clone();
449    }
450    Ok(())
451}
452
453#[cfg(all(test, feature = "rlm"))]
454mod tests {
455    use super::*;
456
457    #[test]
458    fn apply_rlm_session_options_preserves_existing_termination() -> Result<()> {
459        let mut state = RuntimeSessionState {
460            protocol_turn_options: ProtocolTurnOptions::typed(lash_rlm_types::RlmCreateExtras {
461                termination: lash_rlm_types::RlmTermination::ProseOrSubmit,
462                final_answer_format: None,
463            })?,
464            ..Default::default()
465        };
466
467        apply_rlm_session_options(true, None, &mut state)?;
468
469        let extras: lash_rlm_types::RlmCreateExtras = state.protocol_turn_options.decode()?;
470        assert_eq!(
471            extras.termination,
472            lash_rlm_types::RlmTermination::ProseOrSubmit
473        );
474        assert_eq!(
475            extras.final_answer_format,
476            Some(lash_rlm_types::RlmFinalAnswerFormat::Markdown)
477        );
478        Ok(())
479    }
480}
481
482#[derive(Clone)]
483pub struct LashSession {
484    pub(crate) runtime: RuntimeHandle,
485    pub(crate) effect_host: Arc<dyn EffectHost>,
486    pub(crate) parent_session_id: Option<String>,
487    pub(crate) active_plugins: Vec<ActivePluginBinding>,
488    pub(crate) process_phase_probe_slot: Option<lash_core::runtime::RuntimeTurnPhaseProbeSlot>,
489    pub(crate) turn_cancels: crate::turn::TurnCancelRegistry,
490}
491
492#[derive(Clone, Debug, Default)]
493pub struct SessionConfigPatch {
494    pub provider: Option<ProviderHandle>,
495    pub model: Option<ModelSpec>,
496    pub prompt: Option<PromptLayer>,
497}
498
499impl LashSession {
500    pub async fn close(self) -> Result<()> {
501        let runtime = self.runtime.writer();
502        let runtime = runtime.lock().await;
503        runtime.unregister_plugin_session()?;
504        Ok(())
505    }
506
507    pub fn session_id(&self) -> String {
508        self.runtime.observe().session_id().to_string()
509    }
510
511    pub fn policy_snapshot(&self) -> SessionPolicy {
512        self.runtime.observe().policy.clone()
513    }
514
515    pub fn observe(&self) -> ObservableSession {
516        ObservableSession {
517            runtime: self.runtime.clone(),
518        }
519    }
520
521    pub fn parent_session_id(&self) -> Option<&str> {
522        self.parent_session_id.as_deref()
523    }
524
525    pub fn effect_host(&self) -> Arc<dyn EffectHost> {
526        Arc::clone(&self.effect_host)
527    }
528
529    pub fn turn(&self, input: TurnInput) -> TurnBuilder {
530        TurnBuilder {
531            runtime: self.runtime.clone(),
532            effect_host: Arc::clone(&self.effect_host),
533            active_plugins: self.active_plugins.clone(),
534            input,
535            cancel: CancellationToken::new(),
536            cancels: self.turn_cancels.clone(),
537            protocol_turn_options: None,
538            provider: None,
539            model: None,
540            turn_id: None,
541        }
542    }
543
544    pub fn queued_turn(&self) -> QueuedTurnBuilder {
545        QueuedTurnBuilder {
546            runtime: self.runtime.clone(),
547            effect_host: Arc::clone(&self.effect_host),
548            cancel: CancellationToken::new(),
549            cancels: self.turn_cancels.clone(),
550            batch_ids: Vec::new(),
551            drain_id: None,
552        }
553    }
554
555    /// Cancel every turn currently executing through this opened session
556    /// (including its clones) and report how many were signalled.
557    ///
558    /// This is the affordance behind a UI "stop" control: hold a clone of the
559    /// session wherever the stop arrives and call this, instead of threading a
560    /// [`CancellationToken`](crate::CancellationToken) into every turn call
561    /// ([`TurnBuilder::cancel`](crate::TurnBuilder::cancel) remains the
562    /// per-turn hook when you need one). A cancelled turn finishes with
563    /// `TurnOutcome::Stopped(TurnStop::Cancelled)` and commits like any other
564    /// turn; the session stays usable.
565    ///
566    /// Scope: turns started from this `LashSession` instance and its clones.
567    /// A handle opened separately for the same session id has its own
568    /// registry and is not reached.
569    pub fn cancel_running_turns(&self) -> usize {
570        self.turn_cancels.cancel_all()
571    }
572
573    pub fn admin(&self) -> SessionAdmin {
574        SessionAdmin {
575            runtime: self.runtime.clone(),
576        }
577    }
578
579    pub async fn configure(&self, patch: SessionConfigPatch) -> Result<()> {
580        self.admin().config().update(patch).await
581    }
582
583    pub fn tools(&self) -> ToolAdmin {
584        ToolAdmin::new(self.admin())
585    }
586
587    pub fn commands(&self) -> SessionCommandAdmin {
588        self.admin().commands()
589    }
590
591    pub fn triggers(&self) -> SessionTriggerAdmin {
592        self.admin().triggers()
593    }
594
595    pub fn processes(&self) -> SessionProcessAdmin {
596        SessionProcessAdmin::new(self.admin())
597    }
598
599    pub fn plugin_operations(&self) -> PluginOperations {
600        PluginOperations {
601            control: self.admin(),
602        }
603    }
604
605    pub fn enqueue(&self, input: TurnInput) -> EnqueueTurnBuilder<'_> {
606        EnqueueTurnBuilder {
607            session: self,
608            input,
609            id: None,
610            delivery_policy: DeliveryPolicy::AfterCurrentTurnCommit,
611            slot_policy: SlotPolicy::Exclusive,
612        }
613    }
614
615    pub async fn queued_work(&self) -> Result<Vec<QueuedWorkBatch>> {
616        let observation = self.runtime.observe();
617        let store = observation.queue_store.as_ref().ok_or_else(|| {
618            EmbedError::Runtime(lash_core::RuntimeError::new(
619                lash_core::RuntimeErrorCode::StoreCommitFailed,
620                "queued work inspection requires a persistent runtime store",
621            ))
622        })?;
623        store
624            .list_pending_queued_work(observation.session_id())
625            .await
626            .map_err(|err| {
627                EmbedError::Runtime(lash_core::RuntimeError::new(
628                    lash_core::RuntimeErrorCode::StoreCommitFailed,
629                    err.to_string(),
630                ))
631            })
632    }
633
634    pub async fn cancel_queued_work_batch(
635        &self,
636        batch_id: &str,
637    ) -> Result<Option<QueuedWorkBatch>> {
638        let session_id = self.session_id();
639        self.runtime
640            .cancel_queued_work_batch(&session_id, batch_id)
641            .await
642            .map_err(EmbedError::Runtime)
643    }
644
645    /// Resolve once `batch_id` is no longer pending in the queue store —
646    /// drained by whoever runs queued work (a queued-work runner, a durable
647    /// worker, or another handle's [`queued_turn`](Self::queued_turn)) or
648    /// cancelled. This is the enqueue-and-observe side of the queue: the
649    /// caller never claims the work itself.
650    ///
651    /// Completion is read from the persistent queue store, so it observes
652    /// drains performed by other session handles and other processes alike.
653    /// There is no built-in deadline — nothing resolves if nothing drains the
654    /// queue, so bound it with `tokio::time::timeout` when the worker may be
655    /// unavailable. A batch id the store has never seen resolves immediately.
656    pub async fn await_queued_work_batch(&self, batch_id: &str) -> Result<()> {
657        let observation = self.runtime.observe();
658        let store = observation.queue_store.clone().ok_or_else(|| {
659            EmbedError::Runtime(lash_core::RuntimeError::new(
660                lash_core::RuntimeErrorCode::StoreCommitFailed,
661                "queued work inspection requires a persistent runtime store",
662            ))
663        })?;
664        let session_id = observation.session_id().to_string();
665        drop(observation);
666        let mut delay = std::time::Duration::from_millis(25);
667        loop {
668            let pending = store
669                .list_pending_queued_work(&session_id)
670                .await
671                .map_err(|err| {
672                    EmbedError::Runtime(lash_core::RuntimeError::new(
673                        lash_core::RuntimeErrorCode::StoreCommitFailed,
674                        err.to_string(),
675                    ))
676                })?;
677            if !pending.iter().any(|batch| batch.batch_id == batch_id) {
678                return Ok(());
679            }
680            tokio::time::sleep(delay).await;
681            delay = (delay * 2).min(std::time::Duration::from_millis(400));
682        }
683    }
684
685    pub fn read_view(&self) -> SessionReadView {
686        self.runtime.observe().read_view.clone()
687    }
688
689    pub fn usage_report(&self) -> SessionUsageReport {
690        self.runtime.observe().usage_report.clone()
691    }
692
693    pub async fn set_turn_phase_probe(
694        &self,
695        probe: Arc<dyn lash_core::runtime::RuntimeTurnPhaseProbe>,
696    ) {
697        let writer = self.runtime.writer();
698        let mut runtime = writer.lock().await;
699        runtime.set_turn_phase_probe(Arc::clone(&probe));
700        self.runtime.publish_from(&runtime);
701        if let Some(slot) = &self.process_phase_probe_slot {
702            let observation = self.runtime.observe();
703            slot.set_for_session(observation.session_id(), Arc::clone(&probe));
704            let current_frame = observation.persisted_state.current_agent_frame_id.as_str();
705            if !current_frame.is_empty() {
706                let scope = lash_core::SessionScope::for_agent_frame(
707                    observation.session_id(),
708                    current_frame,
709                );
710                slot.set_for_scope(&scope, probe);
711            }
712        }
713    }
714}
715
716#[derive(Clone)]
717pub struct ObservableSession {
718    pub(crate) runtime: RuntimeHandle,
719}
720
721impl ObservableSession {
722    fn snapshot(&self) -> Arc<RuntimeObservation> {
723        self.runtime.observe()
724    }
725
726    pub fn current_observation(&self) -> SessionObservation {
727        self.runtime.current_session_observation()
728    }
729
730    pub fn current_remote_observation(&self) -> RemoteSessionObservation {
731        RemoteSessionObservation::from_core(self.current_observation())
732    }
733
734    pub fn resume_from_cursor(&self, cursor: &SessionCursor) -> Result<SessionResume> {
735        self.runtime
736            .resume_session_observation(cursor)
737            .map_err(live_replay_error)
738    }
739
740    pub fn subscribe_from_cursor(
741        &self,
742        cursor: &SessionCursor,
743    ) -> Result<SessionObservationSubscription> {
744        self.runtime
745            .subscribe_session_observation(cursor)
746            .map_err(live_replay_error)
747    }
748
749    pub fn subscribe_from_remote_cursor(
750        &self,
751        cursor: &RemoteSessionCursor,
752    ) -> Result<RemoteSessionObservationSubscription> {
753        cursor.validate()?;
754        let cursor = lash_core::SessionCursor::try_from(cursor.clone())?;
755        match self.subscribe_from_cursor(&cursor)? {
756            SessionObservationSubscription::Subscribed(subscription) => {
757                Ok(RemoteSessionObservationSubscription::Subscribed(
758                    RemoteSessionObservationEventStream::new(subscription),
759                ))
760            }
761            SessionObservationSubscription::Gap { observation, gap } => {
762                Ok(RemoteSessionObservationSubscription::Gap {
763                    observation: observation.into(),
764                    gap: gap.into(),
765                })
766            }
767        }
768    }
769
770    /// Subscribe to session observation events and keep the subscription alive
771    /// across recoverable live-replay gaps.
772    ///
773    /// The returned stream yields [`SessionObservationStreamItem::Gap`] when
774    /// the cursor missed the bounded replay window. Callers should replace
775    /// their UI/projection from the included fresh observation, persist
776    /// `gap.latest_cursor`, and keep polling the same stream; it resubscribes
777    /// from that cursor internally.
778    pub fn subscribe_and_recover(&self, cursor: SessionCursor) -> SessionObservationStream {
779        SessionObservationStream {
780            observable: self.clone(),
781            cursor,
782            subscription: None,
783            done: false,
784        }
785    }
786
787    /// Subscribe to remote DTO session observation events and keep the
788    /// subscription alive across recoverable live-replay gaps.
789    pub fn subscribe_and_recover_remote(
790        &self,
791        cursor: RemoteSessionCursor,
792    ) -> Result<RemoteSessionObservationStream> {
793        cursor.validate()?;
794        let cursor = lash_core::SessionCursor::try_from(cursor)?;
795        Ok(RemoteSessionObservationStream {
796            inner: self.subscribe_and_recover(cursor),
797            next_sequence: 0,
798        })
799    }
800
801    pub fn session_id(&self) -> String {
802        self.snapshot().session_id().to_string()
803    }
804
805    pub fn policy_snapshot(&self) -> SessionPolicy {
806        self.snapshot().policy.clone()
807    }
808
809    pub fn read_view(&self) -> SessionReadView {
810        self.snapshot().read_view.clone()
811    }
812
813    pub fn usage_report(&self) -> SessionUsageReport {
814        self.snapshot().usage_report.clone()
815    }
816
817    pub fn tool_state(&self) -> Option<ToolState> {
818        self.snapshot().tool_state.clone()
819    }
820
821    pub fn active_tool_manifests(&self) -> Vec<ToolManifest> {
822        self.snapshot()
823            .tool_state
824            .as_ref()
825            .map(ToolState::tool_manifests)
826            .unwrap_or_default()
827    }
828
829    pub async fn list_process_handles(&self) -> Vec<ProcessHandleSummary> {
830        self.snapshot().list_process_handles().await
831    }
832
833    pub async fn list_all_process_handles(&self) -> Vec<ProcessHandleSummary> {
834        self.snapshot().list_all_process_handles().await
835    }
836
837    pub fn process_scope(&self) -> SessionScope {
838        self.snapshot().process_scope()
839    }
840}
841
842#[derive(Clone, Debug)]
843pub enum SessionObservationStreamItem {
844    /// A replayed or live session observation event.
845    Event(SessionObservationEvent),
846    /// A recoverable replay gap with a fresh durable observation.
847    Gap {
848        observation: SessionObservation,
849        gap: LiveReplayGap,
850    },
851}
852
853pub enum RemoteSessionObservationSubscription {
854    Subscribed(RemoteSessionObservationEventStream),
855    Gap {
856        observation: RemoteSessionObservation,
857        gap: RemoteLiveReplayGap,
858    },
859}
860
861#[derive(Clone, Debug)]
862pub enum RemoteSessionObservationStreamItem {
863    /// A replayed or live session observation event encoded as remote DTOs.
864    Event(RemoteSessionObservationEvent),
865    /// A recoverable replay gap with a fresh remote observation snapshot.
866    Gap {
867        observation: RemoteSessionObservation,
868        gap: RemoteLiveReplayGap,
869    },
870}
871
872pub struct RemoteSessionObservationEventStream {
873    inner: lash_core::LiveReplaySubscription,
874    next_sequence: u64,
875}
876
877impl RemoteSessionObservationEventStream {
878    fn new(inner: lash_core::LiveReplaySubscription) -> Self {
879        Self {
880            inner,
881            next_sequence: 0,
882        }
883    }
884
885    pub async fn next_event(&mut self) -> Result<RemoteSessionObservationEvent> {
886        futures_util::future::poll_fn(|cx| Pin::new(&mut *self).poll_next(cx))
887            .await
888            .transpose()?
889            .ok_or_else(|| live_replay_error(LiveReplayStoreError::Closed))
890    }
891}
892
893impl Stream for RemoteSessionObservationEventStream {
894    type Item = Result<RemoteSessionObservationEvent>;
895
896    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
897        match Pin::new(&mut self.inner).poll_next(cx) {
898            Poll::Pending => Poll::Pending,
899            Poll::Ready(Some(Ok(event))) => {
900                let remote = RemoteSessionObservationEvent::from_core(self.next_sequence, event);
901                self.next_sequence = self.next_sequence.saturating_add(1);
902                Poll::Ready(Some(Ok(remote)))
903            }
904            Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(live_replay_error(err)))),
905            Poll::Ready(None) => Poll::Ready(None),
906        }
907    }
908}
909
910/// Remote DTO stream returned by [`ObservableSession::subscribe_and_recover_remote`].
911pub struct RemoteSessionObservationStream {
912    inner: SessionObservationStream,
913    next_sequence: u64,
914}
915
916impl RemoteSessionObservationStream {
917    pub fn cursor(&self) -> RemoteSessionCursor {
918        RemoteSessionCursor::from(self.inner.cursor())
919    }
920}
921
922impl Stream for RemoteSessionObservationStream {
923    type Item = Result<RemoteSessionObservationStreamItem>;
924
925    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
926        match Pin::new(&mut self.inner).poll_next(cx) {
927            Poll::Pending => Poll::Pending,
928            Poll::Ready(Some(Ok(SessionObservationStreamItem::Event(event)))) => {
929                let remote = RemoteSessionObservationEvent::from_core(self.next_sequence, event);
930                self.next_sequence = self.next_sequence.saturating_add(1);
931                Poll::Ready(Some(Ok(RemoteSessionObservationStreamItem::Event(remote))))
932            }
933            Poll::Ready(Some(Ok(SessionObservationStreamItem::Gap { observation, gap }))) => {
934                Poll::Ready(Some(Ok(RemoteSessionObservationStreamItem::Gap {
935                    observation: observation.into(),
936                    gap: gap.into(),
937                })))
938            }
939            Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err))),
940            Poll::Ready(None) => Poll::Ready(None),
941        }
942    }
943}
944
945/// Stream returned by [`ObservableSession::subscribe_and_recover`].
946pub struct SessionObservationStream {
947    observable: ObservableSession,
948    cursor: SessionCursor,
949    subscription: Option<lash_core::LiveReplaySubscription>,
950    done: bool,
951}
952
953impl SessionObservationStream {
954    pub fn cursor(&self) -> &SessionCursor {
955        &self.cursor
956    }
957}
958
959impl Stream for SessionObservationStream {
960    type Item = Result<SessionObservationStreamItem>;
961
962    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
963        loop {
964            if self.done {
965                return Poll::Ready(None);
966            }
967            if self.subscription.is_none() {
968                match self.observable.subscribe_from_cursor(&self.cursor) {
969                    Ok(SessionObservationSubscription::Subscribed(subscription)) => {
970                        self.subscription = Some(subscription);
971                    }
972                    Ok(SessionObservationSubscription::Gap { observation, gap }) => {
973                        self.cursor = gap.latest_cursor.clone();
974                        return Poll::Ready(Some(Ok(SessionObservationStreamItem::Gap {
975                            observation,
976                            gap,
977                        })));
978                    }
979                    Err(err) => {
980                        self.done = true;
981                        return Poll::Ready(Some(Err(err)));
982                    }
983                }
984            }
985
986            let Some(subscription) = self.subscription.as_mut() else {
987                continue;
988            };
989            match Pin::new(subscription).poll_next(cx) {
990                Poll::Pending => return Poll::Pending,
991                Poll::Ready(Some(Ok(event))) => {
992                    self.cursor = event.cursor.clone();
993                    return Poll::Ready(Some(Ok(SessionObservationStreamItem::Event(event))));
994                }
995                Poll::Ready(Some(Err(LiveReplayStoreError::SubscriberLagged(_)))) => {
996                    self.subscription = None;
997                    continue;
998                }
999                Poll::Ready(Some(Err(err))) => {
1000                    self.done = true;
1001                    return Poll::Ready(Some(Err(live_replay_error(err))));
1002                }
1003                Poll::Ready(None) => {
1004                    self.done = true;
1005                    return Poll::Ready(None);
1006                }
1007            }
1008        }
1009    }
1010}
1011
1012fn live_replay_error(err: lash_core::LiveReplayStoreError) -> EmbedError {
1013    EmbedError::Runtime(lash_core::RuntimeError::new(
1014        RuntimeErrorCode::Other("live_replay".to_string()),
1015        err.to_string(),
1016    ))
1017}
1018
1019pub struct EnqueueTurnBuilder<'a> {
1020    session: &'a LashSession,
1021    input: TurnInput,
1022    id: Option<String>,
1023    delivery_policy: DeliveryPolicy,
1024    slot_policy: SlotPolicy,
1025}
1026
1027impl<'a> EnqueueTurnBuilder<'a> {
1028    pub fn id(mut self, id: impl Into<String>) -> Self {
1029        self.id = Some(id.into());
1030        self
1031    }
1032
1033    pub fn delivery_policy(mut self, policy: DeliveryPolicy) -> Self {
1034        self.delivery_policy = policy;
1035        self
1036    }
1037
1038    pub fn slot_policy(mut self, policy: SlotPolicy) -> Self {
1039        self.slot_policy = policy;
1040        self
1041    }
1042
1043    pub async fn send(self) -> Result<QueuedWorkBatch> {
1044        let source_key = self.id.map(|id| format!("host:{id}"));
1045        self.session
1046            .runtime
1047            .enqueue_turn_input(
1048                self.input,
1049                self.delivery_policy,
1050                self.slot_policy,
1051                source_key,
1052            )
1053            .await
1054            .map_err(EmbedError::Runtime)
1055    }
1056}
1057
1058impl<'a> std::future::IntoFuture for EnqueueTurnBuilder<'a> {
1059    type Output = Result<QueuedWorkBatch>;
1060    type IntoFuture =
1061        std::pin::Pin<Box<dyn std::future::Future<Output = Result<QueuedWorkBatch>> + 'a>>;
1062
1063    fn into_future(self) -> Self::IntoFuture {
1064        Box::pin(self.send())
1065    }
1066}