1use std::pin::Pin;
2use std::task::{Context, Poll};
3
4use crate::support::*;
5use futures_util::Stream;
6use lash_core::runtime::{DeliveryPolicy, QueuedWorkBatch, SlotPolicy};
7use lash_core::{LiveReplayGap, LiveReplayStoreError, SessionObservationEvent};
8use lash_remote_protocol::{
9 RemoteLiveReplayGap, RemoteSessionCursor, RemoteSessionObservation,
10 RemoteSessionObservationEvent,
11};
12
13pub struct SessionBuilder {
14 pub(crate) core: LashCore,
15 pub(crate) session_id: String,
16 pub(crate) spec: SessionSpec,
17 pub(crate) parent_session_id: Option<String>,
18 pub(crate) session_execution_owner: Option<lash_core::LeaseOwnerIdentity>,
19 pub(crate) store: Option<Arc<dyn RuntimePersistence>>,
20 pub(crate) provider: Option<ProviderHandle>,
21 pub(crate) active_plugins: Vec<ActivePluginBinding>,
22 pub(crate) plugin_factories: Vec<Arc<dyn PluginFactory>>,
23}
24
25#[cfg(feature = "rlm")]
26pub struct RlmSessionBuilder {
27 pub(crate) builder: SessionBuilder,
28 pub(crate) rlm_final_answer_format: Option<lash_rlm_types::RlmFinalAnswerFormat>,
29}
30
31impl SessionBuilder {
32 pub fn provider(mut self, provider: ProviderHandle) -> Self {
33 self.spec = self.spec.provider_id(provider.kind());
34 self.provider = Some(provider);
35 self
36 }
37
38 pub fn session_spec(mut self, spec: SessionSpec) -> Self {
39 self.spec = spec;
40 self
41 }
42
43 pub fn parent(mut self, parent_session_id: impl Into<String>) -> Self {
44 self.parent_session_id = Some(parent_session_id.into());
45 self
46 }
47
48 pub fn session_execution_owner(mut self, owner: lash_core::LeaseOwnerIdentity) -> Self {
54 self.session_execution_owner = Some(owner);
55 self
56 }
57
58 pub fn store(mut self, store: Arc<dyn RuntimePersistence>) -> Self {
65 self.store = Some(store);
66 self
67 }
68
69 pub fn plugin<P: PluginBinding>(mut self, config: P::SessionConfig) -> Self {
70 self.active_plugins.push(ActivePluginBinding {
71 id: P::ID,
72 requires_turn_input: P::requires_turn_input(&config),
73 });
74 self.plugin_factories.push(P::factory(&config));
75 self
76 }
77
78 pub async fn open(self) -> Result<LashSession> {
79 let policy = self.session_policy();
80 let store = self.create_store(&policy).await?;
81 let state = self
82 .load_or_default_state(&policy, store.as_deref())
83 .await?;
84 self.open_resolved(policy, state, store).await
85 }
86
87 pub async fn open_fresh(self) -> Result<LashSession> {
97 let policy = self.session_policy();
98 let store = self.create_store(&policy).await?;
99 let state = RuntimeSessionState {
100 session_id: self.session_id.clone(),
101 policy: policy.clone(),
102 graph_replace_required: true,
103 ..RuntimeSessionState::default()
104 };
105 self.open_resolved(policy, state, store).await
106 }
107
108 pub async fn open_with_state(self, mut state: RuntimeSessionState) -> Result<LashSession> {
115 let policy = self.session_policy();
116 let store = self.create_store(&policy).await?;
117 if state.session_id != self.session_id {
118 return Err(EmbedError::StoreSessionMismatch {
119 loaded: state.session_id,
120 requested: self.session_id,
121 });
122 }
123 let recorded_provider_id = state.policy.recorded_provider_id().to_string();
124 state.policy = policy.clone();
125 state.policy.provider_id = recorded_provider_id;
126 self.open_resolved(policy, state, store).await
127 }
128
129 fn session_policy(&self) -> SessionPolicy {
130 let mut policy = self.spec.resolve_against(&self.core.policy);
131 policy.session_id = Some(self.session_id.clone());
132 policy
133 }
134
135 async fn load_or_default_state(
136 &self,
137 policy: &SessionPolicy,
138 store: Option<&dyn RuntimePersistence>,
139 ) -> Result<RuntimeSessionState> {
140 let state = match store {
141 Some(store) => {
142 let loaded = self.load_persisted_state_for_residency(store).await?;
143 let mut state = loaded.unwrap_or_else(|| RuntimeSessionState {
144 session_id: self.session_id.clone(),
145 policy: policy.clone(),
146 ..RuntimeSessionState::default()
147 });
148 if state.session_id != self.session_id {
149 return Err(EmbedError::StoreSessionMismatch {
150 loaded: state.session_id,
151 requested: self.session_id.clone(),
152 });
153 }
154 let recorded_provider_id = state.policy.recorded_provider_id().to_string();
155 state.policy = policy.clone();
156 state.policy.provider_id = recorded_provider_id;
157 state
158 }
159 None => RuntimeSessionState {
160 session_id: self.session_id.clone(),
161 policy: policy.clone(),
162 ..RuntimeSessionState::default()
163 },
164 };
165 Ok(state)
166 }
167
168 async fn load_persisted_state_for_residency(
169 &self,
170 store: &dyn RuntimePersistence,
171 ) -> Result<Option<RuntimeSessionState>> {
172 load_persisted_state_for_residency(self.core.env.residency, store).await
173 }
174
175 async fn open_resolved(
176 self,
177 policy: SessionPolicy,
178 state: RuntimeSessionState,
179 store: Option<Arc<dyn RuntimePersistence>>,
180 ) -> Result<LashSession> {
181 let mut env = self.core.env.clone();
182 if let Some(provider) = self.provider.clone().or_else(|| self.core.provider.clone()) {
183 env.core.providers.provider_resolver =
184 Arc::new(lash_core::SingleProviderResolver::new(provider));
185 }
186 let plugin_host = build_plugin_host(
187 self.core.protocol_factory.as_ref(),
188 self.core.plugin_factories.as_ref(),
189 self.plugin_factories,
190 )?;
191 env.core = self
192 .core
193 .runtime_host_for_plugin_host(env.core.clone(), &plugin_host)?;
194 env.plugin_host = Some(Arc::new(plugin_host));
195 let effect_host = Arc::clone(&env.core.control.effect_host);
196 let drivers = self.core.work_driver.drivers().await;
197 env.process_work_driver = drivers.process.clone();
198 env.queued_work_driver = drivers.queued.clone();
199 let mut runtime = LashRuntime::from_environment(&env, policy, state, store).await?;
200 if let Some(owner) = self.session_execution_owner {
201 runtime.set_runtime_lease_owner(owner);
202 }
203 if drivers.drive_process_on_open
204 && let Some(driver) = drivers.process.as_ref()
205 {
206 driver.claim_and_run_pending("session_open").await?;
207 }
208 let handle = RuntimeHandle::with_live_replay_store(
209 runtime,
210 Arc::clone(&self.core.live_replay_store),
211 );
212 Ok(LashSession {
213 runtime: handle,
214 effect_host,
215 parent_session_id: self.parent_session_id,
216 active_plugins: self.active_plugins,
217 process_phase_probe_slot: self.core.work_driver.phase_probe_slot(),
218 turn_cancels: crate::turn::TurnCancelRegistry::default(),
219 })
220 }
221
222 async fn create_store(
223 &self,
224 policy: &SessionPolicy,
225 ) -> Result<Option<Arc<dyn RuntimePersistence>>> {
226 if let Some(store) = self.store.as_ref() {
227 return Ok(Some(Arc::clone(store)));
228 }
229 let Some(factory) = self.core.store_factory.as_ref() else {
230 return Ok(None);
231 };
232 let request = SessionStoreCreateRequest {
233 session_id: self.session_id.clone(),
234 relation: self
235 .parent_session_id
236 .as_ref()
237 .map(|parent_session_id| lash_core::SessionRelation::Child {
238 parent_session_id: parent_session_id.clone(),
239 caused_by: None,
240 })
241 .unwrap_or_default(),
242 policy: policy.clone(),
243 };
244 factory
245 .create_store(&request)
246 .await
247 .map(Some)
248 .map_err(|message| EmbedError::StoreFactory {
249 session_id: self.session_id.clone(),
250 message,
251 })
252 }
253}
254
255pub(crate) async fn load_state_for_residency(
256 residency: Residency,
257 session_id: &str,
258 policy: &SessionPolicy,
259 store: &dyn RuntimePersistence,
260) -> Result<RuntimeSessionState> {
261 let mut state = load_persisted_state_for_residency(residency, store)
262 .await?
263 .unwrap_or_else(|| RuntimeSessionState {
264 session_id: session_id.to_string(),
265 policy: policy.clone(),
266 ..RuntimeSessionState::default()
267 });
268 if state.session_id != session_id {
269 return Err(EmbedError::StoreSessionMismatch {
270 loaded: state.session_id,
271 requested: session_id.to_string(),
272 });
273 }
274 let recorded_provider_id = state.policy.recorded_provider_id().to_string();
275 state.policy = policy.clone();
276 state.policy.provider_id = recorded_provider_id;
277 Ok(state)
278}
279
280async fn load_persisted_state_for_residency(
281 residency: Residency,
282 store: &dyn RuntimePersistence,
283) -> Result<Option<RuntimeSessionState>> {
284 match residency {
285 Residency::KeepAll => {
286 let loaded = lash_core::store::load_persisted_session_state(store)
287 .await
288 .map_err(|err| SessionError::Protocol(format!("failed to load store: {err}")))?;
289 Ok(loaded)
290 }
291 Residency::ActivePathOnly => {
292 let active = lash_core::store::load_persisted_session_state_active_path(store, None)
293 .await
294 .map_err(|err| {
295 SessionError::Protocol(format!("failed to load active-path store: {err}"))
296 })?;
297 if active
298 .as_ref()
299 .is_some_and(|state| state.session_graph.nodes.is_empty())
300 {
301 let mut full = lash_core::store::load_persisted_session_state(store)
302 .await
303 .map_err(|err| {
304 SessionError::Protocol(format!(
305 "failed to heal active-path store from full graph: {err}"
306 ))
307 })?;
308 if let Some(state) = full.as_mut() {
309 state.graph_replace_required = true;
310 }
311 return Ok(full);
312 }
313 Ok(active)
314 }
315 }
316}
317
318impl PromptLayerSink for SessionBuilder {
319 fn prompt_layer_mut(&mut self) -> &mut PromptLayer {
320 self.spec.prompt.get_or_insert_with(PromptLayer::new)
321 }
322}
323
324#[cfg(feature = "rlm")]
325impl RlmSessionBuilder {
326 pub fn provider(mut self, provider: ProviderHandle) -> Self {
327 self.builder = self.builder.provider(provider);
328 self
329 }
330
331 pub fn session_spec(mut self, spec: SessionSpec) -> Self {
332 self.builder = self.builder.session_spec(spec);
333 self
334 }
335
336 pub fn parent(mut self, parent_session_id: impl Into<String>) -> Self {
337 self.builder = self.builder.parent(parent_session_id);
338 self
339 }
340
341 pub fn session_execution_owner(mut self, owner: lash_core::LeaseOwnerIdentity) -> Self {
342 self.builder = self.builder.session_execution_owner(owner);
343 self
344 }
345
346 pub fn store(mut self, store: Arc<dyn RuntimePersistence>) -> Self {
347 self.builder = self.builder.store(store);
348 self
349 }
350
351 pub fn plugin<P: PluginBinding>(mut self, config: P::SessionConfig) -> Self {
352 self.builder = self.builder.plugin::<P>(config);
353 self
354 }
355
356 pub async fn open(self) -> Result<LashSession> {
357 self.open_resolved(RlmOpenState::Resume).await
358 }
359
360 pub async fn open_fresh(self) -> Result<LashSession> {
361 self.open_resolved(RlmOpenState::Fresh).await
362 }
363
364 pub async fn open_with_state(self, state: RuntimeSessionState) -> Result<LashSession> {
365 self.open_resolved(RlmOpenState::Explicit(state)).await
366 }
367
368 async fn open_resolved(self, open_state: RlmOpenState) -> Result<LashSession> {
369 let Self {
370 builder,
371 rlm_final_answer_format,
372 } = self;
373 let policy = builder.session_policy();
374 let store = builder.create_store(&policy).await?;
375 let mut state = match open_state {
376 RlmOpenState::Resume => {
377 builder
378 .load_or_default_state(&policy, store.as_deref())
379 .await?
380 }
381 RlmOpenState::Fresh => RuntimeSessionState {
382 session_id: builder.session_id.clone(),
383 policy: policy.clone(),
384 graph_replace_required: true,
385 ..RuntimeSessionState::default()
386 },
387 RlmOpenState::Explicit(mut state) => {
388 if state.session_id != builder.session_id {
389 return Err(EmbedError::StoreSessionMismatch {
390 loaded: state.session_id,
391 requested: builder.session_id.clone(),
392 });
393 }
394 let recorded_provider_id = state.policy.recorded_provider_id().to_string();
395 state.policy = policy.clone();
396 state.policy.provider_id = recorded_provider_id;
397 state
398 }
399 };
400 apply_rlm_session_options(
401 builder.parent_session_id.is_none(),
402 rlm_final_answer_format,
403 &mut state,
404 )?;
405 builder.open_resolved(policy, state, store).await
406 }
407}
408
409#[cfg(feature = "rlm")]
410impl PromptLayerSink for RlmSessionBuilder {
411 fn prompt_layer_mut(&mut self) -> &mut PromptLayer {
412 self.builder.prompt_layer_mut()
413 }
414}
415
416#[cfg(feature = "rlm")]
417#[allow(clippy::large_enum_variant)]
418enum RlmOpenState {
419 Resume,
420 Fresh,
421 Explicit(RuntimeSessionState),
422}
423
424#[cfg(feature = "rlm")]
425fn apply_rlm_session_options(
426 is_root_session: bool,
427 explicit_format: Option<lash_rlm_types::RlmFinalAnswerFormat>,
428 state: &mut RuntimeSessionState,
429) -> Result<()> {
430 let final_answer_format = explicit_format.unwrap_or({
431 if is_root_session {
432 lash_rlm_types::RlmFinalAnswerFormat::Markdown
433 } else {
434 lash_rlm_types::RlmFinalAnswerFormat::RawSubmitValue
435 }
436 });
437 let mut extras = if state.protocol_turn_options.is_empty() {
438 lash_rlm_types::RlmCreateExtras::default()
439 } else {
440 state.protocol_turn_options.decode()?
441 };
442 extras.final_answer_format = Some(final_answer_format);
443 let options = ProtocolTurnOptions::typed(extras)?;
444 state.protocol_turn_options = options.clone();
445 for frame in &mut state.agent_frames {
446 frame.protocol_turn_options = options.clone();
447 }
448 Ok(())
449}
450
451#[cfg(all(test, feature = "rlm"))]
452mod tests {
453 use super::*;
454
455 #[test]
456 fn apply_rlm_session_options_preserves_existing_termination() -> Result<()> {
457 let mut state = RuntimeSessionState {
458 protocol_turn_options: ProtocolTurnOptions::typed(lash_rlm_types::RlmCreateExtras {
459 termination: lash_rlm_types::RlmTermination::ProseOrSubmit,
460 final_answer_format: None,
461 })?,
462 ..Default::default()
463 };
464
465 apply_rlm_session_options(true, None, &mut state)?;
466
467 let extras: lash_rlm_types::RlmCreateExtras = state.protocol_turn_options.decode()?;
468 assert_eq!(
469 extras.termination,
470 lash_rlm_types::RlmTermination::ProseOrSubmit
471 );
472 assert_eq!(
473 extras.final_answer_format,
474 Some(lash_rlm_types::RlmFinalAnswerFormat::Markdown)
475 );
476 Ok(())
477 }
478}
479
480#[derive(Clone)]
481pub struct LashSession {
482 pub(crate) runtime: RuntimeHandle,
483 pub(crate) effect_host: Arc<dyn EffectHost>,
484 pub(crate) parent_session_id: Option<String>,
485 pub(crate) active_plugins: Vec<ActivePluginBinding>,
486 pub(crate) process_phase_probe_slot: Option<lash_core::runtime::RuntimeTurnPhaseProbeSlot>,
487 pub(crate) turn_cancels: crate::turn::TurnCancelRegistry,
488}
489
490#[derive(Clone, Debug, Default)]
491pub struct SessionConfigPatch {
492 pub provider: Option<ProviderHandle>,
493 pub model: Option<ModelSpec>,
494 pub prompt: Option<PromptLayer>,
495}
496
497impl LashSession {
498 pub async fn close(self) -> Result<()> {
499 let runtime = self.runtime.writer();
500 let runtime = runtime.lock().await;
501 runtime.unregister_plugin_session()?;
502 Ok(())
503 }
504
505 pub fn session_id(&self) -> String {
506 self.runtime.observe().session_id().to_string()
507 }
508
509 pub fn policy_snapshot(&self) -> SessionPolicy {
510 self.runtime.observe().policy.clone()
511 }
512
513 pub fn observe(&self) -> ObservableSession {
514 ObservableSession {
515 runtime: self.runtime.clone(),
516 }
517 }
518
519 pub fn parent_session_id(&self) -> Option<&str> {
520 self.parent_session_id.as_deref()
521 }
522
523 pub fn effect_host(&self) -> Arc<dyn EffectHost> {
524 Arc::clone(&self.effect_host)
525 }
526
527 pub fn turn(&self, input: TurnInput) -> TurnBuilder {
528 TurnBuilder {
529 runtime: self.runtime.clone(),
530 effect_host: Arc::clone(&self.effect_host),
531 active_plugins: self.active_plugins.clone(),
532 input,
533 cancel: CancellationToken::new(),
534 cancels: self.turn_cancels.clone(),
535 protocol_turn_options: None,
536 provider: None,
537 model: None,
538 turn_id: None,
539 }
540 }
541
542 pub fn queued_turn(&self) -> QueuedTurnBuilder {
543 QueuedTurnBuilder {
544 runtime: self.runtime.clone(),
545 effect_host: Arc::clone(&self.effect_host),
546 cancel: CancellationToken::new(),
547 cancels: self.turn_cancels.clone(),
548 batch_ids: Vec::new(),
549 drain_id: None,
550 }
551 }
552
553 pub fn cancel_running_turns(&self) -> usize {
568 self.turn_cancels.cancel_all()
569 }
570
571 pub fn admin(&self) -> SessionAdmin {
572 SessionAdmin {
573 runtime: self.runtime.clone(),
574 }
575 }
576
577 pub async fn configure(&self, patch: SessionConfigPatch) -> Result<()> {
578 self.admin().config().update(patch).await
579 }
580
581 pub fn tools(&self) -> ToolAdmin {
582 ToolAdmin::new(self.admin())
583 }
584
585 pub fn commands(&self) -> SessionCommandAdmin {
586 self.admin().commands()
587 }
588
589 pub fn triggers(&self) -> SessionTriggerAdmin {
590 self.admin().triggers()
591 }
592
593 pub fn processes(&self) -> SessionProcessAdmin {
594 SessionProcessAdmin::new(self.admin())
595 }
596
597 pub fn plugin_operations(&self) -> PluginOperations {
598 PluginOperations {
599 control: self.admin(),
600 }
601 }
602
603 pub fn enqueue(&self, input: TurnInput) -> EnqueueTurnBuilder<'_> {
604 EnqueueTurnBuilder {
605 session: self,
606 input,
607 id: None,
608 delivery_policy: DeliveryPolicy::AfterCurrentTurnCommit,
609 slot_policy: SlotPolicy::Exclusive,
610 }
611 }
612
613 pub async fn queued_work(&self) -> Result<Vec<QueuedWorkBatch>> {
614 let observation = self.runtime.observe();
615 let store = observation.queue_store.as_ref().ok_or_else(|| {
616 EmbedError::Runtime(lash_core::RuntimeError::new(
617 lash_core::RuntimeErrorCode::StoreCommitFailed,
618 "queued work inspection requires a persistent runtime store",
619 ))
620 })?;
621 store
622 .list_pending_queued_work(observation.session_id())
623 .await
624 .map_err(|err| {
625 EmbedError::Runtime(lash_core::RuntimeError::new(
626 lash_core::RuntimeErrorCode::StoreCommitFailed,
627 err.to_string(),
628 ))
629 })
630 }
631
632 pub async fn cancel_queued_work_batch(
633 &self,
634 batch_id: &str,
635 ) -> Result<Option<QueuedWorkBatch>> {
636 let session_id = self.session_id();
637 self.runtime
638 .cancel_queued_work_batch(&session_id, batch_id)
639 .await
640 .map_err(EmbedError::Runtime)
641 }
642
643 pub async fn await_queued_work_batch(&self, batch_id: &str) -> Result<()> {
655 let observation = self.runtime.observe();
656 let store = observation.queue_store.clone().ok_or_else(|| {
657 EmbedError::Runtime(lash_core::RuntimeError::new(
658 lash_core::RuntimeErrorCode::StoreCommitFailed,
659 "queued work inspection requires a persistent runtime store",
660 ))
661 })?;
662 let session_id = observation.session_id().to_string();
663 drop(observation);
664 let mut delay = std::time::Duration::from_millis(25);
665 loop {
666 let pending = store
667 .list_pending_queued_work(&session_id)
668 .await
669 .map_err(|err| {
670 EmbedError::Runtime(lash_core::RuntimeError::new(
671 lash_core::RuntimeErrorCode::StoreCommitFailed,
672 err.to_string(),
673 ))
674 })?;
675 if !pending.iter().any(|batch| batch.batch_id == batch_id) {
676 return Ok(());
677 }
678 tokio::time::sleep(delay).await;
679 delay = (delay * 2).min(std::time::Duration::from_millis(400));
680 }
681 }
682
683 pub fn read_view(&self) -> SessionReadView {
684 self.runtime.observe().read_view.clone()
685 }
686
687 pub fn usage_report(&self) -> SessionUsageReport {
688 self.runtime.observe().usage_report.clone()
689 }
690
691 pub async fn set_turn_phase_probe(
692 &self,
693 probe: Arc<dyn lash_core::runtime::RuntimeTurnPhaseProbe>,
694 ) {
695 let writer = self.runtime.writer();
696 let mut runtime = writer.lock().await;
697 runtime.set_turn_phase_probe(Arc::clone(&probe));
698 self.runtime.publish_from(&runtime);
699 if let Some(slot) = &self.process_phase_probe_slot {
700 let observation = self.runtime.observe();
701 slot.set_for_session(observation.session_id(), Arc::clone(&probe));
702 let current_frame = observation.persisted_state.current_agent_frame_id.as_str();
703 if !current_frame.is_empty() {
704 let scope = lash_core::SessionScope::for_agent_frame(
705 observation.session_id(),
706 current_frame,
707 );
708 slot.set_for_scope(&scope, probe);
709 }
710 }
711 }
712}
713
714#[derive(Clone)]
715pub struct ObservableSession {
716 pub(crate) runtime: RuntimeHandle,
717}
718
719impl ObservableSession {
720 fn snapshot(&self) -> Arc<RuntimeObservation> {
721 self.runtime.observe()
722 }
723
724 pub fn current_observation(&self) -> SessionObservation {
725 self.runtime.current_session_observation()
726 }
727
728 pub fn current_remote_observation(&self) -> RemoteSessionObservation {
729 RemoteSessionObservation::from_core(self.current_observation())
730 }
731
732 pub fn resume_from_cursor(&self, cursor: &SessionCursor) -> Result<SessionResume> {
733 self.runtime
734 .resume_session_observation(cursor)
735 .map_err(live_replay_error)
736 }
737
738 pub fn subscribe_from_cursor(
739 &self,
740 cursor: &SessionCursor,
741 ) -> Result<SessionObservationSubscription> {
742 self.runtime
743 .subscribe_session_observation(cursor)
744 .map_err(live_replay_error)
745 }
746
747 pub fn subscribe_from_remote_cursor(
748 &self,
749 cursor: &RemoteSessionCursor,
750 ) -> Result<RemoteSessionObservationSubscription> {
751 cursor.validate()?;
752 let cursor = lash_core::SessionCursor::try_from(cursor.clone())?;
753 match self.subscribe_from_cursor(&cursor)? {
754 SessionObservationSubscription::Subscribed(subscription) => {
755 Ok(RemoteSessionObservationSubscription::Subscribed(
756 RemoteSessionObservationEventStream::new(subscription),
757 ))
758 }
759 SessionObservationSubscription::Gap { observation, gap } => {
760 Ok(RemoteSessionObservationSubscription::Gap {
761 observation: observation.into(),
762 gap: gap.into(),
763 })
764 }
765 }
766 }
767
768 pub fn subscribe_and_recover(&self, cursor: SessionCursor) -> SessionObservationStream {
777 SessionObservationStream {
778 observable: self.clone(),
779 cursor,
780 subscription: None,
781 done: false,
782 }
783 }
784
785 pub fn subscribe_and_recover_remote(
788 &self,
789 cursor: RemoteSessionCursor,
790 ) -> Result<RemoteSessionObservationStream> {
791 cursor.validate()?;
792 let cursor = lash_core::SessionCursor::try_from(cursor)?;
793 Ok(RemoteSessionObservationStream {
794 inner: self.subscribe_and_recover(cursor),
795 next_sequence: 0,
796 })
797 }
798
799 pub fn session_id(&self) -> String {
800 self.snapshot().session_id().to_string()
801 }
802
803 pub fn policy_snapshot(&self) -> SessionPolicy {
804 self.snapshot().policy.clone()
805 }
806
807 pub fn read_view(&self) -> SessionReadView {
808 self.snapshot().read_view.clone()
809 }
810
811 pub fn usage_report(&self) -> SessionUsageReport {
812 self.snapshot().usage_report.clone()
813 }
814
815 pub fn tool_state(&self) -> Option<ToolState> {
816 self.snapshot().tool_state.clone()
817 }
818
819 pub fn active_tool_manifests(&self) -> Vec<ToolManifest> {
820 self.snapshot()
821 .tool_state
822 .as_ref()
823 .map(ToolState::tool_manifests)
824 .unwrap_or_default()
825 }
826
827 pub async fn list_process_handles(&self) -> Vec<ProcessHandleSummary> {
828 self.snapshot().list_process_handles().await
829 }
830
831 pub async fn list_all_process_handles(&self) -> Vec<ProcessHandleSummary> {
832 self.snapshot().list_all_process_handles().await
833 }
834
835 pub fn process_scope(&self) -> SessionScope {
836 self.snapshot().process_scope()
837 }
838}
839
840#[derive(Clone, Debug)]
841pub enum SessionObservationStreamItem {
842 Event(SessionObservationEvent),
844 Gap {
846 observation: SessionObservation,
847 gap: LiveReplayGap,
848 },
849}
850
851pub enum RemoteSessionObservationSubscription {
852 Subscribed(RemoteSessionObservationEventStream),
853 Gap {
854 observation: RemoteSessionObservation,
855 gap: RemoteLiveReplayGap,
856 },
857}
858
859#[derive(Clone, Debug)]
860pub enum RemoteSessionObservationStreamItem {
861 Event(RemoteSessionObservationEvent),
863 Gap {
865 observation: RemoteSessionObservation,
866 gap: RemoteLiveReplayGap,
867 },
868}
869
870pub struct RemoteSessionObservationEventStream {
871 inner: lash_core::LiveReplaySubscription,
872 next_sequence: u64,
873}
874
875impl RemoteSessionObservationEventStream {
876 fn new(inner: lash_core::LiveReplaySubscription) -> Self {
877 Self {
878 inner,
879 next_sequence: 0,
880 }
881 }
882
883 pub async fn next_event(&mut self) -> Result<RemoteSessionObservationEvent> {
884 futures_util::future::poll_fn(|cx| Pin::new(&mut *self).poll_next(cx))
885 .await
886 .transpose()?
887 .ok_or_else(|| live_replay_error(LiveReplayStoreError::Closed))
888 }
889}
890
891impl Stream for RemoteSessionObservationEventStream {
892 type Item = Result<RemoteSessionObservationEvent>;
893
894 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
895 match Pin::new(&mut self.inner).poll_next(cx) {
896 Poll::Pending => Poll::Pending,
897 Poll::Ready(Some(Ok(event))) => {
898 let remote = RemoteSessionObservationEvent::from_core(self.next_sequence, event);
899 self.next_sequence = self.next_sequence.saturating_add(1);
900 Poll::Ready(Some(Ok(remote)))
901 }
902 Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(live_replay_error(err)))),
903 Poll::Ready(None) => Poll::Ready(None),
904 }
905 }
906}
907
908pub struct RemoteSessionObservationStream {
910 inner: SessionObservationStream,
911 next_sequence: u64,
912}
913
914impl RemoteSessionObservationStream {
915 pub fn cursor(&self) -> RemoteSessionCursor {
916 RemoteSessionCursor::from(self.inner.cursor())
917 }
918}
919
920impl Stream for RemoteSessionObservationStream {
921 type Item = Result<RemoteSessionObservationStreamItem>;
922
923 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
924 match Pin::new(&mut self.inner).poll_next(cx) {
925 Poll::Pending => Poll::Pending,
926 Poll::Ready(Some(Ok(SessionObservationStreamItem::Event(event)))) => {
927 let remote = RemoteSessionObservationEvent::from_core(self.next_sequence, event);
928 self.next_sequence = self.next_sequence.saturating_add(1);
929 Poll::Ready(Some(Ok(RemoteSessionObservationStreamItem::Event(remote))))
930 }
931 Poll::Ready(Some(Ok(SessionObservationStreamItem::Gap { observation, gap }))) => {
932 Poll::Ready(Some(Ok(RemoteSessionObservationStreamItem::Gap {
933 observation: observation.into(),
934 gap: gap.into(),
935 })))
936 }
937 Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err))),
938 Poll::Ready(None) => Poll::Ready(None),
939 }
940 }
941}
942
943pub struct SessionObservationStream {
945 observable: ObservableSession,
946 cursor: SessionCursor,
947 subscription: Option<lash_core::LiveReplaySubscription>,
948 done: bool,
949}
950
951impl SessionObservationStream {
952 pub fn cursor(&self) -> &SessionCursor {
953 &self.cursor
954 }
955}
956
957impl Stream for SessionObservationStream {
958 type Item = Result<SessionObservationStreamItem>;
959
960 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
961 loop {
962 if self.done {
963 return Poll::Ready(None);
964 }
965 if self.subscription.is_none() {
966 match self.observable.subscribe_from_cursor(&self.cursor) {
967 Ok(SessionObservationSubscription::Subscribed(subscription)) => {
968 self.subscription = Some(subscription);
969 }
970 Ok(SessionObservationSubscription::Gap { observation, gap }) => {
971 self.cursor = gap.latest_cursor.clone();
972 return Poll::Ready(Some(Ok(SessionObservationStreamItem::Gap {
973 observation,
974 gap,
975 })));
976 }
977 Err(err) => {
978 self.done = true;
979 return Poll::Ready(Some(Err(err)));
980 }
981 }
982 }
983
984 let Some(subscription) = self.subscription.as_mut() else {
985 continue;
986 };
987 match Pin::new(subscription).poll_next(cx) {
988 Poll::Pending => return Poll::Pending,
989 Poll::Ready(Some(Ok(event))) => {
990 self.cursor = event.cursor.clone();
991 return Poll::Ready(Some(Ok(SessionObservationStreamItem::Event(event))));
992 }
993 Poll::Ready(Some(Err(LiveReplayStoreError::SubscriberLagged(_)))) => {
994 self.subscription = None;
995 continue;
996 }
997 Poll::Ready(Some(Err(err))) => {
998 self.done = true;
999 return Poll::Ready(Some(Err(live_replay_error(err))));
1000 }
1001 Poll::Ready(None) => {
1002 self.done = true;
1003 return Poll::Ready(None);
1004 }
1005 }
1006 }
1007 }
1008}
1009
1010fn live_replay_error(err: lash_core::LiveReplayStoreError) -> EmbedError {
1011 EmbedError::Runtime(lash_core::RuntimeError::new(
1012 RuntimeErrorCode::Other("live_replay".to_string()),
1013 err.to_string(),
1014 ))
1015}
1016
1017pub struct EnqueueTurnBuilder<'a> {
1018 session: &'a LashSession,
1019 input: TurnInput,
1020 id: Option<String>,
1021 delivery_policy: DeliveryPolicy,
1022 slot_policy: SlotPolicy,
1023}
1024
1025impl<'a> EnqueueTurnBuilder<'a> {
1026 pub fn id(mut self, id: impl Into<String>) -> Self {
1027 self.id = Some(id.into());
1028 self
1029 }
1030
1031 pub fn delivery_policy(mut self, policy: DeliveryPolicy) -> Self {
1032 self.delivery_policy = policy;
1033 self
1034 }
1035
1036 pub fn slot_policy(mut self, policy: SlotPolicy) -> Self {
1037 self.slot_policy = policy;
1038 self
1039 }
1040
1041 pub async fn send(self) -> Result<QueuedWorkBatch> {
1042 let source_key = self.id.map(|id| format!("host:{id}"));
1043 self.session
1044 .runtime
1045 .enqueue_turn_input(
1046 self.input,
1047 self.delivery_policy,
1048 self.slot_policy,
1049 source_key,
1050 )
1051 .await
1052 .map_err(EmbedError::Runtime)
1053 }
1054}
1055
1056impl<'a> std::future::IntoFuture for EnqueueTurnBuilder<'a> {
1057 type Output = Result<QueuedWorkBatch>;
1058 type IntoFuture =
1059 std::pin::Pin<Box<dyn std::future::Future<Output = Result<QueuedWorkBatch>> + 'a>>;
1060
1061 fn into_future(self) -> Self::IntoFuture {
1062 Box::pin(self.send())
1063 }
1064}