1use std::pin::Pin;
2use std::task::{Context, Poll};
3
4use crate::support::*;
5use futures_util::Stream;
6use lash_core::runtime::{DeliveryPolicy, QueuedWorkBatch, SlotPolicy};
7use lash_core::{LiveReplayGap, LiveReplayStoreError, SessionObservationEvent};
8use lash_remote_protocol::{
9 RemoteLiveReplayGap, RemoteSessionCursor, RemoteSessionObservation,
10 RemoteSessionObservationEvent,
11};
12
13pub struct SessionBuilder {
14 pub(crate) core: LashCore,
15 pub(crate) session_id: String,
16 pub(crate) spec: SessionSpec,
17 pub(crate) parent_session_id: Option<String>,
18 pub(crate) session_execution_owner_id: Option<String>,
19 pub(crate) store: Option<Arc<dyn RuntimePersistence>>,
20 pub(crate) provider: Option<ProviderHandle>,
21 pub(crate) active_plugins: Vec<ActivePluginBinding>,
22 pub(crate) plugin_factories: Vec<Arc<dyn PluginFactory>>,
23}
24
25#[cfg(feature = "rlm")]
26pub struct RlmSessionBuilder {
27 pub(crate) builder: SessionBuilder,
28 pub(crate) rlm_final_answer_format: Option<lash_rlm_types::RlmFinalAnswerFormat>,
29}
30
31impl SessionBuilder {
32 pub fn provider(mut self, provider: ProviderHandle) -> Self {
33 self.spec = self.spec.provider_id(provider.kind());
34 self.provider = Some(provider);
35 self
36 }
37
38 pub fn session_spec(mut self, spec: SessionSpec) -> Self {
39 self.spec = spec;
40 self
41 }
42
43 pub fn parent(mut self, parent_session_id: impl Into<String>) -> Self {
44 self.parent_session_id = Some(parent_session_id.into());
45 self
46 }
47
48 pub fn session_execution_owner_id(mut self, owner_id: impl Into<String>) -> Self {
56 self.session_execution_owner_id = Some(owner_id.into());
57 self
58 }
59
60 pub fn store(mut self, store: Arc<dyn RuntimePersistence>) -> Self {
67 self.store = Some(store);
68 self
69 }
70
71 pub fn plugin<P: PluginBinding>(mut self, config: P::SessionConfig) -> Self {
72 self.active_plugins.push(ActivePluginBinding {
73 id: P::ID,
74 requires_turn_input: P::requires_turn_input(&config),
75 });
76 self.plugin_factories.push(P::factory(&config));
77 self
78 }
79
80 pub async fn open(self) -> Result<LashSession> {
81 let policy = self.session_policy();
82 let store = self.create_store(&policy).await?;
83 let state = self
84 .load_or_default_state(&policy, store.as_deref())
85 .await?;
86 self.open_resolved(policy, state, store).await
87 }
88
89 pub async fn open_fresh(self) -> Result<LashSession> {
99 let policy = self.session_policy();
100 let store = self.create_store(&policy).await?;
101 let state = RuntimeSessionState {
102 session_id: self.session_id.clone(),
103 policy: policy.clone(),
104 graph_replace_required: true,
105 ..RuntimeSessionState::default()
106 };
107 self.open_resolved(policy, state, store).await
108 }
109
110 pub async fn open_with_state(self, mut state: RuntimeSessionState) -> Result<LashSession> {
117 let policy = self.session_policy();
118 let store = self.create_store(&policy).await?;
119 if state.session_id != self.session_id {
120 return Err(EmbedError::StoreSessionMismatch {
121 loaded: state.session_id,
122 requested: self.session_id,
123 });
124 }
125 let recorded_provider_id = state.policy.recorded_provider_id().to_string();
126 state.policy = policy.clone();
127 state.policy.provider_id = recorded_provider_id;
128 self.open_resolved(policy, state, store).await
129 }
130
131 fn session_policy(&self) -> SessionPolicy {
132 let mut policy = self.spec.resolve_against(&self.core.policy);
133 policy.session_id = Some(self.session_id.clone());
134 policy
135 }
136
137 async fn load_or_default_state(
138 &self,
139 policy: &SessionPolicy,
140 store: Option<&dyn RuntimePersistence>,
141 ) -> Result<RuntimeSessionState> {
142 let state = match store {
143 Some(store) => {
144 let loaded = self.load_persisted_state_for_residency(store).await?;
145 let mut state = loaded.unwrap_or_else(|| RuntimeSessionState {
146 session_id: self.session_id.clone(),
147 policy: policy.clone(),
148 ..RuntimeSessionState::default()
149 });
150 if state.session_id != self.session_id {
151 return Err(EmbedError::StoreSessionMismatch {
152 loaded: state.session_id,
153 requested: self.session_id.clone(),
154 });
155 }
156 let recorded_provider_id = state.policy.recorded_provider_id().to_string();
157 state.policy = policy.clone();
158 state.policy.provider_id = recorded_provider_id;
159 state
160 }
161 None => RuntimeSessionState {
162 session_id: self.session_id.clone(),
163 policy: policy.clone(),
164 ..RuntimeSessionState::default()
165 },
166 };
167 Ok(state)
168 }
169
170 async fn load_persisted_state_for_residency(
171 &self,
172 store: &dyn RuntimePersistence,
173 ) -> Result<Option<RuntimeSessionState>> {
174 load_persisted_state_for_residency(self.core.env.residency, store).await
175 }
176
177 async fn open_resolved(
178 self,
179 policy: SessionPolicy,
180 state: RuntimeSessionState,
181 store: Option<Arc<dyn RuntimePersistence>>,
182 ) -> Result<LashSession> {
183 let mut env = self.core.env.clone();
184 if let Some(provider) = self.provider.clone().or_else(|| self.core.provider.clone()) {
185 env.core.providers.provider_resolver =
186 Arc::new(lash_core::SingleProviderResolver::new(provider));
187 }
188 let plugin_host = build_plugin_host(
189 self.core.protocol_factory.as_ref(),
190 self.core.plugin_factories.as_ref(),
191 self.plugin_factories,
192 )?;
193 env.core = self
194 .core
195 .runtime_host_for_plugin_host(env.core.clone(), &plugin_host)?;
196 env.plugin_host = Some(Arc::new(plugin_host));
197 let effect_host = Arc::clone(&env.core.control.effect_host);
198 let drivers = self.core.work_driver.drivers().await;
199 env.process_work_driver = drivers.process.clone();
200 env.queued_work_driver = drivers.queued.clone();
201 let mut runtime = LashRuntime::from_environment(&env, policy, state, store).await?;
202 if let Some(owner_id) = self.session_execution_owner_id {
203 runtime.set_runtime_scope_id(owner_id);
204 }
205 if drivers.drive_process_on_open
206 && let Some(driver) = drivers.process.as_ref()
207 {
208 driver.claim_and_run_pending("session_open").await?;
209 }
210 let handle = RuntimeHandle::with_live_replay_store(
211 runtime,
212 Arc::clone(&self.core.live_replay_store),
213 );
214 Ok(LashSession {
215 runtime: handle,
216 effect_host,
217 parent_session_id: self.parent_session_id,
218 active_plugins: self.active_plugins,
219 process_phase_probe_slot: self.core.work_driver.phase_probe_slot(),
220 turn_cancels: crate::turn::TurnCancelRegistry::default(),
221 })
222 }
223
224 async fn create_store(
225 &self,
226 policy: &SessionPolicy,
227 ) -> Result<Option<Arc<dyn RuntimePersistence>>> {
228 if let Some(store) = self.store.as_ref() {
229 return Ok(Some(Arc::clone(store)));
230 }
231 let Some(factory) = self.core.store_factory.as_ref() else {
232 return Ok(None);
233 };
234 let request = SessionStoreCreateRequest {
235 session_id: self.session_id.clone(),
236 relation: self
237 .parent_session_id
238 .as_ref()
239 .map(|parent_session_id| lash_core::SessionRelation::Child {
240 parent_session_id: parent_session_id.clone(),
241 caused_by: None,
242 })
243 .unwrap_or_default(),
244 policy: policy.clone(),
245 };
246 factory
247 .create_store(&request)
248 .await
249 .map(Some)
250 .map_err(|message| EmbedError::StoreFactory {
251 session_id: self.session_id.clone(),
252 message,
253 })
254 }
255}
256
257pub(crate) async fn load_state_for_residency(
258 residency: Residency,
259 session_id: &str,
260 policy: &SessionPolicy,
261 store: &dyn RuntimePersistence,
262) -> Result<RuntimeSessionState> {
263 let mut state = load_persisted_state_for_residency(residency, store)
264 .await?
265 .unwrap_or_else(|| RuntimeSessionState {
266 session_id: session_id.to_string(),
267 policy: policy.clone(),
268 ..RuntimeSessionState::default()
269 });
270 if state.session_id != session_id {
271 return Err(EmbedError::StoreSessionMismatch {
272 loaded: state.session_id,
273 requested: session_id.to_string(),
274 });
275 }
276 let recorded_provider_id = state.policy.recorded_provider_id().to_string();
277 state.policy = policy.clone();
278 state.policy.provider_id = recorded_provider_id;
279 Ok(state)
280}
281
282async fn load_persisted_state_for_residency(
283 residency: Residency,
284 store: &dyn RuntimePersistence,
285) -> Result<Option<RuntimeSessionState>> {
286 match residency {
287 Residency::KeepAll => {
288 let loaded = lash_core::store::load_persisted_session_state(store)
289 .await
290 .map_err(|err| SessionError::Protocol(format!("failed to load store: {err}")))?;
291 Ok(loaded)
292 }
293 Residency::ActivePathOnly => {
294 let active = lash_core::store::load_persisted_session_state_active_path(store, None)
295 .await
296 .map_err(|err| {
297 SessionError::Protocol(format!("failed to load active-path store: {err}"))
298 })?;
299 if active
300 .as_ref()
301 .is_some_and(|state| state.session_graph.nodes.is_empty())
302 {
303 let mut full = lash_core::store::load_persisted_session_state(store)
304 .await
305 .map_err(|err| {
306 SessionError::Protocol(format!(
307 "failed to heal active-path store from full graph: {err}"
308 ))
309 })?;
310 if let Some(state) = full.as_mut() {
311 state.graph_replace_required = true;
312 }
313 return Ok(full);
314 }
315 Ok(active)
316 }
317 }
318}
319
320impl PromptLayerSink for SessionBuilder {
321 fn prompt_layer_mut(&mut self) -> &mut PromptLayer {
322 self.spec.prompt.get_or_insert_with(PromptLayer::new)
323 }
324}
325
326#[cfg(feature = "rlm")]
327impl RlmSessionBuilder {
328 pub fn provider(mut self, provider: ProviderHandle) -> Self {
329 self.builder = self.builder.provider(provider);
330 self
331 }
332
333 pub fn session_spec(mut self, spec: SessionSpec) -> Self {
334 self.builder = self.builder.session_spec(spec);
335 self
336 }
337
338 pub fn parent(mut self, parent_session_id: impl Into<String>) -> Self {
339 self.builder = self.builder.parent(parent_session_id);
340 self
341 }
342
343 pub fn session_execution_owner_id(mut self, owner_id: impl Into<String>) -> Self {
344 self.builder = self.builder.session_execution_owner_id(owner_id);
345 self
346 }
347
348 pub fn store(mut self, store: Arc<dyn RuntimePersistence>) -> Self {
349 self.builder = self.builder.store(store);
350 self
351 }
352
353 pub fn plugin<P: PluginBinding>(mut self, config: P::SessionConfig) -> Self {
354 self.builder = self.builder.plugin::<P>(config);
355 self
356 }
357
358 pub async fn open(self) -> Result<LashSession> {
359 self.open_resolved(RlmOpenState::Resume).await
360 }
361
362 pub async fn open_fresh(self) -> Result<LashSession> {
363 self.open_resolved(RlmOpenState::Fresh).await
364 }
365
366 pub async fn open_with_state(self, state: RuntimeSessionState) -> Result<LashSession> {
367 self.open_resolved(RlmOpenState::Explicit(state)).await
368 }
369
370 async fn open_resolved(self, open_state: RlmOpenState) -> Result<LashSession> {
371 let Self {
372 builder,
373 rlm_final_answer_format,
374 } = self;
375 let policy = builder.session_policy();
376 let store = builder.create_store(&policy).await?;
377 let mut state = match open_state {
378 RlmOpenState::Resume => {
379 builder
380 .load_or_default_state(&policy, store.as_deref())
381 .await?
382 }
383 RlmOpenState::Fresh => RuntimeSessionState {
384 session_id: builder.session_id.clone(),
385 policy: policy.clone(),
386 graph_replace_required: true,
387 ..RuntimeSessionState::default()
388 },
389 RlmOpenState::Explicit(mut state) => {
390 if state.session_id != builder.session_id {
391 return Err(EmbedError::StoreSessionMismatch {
392 loaded: state.session_id,
393 requested: builder.session_id.clone(),
394 });
395 }
396 let recorded_provider_id = state.policy.recorded_provider_id().to_string();
397 state.policy = policy.clone();
398 state.policy.provider_id = recorded_provider_id;
399 state
400 }
401 };
402 apply_rlm_session_options(
403 builder.parent_session_id.is_none(),
404 rlm_final_answer_format,
405 &mut state,
406 )?;
407 builder.open_resolved(policy, state, store).await
408 }
409}
410
411#[cfg(feature = "rlm")]
412impl PromptLayerSink for RlmSessionBuilder {
413 fn prompt_layer_mut(&mut self) -> &mut PromptLayer {
414 self.builder.prompt_layer_mut()
415 }
416}
417
418#[cfg(feature = "rlm")]
419#[allow(clippy::large_enum_variant)]
420enum RlmOpenState {
421 Resume,
422 Fresh,
423 Explicit(RuntimeSessionState),
424}
425
426#[cfg(feature = "rlm")]
427fn apply_rlm_session_options(
428 is_root_session: bool,
429 explicit_format: Option<lash_rlm_types::RlmFinalAnswerFormat>,
430 state: &mut RuntimeSessionState,
431) -> Result<()> {
432 let final_answer_format = explicit_format.unwrap_or({
433 if is_root_session {
434 lash_rlm_types::RlmFinalAnswerFormat::Markdown
435 } else {
436 lash_rlm_types::RlmFinalAnswerFormat::RawSubmitValue
437 }
438 });
439 let mut extras = if state.protocol_turn_options.is_empty() {
440 lash_rlm_types::RlmCreateExtras::default()
441 } else {
442 state.protocol_turn_options.decode()?
443 };
444 extras.final_answer_format = Some(final_answer_format);
445 let options = ProtocolTurnOptions::typed(extras)?;
446 state.protocol_turn_options = options.clone();
447 for frame in &mut state.agent_frames {
448 frame.protocol_turn_options = options.clone();
449 }
450 Ok(())
451}
452
453#[cfg(all(test, feature = "rlm"))]
454mod tests {
455 use super::*;
456
457 #[test]
458 fn apply_rlm_session_options_preserves_existing_termination() -> Result<()> {
459 let mut state = RuntimeSessionState {
460 protocol_turn_options: ProtocolTurnOptions::typed(lash_rlm_types::RlmCreateExtras {
461 termination: lash_rlm_types::RlmTermination::ProseOrSubmit,
462 final_answer_format: None,
463 })?,
464 ..Default::default()
465 };
466
467 apply_rlm_session_options(true, None, &mut state)?;
468
469 let extras: lash_rlm_types::RlmCreateExtras = state.protocol_turn_options.decode()?;
470 assert_eq!(
471 extras.termination,
472 lash_rlm_types::RlmTermination::ProseOrSubmit
473 );
474 assert_eq!(
475 extras.final_answer_format,
476 Some(lash_rlm_types::RlmFinalAnswerFormat::Markdown)
477 );
478 Ok(())
479 }
480}
481
482#[derive(Clone)]
483pub struct LashSession {
484 pub(crate) runtime: RuntimeHandle,
485 pub(crate) effect_host: Arc<dyn EffectHost>,
486 pub(crate) parent_session_id: Option<String>,
487 pub(crate) active_plugins: Vec<ActivePluginBinding>,
488 pub(crate) process_phase_probe_slot: Option<lash_core::runtime::RuntimeTurnPhaseProbeSlot>,
489 pub(crate) turn_cancels: crate::turn::TurnCancelRegistry,
490}
491
492#[derive(Clone, Debug, Default)]
493pub struct SessionConfigPatch {
494 pub provider: Option<ProviderHandle>,
495 pub model: Option<ModelSpec>,
496 pub prompt: Option<PromptLayer>,
497}
498
499impl LashSession {
500 pub async fn close(self) -> Result<()> {
501 let runtime = self.runtime.writer();
502 let runtime = runtime.lock().await;
503 runtime.unregister_plugin_session()?;
504 Ok(())
505 }
506
507 pub fn session_id(&self) -> String {
508 self.runtime.observe().session_id().to_string()
509 }
510
511 pub fn policy_snapshot(&self) -> SessionPolicy {
512 self.runtime.observe().policy.clone()
513 }
514
515 pub fn observe(&self) -> ObservableSession {
516 ObservableSession {
517 runtime: self.runtime.clone(),
518 }
519 }
520
521 pub fn parent_session_id(&self) -> Option<&str> {
522 self.parent_session_id.as_deref()
523 }
524
525 pub fn effect_host(&self) -> Arc<dyn EffectHost> {
526 Arc::clone(&self.effect_host)
527 }
528
529 pub fn turn(&self, input: TurnInput) -> TurnBuilder {
530 TurnBuilder {
531 runtime: self.runtime.clone(),
532 effect_host: Arc::clone(&self.effect_host),
533 active_plugins: self.active_plugins.clone(),
534 input,
535 cancel: CancellationToken::new(),
536 cancels: self.turn_cancels.clone(),
537 protocol_turn_options: None,
538 provider: None,
539 model: None,
540 turn_id: None,
541 }
542 }
543
544 pub fn queued_turn(&self) -> QueuedTurnBuilder {
545 QueuedTurnBuilder {
546 runtime: self.runtime.clone(),
547 effect_host: Arc::clone(&self.effect_host),
548 cancel: CancellationToken::new(),
549 cancels: self.turn_cancels.clone(),
550 batch_ids: Vec::new(),
551 drain_id: None,
552 }
553 }
554
555 pub fn cancel_running_turns(&self) -> usize {
570 self.turn_cancels.cancel_all()
571 }
572
573 pub fn admin(&self) -> SessionAdmin {
574 SessionAdmin {
575 runtime: self.runtime.clone(),
576 }
577 }
578
579 pub async fn configure(&self, patch: SessionConfigPatch) -> Result<()> {
580 self.admin().config().update(patch).await
581 }
582
583 pub fn tools(&self) -> ToolAdmin {
584 ToolAdmin::new(self.admin())
585 }
586
587 pub fn commands(&self) -> SessionCommandAdmin {
588 self.admin().commands()
589 }
590
591 pub fn triggers(&self) -> SessionTriggerAdmin {
592 self.admin().triggers()
593 }
594
595 pub fn processes(&self) -> SessionProcessAdmin {
596 SessionProcessAdmin::new(self.admin())
597 }
598
599 pub fn plugin_operations(&self) -> PluginOperations {
600 PluginOperations {
601 control: self.admin(),
602 }
603 }
604
605 pub fn enqueue(&self, input: TurnInput) -> EnqueueTurnBuilder<'_> {
606 EnqueueTurnBuilder {
607 session: self,
608 input,
609 id: None,
610 delivery_policy: DeliveryPolicy::AfterCurrentTurnCommit,
611 slot_policy: SlotPolicy::Exclusive,
612 }
613 }
614
615 pub async fn queued_work(&self) -> Result<Vec<QueuedWorkBatch>> {
616 let observation = self.runtime.observe();
617 let store = observation.queue_store.as_ref().ok_or_else(|| {
618 EmbedError::Runtime(lash_core::RuntimeError::new(
619 lash_core::RuntimeErrorCode::StoreCommitFailed,
620 "queued work inspection requires a persistent runtime store",
621 ))
622 })?;
623 store
624 .list_pending_queued_work(observation.session_id())
625 .await
626 .map_err(|err| {
627 EmbedError::Runtime(lash_core::RuntimeError::new(
628 lash_core::RuntimeErrorCode::StoreCommitFailed,
629 err.to_string(),
630 ))
631 })
632 }
633
634 pub async fn cancel_queued_work_batch(
635 &self,
636 batch_id: &str,
637 ) -> Result<Option<QueuedWorkBatch>> {
638 let session_id = self.session_id();
639 self.runtime
640 .cancel_queued_work_batch(&session_id, batch_id)
641 .await
642 .map_err(EmbedError::Runtime)
643 }
644
645 pub async fn await_queued_work_batch(&self, batch_id: &str) -> Result<()> {
657 let observation = self.runtime.observe();
658 let store = observation.queue_store.clone().ok_or_else(|| {
659 EmbedError::Runtime(lash_core::RuntimeError::new(
660 lash_core::RuntimeErrorCode::StoreCommitFailed,
661 "queued work inspection requires a persistent runtime store",
662 ))
663 })?;
664 let session_id = observation.session_id().to_string();
665 drop(observation);
666 let mut delay = std::time::Duration::from_millis(25);
667 loop {
668 let pending = store
669 .list_pending_queued_work(&session_id)
670 .await
671 .map_err(|err| {
672 EmbedError::Runtime(lash_core::RuntimeError::new(
673 lash_core::RuntimeErrorCode::StoreCommitFailed,
674 err.to_string(),
675 ))
676 })?;
677 if !pending.iter().any(|batch| batch.batch_id == batch_id) {
678 return Ok(());
679 }
680 tokio::time::sleep(delay).await;
681 delay = (delay * 2).min(std::time::Duration::from_millis(400));
682 }
683 }
684
685 pub fn read_view(&self) -> SessionReadView {
686 self.runtime.observe().read_view.clone()
687 }
688
689 pub fn usage_report(&self) -> SessionUsageReport {
690 self.runtime.observe().usage_report.clone()
691 }
692
693 pub async fn set_turn_phase_probe(
694 &self,
695 probe: Arc<dyn lash_core::runtime::RuntimeTurnPhaseProbe>,
696 ) {
697 let writer = self.runtime.writer();
698 let mut runtime = writer.lock().await;
699 runtime.set_turn_phase_probe(Arc::clone(&probe));
700 self.runtime.publish_from(&runtime);
701 if let Some(slot) = &self.process_phase_probe_slot {
702 let observation = self.runtime.observe();
703 slot.set_for_session(observation.session_id(), Arc::clone(&probe));
704 let current_frame = observation.persisted_state.current_agent_frame_id.as_str();
705 if !current_frame.is_empty() {
706 let scope = lash_core::SessionScope::for_agent_frame(
707 observation.session_id(),
708 current_frame,
709 );
710 slot.set_for_scope(&scope, probe);
711 }
712 }
713 }
714}
715
716#[derive(Clone)]
717pub struct ObservableSession {
718 pub(crate) runtime: RuntimeHandle,
719}
720
721impl ObservableSession {
722 fn snapshot(&self) -> Arc<RuntimeObservation> {
723 self.runtime.observe()
724 }
725
726 pub fn current_observation(&self) -> SessionObservation {
727 self.runtime.current_session_observation()
728 }
729
730 pub fn current_remote_observation(&self) -> RemoteSessionObservation {
731 RemoteSessionObservation::from_core(self.current_observation())
732 }
733
734 pub fn resume_from_cursor(&self, cursor: &SessionCursor) -> Result<SessionResume> {
735 self.runtime
736 .resume_session_observation(cursor)
737 .map_err(live_replay_error)
738 }
739
740 pub fn subscribe_from_cursor(
741 &self,
742 cursor: &SessionCursor,
743 ) -> Result<SessionObservationSubscription> {
744 self.runtime
745 .subscribe_session_observation(cursor)
746 .map_err(live_replay_error)
747 }
748
749 pub fn subscribe_from_remote_cursor(
750 &self,
751 cursor: &RemoteSessionCursor,
752 ) -> Result<RemoteSessionObservationSubscription> {
753 cursor.validate()?;
754 let cursor = lash_core::SessionCursor::try_from(cursor.clone())?;
755 match self.subscribe_from_cursor(&cursor)? {
756 SessionObservationSubscription::Subscribed(subscription) => {
757 Ok(RemoteSessionObservationSubscription::Subscribed(
758 RemoteSessionObservationEventStream::new(subscription),
759 ))
760 }
761 SessionObservationSubscription::Gap { observation, gap } => {
762 Ok(RemoteSessionObservationSubscription::Gap {
763 observation: observation.into(),
764 gap: gap.into(),
765 })
766 }
767 }
768 }
769
770 pub fn subscribe_and_recover(&self, cursor: SessionCursor) -> SessionObservationStream {
779 SessionObservationStream {
780 observable: self.clone(),
781 cursor,
782 subscription: None,
783 done: false,
784 }
785 }
786
787 pub fn subscribe_and_recover_remote(
790 &self,
791 cursor: RemoteSessionCursor,
792 ) -> Result<RemoteSessionObservationStream> {
793 cursor.validate()?;
794 let cursor = lash_core::SessionCursor::try_from(cursor)?;
795 Ok(RemoteSessionObservationStream {
796 inner: self.subscribe_and_recover(cursor),
797 next_sequence: 0,
798 })
799 }
800
801 pub fn session_id(&self) -> String {
802 self.snapshot().session_id().to_string()
803 }
804
805 pub fn policy_snapshot(&self) -> SessionPolicy {
806 self.snapshot().policy.clone()
807 }
808
809 pub fn read_view(&self) -> SessionReadView {
810 self.snapshot().read_view.clone()
811 }
812
813 pub fn usage_report(&self) -> SessionUsageReport {
814 self.snapshot().usage_report.clone()
815 }
816
817 pub fn tool_state(&self) -> Option<ToolState> {
818 self.snapshot().tool_state.clone()
819 }
820
821 pub fn active_tool_manifests(&self) -> Vec<ToolManifest> {
822 self.snapshot()
823 .tool_state
824 .as_ref()
825 .map(ToolState::tool_manifests)
826 .unwrap_or_default()
827 }
828
829 pub async fn list_process_handles(&self) -> Vec<ProcessHandleSummary> {
830 self.snapshot().list_process_handles().await
831 }
832
833 pub async fn list_all_process_handles(&self) -> Vec<ProcessHandleSummary> {
834 self.snapshot().list_all_process_handles().await
835 }
836
837 pub fn process_scope(&self) -> SessionScope {
838 self.snapshot().process_scope()
839 }
840}
841
842#[derive(Clone, Debug)]
843pub enum SessionObservationStreamItem {
844 Event(SessionObservationEvent),
846 Gap {
848 observation: SessionObservation,
849 gap: LiveReplayGap,
850 },
851}
852
853pub enum RemoteSessionObservationSubscription {
854 Subscribed(RemoteSessionObservationEventStream),
855 Gap {
856 observation: RemoteSessionObservation,
857 gap: RemoteLiveReplayGap,
858 },
859}
860
861#[derive(Clone, Debug)]
862pub enum RemoteSessionObservationStreamItem {
863 Event(RemoteSessionObservationEvent),
865 Gap {
867 observation: RemoteSessionObservation,
868 gap: RemoteLiveReplayGap,
869 },
870}
871
872pub struct RemoteSessionObservationEventStream {
873 inner: lash_core::LiveReplaySubscription,
874 next_sequence: u64,
875}
876
877impl RemoteSessionObservationEventStream {
878 fn new(inner: lash_core::LiveReplaySubscription) -> Self {
879 Self {
880 inner,
881 next_sequence: 0,
882 }
883 }
884
885 pub async fn next_event(&mut self) -> Result<RemoteSessionObservationEvent> {
886 futures_util::future::poll_fn(|cx| Pin::new(&mut *self).poll_next(cx))
887 .await
888 .transpose()?
889 .ok_or_else(|| live_replay_error(LiveReplayStoreError::Closed))
890 }
891}
892
893impl Stream for RemoteSessionObservationEventStream {
894 type Item = Result<RemoteSessionObservationEvent>;
895
896 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
897 match Pin::new(&mut self.inner).poll_next(cx) {
898 Poll::Pending => Poll::Pending,
899 Poll::Ready(Some(Ok(event))) => {
900 let remote = RemoteSessionObservationEvent::from_core(self.next_sequence, event);
901 self.next_sequence = self.next_sequence.saturating_add(1);
902 Poll::Ready(Some(Ok(remote)))
903 }
904 Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(live_replay_error(err)))),
905 Poll::Ready(None) => Poll::Ready(None),
906 }
907 }
908}
909
910pub struct RemoteSessionObservationStream {
912 inner: SessionObservationStream,
913 next_sequence: u64,
914}
915
916impl RemoteSessionObservationStream {
917 pub fn cursor(&self) -> RemoteSessionCursor {
918 RemoteSessionCursor::from(self.inner.cursor())
919 }
920}
921
922impl Stream for RemoteSessionObservationStream {
923 type Item = Result<RemoteSessionObservationStreamItem>;
924
925 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
926 match Pin::new(&mut self.inner).poll_next(cx) {
927 Poll::Pending => Poll::Pending,
928 Poll::Ready(Some(Ok(SessionObservationStreamItem::Event(event)))) => {
929 let remote = RemoteSessionObservationEvent::from_core(self.next_sequence, event);
930 self.next_sequence = self.next_sequence.saturating_add(1);
931 Poll::Ready(Some(Ok(RemoteSessionObservationStreamItem::Event(remote))))
932 }
933 Poll::Ready(Some(Ok(SessionObservationStreamItem::Gap { observation, gap }))) => {
934 Poll::Ready(Some(Ok(RemoteSessionObservationStreamItem::Gap {
935 observation: observation.into(),
936 gap: gap.into(),
937 })))
938 }
939 Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err))),
940 Poll::Ready(None) => Poll::Ready(None),
941 }
942 }
943}
944
945pub struct SessionObservationStream {
947 observable: ObservableSession,
948 cursor: SessionCursor,
949 subscription: Option<lash_core::LiveReplaySubscription>,
950 done: bool,
951}
952
953impl SessionObservationStream {
954 pub fn cursor(&self) -> &SessionCursor {
955 &self.cursor
956 }
957}
958
959impl Stream for SessionObservationStream {
960 type Item = Result<SessionObservationStreamItem>;
961
962 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
963 loop {
964 if self.done {
965 return Poll::Ready(None);
966 }
967 if self.subscription.is_none() {
968 match self.observable.subscribe_from_cursor(&self.cursor) {
969 Ok(SessionObservationSubscription::Subscribed(subscription)) => {
970 self.subscription = Some(subscription);
971 }
972 Ok(SessionObservationSubscription::Gap { observation, gap }) => {
973 self.cursor = gap.latest_cursor.clone();
974 return Poll::Ready(Some(Ok(SessionObservationStreamItem::Gap {
975 observation,
976 gap,
977 })));
978 }
979 Err(err) => {
980 self.done = true;
981 return Poll::Ready(Some(Err(err)));
982 }
983 }
984 }
985
986 let Some(subscription) = self.subscription.as_mut() else {
987 continue;
988 };
989 match Pin::new(subscription).poll_next(cx) {
990 Poll::Pending => return Poll::Pending,
991 Poll::Ready(Some(Ok(event))) => {
992 self.cursor = event.cursor.clone();
993 return Poll::Ready(Some(Ok(SessionObservationStreamItem::Event(event))));
994 }
995 Poll::Ready(Some(Err(LiveReplayStoreError::SubscriberLagged(_)))) => {
996 self.subscription = None;
997 continue;
998 }
999 Poll::Ready(Some(Err(err))) => {
1000 self.done = true;
1001 return Poll::Ready(Some(Err(live_replay_error(err))));
1002 }
1003 Poll::Ready(None) => {
1004 self.done = true;
1005 return Poll::Ready(None);
1006 }
1007 }
1008 }
1009 }
1010}
1011
1012fn live_replay_error(err: lash_core::LiveReplayStoreError) -> EmbedError {
1013 EmbedError::Runtime(lash_core::RuntimeError::new(
1014 RuntimeErrorCode::Other("live_replay".to_string()),
1015 err.to_string(),
1016 ))
1017}
1018
1019pub struct EnqueueTurnBuilder<'a> {
1020 session: &'a LashSession,
1021 input: TurnInput,
1022 id: Option<String>,
1023 delivery_policy: DeliveryPolicy,
1024 slot_policy: SlotPolicy,
1025}
1026
1027impl<'a> EnqueueTurnBuilder<'a> {
1028 pub fn id(mut self, id: impl Into<String>) -> Self {
1029 self.id = Some(id.into());
1030 self
1031 }
1032
1033 pub fn delivery_policy(mut self, policy: DeliveryPolicy) -> Self {
1034 self.delivery_policy = policy;
1035 self
1036 }
1037
1038 pub fn slot_policy(mut self, policy: SlotPolicy) -> Self {
1039 self.slot_policy = policy;
1040 self
1041 }
1042
1043 pub async fn send(self) -> Result<QueuedWorkBatch> {
1044 let source_key = self.id.map(|id| format!("host:{id}"));
1045 self.session
1046 .runtime
1047 .enqueue_turn_input(
1048 self.input,
1049 self.delivery_policy,
1050 self.slot_policy,
1051 source_key,
1052 )
1053 .await
1054 .map_err(EmbedError::Runtime)
1055 }
1056}
1057
1058impl<'a> std::future::IntoFuture for EnqueueTurnBuilder<'a> {
1059 type Output = Result<QueuedWorkBatch>;
1060 type IntoFuture =
1061 std::pin::Pin<Box<dyn std::future::Future<Output = Result<QueuedWorkBatch>> + 'a>>;
1062
1063 fn into_future(self) -> Self::IntoFuture {
1064 Box::pin(self.send())
1065 }
1066}