1use std::pin::Pin;
2use std::task::{Context, Poll};
3
4use crate::support::*;
5use futures_util::Stream;
6use lash_core::runtime::{DeliveryPolicy, QueuedWorkBatch, SlotPolicy};
7use lash_core::{LiveReplayGap, LiveReplayStoreError, SessionObservationEvent};
8use lash_remote_protocol::{
9 RemoteLiveReplayGap, RemoteSessionCursor, RemoteSessionObservation,
10 RemoteSessionObservationEvent,
11};
12
13pub struct SessionBuilder {
14 pub(crate) core: LashCore,
15 pub(crate) session_id: String,
16 pub(crate) spec: SessionSpec,
17 pub(crate) parent_session_id: Option<String>,
18 pub(crate) session_execution_owner: Option<lash_core::LeaseOwnerIdentity>,
19 pub(crate) store: Option<Arc<dyn RuntimePersistence>>,
20 pub(crate) provider: Option<ProviderHandle>,
21 pub(crate) active_plugins: Vec<ActivePluginBinding>,
22 pub(crate) plugin_factories: Vec<Arc<dyn PluginFactory>>,
23}
24
25#[cfg(feature = "rlm")]
26pub struct RlmSessionBuilder {
27 pub(crate) builder: SessionBuilder,
28 pub(crate) rlm_final_answer_format: Option<lash_rlm_types::RlmFinalAnswerFormat>,
29}
30
31impl SessionBuilder {
32 pub fn provider(mut self, provider: ProviderHandle) -> Self {
33 self.spec = self.spec.provider_id(provider.kind());
34 self.provider = Some(provider);
35 self
36 }
37
38 pub fn session_spec(mut self, spec: SessionSpec) -> Self {
39 self.spec = spec;
40 self
41 }
42
43 pub fn parent(mut self, parent_session_id: impl Into<String>) -> Self {
44 self.parent_session_id = Some(parent_session_id.into());
45 self
46 }
47
48 pub fn session_execution_owner(mut self, owner: lash_core::LeaseOwnerIdentity) -> Self {
54 self.session_execution_owner = Some(owner);
55 self
56 }
57
58 pub fn store(mut self, store: Arc<dyn RuntimePersistence>) -> Self {
65 self.store = Some(store);
66 self
67 }
68
69 pub fn plugin<P: PluginBinding>(mut self, config: P::SessionConfig) -> Self {
70 self.active_plugins.push(ActivePluginBinding {
71 id: P::ID,
72 requires_turn_input: P::requires_turn_input(&config),
73 });
74 self.plugin_factories.push(P::factory(&config));
75 self
76 }
77
78 pub async fn open(self) -> Result<LashSession> {
79 let policy = self.session_policy();
80 let store = self.create_store(&policy).await?;
81 let state = self
82 .load_or_default_state(&policy, store.as_deref())
83 .await?;
84 self.open_resolved(policy, state, store).await
85 }
86
87 pub async fn open_fresh(self) -> Result<LashSession> {
97 let policy = self.session_policy();
98 let store = self.create_store(&policy).await?;
99 let state = RuntimeSessionState {
100 session_id: self.session_id.clone(),
101 policy: policy.clone(),
102 graph_replace_required: true,
103 ..RuntimeSessionState::default()
104 };
105 self.open_resolved(policy, state, store).await
106 }
107
108 pub async fn open_with_state(self, mut state: RuntimeSessionState) -> Result<LashSession> {
115 let policy = self.session_policy();
116 let store = self.create_store(&policy).await?;
117 if state.session_id != self.session_id {
118 return Err(EmbedError::StoreSessionMismatch {
119 loaded: state.session_id,
120 requested: self.session_id,
121 });
122 }
123 let recorded_provider_id = state.policy.recorded_provider_id().to_string();
124 state.policy = policy.clone();
125 state.policy.provider_id = recorded_provider_id;
126 self.open_resolved(policy, state, store).await
127 }
128
129 fn session_policy(&self) -> SessionPolicy {
130 let mut policy = self.spec.resolve_against(&self.core.policy);
131 policy.session_id = Some(self.session_id.clone());
132 policy
133 }
134
135 async fn load_or_default_state(
136 &self,
137 policy: &SessionPolicy,
138 store: Option<&dyn RuntimePersistence>,
139 ) -> Result<RuntimeSessionState> {
140 let state = match store {
141 Some(store) => {
142 let loaded = self.load_persisted_state_for_residency(store).await?;
143 let mut state = loaded.unwrap_or_else(|| RuntimeSessionState {
144 session_id: self.session_id.clone(),
145 policy: policy.clone(),
146 ..RuntimeSessionState::default()
147 });
148 if state.session_id != self.session_id {
149 return Err(EmbedError::StoreSessionMismatch {
150 loaded: state.session_id,
151 requested: self.session_id.clone(),
152 });
153 }
154 let recorded_provider_id = state.policy.recorded_provider_id().to_string();
155 state.policy = policy.clone();
156 state.policy.provider_id = recorded_provider_id;
157 state
158 }
159 None => RuntimeSessionState {
160 session_id: self.session_id.clone(),
161 policy: policy.clone(),
162 ..RuntimeSessionState::default()
163 },
164 };
165 Ok(state)
166 }
167
168 async fn load_persisted_state_for_residency(
169 &self,
170 store: &dyn RuntimePersistence,
171 ) -> Result<Option<RuntimeSessionState>> {
172 load_persisted_state_for_residency(self.core.env.residency, store).await
173 }
174
175 async fn open_resolved(
176 self,
177 policy: SessionPolicy,
178 state: RuntimeSessionState,
179 store: Option<Arc<dyn RuntimePersistence>>,
180 ) -> Result<LashSession> {
181 let mut env = self.core.env.clone();
182 if let Some(provider) = self.provider.clone().or_else(|| self.core.provider.clone()) {
183 env.core.providers.provider_resolver =
184 Arc::new(lash_core::SingleProviderResolver::new(provider));
185 }
186 let plugin_host = build_plugin_host(
187 self.core.protocol_factory.as_ref(),
188 self.core.plugin_factories.as_ref(),
189 self.plugin_factories,
190 )?;
191 env.core = self
192 .core
193 .runtime_host_for_plugin_host(env.core.clone(), &plugin_host)?;
194 env.plugin_host = Some(Arc::new(plugin_host));
195 let effect_host = Arc::clone(&env.core.control.effect_host);
196 let drivers = self.core.work_driver.drivers().await;
197 env.process_work_driver = drivers.process.clone();
198 env.queued_work_driver = drivers.queued.clone();
199 let mut runtime = LashRuntime::from_environment(&env, policy, state, store).await?;
200 if let Some(owner) = self.session_execution_owner {
201 runtime.set_runtime_lease_owner(owner);
202 }
203 if drivers.drive_process_on_open
204 && let Some(driver) = drivers.process.as_ref()
205 {
206 driver.claim_and_run_pending("session_open").await?;
207 }
208 let handle = RuntimeHandle::with_live_replay_store(
209 runtime,
210 Arc::clone(&self.core.live_replay_store),
211 );
212 Ok(LashSession {
213 runtime: handle,
214 effect_host,
215 parent_session_id: self.parent_session_id,
216 active_plugins: self.active_plugins,
217 process_phase_probe_slot: self.core.work_driver.phase_probe_slot(),
218 turn_cancels: crate::turn::TurnCancelRegistry::default(),
219 })
220 }
221
222 async fn create_store(
223 &self,
224 policy: &SessionPolicy,
225 ) -> Result<Option<Arc<dyn RuntimePersistence>>> {
226 if let Some(store) = self.store.as_ref() {
227 return Ok(Some(Arc::clone(store)));
228 }
229 let Some(factory) = self.core.store_factory.as_ref() else {
230 return Ok(None);
231 };
232 let request = SessionStoreCreateRequest {
233 session_id: self.session_id.clone(),
234 relation: self
235 .parent_session_id
236 .as_ref()
237 .map(|parent_session_id| lash_core::SessionRelation::Child {
238 parent_session_id: parent_session_id.clone(),
239 caused_by: None,
240 })
241 .unwrap_or_default(),
242 policy: policy.clone(),
243 };
244 factory
245 .create_store(&request)
246 .await
247 .map(Some)
248 .map_err(|message| EmbedError::StoreFactory {
249 session_id: self.session_id.clone(),
250 message,
251 })
252 }
253}
254
255pub(crate) async fn load_state_for_residency(
256 residency: Residency,
257 session_id: &str,
258 policy: &SessionPolicy,
259 store: &dyn RuntimePersistence,
260) -> Result<RuntimeSessionState> {
261 let mut state = load_persisted_state_for_residency(residency, store)
262 .await?
263 .unwrap_or_else(|| RuntimeSessionState {
264 session_id: session_id.to_string(),
265 policy: policy.clone(),
266 ..RuntimeSessionState::default()
267 });
268 if state.session_id != session_id {
269 return Err(EmbedError::StoreSessionMismatch {
270 loaded: state.session_id,
271 requested: session_id.to_string(),
272 });
273 }
274 let recorded_provider_id = state.policy.recorded_provider_id().to_string();
275 state.policy = policy.clone();
276 state.policy.provider_id = recorded_provider_id;
277 Ok(state)
278}
279
280async fn load_persisted_state_for_residency(
281 residency: Residency,
282 store: &dyn RuntimePersistence,
283) -> Result<Option<RuntimeSessionState>> {
284 match residency {
285 Residency::KeepAll => {
286 let loaded = lash_core::store::load_persisted_session_state(store)
287 .await
288 .map_err(|err| SessionError::Protocol(format!("failed to load store: {err}")))?;
289 Ok(loaded)
290 }
291 Residency::ActivePathOnly => {
292 let active = lash_core::store::load_persisted_session_state_active_path(store, None)
293 .await
294 .map_err(|err| {
295 SessionError::Protocol(format!("failed to load active-path store: {err}"))
296 })?;
297 if active
298 .as_ref()
299 .is_some_and(|state| state.session_graph.nodes.is_empty())
300 {
301 let mut full = lash_core::store::load_persisted_session_state(store)
302 .await
303 .map_err(|err| {
304 SessionError::Protocol(format!(
305 "failed to heal active-path store from full graph: {err}"
306 ))
307 })?;
308 if let Some(state) = full.as_mut() {
309 state.graph_replace_required = true;
310 }
311 return Ok(full);
312 }
313 Ok(active)
314 }
315 }
316}
317
318impl PromptLayerSink for SessionBuilder {
319 fn prompt_layer_mut(&mut self) -> &mut PromptLayer {
320 self.spec.prompt.get_or_insert_with(PromptLayer::new)
321 }
322}
323
324#[cfg(feature = "rlm")]
325impl RlmSessionBuilder {
326 pub fn provider(mut self, provider: ProviderHandle) -> Self {
327 self.builder = self.builder.provider(provider);
328 self
329 }
330
331 pub fn session_spec(mut self, spec: SessionSpec) -> Self {
332 self.builder = self.builder.session_spec(spec);
333 self
334 }
335
336 pub fn parent(mut self, parent_session_id: impl Into<String>) -> Self {
337 self.builder = self.builder.parent(parent_session_id);
338 self
339 }
340
341 pub fn session_execution_owner(mut self, owner: lash_core::LeaseOwnerIdentity) -> Self {
342 self.builder = self.builder.session_execution_owner(owner);
343 self
344 }
345
346 pub fn store(mut self, store: Arc<dyn RuntimePersistence>) -> Self {
347 self.builder = self.builder.store(store);
348 self
349 }
350
351 pub fn plugin<P: PluginBinding>(mut self, config: P::SessionConfig) -> Self {
352 self.builder = self.builder.plugin::<P>(config);
353 self
354 }
355
356 pub async fn open(self) -> Result<LashSession> {
357 self.open_resolved(RlmOpenState::Resume).await
358 }
359
360 pub async fn open_fresh(self) -> Result<LashSession> {
361 self.open_resolved(RlmOpenState::Fresh).await
362 }
363
364 pub async fn open_with_state(self, state: RuntimeSessionState) -> Result<LashSession> {
365 self.open_resolved(RlmOpenState::Explicit(state)).await
366 }
367
368 async fn open_resolved(self, open_state: RlmOpenState) -> Result<LashSession> {
369 let Self {
370 builder,
371 rlm_final_answer_format,
372 } = self;
373 let policy = builder.session_policy();
374 let store = builder.create_store(&policy).await?;
375 let mut state = match open_state {
376 RlmOpenState::Resume => {
377 builder
378 .load_or_default_state(&policy, store.as_deref())
379 .await?
380 }
381 RlmOpenState::Fresh => RuntimeSessionState {
382 session_id: builder.session_id.clone(),
383 policy: policy.clone(),
384 graph_replace_required: true,
385 ..RuntimeSessionState::default()
386 },
387 RlmOpenState::Explicit(mut state) => {
388 if state.session_id != builder.session_id {
389 return Err(EmbedError::StoreSessionMismatch {
390 loaded: state.session_id,
391 requested: builder.session_id.clone(),
392 });
393 }
394 let recorded_provider_id = state.policy.recorded_provider_id().to_string();
395 state.policy = policy.clone();
396 state.policy.provider_id = recorded_provider_id;
397 state
398 }
399 };
400 apply_rlm_session_options(
401 builder.parent_session_id.is_none(),
402 rlm_final_answer_format,
403 &mut state,
404 )?;
405 builder.open_resolved(policy, state, store).await
406 }
407}
408
409#[cfg(feature = "rlm")]
410impl PromptLayerSink for RlmSessionBuilder {
411 fn prompt_layer_mut(&mut self) -> &mut PromptLayer {
412 self.builder.prompt_layer_mut()
413 }
414}
415
416#[cfg(feature = "rlm")]
417#[allow(clippy::large_enum_variant)]
418enum RlmOpenState {
419 Resume,
420 Fresh,
421 Explicit(RuntimeSessionState),
422}
423
424#[cfg(feature = "rlm")]
425fn apply_rlm_session_options(
426 is_root_session: bool,
427 explicit_format: Option<lash_rlm_types::RlmFinalAnswerFormat>,
428 state: &mut RuntimeSessionState,
429) -> Result<()> {
430 let final_answer_format = explicit_format.unwrap_or({
431 if is_root_session {
432 lash_rlm_types::RlmFinalAnswerFormat::Markdown
433 } else {
434 lash_rlm_types::RlmFinalAnswerFormat::RawSubmitValue
435 }
436 });
437 let mut extras = if state.protocol_turn_options.is_empty() {
438 lash_rlm_types::RlmCreateExtras::default()
439 } else {
440 state.protocol_turn_options.decode()?
441 };
442 extras.final_answer_format = Some(final_answer_format);
443 let options = ProtocolTurnOptions::typed(extras)?;
444 state.protocol_turn_options = options.clone();
445 for frame in &mut state.agent_frames {
446 frame.protocol_turn_options = options.clone();
447 }
448 Ok(())
449}
450
451#[cfg(all(test, feature = "rlm"))]
452mod tests {
453 use super::*;
454
455 #[test]
456 fn apply_rlm_session_options_preserves_existing_termination() -> Result<()> {
457 let mut state = RuntimeSessionState {
458 protocol_turn_options: ProtocolTurnOptions::typed(lash_rlm_types::RlmCreateExtras {
459 termination: lash_rlm_types::RlmTermination::ProseOrSubmit,
460 final_answer_format: None,
461 })?,
462 ..Default::default()
463 };
464
465 apply_rlm_session_options(true, None, &mut state)?;
466
467 let extras: lash_rlm_types::RlmCreateExtras = state.protocol_turn_options.decode()?;
468 assert_eq!(
469 extras.termination,
470 lash_rlm_types::RlmTermination::ProseOrSubmit
471 );
472 assert_eq!(
473 extras.final_answer_format,
474 Some(lash_rlm_types::RlmFinalAnswerFormat::Markdown)
475 );
476 Ok(())
477 }
478}
479
480#[derive(Clone)]
481pub struct LashSession {
482 pub(crate) runtime: RuntimeHandle,
483 pub(crate) effect_host: Arc<dyn EffectHost>,
484 pub(crate) parent_session_id: Option<String>,
485 pub(crate) active_plugins: Vec<ActivePluginBinding>,
486 pub(crate) process_phase_probe_slot: Option<lash_core::runtime::RuntimeTurnPhaseProbeSlot>,
487 pub(crate) turn_cancels: crate::turn::TurnCancelRegistry,
488}
489
490#[derive(Clone, Debug, Default)]
491pub struct SessionConfigPatch {
492 pub provider: Option<ProviderHandle>,
493 pub model: Option<ModelSpec>,
494 pub prompt: Option<PromptLayer>,
495}
496
497impl LashSession {
498 pub async fn close(self) -> Result<()> {
499 let runtime = self.runtime.writer();
500 let runtime = runtime.lock().await;
501 runtime.unregister_plugin_session()?;
502 Ok(())
503 }
504
505 pub fn session_id(&self) -> String {
506 self.runtime.observe().session_id().to_string()
507 }
508
509 pub fn policy_snapshot(&self) -> SessionPolicy {
510 self.runtime.observe().policy.clone()
511 }
512
513 pub fn observe(&self) -> ObservableSession {
514 ObservableSession {
515 runtime: self.runtime.clone(),
516 }
517 }
518
519 pub fn parent_session_id(&self) -> Option<&str> {
520 self.parent_session_id.as_deref()
521 }
522
523 pub fn effect_host(&self) -> Arc<dyn EffectHost> {
524 Arc::clone(&self.effect_host)
525 }
526
527 pub fn turn(&self, input: TurnInput) -> TurnBuilder {
528 TurnBuilder {
529 runtime: self.runtime.clone(),
530 effect_host: Arc::clone(&self.effect_host),
531 active_plugins: self.active_plugins.clone(),
532 input,
533 cancel: CancellationToken::new(),
534 cancels: self.turn_cancels.clone(),
535 protocol_turn_options: None,
536 provider: None,
537 model: None,
538 turn_id: None,
539 }
540 }
541
542 pub fn queued_turn(&self) -> QueuedTurnBuilder {
543 QueuedTurnBuilder {
544 runtime: self.runtime.clone(),
545 effect_host: Arc::clone(&self.effect_host),
546 cancel: CancellationToken::new(),
547 cancels: self.turn_cancels.clone(),
548 batch_ids: Vec::new(),
549 drain_id: None,
550 }
551 }
552
553 pub fn cancel_running_turns(&self) -> usize {
568 self.turn_cancels.cancel_all()
569 }
570
571 pub fn admin(&self) -> SessionAdmin {
572 SessionAdmin {
573 runtime: self.runtime.clone(),
574 }
575 }
576
577 pub async fn configure(&self, patch: SessionConfigPatch) -> Result<()> {
578 self.admin().config().update(patch).await
579 }
580
581 pub fn tools(&self) -> ToolAdmin {
582 ToolAdmin::new(self.admin())
583 }
584
585 pub fn commands(&self) -> SessionCommandAdmin {
586 self.admin().commands()
587 }
588
589 pub fn triggers(&self) -> SessionTriggerAdmin {
590 self.admin().triggers()
591 }
592
593 pub fn processes(&self) -> SessionProcessAdmin {
594 SessionProcessAdmin::new(self.admin())
595 }
596
597 pub fn plugin_operations(&self) -> PluginOperations {
598 PluginOperations {
599 control: self.admin(),
600 }
601 }
602
603 pub fn enqueue(&self, input: TurnInput) -> EnqueueTurnBuilder<'_> {
604 EnqueueTurnBuilder {
605 session: self,
606 input,
607 id: None,
608 delivery_policy: DeliveryPolicy::AfterCurrentTurnCommit,
609 slot_policy: SlotPolicy::Exclusive,
610 }
611 }
612
613 pub async fn queued_work(&self) -> Result<Vec<QueuedWorkBatch>> {
619 let observation = self.runtime.observe();
620 let store = observation.queue_store.as_ref().ok_or_else(|| {
621 EmbedError::Runtime(lash_core::RuntimeError::new(
622 lash_core::RuntimeErrorCode::StoreCommitFailed,
623 "queued work inspection requires a persistent runtime store",
624 ))
625 })?;
626 store
627 .list_pending_queued_work(observation.session_id())
628 .await
629 .map_err(|err| {
630 EmbedError::Runtime(lash_core::RuntimeError::new(
631 lash_core::RuntimeErrorCode::StoreCommitFailed,
632 err.to_string(),
633 ))
634 })
635 }
636
637 pub async fn cancel_queued_work_batch(
638 &self,
639 batch_id: &str,
640 ) -> Result<Option<QueuedWorkBatch>> {
641 let session_id = self.session_id();
642 self.runtime
643 .cancel_queued_work_batch(&session_id, batch_id)
644 .await
645 .map_err(EmbedError::Runtime)
646 }
647
648 pub async fn await_queued_work_batch(&self, batch_id: &str) -> Result<()> {
660 let observation = self.runtime.observe();
661 let store = observation.queue_store.clone().ok_or_else(|| {
662 EmbedError::Runtime(lash_core::RuntimeError::new(
663 lash_core::RuntimeErrorCode::StoreCommitFailed,
664 "queued work inspection requires a persistent runtime store",
665 ))
666 })?;
667 let session_id = observation.session_id().to_string();
668 drop(observation);
669 let mut delay = std::time::Duration::from_millis(25);
670 loop {
671 let pending = store
672 .list_pending_queued_work(&session_id)
673 .await
674 .map_err(|err| {
675 EmbedError::Runtime(lash_core::RuntimeError::new(
676 lash_core::RuntimeErrorCode::StoreCommitFailed,
677 err.to_string(),
678 ))
679 })?;
680 if !pending.iter().any(|batch| batch.batch_id == batch_id) {
681 return Ok(());
682 }
683 tokio::time::sleep(delay).await;
684 delay = (delay * 2).min(std::time::Duration::from_millis(400));
685 }
686 }
687
688 pub fn read_view(&self) -> SessionReadView {
689 self.runtime.observe().read_view.clone()
690 }
691
692 pub fn usage_report(&self) -> SessionUsageReport {
693 self.runtime.observe().usage_report.clone()
694 }
695
696 pub async fn set_turn_phase_probe(
697 &self,
698 probe: Arc<dyn lash_core::runtime::RuntimeTurnPhaseProbe>,
699 ) {
700 let writer = self.runtime.writer();
701 let mut runtime = writer.lock().await;
702 runtime.set_turn_phase_probe(Arc::clone(&probe));
703 self.runtime.publish_from(&runtime);
704 if let Some(slot) = &self.process_phase_probe_slot {
705 let observation = self.runtime.observe();
706 slot.set_for_session(observation.session_id(), Arc::clone(&probe));
707 let current_frame = observation.persisted_state.current_agent_frame_id.as_str();
708 if !current_frame.is_empty() {
709 let scope = lash_core::SessionScope::for_agent_frame(
710 observation.session_id(),
711 current_frame,
712 );
713 slot.set_for_scope(&scope, probe);
714 }
715 }
716 }
717}
718
719#[derive(Clone)]
720pub struct ObservableSession {
721 pub(crate) runtime: RuntimeHandle,
722}
723
724impl ObservableSession {
725 fn snapshot(&self) -> Arc<RuntimeObservation> {
726 self.runtime.observe()
727 }
728
729 pub fn current_observation(&self) -> SessionObservation {
730 self.runtime.current_session_observation()
731 }
732
733 pub fn current_remote_observation(&self) -> RemoteSessionObservation {
734 RemoteSessionObservation::from_core(self.current_observation())
735 }
736
737 pub fn resume_from_cursor(&self, cursor: &SessionCursor) -> Result<SessionResume> {
738 self.runtime
739 .resume_session_observation(cursor)
740 .map_err(live_replay_error)
741 }
742
743 pub fn subscribe_from_cursor(
744 &self,
745 cursor: &SessionCursor,
746 ) -> Result<SessionObservationSubscription> {
747 self.runtime
748 .subscribe_session_observation(cursor)
749 .map_err(live_replay_error)
750 }
751
752 pub fn subscribe_from_remote_cursor(
753 &self,
754 cursor: &RemoteSessionCursor,
755 ) -> Result<RemoteSessionObservationSubscription> {
756 cursor.validate()?;
757 let cursor = lash_core::SessionCursor::try_from(cursor.clone())?;
758 match self.subscribe_from_cursor(&cursor)? {
759 SessionObservationSubscription::Subscribed(subscription) => {
760 Ok(RemoteSessionObservationSubscription::Subscribed(
761 RemoteSessionObservationEventStream::new(subscription),
762 ))
763 }
764 SessionObservationSubscription::Gap { observation, gap } => {
765 Ok(RemoteSessionObservationSubscription::Gap {
766 observation: observation.into(),
767 gap: gap.into(),
768 })
769 }
770 }
771 }
772
773 pub fn subscribe_and_recover(&self, cursor: SessionCursor) -> SessionObservationStream {
782 SessionObservationStream {
783 observable: self.clone(),
784 cursor,
785 subscription: None,
786 done: false,
787 }
788 }
789
790 pub fn subscribe_and_recover_remote(
793 &self,
794 cursor: RemoteSessionCursor,
795 ) -> Result<RemoteSessionObservationStream> {
796 cursor.validate()?;
797 let cursor = lash_core::SessionCursor::try_from(cursor)?;
798 Ok(RemoteSessionObservationStream {
799 inner: self.subscribe_and_recover(cursor),
800 next_sequence: 0,
801 })
802 }
803
804 pub fn session_id(&self) -> String {
805 self.snapshot().session_id().to_string()
806 }
807
808 pub fn policy_snapshot(&self) -> SessionPolicy {
809 self.snapshot().policy.clone()
810 }
811
812 pub fn read_view(&self) -> SessionReadView {
813 self.snapshot().read_view.clone()
814 }
815
816 pub fn usage_report(&self) -> SessionUsageReport {
817 self.snapshot().usage_report.clone()
818 }
819
820 pub fn tool_state(&self) -> Option<ToolState> {
821 self.snapshot().tool_state.clone()
822 }
823
824 pub fn active_tool_manifests(&self) -> Vec<ToolManifest> {
825 self.snapshot()
826 .tool_state
827 .as_ref()
828 .map(ToolState::tool_manifests)
829 .unwrap_or_default()
830 }
831
832 pub async fn list_process_handles(&self) -> Vec<ProcessHandleSummary> {
833 self.snapshot().list_process_handles().await
834 }
835
836 pub async fn list_all_process_handles(&self) -> Vec<ProcessHandleSummary> {
837 self.snapshot().list_all_process_handles().await
838 }
839
840 pub fn process_scope(&self) -> SessionScope {
841 self.snapshot().process_scope()
842 }
843}
844
845#[derive(Clone, Debug)]
846pub enum SessionObservationStreamItem {
847 Event(SessionObservationEvent),
849 Gap {
851 observation: SessionObservation,
852 gap: LiveReplayGap,
853 },
854}
855
856pub enum RemoteSessionObservationSubscription {
857 Subscribed(RemoteSessionObservationEventStream),
858 Gap {
859 observation: RemoteSessionObservation,
860 gap: RemoteLiveReplayGap,
861 },
862}
863
864#[derive(Clone, Debug)]
865pub enum RemoteSessionObservationStreamItem {
866 Event(RemoteSessionObservationEvent),
868 Gap {
870 observation: RemoteSessionObservation,
871 gap: RemoteLiveReplayGap,
872 },
873}
874
875pub struct RemoteSessionObservationEventStream {
876 inner: lash_core::LiveReplaySubscription,
877 next_sequence: u64,
878}
879
880impl RemoteSessionObservationEventStream {
881 fn new(inner: lash_core::LiveReplaySubscription) -> Self {
882 Self {
883 inner,
884 next_sequence: 0,
885 }
886 }
887
888 pub async fn next_event(&mut self) -> Result<RemoteSessionObservationEvent> {
889 futures_util::future::poll_fn(|cx| Pin::new(&mut *self).poll_next(cx))
890 .await
891 .transpose()?
892 .ok_or_else(|| live_replay_error(LiveReplayStoreError::Closed))
893 }
894}
895
896impl Stream for RemoteSessionObservationEventStream {
897 type Item = Result<RemoteSessionObservationEvent>;
898
899 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
900 match Pin::new(&mut self.inner).poll_next(cx) {
901 Poll::Pending => Poll::Pending,
902 Poll::Ready(Some(Ok(event))) => {
903 let remote = RemoteSessionObservationEvent::from_core(self.next_sequence, event);
904 self.next_sequence = self.next_sequence.saturating_add(1);
905 Poll::Ready(Some(Ok(remote)))
906 }
907 Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(live_replay_error(err)))),
908 Poll::Ready(None) => Poll::Ready(None),
909 }
910 }
911}
912
913pub struct RemoteSessionObservationStream {
915 inner: SessionObservationStream,
916 next_sequence: u64,
917}
918
919impl RemoteSessionObservationStream {
920 pub fn cursor(&self) -> RemoteSessionCursor {
921 RemoteSessionCursor::from(self.inner.cursor())
922 }
923}
924
925impl Stream for RemoteSessionObservationStream {
926 type Item = Result<RemoteSessionObservationStreamItem>;
927
928 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
929 match Pin::new(&mut self.inner).poll_next(cx) {
930 Poll::Pending => Poll::Pending,
931 Poll::Ready(Some(Ok(SessionObservationStreamItem::Event(event)))) => {
932 let remote = RemoteSessionObservationEvent::from_core(self.next_sequence, event);
933 self.next_sequence = self.next_sequence.saturating_add(1);
934 Poll::Ready(Some(Ok(RemoteSessionObservationStreamItem::Event(remote))))
935 }
936 Poll::Ready(Some(Ok(SessionObservationStreamItem::Gap { observation, gap }))) => {
937 Poll::Ready(Some(Ok(RemoteSessionObservationStreamItem::Gap {
938 observation: observation.into(),
939 gap: gap.into(),
940 })))
941 }
942 Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err))),
943 Poll::Ready(None) => Poll::Ready(None),
944 }
945 }
946}
947
948pub struct SessionObservationStream {
950 observable: ObservableSession,
951 cursor: SessionCursor,
952 subscription: Option<lash_core::LiveReplaySubscription>,
953 done: bool,
954}
955
956impl SessionObservationStream {
957 pub fn cursor(&self) -> &SessionCursor {
958 &self.cursor
959 }
960}
961
962impl Stream for SessionObservationStream {
963 type Item = Result<SessionObservationStreamItem>;
964
965 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
966 loop {
967 if self.done {
968 return Poll::Ready(None);
969 }
970 if self.subscription.is_none() {
971 match self.observable.subscribe_from_cursor(&self.cursor) {
972 Ok(SessionObservationSubscription::Subscribed(subscription)) => {
973 self.subscription = Some(subscription);
974 }
975 Ok(SessionObservationSubscription::Gap { observation, gap }) => {
976 self.cursor = gap.latest_cursor.clone();
977 return Poll::Ready(Some(Ok(SessionObservationStreamItem::Gap {
978 observation,
979 gap,
980 })));
981 }
982 Err(err) => {
983 self.done = true;
984 return Poll::Ready(Some(Err(err)));
985 }
986 }
987 }
988
989 let Some(subscription) = self.subscription.as_mut() else {
990 continue;
991 };
992 match Pin::new(subscription).poll_next(cx) {
993 Poll::Pending => return Poll::Pending,
994 Poll::Ready(Some(Ok(event))) => {
995 self.cursor = event.cursor.clone();
996 return Poll::Ready(Some(Ok(SessionObservationStreamItem::Event(event))));
997 }
998 Poll::Ready(Some(Err(LiveReplayStoreError::SubscriberLagged(_)))) => {
999 self.subscription = None;
1000 continue;
1001 }
1002 Poll::Ready(Some(Err(err))) => {
1003 self.done = true;
1004 return Poll::Ready(Some(Err(live_replay_error(err))));
1005 }
1006 Poll::Ready(None) => {
1007 self.done = true;
1008 return Poll::Ready(None);
1009 }
1010 }
1011 }
1012 }
1013}
1014
1015fn live_replay_error(err: lash_core::LiveReplayStoreError) -> EmbedError {
1016 EmbedError::Runtime(lash_core::RuntimeError::new(
1017 RuntimeErrorCode::Other("live_replay".to_string()),
1018 err.to_string(),
1019 ))
1020}
1021
1022pub struct EnqueueTurnBuilder<'a> {
1023 session: &'a LashSession,
1024 input: TurnInput,
1025 id: Option<String>,
1026 delivery_policy: DeliveryPolicy,
1027 slot_policy: SlotPolicy,
1028}
1029
1030impl<'a> EnqueueTurnBuilder<'a> {
1031 pub fn id(mut self, id: impl Into<String>) -> Self {
1032 self.id = Some(id.into());
1033 self
1034 }
1035
1036 pub fn delivery_policy(mut self, policy: DeliveryPolicy) -> Self {
1037 self.delivery_policy = policy;
1038 self
1039 }
1040
1041 pub fn slot_policy(mut self, policy: SlotPolicy) -> Self {
1042 self.slot_policy = policy;
1043 self
1044 }
1045
1046 pub async fn send(self) -> Result<QueuedWorkBatch> {
1047 let source_key = self.id.map(|id| format!("host:{id}"));
1048 self.session
1049 .runtime
1050 .enqueue_turn_input(
1051 self.input,
1052 self.delivery_policy,
1053 self.slot_policy,
1054 source_key,
1055 )
1056 .await
1057 .map_err(EmbedError::Runtime)
1058 }
1059}
1060
1061impl<'a> std::future::IntoFuture for EnqueueTurnBuilder<'a> {
1062 type Output = Result<QueuedWorkBatch>;
1063 type IntoFuture =
1064 std::pin::Pin<Box<dyn std::future::Future<Output = Result<QueuedWorkBatch>> + 'a>>;
1065
1066 fn into_future(self) -> Self::IntoFuture {
1067 Box::pin(self.send())
1068 }
1069}