Skip to main content

lash/
session.rs

1use crate::support::*;
2use lash_core::runtime::{DeliveryPolicy, QueuedWorkBatch, SlotPolicy};
3
4pub struct SessionBuilder {
5    pub(crate) core: LashCore,
6    pub(crate) session_id: String,
7    pub(crate) spec: SessionSpec,
8    pub(crate) mode: Option<ModeId>,
9    pub(crate) parent_session_id: Option<String>,
10    pub(crate) store: Option<Arc<dyn RuntimePersistence>>,
11    pub(crate) provider: Option<ProviderHandle>,
12    pub(crate) active_plugins: Vec<ActivePluginBinding>,
13    pub(crate) plugin_factories: Vec<Arc<dyn PluginFactory>>,
14    pub(crate) rlm_final_answer_format: Option<lash_rlm_types::RlmFinalAnswerFormat>,
15}
16
17impl SessionBuilder {
18    pub fn standard(mut self) -> Self {
19        self.mode = Some(ModeId::standard());
20        self
21    }
22
23    pub fn rlm(mut self) -> Self {
24        self.mode = Some(ModeId::rlm());
25        self
26    }
27
28    pub fn mode(mut self, mode: ModeId) -> Self {
29        self.mode = Some(mode);
30        self
31    }
32
33    pub fn provider(mut self, provider: ProviderHandle) -> Self {
34        self.spec = self.spec.provider_id(provider.kind());
35        self.provider = Some(provider);
36        self
37    }
38
39    pub fn session_spec(mut self, spec: SessionSpec) -> Self {
40        self.spec = spec;
41        self
42    }
43
44    pub fn parent(mut self, parent_session_id: impl Into<String>) -> Self {
45        self.parent_session_id = Some(parent_session_id.into());
46        self
47    }
48
49    /// Use a specific persistence store for this root session.
50    ///
51    /// This is the right API for a host-owned, pre-opened session database.
52    /// Managed child sessions never reuse this store; configure
53    /// `LashCoreBuilder::child_store_factory` when child sessions should also
54    /// persist.
55    pub fn store(mut self, store: Arc<dyn RuntimePersistence>) -> Self {
56        self.store = Some(store);
57        self
58    }
59
60    pub fn plugin<P: PluginBinding>(mut self, config: P::SessionConfig) -> Self {
61        self.active_plugins.push(ActivePluginBinding {
62            id: P::ID,
63            requires_turn_input: P::requires_turn_input(&config),
64        });
65        self.plugin_factories.push(P::factory(&config));
66        self
67    }
68
69    pub async fn open(self) -> Result<LashSession> {
70        let (policy, mode) = self.session_policy()?;
71        let store = self.create_store(&policy).await?;
72        let mut state = self
73            .load_or_default_state(&policy, store.as_deref())
74            .await?;
75        self.apply_rlm_session_options(&mode, &mut state)?;
76        self.open_resolved(policy, mode, state, store).await
77    }
78
79    /// Open this session with a fresh resident graph, ignoring any persisted
80    /// session graph/checkpoint state that may already exist for the same
81    /// session id.
82    ///
83    /// The next successful commit writes a full replacement graph, so normal
84    /// embedders can use this to start over without manually calling
85    /// `load_persisted_session_state` or constructing a `RuntimeSessionState`.
86    /// Use [`Self::open`] for resume and [`Self::open_with_state`] only when
87    /// restoring explicit host-owned state.
88    pub async fn open_fresh(self) -> Result<LashSession> {
89        let (policy, mode) = self.session_policy()?;
90        let store = self.create_store(&policy).await?;
91        let mut state = RuntimeSessionState {
92            session_id: self.session_id.clone(),
93            policy: policy.clone(),
94            graph_replace_required: true,
95            ..RuntimeSessionState::default()
96        };
97        self.apply_rlm_session_options(&mode, &mut state)?;
98        self.open_resolved(policy, mode, state, store).await
99    }
100
101    /// Open with an explicitly supplied runtime state.
102    ///
103    /// This is for advanced hosts that already own a complete state snapshot.
104    /// Normal embedders should use [`Self::open`] to resume according to Lash's
105    /// residency policy or [`Self::open_fresh`] to start over and replace prior
106    /// persisted state on the next commit.
107    pub async fn open_with_state(self, mut state: RuntimeSessionState) -> Result<LashSession> {
108        let (policy, mode) = self.session_policy()?;
109        let store = self.create_store(&policy).await?;
110        if state.session_id != self.session_id {
111            return Err(EmbedError::StoreSessionMismatch {
112                loaded: state.session_id,
113                requested: self.session_id,
114            });
115        }
116        let recorded_provider_id = state.policy.recorded_provider_id().to_string();
117        state.policy = policy.clone();
118        state.policy.provider_id = recorded_provider_id;
119        self.apply_rlm_session_options(&mode, &mut state)?;
120        self.open_resolved(policy, mode, state, store).await
121    }
122
123    fn session_policy(&self) -> Result<(SessionPolicy, ModeId)> {
124        let mode = self
125            .mode
126            .clone()
127            .unwrap_or_else(|| self.core.default_mode.clone());
128        if !self.core.modes.contains_key(&mode) {
129            return Err(EmbedError::ModeNotInstalled { mode });
130        }
131        let mut policy = self.spec.resolve_against(&self.core.policy);
132        policy.session_id = Some(self.session_id.clone());
133        Ok((policy, mode))
134    }
135
136    async fn load_or_default_state(
137        &self,
138        policy: &SessionPolicy,
139        store: Option<&dyn RuntimePersistence>,
140    ) -> Result<RuntimeSessionState> {
141        let state = match store {
142            Some(store) => {
143                let loaded = self.load_persisted_state_for_residency(store).await?;
144                let mut state = loaded.unwrap_or_else(|| RuntimeSessionState {
145                    session_id: self.session_id.clone(),
146                    policy: policy.clone(),
147                    ..RuntimeSessionState::default()
148                });
149                if state.session_id != self.session_id {
150                    return Err(EmbedError::StoreSessionMismatch {
151                        loaded: state.session_id,
152                        requested: self.session_id.clone(),
153                    });
154                }
155                let recorded_provider_id = state.policy.recorded_provider_id().to_string();
156                state.policy = policy.clone();
157                state.policy.provider_id = recorded_provider_id;
158                state
159            }
160            None => RuntimeSessionState {
161                session_id: self.session_id.clone(),
162                policy: policy.clone(),
163                ..RuntimeSessionState::default()
164            },
165        };
166        Ok(state)
167    }
168
169    async fn load_persisted_state_for_residency(
170        &self,
171        store: &dyn RuntimePersistence,
172    ) -> Result<Option<RuntimeSessionState>> {
173        match self.core.env.residency {
174            Residency::KeepAll => {
175                let loaded = lash_core::store::load_persisted_session_state(store)
176                    .await
177                    .map_err(|err| {
178                        SessionError::Protocol(format!("failed to load store: {err}"))
179                    })?;
180                Ok(loaded)
181            }
182            Residency::ActivePathOnly => {
183                let active =
184                    lash_core::store::load_persisted_session_state_active_path(store, None)
185                        .await
186                        .map_err(|err| {
187                            SessionError::Protocol(format!(
188                                "failed to load active-path store: {err}"
189                            ))
190                        })?;
191                if active
192                    .as_ref()
193                    .is_some_and(|state| state.session_graph.nodes.is_empty())
194                {
195                    let mut full = lash_core::store::load_persisted_session_state(store)
196                        .await
197                        .map_err(|err| {
198                            SessionError::Protocol(format!(
199                                "failed to heal active-path store from full graph: {err}"
200                            ))
201                        })?;
202                    if let Some(state) = full.as_mut() {
203                        state.graph_replace_required = true;
204                    }
205                    return Ok(full);
206                }
207                Ok(active)
208            }
209        }
210    }
211
212    fn apply_rlm_session_options(
213        &self,
214        mode: &ModeId,
215        state: &mut RuntimeSessionState,
216    ) -> Result<()> {
217        let Some(final_answer_format) = self.rlm_session_final_answer_format(mode) else {
218            return Ok(());
219        };
220        let mut extras = if state.protocol_turn_options.is_empty() {
221            lash_rlm_types::RlmCreateExtras::default()
222        } else {
223            state.protocol_turn_options.decode()?
224        };
225        extras.final_answer_format = Some(final_answer_format);
226        let options = ProtocolTurnOptions::typed(extras)?;
227        state.protocol_turn_options = options.clone();
228        for frame in &mut state.agent_frames {
229            frame.protocol_turn_options = options.clone();
230        }
231        Ok(())
232    }
233
234    fn rlm_session_final_answer_format(
235        &self,
236        mode: &ModeId,
237    ) -> Option<lash_rlm_types::RlmFinalAnswerFormat> {
238        if mode != &ModeId::rlm() {
239            return None;
240        }
241        self.rlm_final_answer_format.clone().or_else(|| {
242            if self.parent_session_id.is_none() {
243                Some(lash_rlm_types::RlmFinalAnswerFormat::Markdown)
244            } else {
245                Some(lash_rlm_types::RlmFinalAnswerFormat::RawSubmitValue)
246            }
247        })
248    }
249
250    async fn open_resolved(
251        self,
252        policy: SessionPolicy,
253        mode: ModeId,
254        state: RuntimeSessionState,
255        store: Option<Arc<dyn RuntimePersistence>>,
256    ) -> Result<LashSession> {
257        let mut env = self.core.env.clone();
258        if let Some(provider) = self.provider.clone().or_else(|| self.core.provider.clone()) {
259            env.core.providers.provider_resolver =
260                Arc::new(lash_core::SingleProviderResolver::new(provider));
261        }
262        let plugin_host = build_plugin_host_for_mode(
263            &self.core.modes,
264            &mode,
265            self.core.plugin_factories.as_ref(),
266            self.plugin_factories,
267            env.process_registry.is_some(),
268        )?;
269        env.plugin_host = Some(Arc::new(plugin_host));
270        let effect_host = Arc::clone(&env.core.control.effect_host);
271        // Lazily spawn the default process work runner (Decision 3: deferred to
272        // the first open so a tokio runtime is guaranteed; idempotent via the
273        // shared once-guard) and thread its poke onto this session's host so the
274        // process admin seam can wake the runner after a successful start.
275        env.process_work_poke = self.core.process_work_runner.poke().await;
276        let runtime = LashRuntime::from_environment(&env, policy, state, store).await?;
277        let handle = RuntimeHandle::with_live_replay_store(
278            runtime,
279            Arc::clone(&self.core.live_replay_store),
280        );
281        Ok(LashSession {
282            runtime: handle,
283            effect_host,
284            mode,
285            parent_session_id: self.parent_session_id,
286            active_plugins: self.active_plugins,
287            process_phase_probe_slot: self.core.process_work_runner.phase_probe_slot(),
288            turn_cancels: crate::turn::TurnCancelRegistry::default(),
289        })
290    }
291
292    async fn create_store(
293        &self,
294        policy: &SessionPolicy,
295    ) -> Result<Option<Arc<dyn RuntimePersistence>>> {
296        if let Some(store) = self.store.as_ref() {
297            return Ok(Some(Arc::clone(store)));
298        }
299        let Some(factory) = self.core.store_factory.as_ref() else {
300            return Ok(None);
301        };
302        let request = SessionStoreCreateRequest {
303            session_id: self.session_id.clone(),
304            relation: self
305                .parent_session_id
306                .as_ref()
307                .map(|parent_session_id| lash_core::SessionRelation::Child {
308                    parent_session_id: parent_session_id.clone(),
309                    caused_by: None,
310                })
311                .unwrap_or_default(),
312            policy: policy.clone(),
313        };
314        factory
315            .create_store(&request)
316            .await
317            .map(Some)
318            .map_err(|message| EmbedError::StoreFactory {
319                session_id: self.session_id.clone(),
320                message,
321            })
322    }
323}
324
325impl PromptLayerSink for SessionBuilder {
326    fn prompt_layer_mut(&mut self) -> &mut PromptLayer {
327        self.spec.prompt.get_or_insert_with(PromptLayer::new)
328    }
329}
330
331#[derive(Clone)]
332pub struct LashSession {
333    pub(crate) runtime: RuntimeHandle,
334    pub(crate) effect_host: Arc<dyn EffectHost>,
335    pub(crate) mode: ModeId,
336    pub(crate) parent_session_id: Option<String>,
337    pub(crate) active_plugins: Vec<ActivePluginBinding>,
338    pub(crate) process_phase_probe_slot: Option<lash_core::runtime::RuntimeTurnPhaseProbeSlot>,
339    pub(crate) turn_cancels: crate::turn::TurnCancelRegistry,
340}
341
342#[derive(Clone, Debug, Default)]
343pub struct SessionConfigPatch {
344    pub provider: Option<ProviderHandle>,
345    pub model: Option<ModelSpec>,
346    pub prompt: Option<PromptLayer>,
347}
348
349impl LashSession {
350    pub async fn close(self) -> Result<()> {
351        let runtime = self.runtime.writer();
352        let runtime = runtime.lock().await;
353        runtime.unregister_plugin_session()?;
354        Ok(())
355    }
356
357    pub fn session_id(&self) -> String {
358        self.runtime.observe().session_id().to_string()
359    }
360
361    pub fn policy_snapshot(&self) -> SessionPolicy {
362        self.runtime.observe().policy.clone()
363    }
364
365    pub fn observe(&self) -> ObservableSession {
366        ObservableSession {
367            runtime: self.runtime.clone(),
368        }
369    }
370
371    pub fn mode(&self) -> &ModeId {
372        &self.mode
373    }
374
375    pub fn parent_session_id(&self) -> Option<&str> {
376        self.parent_session_id.as_deref()
377    }
378
379    pub fn effect_host(&self) -> Arc<dyn EffectHost> {
380        Arc::clone(&self.effect_host)
381    }
382
383    pub fn turn(&self, input: TurnInput) -> TurnBuilder {
384        TurnBuilder {
385            runtime: self.runtime.clone(),
386            effect_host: Arc::clone(&self.effect_host),
387            active_plugins: self.active_plugins.clone(),
388            input,
389            cancel: CancellationToken::new(),
390            cancels: self.turn_cancels.clone(),
391            protocol_turn_options: None,
392            provider: None,
393            model: None,
394            turn_id: None,
395        }
396    }
397
398    pub fn queued_turn(&self) -> QueuedTurnBuilder {
399        QueuedTurnBuilder {
400            runtime: self.runtime.clone(),
401            effect_host: Arc::clone(&self.effect_host),
402            cancel: CancellationToken::new(),
403            cancels: self.turn_cancels.clone(),
404            batch_ids: Vec::new(),
405            drain_id: None,
406        }
407    }
408
409    /// Cancel every turn currently executing through this opened session
410    /// (including its clones) and report how many were signalled.
411    ///
412    /// This is the affordance behind a UI "stop" control: hold a clone of the
413    /// session wherever the stop arrives and call this, instead of threading a
414    /// [`CancellationToken`](crate::CancellationToken) into every turn call
415    /// ([`TurnBuilder::cancel`](crate::TurnBuilder::cancel) remains the
416    /// per-turn hook when you need one). A cancelled turn finishes with
417    /// `TurnOutcome::Stopped(TurnStop::Cancelled)` and commits like any other
418    /// turn; the session stays usable.
419    ///
420    /// Scope: turns started from this `LashSession` instance and its clones.
421    /// A handle opened separately for the same session id has its own
422    /// registry and is not reached.
423    pub fn cancel_running_turns(&self) -> usize {
424        self.turn_cancels.cancel_all()
425    }
426
427    pub fn admin(&self) -> SessionAdmin {
428        SessionAdmin {
429            runtime: self.runtime.clone(),
430        }
431    }
432
433    pub async fn configure(&self, patch: SessionConfigPatch) -> Result<()> {
434        self.admin().config().update(patch).await
435    }
436
437    pub fn tools(&self) -> ToolAdmin {
438        ToolAdmin::new(self.admin())
439    }
440
441    pub fn commands(&self) -> SessionCommandAdmin {
442        self.admin().commands()
443    }
444
445    pub fn triggers(&self) -> SessionTriggerAdmin {
446        self.admin().triggers()
447    }
448
449    pub fn processes(&self) -> SessionProcessAdmin {
450        SessionProcessAdmin::new(self.admin())
451    }
452
453    pub fn plugin_actions(&self) -> PluginActions {
454        PluginActions {
455            control: self.admin(),
456        }
457    }
458
459    pub fn enqueue(&self, input: TurnInput) -> EnqueueTurnBuilder<'_> {
460        EnqueueTurnBuilder {
461            session: self,
462            input,
463            id: None,
464            delivery_policy: DeliveryPolicy::AfterCurrentTurnCommit,
465            slot_policy: SlotPolicy::Exclusive,
466        }
467    }
468
469    pub async fn queued_work(&self) -> Result<Vec<QueuedWorkBatch>> {
470        let observation = self.runtime.observe();
471        let store = observation.queue_store.as_ref().ok_or_else(|| {
472            EmbedError::Runtime(lash_core::RuntimeError::new(
473                lash_core::RuntimeErrorCode::StoreCommitFailed,
474                "queued work inspection requires a persistent runtime store",
475            ))
476        })?;
477        store
478            .list_pending_queued_work(observation.session_id())
479            .await
480            .map_err(|err| {
481                EmbedError::Runtime(lash_core::RuntimeError::new(
482                    lash_core::RuntimeErrorCode::StoreCommitFailed,
483                    err.to_string(),
484                ))
485            })
486    }
487
488    pub async fn cancel_queued_work_batch(
489        &self,
490        batch_id: &str,
491    ) -> Result<Option<QueuedWorkBatch>> {
492        let session_id = self.session_id();
493        self.runtime
494            .cancel_queued_work_batch(&session_id, batch_id)
495            .await
496            .map_err(EmbedError::Runtime)
497    }
498
499    /// Resolve once `batch_id` is no longer pending in the queue store —
500    /// drained by whoever runs queued work (a queued-work runner, a durable
501    /// worker, or another handle's [`queued_turn`](Self::queued_turn)) or
502    /// cancelled. This is the enqueue-and-observe side of the queue: the
503    /// caller never claims the work itself.
504    ///
505    /// Completion is read from the persistent queue store, so it observes
506    /// drains performed by other session handles and other processes alike.
507    /// There is no built-in deadline — nothing resolves if nothing drains the
508    /// queue, so bound it with `tokio::time::timeout` when the worker may be
509    /// unavailable. A batch id the store has never seen resolves immediately.
510    pub async fn await_queued_work_batch(&self, batch_id: &str) -> Result<()> {
511        let observation = self.runtime.observe();
512        let store = observation.queue_store.clone().ok_or_else(|| {
513            EmbedError::Runtime(lash_core::RuntimeError::new(
514                lash_core::RuntimeErrorCode::StoreCommitFailed,
515                "queued work inspection requires a persistent runtime store",
516            ))
517        })?;
518        let session_id = observation.session_id().to_string();
519        drop(observation);
520        let mut delay = std::time::Duration::from_millis(25);
521        loop {
522            let pending = store
523                .list_pending_queued_work(&session_id)
524                .await
525                .map_err(|err| {
526                    EmbedError::Runtime(lash_core::RuntimeError::new(
527                        lash_core::RuntimeErrorCode::StoreCommitFailed,
528                        err.to_string(),
529                    ))
530                })?;
531            if !pending.iter().any(|batch| batch.batch_id == batch_id) {
532                return Ok(());
533            }
534            tokio::time::sleep(delay).await;
535            delay = (delay * 2).min(std::time::Duration::from_millis(400));
536        }
537    }
538
539    pub fn read_view(&self) -> SessionReadView {
540        self.runtime.observe().read_view.clone()
541    }
542
543    pub fn usage_report(&self) -> SessionUsageReport {
544        self.runtime.observe().usage_report.clone()
545    }
546
547    pub async fn set_turn_phase_probe(
548        &self,
549        probe: Arc<dyn lash_core::runtime::RuntimeTurnPhaseProbe>,
550    ) {
551        let writer = self.runtime.writer();
552        let mut runtime = writer.lock().await;
553        runtime.set_turn_phase_probe(Arc::clone(&probe));
554        self.runtime.publish_from(&runtime);
555        if let Some(slot) = &self.process_phase_probe_slot {
556            let observation = self.runtime.observe();
557            slot.set_for_session(observation.session_id(), Arc::clone(&probe));
558            let current_frame = observation.persisted_state.current_agent_frame_id.as_str();
559            if !current_frame.is_empty() {
560                let scope = lash_core::SessionScope::for_agent_frame(
561                    observation.session_id(),
562                    current_frame,
563                );
564                slot.set_for_scope(&scope, probe);
565            }
566        }
567    }
568}
569
570#[derive(Clone)]
571pub struct ObservableSession {
572    pub(crate) runtime: RuntimeHandle,
573}
574
575impl ObservableSession {
576    fn snapshot(&self) -> Arc<RuntimeObservation> {
577        self.runtime.observe()
578    }
579
580    pub fn current_observation(&self) -> SessionObservation {
581        self.runtime.current_session_observation()
582    }
583
584    pub fn resume_from_cursor(&self, cursor: &SessionCursor) -> Result<SessionResume> {
585        self.runtime
586            .resume_session_observation(cursor)
587            .map_err(live_replay_error)
588    }
589
590    pub fn subscribe_from_cursor(
591        &self,
592        cursor: &SessionCursor,
593    ) -> Result<SessionObservationSubscription> {
594        self.runtime
595            .subscribe_session_observation(cursor)
596            .map_err(live_replay_error)
597    }
598
599    pub fn session_id(&self) -> String {
600        self.snapshot().session_id().to_string()
601    }
602
603    pub fn policy_snapshot(&self) -> SessionPolicy {
604        self.snapshot().policy.clone()
605    }
606
607    pub fn read_view(&self) -> SessionReadView {
608        self.snapshot().read_view.clone()
609    }
610
611    pub fn usage_report(&self) -> SessionUsageReport {
612        self.snapshot().usage_report.clone()
613    }
614
615    pub fn tool_state(&self) -> Option<ToolState> {
616        self.snapshot().tool_state.clone()
617    }
618
619    pub fn active_tool_manifests(&self) -> Vec<ToolManifest> {
620        self.snapshot()
621            .tool_state
622            .as_ref()
623            .map(ToolState::tool_manifests)
624            .unwrap_or_default()
625    }
626
627    pub async fn list_process_handles(&self) -> Vec<ProcessHandleSummary> {
628        self.snapshot().list_process_handles().await
629    }
630
631    pub async fn list_all_process_handles(&self) -> Vec<ProcessHandleSummary> {
632        self.snapshot().list_all_process_handles().await
633    }
634
635    pub fn process_scope(&self) -> SessionScope {
636        self.snapshot().process_scope()
637    }
638}
639
640fn live_replay_error(err: lash_core::LiveReplayStoreError) -> EmbedError {
641    EmbedError::Runtime(lash_core::RuntimeError::new(
642        RuntimeErrorCode::Other("live_replay".to_string()),
643        err.to_string(),
644    ))
645}
646
647pub struct EnqueueTurnBuilder<'a> {
648    session: &'a LashSession,
649    input: TurnInput,
650    id: Option<String>,
651    delivery_policy: DeliveryPolicy,
652    slot_policy: SlotPolicy,
653}
654
655impl<'a> EnqueueTurnBuilder<'a> {
656    pub fn id(mut self, id: impl Into<String>) -> Self {
657        self.id = Some(id.into());
658        self
659    }
660
661    pub fn delivery_policy(mut self, policy: DeliveryPolicy) -> Self {
662        self.delivery_policy = policy;
663        self
664    }
665
666    pub fn slot_policy(mut self, policy: SlotPolicy) -> Self {
667        self.slot_policy = policy;
668        self
669    }
670
671    pub async fn send(self) -> Result<QueuedWorkBatch> {
672        let source_key = self.id.map(|id| format!("host:{id}"));
673        self.session
674            .runtime
675            .enqueue_turn_input(
676                self.input,
677                self.delivery_policy,
678                self.slot_policy,
679                source_key,
680            )
681            .await
682            .map_err(EmbedError::Runtime)
683    }
684}
685
686impl<'a> std::future::IntoFuture for EnqueueTurnBuilder<'a> {
687    type Output = Result<QueuedWorkBatch>;
688    type IntoFuture =
689        std::pin::Pin<Box<dyn std::future::Future<Output = Result<QueuedWorkBatch>> + 'a>>;
690
691    fn into_future(self) -> Self::IntoFuture {
692        Box::pin(self.send())
693    }
694}