1use std::pin::Pin;
2use std::task::{Context, Poll};
3
4use crate::support::*;
5use futures_util::Stream;
6use lash_core::runtime::{
7 PendingTurnInput, PendingTurnInputCancelOutcome, PendingTurnInputCancelResult,
8 PendingTurnInputCancelTarget, PendingTurnInputSuffixCancelOutcome, QueuedWorkBatch,
9 TurnInputIngress,
10};
11use lash_core::{LiveReplayGap, LiveReplayStoreError, SessionObservationEvent};
12use lash_remote_protocol::{
13 RemoteLiveReplayGap, RemoteSessionCursor, RemoteSessionObservation,
14 RemoteSessionObservationEvent,
15};
16
17pub struct SessionBuilder {
18 pub(crate) core: LashCore,
19 pub(crate) session_id: String,
20 pub(crate) spec: SessionSpec,
21 pub(crate) parent_session_id: Option<String>,
22 pub(crate) session_execution_owner: Option<lash_core::LeaseOwnerIdentity>,
23 pub(crate) store: Option<Arc<dyn RuntimePersistence>>,
24 pub(crate) provider: Option<ProviderHandle>,
25 pub(crate) active_plugins: Vec<ActivePluginBinding>,
26 pub(crate) plugin_factories: Vec<Arc<dyn PluginFactory>>,
27}
28
29#[cfg(feature = "rlm")]
30pub struct RlmSessionBuilder {
31 pub(crate) builder: SessionBuilder,
32 pub(crate) rlm_final_answer_format: Option<lash_rlm_types::RlmFinalAnswerFormat>,
33}
34
35impl SessionBuilder {
36 pub fn provider(mut self, provider: ProviderHandle) -> Self {
37 self.spec = self.spec.provider_id(provider.kind());
38 self.provider = Some(provider);
39 self
40 }
41
42 pub fn session_spec(mut self, spec: SessionSpec) -> Self {
43 self.spec = spec;
44 self
45 }
46
47 pub fn parent(mut self, parent_session_id: impl Into<String>) -> Self {
48 self.parent_session_id = Some(parent_session_id.into());
49 self
50 }
51
52 pub fn session_execution_owner(mut self, owner: lash_core::LeaseOwnerIdentity) -> Self {
58 self.session_execution_owner = Some(owner);
59 self
60 }
61
62 pub fn store(mut self, store: Arc<dyn RuntimePersistence>) -> Self {
69 self.store = Some(store);
70 self
71 }
72
73 pub fn plugin<P: PluginBinding>(mut self, config: P::SessionConfig) -> Self {
74 self.active_plugins.push(ActivePluginBinding {
75 id: P::ID,
76 requires_turn_input: P::requires_turn_input(&config),
77 });
78 self.plugin_factories.push(P::factory(&config));
79 self
80 }
81
82 pub async fn open(self) -> Result<LashSession> {
83 let policy = self.session_policy();
84 let store = self.create_store(&policy).await?;
85 let state = self
86 .load_or_default_state(&policy, store.as_deref())
87 .await?;
88 self.open_resolved(policy, state, store).await
89 }
90
91 pub async fn open_fresh(self) -> Result<LashSession> {
101 let policy = self.session_policy();
102 let store = self.create_store(&policy).await?;
103 let state = RuntimeSessionState {
104 session_id: self.session_id.clone(),
105 policy: policy.clone(),
106 graph_replace_required: true,
107 ..RuntimeSessionState::default()
108 };
109 self.open_resolved(policy, state, store).await
110 }
111
112 pub async fn open_with_state(self, mut state: RuntimeSessionState) -> Result<LashSession> {
119 let policy = self.session_policy();
120 let store = self.create_store(&policy).await?;
121 if state.session_id != self.session_id {
122 return Err(EmbedError::StoreSessionMismatch {
123 loaded: state.session_id,
124 requested: self.session_id,
125 });
126 }
127 let recorded_provider_id = state.policy.recorded_provider_id().to_string();
128 state.policy = policy.clone();
129 state.policy.provider_id = recorded_provider_id;
130 self.open_resolved(policy, state, store).await
131 }
132
133 fn session_policy(&self) -> SessionPolicy {
134 let mut policy = self.spec.resolve_against(&self.core.policy);
135 policy.session_id = Some(self.session_id.clone());
136 policy
137 }
138
139 async fn load_or_default_state(
140 &self,
141 policy: &SessionPolicy,
142 store: Option<&dyn RuntimePersistence>,
143 ) -> Result<RuntimeSessionState> {
144 let state = match store {
145 Some(store) => {
146 let loaded = self.load_persisted_state_for_residency(store).await?;
147 let mut state = loaded.unwrap_or_else(|| RuntimeSessionState {
148 session_id: self.session_id.clone(),
149 policy: policy.clone(),
150 ..RuntimeSessionState::default()
151 });
152 if state.session_id != self.session_id {
153 return Err(EmbedError::StoreSessionMismatch {
154 loaded: state.session_id,
155 requested: self.session_id.clone(),
156 });
157 }
158 let recorded_provider_id = state.policy.recorded_provider_id().to_string();
159 state.policy = policy.clone();
160 state.policy.provider_id = recorded_provider_id;
161 state
162 }
163 None => RuntimeSessionState {
164 session_id: self.session_id.clone(),
165 policy: policy.clone(),
166 ..RuntimeSessionState::default()
167 },
168 };
169 Ok(state)
170 }
171
172 async fn load_persisted_state_for_residency(
173 &self,
174 store: &dyn RuntimePersistence,
175 ) -> Result<Option<RuntimeSessionState>> {
176 load_persisted_state_for_residency(self.core.env.residency, store).await
177 }
178
179 async fn open_resolved(
180 self,
181 policy: SessionPolicy,
182 state: RuntimeSessionState,
183 store: Option<Arc<dyn RuntimePersistence>>,
184 ) -> Result<LashSession> {
185 let mut env = self.core.env.clone();
186 if let Some(provider) = self.provider.clone().or_else(|| self.core.provider.clone()) {
187 env.core.providers.provider_resolver =
188 Arc::new(lash_core::SingleProviderResolver::new(provider));
189 }
190 let plugin_host = build_plugin_host(
191 self.core.protocol_factory.as_ref(),
192 self.core.plugin_factories.as_ref(),
193 self.plugin_factories,
194 )?;
195 env.core = self
196 .core
197 .runtime_host_for_plugin_host(env.core.clone(), &plugin_host)?;
198 env.plugin_host = Some(Arc::new(plugin_host));
199 let effect_host = Arc::clone(&env.core.control.effect_host);
200 let drivers = self.core.work_driver.drivers().await;
201 env.process_work_driver = drivers.process.clone();
202 env.queued_work_driver = drivers.queued.clone();
203 let mut runtime = LashRuntime::from_environment(&env, policy, state, store).await?;
204 if let Some(owner) = self.session_execution_owner {
205 runtime.set_runtime_lease_owner(owner);
206 }
207 if drivers.drive_process_on_open
208 && let Some(driver) = drivers.process.as_ref()
209 {
210 driver.claim_and_run_pending("session_open").await?;
211 }
212 let handle = RuntimeHandle::with_live_replay_store(
213 runtime,
214 Arc::clone(&self.core.live_replay_store),
215 );
216 Ok(LashSession {
217 runtime: handle,
218 effect_host,
219 parent_session_id: self.parent_session_id,
220 active_plugins: self.active_plugins,
221 process_phase_probe_slot: self.core.work_driver.phase_probe_slot(),
222 turn_cancels: crate::turn::TurnCancelRegistry::default(),
223 })
224 }
225
226 async fn create_store(
227 &self,
228 policy: &SessionPolicy,
229 ) -> Result<Option<Arc<dyn RuntimePersistence>>> {
230 if let Some(store) = self.store.as_ref() {
231 return Ok(Some(Arc::clone(store)));
232 }
233 let Some(factory) = self.core.store_factory.as_ref() else {
234 return Ok(None);
235 };
236 let request = SessionStoreCreateRequest {
237 session_id: self.session_id.clone(),
238 relation: self
239 .parent_session_id
240 .as_ref()
241 .map(|parent_session_id| lash_core::SessionRelation::Child {
242 parent_session_id: parent_session_id.clone(),
243 caused_by: None,
244 })
245 .unwrap_or_default(),
246 policy: policy.clone(),
247 };
248 factory
249 .create_store(&request)
250 .await
251 .map(Some)
252 .map_err(|message| EmbedError::StoreFactory {
253 session_id: self.session_id.clone(),
254 message,
255 })
256 }
257}
258
259pub(crate) async fn load_state_for_residency(
260 residency: Residency,
261 session_id: &str,
262 policy: &SessionPolicy,
263 store: &dyn RuntimePersistence,
264) -> Result<RuntimeSessionState> {
265 let mut state = load_persisted_state_for_residency(residency, store)
266 .await?
267 .unwrap_or_else(|| RuntimeSessionState {
268 session_id: session_id.to_string(),
269 policy: policy.clone(),
270 ..RuntimeSessionState::default()
271 });
272 if state.session_id != session_id {
273 return Err(EmbedError::StoreSessionMismatch {
274 loaded: state.session_id,
275 requested: session_id.to_string(),
276 });
277 }
278 let recorded_provider_id = state.policy.recorded_provider_id().to_string();
279 state.policy = policy.clone();
280 state.policy.provider_id = recorded_provider_id;
281 Ok(state)
282}
283
284async fn load_persisted_state_for_residency(
285 residency: Residency,
286 store: &dyn RuntimePersistence,
287) -> Result<Option<RuntimeSessionState>> {
288 match residency {
289 Residency::KeepAll => {
290 let loaded = lash_core::store::load_persisted_session_state(store)
291 .await
292 .map_err(|err| SessionError::Protocol(format!("failed to load store: {err}")))?;
293 Ok(loaded)
294 }
295 Residency::ActivePathOnly => {
296 let active = lash_core::store::load_persisted_session_state_active_path(store, None)
297 .await
298 .map_err(|err| {
299 SessionError::Protocol(format!("failed to load active-path store: {err}"))
300 })?;
301 if active
302 .as_ref()
303 .is_some_and(|state| state.session_graph.nodes.is_empty())
304 {
305 let mut full = lash_core::store::load_persisted_session_state(store)
306 .await
307 .map_err(|err| {
308 SessionError::Protocol(format!(
309 "failed to heal active-path store from full graph: {err}"
310 ))
311 })?;
312 if let Some(state) = full.as_mut() {
313 state.graph_replace_required = true;
314 }
315 return Ok(full);
316 }
317 Ok(active)
318 }
319 }
320}
321
322impl PromptLayerSink for SessionBuilder {
323 fn prompt_layer_mut(&mut self) -> &mut PromptLayer {
324 self.spec.prompt.get_or_insert_with(PromptLayer::new)
325 }
326}
327
328#[cfg(feature = "rlm")]
329impl RlmSessionBuilder {
330 pub fn provider(mut self, provider: ProviderHandle) -> Self {
331 self.builder = self.builder.provider(provider);
332 self
333 }
334
335 pub fn session_spec(mut self, spec: SessionSpec) -> Self {
336 self.builder = self.builder.session_spec(spec);
337 self
338 }
339
340 pub fn parent(mut self, parent_session_id: impl Into<String>) -> Self {
341 self.builder = self.builder.parent(parent_session_id);
342 self
343 }
344
345 pub fn session_execution_owner(mut self, owner: lash_core::LeaseOwnerIdentity) -> Self {
346 self.builder = self.builder.session_execution_owner(owner);
347 self
348 }
349
350 pub fn store(mut self, store: Arc<dyn RuntimePersistence>) -> Self {
351 self.builder = self.builder.store(store);
352 self
353 }
354
355 pub fn plugin<P: PluginBinding>(mut self, config: P::SessionConfig) -> Self {
356 self.builder = self.builder.plugin::<P>(config);
357 self
358 }
359
360 pub async fn open(self) -> Result<LashSession> {
361 self.open_resolved(RlmOpenState::Resume).await
362 }
363
364 pub async fn open_fresh(self) -> Result<LashSession> {
365 self.open_resolved(RlmOpenState::Fresh).await
366 }
367
368 pub async fn open_with_state(self, state: RuntimeSessionState) -> Result<LashSession> {
369 self.open_resolved(RlmOpenState::Explicit(state)).await
370 }
371
372 async fn open_resolved(self, open_state: RlmOpenState) -> Result<LashSession> {
373 let Self {
374 builder,
375 rlm_final_answer_format,
376 } = self;
377 let policy = builder.session_policy();
378 let store = builder.create_store(&policy).await?;
379 let mut state = match open_state {
380 RlmOpenState::Resume => {
381 builder
382 .load_or_default_state(&policy, store.as_deref())
383 .await?
384 }
385 RlmOpenState::Fresh => RuntimeSessionState {
386 session_id: builder.session_id.clone(),
387 policy: policy.clone(),
388 graph_replace_required: true,
389 ..RuntimeSessionState::default()
390 },
391 RlmOpenState::Explicit(mut state) => {
392 if state.session_id != builder.session_id {
393 return Err(EmbedError::StoreSessionMismatch {
394 loaded: state.session_id,
395 requested: builder.session_id.clone(),
396 });
397 }
398 let recorded_provider_id = state.policy.recorded_provider_id().to_string();
399 state.policy = policy.clone();
400 state.policy.provider_id = recorded_provider_id;
401 state
402 }
403 };
404 apply_rlm_session_options(
405 builder.parent_session_id.is_none(),
406 rlm_final_answer_format,
407 &mut state,
408 )?;
409 builder.open_resolved(policy, state, store).await
410 }
411}
412
413#[cfg(feature = "rlm")]
414impl PromptLayerSink for RlmSessionBuilder {
415 fn prompt_layer_mut(&mut self) -> &mut PromptLayer {
416 self.builder.prompt_layer_mut()
417 }
418}
419
420#[cfg(feature = "rlm")]
421#[allow(clippy::large_enum_variant)]
422enum RlmOpenState {
423 Resume,
424 Fresh,
425 Explicit(RuntimeSessionState),
426}
427
428#[cfg(feature = "rlm")]
429fn apply_rlm_session_options(
430 is_root_session: bool,
431 explicit_format: Option<lash_rlm_types::RlmFinalAnswerFormat>,
432 state: &mut RuntimeSessionState,
433) -> Result<()> {
434 let final_answer_format = explicit_format.unwrap_or({
435 if is_root_session {
436 lash_rlm_types::RlmFinalAnswerFormat::Markdown
437 } else {
438 lash_rlm_types::RlmFinalAnswerFormat::RawFinalValue
439 }
440 });
441 let mut extras = if state.protocol_turn_options.is_empty() {
442 lash_rlm_types::RlmCreateExtras::default()
443 } else {
444 state.protocol_turn_options.decode()?
445 };
446 extras.final_answer_format = Some(final_answer_format);
447 let options = ProtocolTurnOptions::typed(extras)?;
448 state.protocol_turn_options = options.clone();
449 for frame in &mut state.agent_frames {
450 frame.protocol_turn_options = options.clone();
451 }
452 Ok(())
453}
454
455#[cfg(all(test, feature = "rlm"))]
456mod tests {
457 use super::*;
458
459 #[test]
460 fn apply_rlm_session_options_preserves_existing_termination() -> Result<()> {
461 let mut state = RuntimeSessionState {
462 protocol_turn_options: ProtocolTurnOptions::typed(lash_rlm_types::RlmCreateExtras {
463 termination: lash_rlm_types::RlmTermination::Natural,
464 final_answer_format: None,
465 })?,
466 ..Default::default()
467 };
468
469 apply_rlm_session_options(true, None, &mut state)?;
470
471 let extras: lash_rlm_types::RlmCreateExtras = state.protocol_turn_options.decode()?;
472 assert_eq!(extras.termination, lash_rlm_types::RlmTermination::Natural);
473 assert_eq!(
474 extras.final_answer_format,
475 Some(lash_rlm_types::RlmFinalAnswerFormat::Markdown)
476 );
477 Ok(())
478 }
479}
480
481#[derive(Clone)]
482pub struct LashSession {
483 pub(crate) runtime: RuntimeHandle,
484 pub(crate) effect_host: Arc<dyn EffectHost>,
485 pub(crate) parent_session_id: Option<String>,
486 pub(crate) active_plugins: Vec<ActivePluginBinding>,
487 pub(crate) process_phase_probe_slot: Option<lash_core::runtime::RuntimeTurnPhaseProbeSlot>,
488 pub(crate) turn_cancels: crate::turn::TurnCancelRegistry,
489}
490
491#[derive(Clone, Debug, Default)]
492pub struct SessionConfigPatch {
493 pub provider: Option<ProviderHandle>,
494 pub model: Option<ModelSpec>,
495 pub prompt: Option<PromptLayer>,
496}
497
498impl LashSession {
499 pub async fn close(self) -> Result<()> {
500 let runtime = self.runtime.writer();
501 let runtime = runtime.lock().await;
502 runtime.unregister_plugin_session()?;
503 Ok(())
504 }
505
506 pub fn session_id(&self) -> String {
507 self.runtime.observe().session_id().to_string()
508 }
509
510 pub fn policy_snapshot(&self) -> SessionPolicy {
511 self.runtime.observe().policy.clone()
512 }
513
514 pub fn observe(&self) -> ObservableSession {
515 ObservableSession {
516 runtime: self.runtime.clone(),
517 }
518 }
519
520 pub fn parent_session_id(&self) -> Option<&str> {
521 self.parent_session_id.as_deref()
522 }
523
524 pub fn effect_host(&self) -> Arc<dyn EffectHost> {
525 Arc::clone(&self.effect_host)
526 }
527
528 pub fn turn(&self, input: TurnInput) -> TurnBuilder {
529 TurnBuilder {
530 runtime: self.runtime.clone(),
531 effect_host: Arc::clone(&self.effect_host),
532 active_plugins: self.active_plugins.clone(),
533 input,
534 cancel: CancellationToken::new(),
535 cancels: self.turn_cancels.clone(),
536 protocol_turn_options: None,
537 provider: None,
538 model: None,
539 turn_id: None,
540 }
541 }
542
543 pub fn queued_turn(&self) -> QueuedTurnBuilder {
544 QueuedTurnBuilder {
545 runtime: self.runtime.clone(),
546 effect_host: Arc::clone(&self.effect_host),
547 cancel: CancellationToken::new(),
548 cancels: self.turn_cancels.clone(),
549 batch_ids: Vec::new(),
550 drain_id: None,
551 }
552 }
553
554 pub fn cancel_running_turns(&self) -> usize {
569 self.turn_cancels.cancel_all()
570 }
571
572 pub fn admin(&self) -> SessionAdmin {
573 SessionAdmin {
574 runtime: self.runtime.clone(),
575 }
576 }
577
578 pub async fn configure(&self, patch: SessionConfigPatch) -> Result<()> {
579 self.admin().config().update(patch).await
580 }
581
582 pub fn tools(&self) -> ToolAdmin {
583 ToolAdmin::new(self.admin())
584 }
585
586 pub fn commands(&self) -> SessionCommandAdmin {
587 self.admin().commands()
588 }
589
590 pub fn triggers(&self) -> SessionTriggerAdmin {
591 self.admin().triggers()
592 }
593
594 pub fn processes(&self) -> SessionProcessAdmin {
595 SessionProcessAdmin::new(self.admin())
596 }
597
598 pub fn plugin_operations(&self) -> PluginOperations {
599 PluginOperations {
600 control: self.admin(),
601 }
602 }
603
604 pub fn enqueue(&self, input: TurnInput) -> EnqueueTurnBuilder<'_> {
605 EnqueueTurnBuilder {
606 session: self,
607 input,
608 id: None,
609 ingress: TurnInputIngress::NextTurn,
610 }
611 }
612
613 pub async fn queued_work(&self) -> Result<Vec<QueuedWorkBatch>> {
620 let observation = self.runtime.observe();
621 let store = observation.queue_store.as_ref().ok_or_else(|| {
622 EmbedError::Runtime(lash_core::RuntimeError::new(
623 lash_core::RuntimeErrorCode::StoreCommitFailed,
624 "queued work inspection requires a persistent runtime store",
625 ))
626 })?;
627 store
628 .list_pending_queued_work(observation.session_id())
629 .await
630 .map_err(|err| {
631 EmbedError::Runtime(lash_core::RuntimeError::new(
632 lash_core::RuntimeErrorCode::StoreCommitFailed,
633 err.to_string(),
634 ))
635 })
636 }
637
638 pub async fn pending_turn_inputs(&self) -> Result<Vec<PendingTurnInput>> {
639 let observation = self.runtime.observe();
640 let store = observation.queue_store.as_ref().ok_or_else(|| {
641 EmbedError::Runtime(lash_core::RuntimeError::new(
642 lash_core::RuntimeErrorCode::StoreCommitFailed,
643 "pending turn input inspection requires a persistent runtime store",
644 ))
645 })?;
646 store
647 .list_pending_turn_inputs(observation.session_id())
648 .await
649 .map_err(|err| {
650 EmbedError::Runtime(lash_core::RuntimeError::new(
651 lash_core::RuntimeErrorCode::StoreCommitFailed,
652 err.to_string(),
653 ))
654 })
655 }
656
657 pub async fn cancel_pending_turn_input(
658 &self,
659 input_id: &str,
660 ) -> Result<PendingTurnInputCancelOutcome> {
661 let session_id = self.session_id();
662 self.runtime
663 .cancel_pending_turn_input(&session_id, input_id)
664 .await
665 .map_err(EmbedError::Runtime)
666 }
667
668 pub async fn cancel_pending_turn_inputs(
676 &self,
677 targets: impl IntoIterator<Item = PendingTurnInputCancelTarget>,
678 ) -> Result<Vec<PendingTurnInputCancelResult>> {
679 let session_id = self.session_id();
680 let targets = targets.into_iter().collect::<Vec<_>>();
681 self.runtime
682 .cancel_pending_turn_inputs(&session_id, &targets)
683 .await
684 .map_err(EmbedError::Runtime)
685 }
686
687 pub async fn cancel_pending_turn_input_suffix(
696 &self,
697 anchor: PendingTurnInputCancelTarget,
698 ) -> Result<PendingTurnInputSuffixCancelOutcome> {
699 let session_id = self.session_id();
700 self.runtime
701 .cancel_pending_turn_input_suffix(&session_id, &anchor)
702 .await
703 .map_err(EmbedError::Runtime)
704 }
705
706 pub async fn cancel_queued_work_batch(
707 &self,
708 batch_id: &str,
709 ) -> Result<Option<QueuedWorkBatch>> {
710 let session_id = self.session_id();
711 self.runtime
712 .cancel_queued_work_batch(&session_id, batch_id)
713 .await
714 .map_err(EmbedError::Runtime)
715 }
716
717 pub async fn await_queued_work_batch(&self, batch_id: &str) -> Result<()> {
729 let observation = self.runtime.observe();
730 let store = observation.queue_store.clone().ok_or_else(|| {
731 EmbedError::Runtime(lash_core::RuntimeError::new(
732 lash_core::RuntimeErrorCode::StoreCommitFailed,
733 "queued work inspection requires a persistent runtime store",
734 ))
735 })?;
736 let session_id = observation.session_id().to_string();
737 drop(observation);
738 let mut delay = std::time::Duration::from_millis(25);
739 loop {
740 let pending = store
741 .list_pending_queued_work(&session_id)
742 .await
743 .map_err(|err| {
744 EmbedError::Runtime(lash_core::RuntimeError::new(
745 lash_core::RuntimeErrorCode::StoreCommitFailed,
746 err.to_string(),
747 ))
748 })?;
749 if !pending.iter().any(|batch| batch.batch_id == batch_id) {
750 return Ok(());
751 }
752 tokio::time::sleep(delay).await;
753 delay = (delay * 2).min(std::time::Duration::from_millis(400));
754 }
755 }
756
757 pub fn read_view(&self) -> SessionReadView {
758 self.runtime.observe().read_view.clone()
759 }
760
761 pub fn usage_report(&self) -> SessionUsageReport {
762 self.runtime.observe().usage_report.clone()
763 }
764
765 pub async fn set_turn_phase_probe(
766 &self,
767 probe: Arc<dyn lash_core::runtime::RuntimeTurnPhaseProbe>,
768 ) {
769 let writer = self.runtime.writer();
770 let mut runtime = writer.lock().await;
771 runtime.set_turn_phase_probe(Arc::clone(&probe));
772 self.runtime.publish_from(&runtime);
773 if let Some(slot) = &self.process_phase_probe_slot {
774 let observation = self.runtime.observe();
775 slot.set_for_session(observation.session_id(), Arc::clone(&probe));
776 let current_frame = observation.persisted_state.current_agent_frame_id.as_str();
777 if !current_frame.is_empty() {
778 let scope = lash_core::SessionScope::for_agent_frame(
779 observation.session_id(),
780 current_frame,
781 );
782 slot.set_for_scope(&scope, probe);
783 }
784 }
785 }
786}
787
788#[derive(Clone)]
789pub struct ObservableSession {
790 pub(crate) runtime: RuntimeHandle,
791}
792
793impl ObservableSession {
794 fn snapshot(&self) -> Arc<RuntimeObservation> {
795 self.runtime.observe()
796 }
797
798 pub fn current_observation(&self) -> SessionObservation {
799 self.runtime.current_session_observation()
800 }
801
802 pub fn current_remote_observation(&self) -> RemoteSessionObservation {
803 RemoteSessionObservation::from_core(self.current_observation())
804 }
805
806 pub fn resume_from_cursor(&self, cursor: &SessionCursor) -> Result<SessionResume> {
807 self.runtime
808 .resume_session_observation(cursor)
809 .map_err(live_replay_error)
810 }
811
812 pub fn subscribe_from_cursor(
813 &self,
814 cursor: &SessionCursor,
815 ) -> Result<SessionObservationSubscription> {
816 self.runtime
817 .subscribe_session_observation(cursor)
818 .map_err(live_replay_error)
819 }
820
821 pub fn subscribe_from_remote_cursor(
822 &self,
823 cursor: &RemoteSessionCursor,
824 ) -> Result<RemoteSessionObservationSubscription> {
825 cursor.validate()?;
826 let cursor = lash_core::SessionCursor::try_from(cursor.clone())?;
827 match self.subscribe_from_cursor(&cursor)? {
828 SessionObservationSubscription::Subscribed(subscription) => {
829 Ok(RemoteSessionObservationSubscription::Subscribed(
830 RemoteSessionObservationEventStream::new(subscription),
831 ))
832 }
833 SessionObservationSubscription::Gap { observation, gap } => {
834 Ok(RemoteSessionObservationSubscription::Gap {
835 observation: observation.into(),
836 gap: gap.into(),
837 })
838 }
839 }
840 }
841
842 pub fn subscribe_and_recover(&self, cursor: SessionCursor) -> SessionObservationStream {
851 SessionObservationStream {
852 observable: self.clone(),
853 cursor,
854 subscription: None,
855 done: false,
856 }
857 }
858
859 pub fn subscribe_and_recover_remote(
862 &self,
863 cursor: RemoteSessionCursor,
864 ) -> Result<RemoteSessionObservationStream> {
865 cursor.validate()?;
866 let cursor = lash_core::SessionCursor::try_from(cursor)?;
867 Ok(RemoteSessionObservationStream {
868 inner: self.subscribe_and_recover(cursor),
869 next_sequence: 0,
870 })
871 }
872
873 pub fn session_id(&self) -> String {
874 self.snapshot().session_id().to_string()
875 }
876
877 pub fn policy_snapshot(&self) -> SessionPolicy {
878 self.snapshot().policy.clone()
879 }
880
881 pub fn read_view(&self) -> SessionReadView {
882 self.snapshot().read_view.clone()
883 }
884
885 pub fn usage_report(&self) -> SessionUsageReport {
886 self.snapshot().usage_report.clone()
887 }
888
889 pub fn tool_state(&self) -> Option<ToolState> {
890 self.snapshot().tool_state.clone()
891 }
892
893 pub fn active_tool_manifests(&self) -> Vec<ToolManifest> {
894 self.snapshot()
895 .tool_state
896 .as_ref()
897 .map(ToolState::tool_manifests)
898 .unwrap_or_default()
899 }
900
901 pub async fn list_process_handles(&self) -> Vec<ProcessHandleSummary> {
902 self.snapshot().list_process_handles().await
903 }
904
905 pub async fn list_all_process_handles(&self) -> Vec<ProcessHandleSummary> {
906 self.snapshot().list_all_process_handles().await
907 }
908
909 pub fn process_scope(&self) -> SessionScope {
910 self.snapshot().process_scope()
911 }
912}
913
914#[derive(Clone, Debug)]
915pub enum SessionObservationStreamItem {
916 Event(SessionObservationEvent),
918 Gap {
920 observation: SessionObservation,
921 gap: LiveReplayGap,
922 },
923}
924
925pub enum RemoteSessionObservationSubscription {
926 Subscribed(RemoteSessionObservationEventStream),
927 Gap {
928 observation: RemoteSessionObservation,
929 gap: RemoteLiveReplayGap,
930 },
931}
932
933#[derive(Clone, Debug)]
934pub enum RemoteSessionObservationStreamItem {
935 Event(RemoteSessionObservationEvent),
937 Gap {
939 observation: RemoteSessionObservation,
940 gap: RemoteLiveReplayGap,
941 },
942}
943
944pub struct RemoteSessionObservationEventStream {
945 inner: lash_core::LiveReplaySubscription,
946 next_sequence: u64,
947}
948
949impl RemoteSessionObservationEventStream {
950 fn new(inner: lash_core::LiveReplaySubscription) -> Self {
951 Self {
952 inner,
953 next_sequence: 0,
954 }
955 }
956
957 pub async fn next_event(&mut self) -> Result<RemoteSessionObservationEvent> {
958 futures_util::future::poll_fn(|cx| Pin::new(&mut *self).poll_next(cx))
959 .await
960 .transpose()?
961 .ok_or_else(|| live_replay_error(LiveReplayStoreError::Closed))
962 }
963}
964
965impl Stream for RemoteSessionObservationEventStream {
966 type Item = Result<RemoteSessionObservationEvent>;
967
968 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
969 match Pin::new(&mut self.inner).poll_next(cx) {
970 Poll::Pending => Poll::Pending,
971 Poll::Ready(Some(Ok(event))) => {
972 let remote = RemoteSessionObservationEvent::from_core(self.next_sequence, event);
973 self.next_sequence = self.next_sequence.saturating_add(1);
974 Poll::Ready(Some(Ok(remote)))
975 }
976 Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(live_replay_error(err)))),
977 Poll::Ready(None) => Poll::Ready(None),
978 }
979 }
980}
981
982pub struct RemoteSessionObservationStream {
984 inner: SessionObservationStream,
985 next_sequence: u64,
986}
987
988impl RemoteSessionObservationStream {
989 pub fn cursor(&self) -> RemoteSessionCursor {
990 RemoteSessionCursor::from(self.inner.cursor())
991 }
992}
993
994impl Stream for RemoteSessionObservationStream {
995 type Item = Result<RemoteSessionObservationStreamItem>;
996
997 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
998 match Pin::new(&mut self.inner).poll_next(cx) {
999 Poll::Pending => Poll::Pending,
1000 Poll::Ready(Some(Ok(SessionObservationStreamItem::Event(event)))) => {
1001 let remote = RemoteSessionObservationEvent::from_core(self.next_sequence, event);
1002 self.next_sequence = self.next_sequence.saturating_add(1);
1003 Poll::Ready(Some(Ok(RemoteSessionObservationStreamItem::Event(remote))))
1004 }
1005 Poll::Ready(Some(Ok(SessionObservationStreamItem::Gap { observation, gap }))) => {
1006 Poll::Ready(Some(Ok(RemoteSessionObservationStreamItem::Gap {
1007 observation: observation.into(),
1008 gap: gap.into(),
1009 })))
1010 }
1011 Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err))),
1012 Poll::Ready(None) => Poll::Ready(None),
1013 }
1014 }
1015}
1016
1017pub struct SessionObservationStream {
1019 observable: ObservableSession,
1020 cursor: SessionCursor,
1021 subscription: Option<lash_core::LiveReplaySubscription>,
1022 done: bool,
1023}
1024
1025impl SessionObservationStream {
1026 pub fn cursor(&self) -> &SessionCursor {
1027 &self.cursor
1028 }
1029}
1030
1031impl Stream for SessionObservationStream {
1032 type Item = Result<SessionObservationStreamItem>;
1033
1034 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1035 loop {
1036 if self.done {
1037 return Poll::Ready(None);
1038 }
1039 if self.subscription.is_none() {
1040 match self.observable.subscribe_from_cursor(&self.cursor) {
1041 Ok(SessionObservationSubscription::Subscribed(subscription)) => {
1042 self.subscription = Some(subscription);
1043 }
1044 Ok(SessionObservationSubscription::Gap { observation, gap }) => {
1045 self.cursor = gap.latest_cursor.clone();
1046 return Poll::Ready(Some(Ok(SessionObservationStreamItem::Gap {
1047 observation,
1048 gap,
1049 })));
1050 }
1051 Err(err) => {
1052 self.done = true;
1053 return Poll::Ready(Some(Err(err)));
1054 }
1055 }
1056 }
1057
1058 let Some(subscription) = self.subscription.as_mut() else {
1059 continue;
1060 };
1061 match Pin::new(subscription).poll_next(cx) {
1062 Poll::Pending => return Poll::Pending,
1063 Poll::Ready(Some(Ok(event))) => {
1064 self.cursor = event.cursor.clone();
1065 return Poll::Ready(Some(Ok(SessionObservationStreamItem::Event(event))));
1066 }
1067 Poll::Ready(Some(Err(LiveReplayStoreError::SubscriberLagged(_)))) => {
1068 self.subscription = None;
1069 continue;
1070 }
1071 Poll::Ready(Some(Err(err))) => {
1072 self.done = true;
1073 return Poll::Ready(Some(Err(live_replay_error(err))));
1074 }
1075 Poll::Ready(None) => {
1076 self.done = true;
1077 return Poll::Ready(None);
1078 }
1079 }
1080 }
1081 }
1082}
1083
1084fn live_replay_error(err: lash_core::LiveReplayStoreError) -> EmbedError {
1085 EmbedError::Runtime(lash_core::RuntimeError::new(
1086 RuntimeErrorCode::Other("live_replay".to_string()),
1087 err.to_string(),
1088 ))
1089}
1090
1091pub struct EnqueueTurnBuilder<'a> {
1092 session: &'a LashSession,
1093 input: TurnInput,
1094 id: Option<String>,
1095 ingress: TurnInputIngress,
1096}
1097
1098impl<'a> EnqueueTurnBuilder<'a> {
1099 pub fn id(mut self, id: impl Into<String>) -> Self {
1100 self.id = Some(id.into());
1101 self
1102 }
1103
1104 pub fn ingress(mut self, ingress: TurnInputIngress) -> Self {
1105 self.ingress = ingress;
1106 self
1107 }
1108
1109 pub async fn send(self) -> Result<PendingTurnInput> {
1110 let source_key = self.id.map(|id| format!("host:{id}"));
1111 self.session
1112 .runtime
1113 .enqueue_turn_input(self.input, self.ingress, source_key)
1114 .await
1115 .map_err(EmbedError::Runtime)
1116 }
1117}
1118
1119impl<'a> std::future::IntoFuture for EnqueueTurnBuilder<'a> {
1120 type Output = Result<PendingTurnInput>;
1121 type IntoFuture =
1122 std::pin::Pin<Box<dyn std::future::Future<Output = Result<PendingTurnInput>> + 'a>>;
1123
1124 fn into_future(self) -> Self::IntoFuture {
1125 Box::pin(self.send())
1126 }
1127}