1use std::pin::Pin;
2use std::task::{Context, Poll};
3
4use crate::support::*;
5use futures_util::Stream;
6use lash_core::runtime::{
7 PendingTurnInput, PendingTurnInputCancelOutcome, PendingTurnInputCancelResult,
8 PendingTurnInputCancelTarget, PendingTurnInputSuffixCancelOutcome, QueuedWorkBatch,
9 QueuedWorkClaim, TurnInputClaim, TurnInputIngress,
10};
11use lash_core::{LiveReplayGap, LiveReplayStoreError, SessionObservationEvent};
12use lash_remote_protocol::{
13 RemoteLiveReplayGap, RemoteSessionCursor, RemoteSessionObservation,
14 RemoteSessionObservationEvent,
15};
16
17pub struct SessionBuilder {
18 pub(crate) core: LashCore,
19 pub(crate) session_id: String,
20 pub(crate) spec: SessionSpec,
21 pub(crate) parent_session_id: Option<String>,
22 pub(crate) session_execution_owner: Option<lash_core::LeaseOwnerIdentity>,
23 pub(crate) store: Option<Arc<dyn RuntimePersistence>>,
24 pub(crate) provider: Option<ProviderHandle>,
25 pub(crate) active_plugins: Vec<ActivePluginBinding>,
26 pub(crate) plugin_factories: Vec<Arc<dyn PluginFactory>>,
27 pub(crate) plugin_options: PluginOptions,
32}
33
34impl SessionBuilder {
35 pub fn plugin_options(mut self, plugin_options: PluginOptions) -> Self {
37 self.plugin_options = plugin_options;
38 self
39 }
40
41 pub fn plugin_option<T: serde::Serialize>(
44 mut self,
45 plugin_id: impl Into<String>,
46 extras: T,
47 ) -> Result<Self> {
48 self.plugin_options
49 .insert_typed(plugin_id, extras)
50 .map_err(EmbedError::ProtocolTurnOptions)?;
51 Ok(self)
52 }
53
54 pub fn provider(mut self, provider: ProviderHandle) -> Self {
55 self.spec = self.spec.provider_id(provider.kind());
56 self.provider = Some(provider);
57 self
58 }
59
60 pub fn session_spec(mut self, spec: SessionSpec) -> Self {
61 self.spec = spec;
62 self
63 }
64
65 pub fn parent(mut self, parent_session_id: impl Into<String>) -> Self {
66 self.parent_session_id = Some(parent_session_id.into());
67 self
68 }
69
70 pub fn session_execution_owner(mut self, owner: lash_core::LeaseOwnerIdentity) -> Self {
76 self.session_execution_owner = Some(owner);
77 self
78 }
79
80 pub fn store(mut self, store: Arc<dyn RuntimePersistence>) -> Self {
87 self.store = Some(store);
88 self
89 }
90
91 pub fn plugin<P: PluginBinding>(mut self, config: P::SessionConfig) -> Self {
92 self.active_plugins.push(ActivePluginBinding {
93 id: P::ID,
94 requires_turn_input: P::requires_turn_input(&config),
95 });
96 self.plugin_factories.push(P::factory(&config));
97 self
98 }
99
100 pub async fn open(self) -> Result<LashSession> {
101 let policy = self.session_policy();
102 let store = self.create_store(&policy).await?;
103 let state = self
104 .load_or_default_state(&policy, store.as_deref())
105 .await?;
106 self.open_resolved(policy, state, store).await
107 }
108
109 pub async fn open_fresh(self) -> Result<LashSession> {
119 let policy = self.session_policy();
120 let store = self.create_store(&policy).await?;
121 let state = RuntimeSessionState {
122 session_id: self.session_id.clone(),
123 policy: policy.clone(),
124 graph_replace_required: true,
125 ..RuntimeSessionState::default()
126 };
127 self.open_resolved(policy, state, store).await
128 }
129
130 pub async fn open_with_state(self, mut state: RuntimeSessionState) -> Result<LashSession> {
137 let policy = self.session_policy();
138 let store = self.create_store(&policy).await?;
139 if state.session_id != self.session_id {
140 return Err(EmbedError::StoreSessionMismatch {
141 loaded: state.session_id,
142 requested: self.session_id,
143 });
144 }
145 let recorded_provider_id = state.policy.recorded_provider_id().to_string();
146 state.policy = policy.clone();
147 state.policy.provider_id = recorded_provider_id;
148 self.open_resolved(policy, state, store).await
149 }
150
151 fn session_policy(&self) -> SessionPolicy {
152 let mut policy = self.spec.resolve_against(&self.core.policy);
153 policy.session_id = Some(self.session_id.clone());
154 policy
155 }
156
157 async fn load_or_default_state(
158 &self,
159 policy: &SessionPolicy,
160 store: Option<&dyn RuntimePersistence>,
161 ) -> Result<RuntimeSessionState> {
162 let state = match store {
163 Some(store) => {
164 let loaded = self.load_persisted_state_for_residency(store).await?;
165 let mut state = loaded.unwrap_or_else(|| RuntimeSessionState {
166 session_id: self.session_id.clone(),
167 policy: policy.clone(),
168 ..RuntimeSessionState::default()
169 });
170 if state.session_id != self.session_id {
171 return Err(EmbedError::StoreSessionMismatch {
172 loaded: state.session_id,
173 requested: self.session_id.clone(),
174 });
175 }
176 let recorded_provider_id = state.policy.recorded_provider_id().to_string();
177 state.policy = policy.clone();
178 state.policy.provider_id = recorded_provider_id;
179 state
180 }
181 None => RuntimeSessionState {
182 session_id: self.session_id.clone(),
183 policy: policy.clone(),
184 ..RuntimeSessionState::default()
185 },
186 };
187 Ok(state)
188 }
189
190 async fn load_persisted_state_for_residency(
191 &self,
192 store: &dyn RuntimePersistence,
193 ) -> Result<Option<RuntimeSessionState>> {
194 load_persisted_state_for_residency(self.core.env.residency, store).await
195 }
196
197 async fn open_resolved(
198 self,
199 policy: SessionPolicy,
200 state: RuntimeSessionState,
201 store: Option<Arc<dyn RuntimePersistence>>,
202 ) -> Result<LashSession> {
203 let mut env = self.core.env.clone();
204 if let Some(provider) = self.provider.clone().or_else(|| self.core.provider.clone()) {
205 env.core.providers.provider_resolver =
206 Arc::new(lash_core::SingleProviderResolver::new(provider));
207 }
208 let plugin_host = build_plugin_host(
209 self.core.protocol_factory.as_ref(),
210 self.core.plugin_factories.as_ref(),
211 self.plugin_factories,
212 )?;
213 env.core = plugin_host.install_process_engine_contributions(
214 env.core.clone(),
215 self.core.process_lifecycle_available,
216 )?;
217 env.plugin_host = Some(Arc::new(plugin_host));
218 let effect_host = Arc::clone(&env.core.control.effect_host);
219 let drivers = self.core.work_driver.drivers().await;
220 env.process_work_driver = drivers.process.clone();
221 env.queued_work_driver = drivers.queued.clone();
222 let mut runtime = LashRuntime::from_environment(&env, policy, state, store).await?;
223 runtime.configure_protocol_on_materialize(
227 &self.plugin_options,
228 self.parent_session_id.is_none(),
229 )?;
230 if let Some(owner) = self.session_execution_owner {
231 runtime.set_runtime_lease_owner(owner);
232 }
233 if drivers.drive_process_on_open
234 && let Some(driver) = drivers.process.as_ref()
235 {
236 driver.claim_and_run_pending("session_open").await?;
237 }
238 let handle = RuntimeHandle::with_live_replay_store(
239 runtime,
240 Arc::clone(&self.core.live_replay_store),
241 );
242 Ok(LashSession {
243 runtime: handle,
244 effect_host,
245 parent_session_id: self.parent_session_id,
246 active_plugins: self.active_plugins,
247 process_phase_probe_slot: self.core.work_driver.phase_probe_slot(),
248 turn_cancels: crate::turn::TurnCancelRegistry::default(),
249 })
250 }
251
252 async fn create_store(
253 &self,
254 policy: &SessionPolicy,
255 ) -> Result<Option<Arc<dyn RuntimePersistence>>> {
256 if let Some(store) = self.store.as_ref() {
257 return Ok(Some(Arc::clone(store)));
258 }
259 let Some(factory) = self.core.store_factory.as_ref() else {
260 return Ok(None);
261 };
262 let request = SessionStoreCreateRequest {
263 session_id: self.session_id.clone(),
264 relation: self
265 .parent_session_id
266 .as_ref()
267 .map(|parent_session_id| lash_core::SessionRelation::Child {
268 parent_session_id: parent_session_id.clone(),
269 caused_by: None,
270 })
271 .unwrap_or_default(),
272 policy: policy.clone(),
273 };
274 factory
275 .create_store(&request)
276 .await
277 .map(Some)
278 .map_err(|message| EmbedError::StoreFactory {
279 session_id: self.session_id.clone(),
280 message,
281 })
282 }
283}
284
285pub(crate) async fn load_state_for_residency(
286 residency: Residency,
287 session_id: &str,
288 policy: &SessionPolicy,
289 store: &dyn RuntimePersistence,
290) -> Result<RuntimeSessionState> {
291 let mut state = load_persisted_state_for_residency(residency, store)
292 .await?
293 .unwrap_or_else(|| RuntimeSessionState {
294 session_id: session_id.to_string(),
295 policy: policy.clone(),
296 ..RuntimeSessionState::default()
297 });
298 if state.session_id != session_id {
299 return Err(EmbedError::StoreSessionMismatch {
300 loaded: state.session_id,
301 requested: session_id.to_string(),
302 });
303 }
304 let recorded_provider_id = state.policy.recorded_provider_id().to_string();
305 state.policy = policy.clone();
306 state.policy.provider_id = recorded_provider_id;
307 Ok(state)
308}
309
310async fn load_persisted_state_for_residency(
311 residency: Residency,
312 store: &dyn RuntimePersistence,
313) -> Result<Option<RuntimeSessionState>> {
314 match residency {
315 Residency::KeepAll => {
316 let loaded = lash_core::store::load_persisted_session_state(store)
317 .await
318 .map_err(|err| SessionError::Protocol(format!("failed to load store: {err}")))?;
319 Ok(loaded)
320 }
321 Residency::ActivePathOnly => {
322 let active = lash_core::store::load_persisted_session_state_active_path(store, None)
323 .await
324 .map_err(|err| {
325 SessionError::Protocol(format!("failed to load active-path store: {err}"))
326 })?;
327 if active
328 .as_ref()
329 .is_some_and(|state| state.session_graph.nodes.is_empty())
330 {
331 let mut full = lash_core::store::load_persisted_session_state(store)
332 .await
333 .map_err(|err| {
334 SessionError::Protocol(format!(
335 "failed to heal active-path store from full graph: {err}"
336 ))
337 })?;
338 if let Some(state) = full.as_mut() {
339 state.graph_replace_required = true;
340 }
341 return Ok(full);
342 }
343 Ok(active)
344 }
345 }
346}
347
348impl PromptLayerSink for SessionBuilder {
349 fn prompt_layer_mut(&mut self) -> &mut PromptLayer {
350 self.spec.prompt.get_or_insert_with(PromptLayer::new)
351 }
352}
353
354#[derive(Clone)]
355pub struct LashSession {
356 pub(crate) runtime: RuntimeHandle,
357 pub(crate) effect_host: Arc<dyn EffectHost>,
358 pub(crate) parent_session_id: Option<String>,
359 pub(crate) active_plugins: Vec<ActivePluginBinding>,
360 pub(crate) process_phase_probe_slot: Option<lash_core::runtime::RuntimeTurnPhaseProbeSlot>,
361 pub(crate) turn_cancels: crate::turn::TurnCancelRegistry,
362}
363
364#[derive(Clone, Debug, Default)]
365pub struct SessionConfigPatch {
366 pub provider: Option<ProviderHandle>,
367 pub model: Option<ModelSpec>,
368 pub prompt: Option<PromptLayer>,
369}
370
371pub struct ParkedSession {
384 pub(crate) inner: lash_core::ParkedSession,
385}
386
387impl ParkedSession {
388 pub fn session_id(&self) -> &str {
391 self.inner.session_id()
392 }
393}
394
395impl LashSession {
396 pub async fn close(self) -> Result<()> {
414 let persistent = self.runtime.observe().queue_store.is_some();
418 let runtime = self.into_owned_runtime()?;
419 runtime.unregister_plugin_session()?;
420 if persistent {
421 runtime.park().await?;
424 }
425 Ok(())
427 }
428
429 pub async fn park(self) -> Result<ParkedSession> {
454 let runtime = self.into_owned_runtime()?;
455 runtime.unregister_plugin_session()?;
458 let parked = runtime.park().await?;
459 Ok(ParkedSession { inner: parked })
460 }
461
462 fn into_owned_runtime(self) -> Result<LashRuntime> {
468 let LashSession { runtime, .. } = self;
469 let writer = runtime.writer();
474 drop(runtime);
475 Arc::try_unwrap(writer)
476 .map(|mutex| mutex.into_inner())
477 .map_err(|_| EmbedError::SessionStillInUse)
478 }
479
480 pub fn session_id(&self) -> String {
481 self.runtime.observe().session_id().to_string()
482 }
483
484 pub fn policy_snapshot(&self) -> SessionPolicy {
485 self.runtime.observe().policy.clone()
486 }
487
488 pub fn observe(&self) -> ObservableSession {
489 ObservableSession {
490 runtime: self.runtime.clone(),
491 }
492 }
493
494 pub fn parent_session_id(&self) -> Option<&str> {
495 self.parent_session_id.as_deref()
496 }
497
498 pub fn effect_host(&self) -> Arc<dyn EffectHost> {
499 Arc::clone(&self.effect_host)
500 }
501
502 pub fn turn(&self, input: TurnInput) -> TurnBuilder {
503 TurnBuilder {
504 runtime: self.runtime.clone(),
505 effect_host: Arc::clone(&self.effect_host),
506 active_plugins: self.active_plugins.clone(),
507 input,
508 cancel: CancellationToken::new(),
509 cancels: self.turn_cancels.clone(),
510 protocol_turn_options: None,
511 provider: None,
512 model: None,
513 turn_id: None,
514 }
515 }
516
517 pub fn queued_turn(&self) -> QueuedTurnBuilder {
518 QueuedTurnBuilder {
519 runtime: self.runtime.clone(),
520 effect_host: Arc::clone(&self.effect_host),
521 cancel: CancellationToken::new(),
522 cancels: self.turn_cancels.clone(),
523 batch_ids: Vec::new(),
524 drain_id: None,
525 }
526 }
527
528 pub fn cancel_running_turns(&self) -> usize {
543 self.turn_cancels.cancel_all()
544 }
545
546 pub fn admin(&self) -> SessionAdmin {
547 SessionAdmin {
548 runtime: self.runtime.clone(),
549 }
550 }
551
552 pub async fn configure(&self, patch: SessionConfigPatch) -> Result<()> {
553 self.admin().config().update(patch).await
554 }
555
556 pub fn tools(&self) -> ToolAdmin {
557 ToolAdmin::new(self.admin())
558 }
559
560 pub fn commands(&self) -> SessionCommandAdmin {
561 self.admin().commands()
562 }
563
564 pub fn triggers(&self) -> SessionTriggerAdmin {
565 self.admin().triggers()
566 }
567
568 pub fn processes(&self) -> SessionProcessAdmin {
569 SessionProcessAdmin::new(self.admin())
570 }
571
572 pub async fn refresh_background_graph(&self) -> Result<()> {
579 self.admin().refresh_background_graph().await
580 }
581
582 pub fn plugin_operations(&self) -> PluginOperations {
583 PluginOperations {
584 control: self.admin(),
585 }
586 }
587
588 pub fn enqueue(&self, input: TurnInput) -> EnqueueTurnBuilder<'_> {
589 EnqueueTurnBuilder {
590 session: self,
591 input,
592 id: None,
593 ingress: TurnInputIngress::NextTurn,
594 }
595 }
596
597 pub async fn queued_work(&self) -> Result<Vec<QueuedWorkBatch>> {
604 let observation = self.runtime.observe();
605 let store = observation.queue_store.as_ref().ok_or_else(|| {
606 EmbedError::Runtime(lash_core::RuntimeError::new(
607 lash_core::RuntimeErrorCode::StoreCommitFailed,
608 "queued work inspection requires a persistent runtime store",
609 ))
610 })?;
611 store
612 .list_pending_queued_work(observation.session_id())
613 .await
614 .map_err(|err| {
615 EmbedError::Runtime(lash_core::RuntimeError::new(
616 lash_core::RuntimeErrorCode::StoreCommitFailed,
617 err.to_string(),
618 ))
619 })
620 }
621
622 pub async fn pending_turn_inputs(&self) -> Result<Vec<PendingTurnInput>> {
623 let observation = self.runtime.observe();
624 let store = observation.queue_store.as_ref().ok_or_else(|| {
625 EmbedError::Runtime(lash_core::RuntimeError::new(
626 lash_core::RuntimeErrorCode::StoreCommitFailed,
627 "pending turn input inspection requires a persistent runtime store",
628 ))
629 })?;
630 store
631 .list_pending_turn_inputs(observation.session_id())
632 .await
633 .map_err(|err| {
634 EmbedError::Runtime(lash_core::RuntimeError::new(
635 lash_core::RuntimeErrorCode::StoreCommitFailed,
636 err.to_string(),
637 ))
638 })
639 }
640
641 pub async fn cancel_pending_turn_input(
642 &self,
643 input_id: &str,
644 ) -> Result<PendingTurnInputCancelOutcome> {
645 let session_id = self.session_id();
646 self.runtime
647 .cancel_pending_turn_input(&session_id, input_id)
648 .await
649 .map_err(EmbedError::Runtime)
650 }
651
652 pub async fn cancel_pending_turn_inputs(
660 &self,
661 targets: impl IntoIterator<Item = PendingTurnInputCancelTarget>,
662 ) -> Result<Vec<PendingTurnInputCancelResult>> {
663 let session_id = self.session_id();
664 let targets = targets.into_iter().collect::<Vec<_>>();
665 self.runtime
666 .cancel_pending_turn_inputs(&session_id, &targets)
667 .await
668 .map_err(EmbedError::Runtime)
669 }
670
671 pub async fn cancel_pending_turn_input_suffix(
680 &self,
681 anchor: PendingTurnInputCancelTarget,
682 ) -> Result<PendingTurnInputSuffixCancelOutcome> {
683 let session_id = self.session_id();
684 self.runtime
685 .cancel_pending_turn_input_suffix(&session_id, &anchor)
686 .await
687 .map_err(EmbedError::Runtime)
688 }
689
690 pub async fn cancel_queued_work_batch(
691 &self,
692 batch_id: &str,
693 ) -> Result<Option<QueuedWorkBatch>> {
694 let session_id = self.session_id();
695 self.runtime
696 .cancel_queued_work_batch(&session_id, batch_id)
697 .await
698 .map_err(EmbedError::Runtime)
699 }
700
701 pub async fn abandon_queued_work_claim(&self, claim: &QueuedWorkClaim) -> Result<()> {
708 self.runtime
709 .abandon_queued_work_claim(claim)
710 .await
711 .map_err(EmbedError::Runtime)
712 }
713
714 pub async fn abandon_turn_input_claim(&self, claim: &TurnInputClaim) -> Result<()> {
718 self.runtime
719 .abandon_turn_input_claim(claim)
720 .await
721 .map_err(EmbedError::Runtime)
722 }
723
724 pub async fn revoke_durable_waits(&self) -> Result<()> {
734 let session_id = self.session_id();
735 self.effect_host
736 .cancel_await_events_for_session(&session_id)
737 .await
738 .map_err(EmbedError::Runtime)
739 }
740
741 pub async fn await_queued_work_batch(&self, batch_id: &str) -> Result<()> {
753 let observation = self.runtime.observe();
754 let store = observation.queue_store.clone().ok_or_else(|| {
755 EmbedError::Runtime(lash_core::RuntimeError::new(
756 lash_core::RuntimeErrorCode::StoreCommitFailed,
757 "queued work inspection requires a persistent runtime store",
758 ))
759 })?;
760 let session_id = observation.session_id().to_string();
761 drop(observation);
762 let mut delay = std::time::Duration::from_millis(25);
763 loop {
764 let pending = store
765 .list_pending_queued_work(&session_id)
766 .await
767 .map_err(|err| {
768 EmbedError::Runtime(lash_core::RuntimeError::new(
769 lash_core::RuntimeErrorCode::StoreCommitFailed,
770 err.to_string(),
771 ))
772 })?;
773 if !pending.iter().any(|batch| batch.batch_id == batch_id) {
774 return Ok(());
775 }
776 tokio::time::sleep(delay).await;
777 delay = (delay * 2).min(std::time::Duration::from_millis(400));
778 }
779 }
780
781 pub fn read_view(&self) -> SessionReadView {
782 self.runtime.observe().read_view.clone()
783 }
784
785 pub fn usage_report(&self) -> SessionUsageReport {
786 self.runtime.observe().usage_report.clone()
787 }
788
789 pub async fn set_turn_phase_probe(
790 &self,
791 probe: Arc<dyn lash_core::runtime::RuntimeTurnPhaseProbe>,
792 ) {
793 let writer = self.runtime.writer();
794 let mut runtime = writer.lock().await;
795 runtime.set_turn_phase_probe(Arc::clone(&probe));
796 self.runtime.publish_from(&runtime);
797 if let Some(slot) = &self.process_phase_probe_slot {
798 let observation = self.runtime.observe();
799 slot.set_for_session(observation.session_id(), Arc::clone(&probe));
800 let current_frame = observation.persisted_state.current_agent_frame_id.as_str();
801 if !current_frame.is_empty() {
802 let scope = lash_core::SessionScope::for_agent_frame(
803 observation.session_id(),
804 current_frame,
805 );
806 slot.set_for_scope(&scope, probe);
807 }
808 }
809 }
810}
811
812#[derive(Clone)]
813pub struct ObservableSession {
814 pub(crate) runtime: RuntimeHandle,
815}
816
817impl ObservableSession {
818 fn snapshot(&self) -> Arc<RuntimeObservation> {
819 self.runtime.observe()
820 }
821
822 pub fn current_observation(&self) -> SessionObservation {
823 self.runtime.current_session_observation()
824 }
825
826 pub fn current_remote_observation(&self) -> RemoteSessionObservation {
827 RemoteSessionObservation::from_core(self.current_observation())
828 }
829
830 pub fn resume_from_cursor(&self, cursor: &SessionCursor) -> Result<SessionResume> {
831 self.runtime
832 .resume_session_observation(cursor)
833 .map_err(live_replay_error)
834 }
835
836 pub fn subscribe_from_cursor(
837 &self,
838 cursor: &SessionCursor,
839 ) -> Result<SessionObservationSubscription> {
840 self.runtime
841 .subscribe_session_observation(cursor)
842 .map_err(live_replay_error)
843 }
844
845 pub fn subscribe_from_remote_cursor(
846 &self,
847 cursor: &RemoteSessionCursor,
848 ) -> Result<RemoteSessionObservationSubscription> {
849 cursor.validate()?;
850 let cursor = lash_core::SessionCursor::try_from(cursor.clone())?;
851 match self.subscribe_from_cursor(&cursor)? {
852 SessionObservationSubscription::Subscribed(subscription) => {
853 Ok(RemoteSessionObservationSubscription::Subscribed(
854 RemoteSessionObservationEventStream::new(subscription),
855 ))
856 }
857 SessionObservationSubscription::Gap { observation, gap } => {
858 Ok(RemoteSessionObservationSubscription::Gap {
859 observation: observation.into(),
860 gap: gap.into(),
861 })
862 }
863 }
864 }
865
866 pub fn subscribe_and_recover(&self, cursor: SessionCursor) -> SessionObservationStream {
875 SessionObservationStream {
876 observable: self.clone(),
877 cursor,
878 subscription: None,
879 done: false,
880 }
881 }
882
883 pub fn subscribe_and_recover_remote(
886 &self,
887 cursor: RemoteSessionCursor,
888 ) -> Result<RemoteSessionObservationStream> {
889 cursor.validate()?;
890 let cursor = lash_core::SessionCursor::try_from(cursor)?;
891 Ok(RemoteSessionObservationStream {
892 inner: self.subscribe_and_recover(cursor),
893 next_sequence: 0,
894 })
895 }
896
897 pub fn session_id(&self) -> String {
898 self.snapshot().session_id().to_string()
899 }
900
901 pub fn policy_snapshot(&self) -> SessionPolicy {
902 self.snapshot().policy.clone()
903 }
904
905 pub fn read_view(&self) -> SessionReadView {
906 self.snapshot().read_view.clone()
907 }
908
909 pub fn usage_report(&self) -> SessionUsageReport {
910 self.snapshot().usage_report.clone()
911 }
912
913 pub fn tool_state(&self) -> Option<ToolState> {
914 self.snapshot().tool_state.clone()
915 }
916
917 pub fn active_tool_manifests(&self) -> Vec<ToolManifest> {
918 self.snapshot()
919 .tool_state
920 .as_ref()
921 .map(ToolState::tool_manifests)
922 .unwrap_or_default()
923 }
924
925 pub async fn list_process_handles(&self) -> Vec<ProcessHandleSummary> {
926 self.snapshot().list_process_handles().await
927 }
928
929 pub async fn list_all_process_handles(&self) -> Vec<ProcessHandleSummary> {
930 self.snapshot().list_all_process_handles().await
931 }
932
933 pub fn process_scope(&self) -> SessionScope {
934 self.snapshot().process_scope()
935 }
936}
937
938#[allow(clippy::large_enum_variant)]
944#[derive(Clone, Debug)]
945pub enum SessionObservationStreamItem {
946 Event(SessionObservationEvent),
948 Gap {
950 observation: SessionObservation,
951 gap: LiveReplayGap,
952 },
953}
954
955pub enum RemoteSessionObservationSubscription {
956 Subscribed(RemoteSessionObservationEventStream),
957 Gap {
958 observation: RemoteSessionObservation,
959 gap: RemoteLiveReplayGap,
960 },
961}
962
963#[derive(Clone, Debug)]
964pub enum RemoteSessionObservationStreamItem {
965 Event(RemoteSessionObservationEvent),
967 Gap {
969 observation: RemoteSessionObservation,
970 gap: RemoteLiveReplayGap,
971 },
972}
973
974pub struct RemoteSessionObservationEventStream {
975 inner: lash_core::LiveReplaySubscription,
976 next_sequence: u64,
977}
978
979impl RemoteSessionObservationEventStream {
980 fn new(inner: lash_core::LiveReplaySubscription) -> Self {
981 Self {
982 inner,
983 next_sequence: 0,
984 }
985 }
986
987 pub async fn next_event(&mut self) -> Result<RemoteSessionObservationEvent> {
988 futures_util::future::poll_fn(|cx| Pin::new(&mut *self).poll_next(cx))
989 .await
990 .transpose()?
991 .ok_or_else(|| live_replay_error(LiveReplayStoreError::Closed))
992 }
993}
994
995impl Stream for RemoteSessionObservationEventStream {
996 type Item = Result<RemoteSessionObservationEvent>;
997
998 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
999 match Pin::new(&mut self.inner).poll_next(cx) {
1000 Poll::Pending => Poll::Pending,
1001 Poll::Ready(Some(Ok(event))) => {
1002 let remote = RemoteSessionObservationEvent::from_core(self.next_sequence, event);
1003 self.next_sequence = self.next_sequence.saturating_add(1);
1004 Poll::Ready(Some(Ok(remote)))
1005 }
1006 Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(live_replay_error(err)))),
1007 Poll::Ready(None) => Poll::Ready(None),
1008 }
1009 }
1010}
1011
1012pub struct RemoteSessionObservationStream {
1014 inner: SessionObservationStream,
1015 next_sequence: u64,
1016}
1017
1018impl RemoteSessionObservationStream {
1019 pub fn cursor(&self) -> RemoteSessionCursor {
1020 RemoteSessionCursor::from(self.inner.cursor())
1021 }
1022}
1023
1024impl Stream for RemoteSessionObservationStream {
1025 type Item = Result<RemoteSessionObservationStreamItem>;
1026
1027 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1028 match Pin::new(&mut self.inner).poll_next(cx) {
1029 Poll::Pending => Poll::Pending,
1030 Poll::Ready(Some(Ok(SessionObservationStreamItem::Event(event)))) => {
1031 let remote = RemoteSessionObservationEvent::from_core(self.next_sequence, event);
1032 self.next_sequence = self.next_sequence.saturating_add(1);
1033 Poll::Ready(Some(Ok(RemoteSessionObservationStreamItem::Event(remote))))
1034 }
1035 Poll::Ready(Some(Ok(SessionObservationStreamItem::Gap { observation, gap }))) => {
1036 Poll::Ready(Some(Ok(RemoteSessionObservationStreamItem::Gap {
1037 observation: observation.into(),
1038 gap: gap.into(),
1039 })))
1040 }
1041 Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err))),
1042 Poll::Ready(None) => Poll::Ready(None),
1043 }
1044 }
1045}
1046
1047pub struct SessionObservationStream {
1049 observable: ObservableSession,
1050 cursor: SessionCursor,
1051 subscription: Option<lash_core::LiveReplaySubscription>,
1052 done: bool,
1053}
1054
1055impl SessionObservationStream {
1056 pub fn cursor(&self) -> &SessionCursor {
1057 &self.cursor
1058 }
1059}
1060
1061impl Stream for SessionObservationStream {
1062 type Item = Result<SessionObservationStreamItem>;
1063
1064 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1065 loop {
1066 if self.done {
1067 return Poll::Ready(None);
1068 }
1069 if self.subscription.is_none() {
1070 match self.observable.subscribe_from_cursor(&self.cursor) {
1071 Ok(SessionObservationSubscription::Subscribed(subscription)) => {
1072 self.subscription = Some(subscription);
1073 }
1074 Ok(SessionObservationSubscription::Gap { observation, gap }) => {
1075 self.cursor = gap.latest_cursor.clone();
1076 return Poll::Ready(Some(Ok(SessionObservationStreamItem::Gap {
1077 observation,
1078 gap,
1079 })));
1080 }
1081 Err(err) => {
1082 self.done = true;
1083 return Poll::Ready(Some(Err(err)));
1084 }
1085 }
1086 }
1087
1088 let Some(subscription) = self.subscription.as_mut() else {
1089 continue;
1090 };
1091 match Pin::new(subscription).poll_next(cx) {
1092 Poll::Pending => return Poll::Pending,
1093 Poll::Ready(Some(Ok(event))) => {
1094 self.cursor = event.cursor.clone();
1095 return Poll::Ready(Some(Ok(SessionObservationStreamItem::Event(event))));
1096 }
1097 Poll::Ready(Some(Err(LiveReplayStoreError::SubscriberLagged(_)))) => {
1098 self.subscription = None;
1099 continue;
1100 }
1101 Poll::Ready(Some(Err(err))) => {
1102 self.done = true;
1103 return Poll::Ready(Some(Err(live_replay_error(err))));
1104 }
1105 Poll::Ready(None) => {
1106 self.done = true;
1107 return Poll::Ready(None);
1108 }
1109 }
1110 }
1111 }
1112}
1113
1114fn live_replay_error(err: lash_core::LiveReplayStoreError) -> EmbedError {
1115 EmbedError::Runtime(lash_core::RuntimeError::new(
1116 RuntimeErrorCode::Other("live_replay".to_string()),
1117 err.to_string(),
1118 ))
1119}
1120
1121pub struct EnqueueTurnBuilder<'a> {
1122 session: &'a LashSession,
1123 input: TurnInput,
1124 id: Option<String>,
1125 ingress: TurnInputIngress,
1126}
1127
1128impl<'a> EnqueueTurnBuilder<'a> {
1129 pub fn id(mut self, id: impl Into<String>) -> Self {
1130 self.id = Some(id.into());
1131 self
1132 }
1133
1134 pub fn ingress(mut self, ingress: TurnInputIngress) -> Self {
1135 self.ingress = ingress;
1136 self
1137 }
1138
1139 pub async fn send(self) -> Result<PendingTurnInput> {
1140 let source_key = self.id.map(|id| format!("host:{id}"));
1141 self.session
1142 .runtime
1143 .enqueue_turn_input(self.input, self.ingress, source_key)
1144 .await
1145 .map_err(EmbedError::Runtime)
1146 }
1147}
1148
1149impl<'a> std::future::IntoFuture for EnqueueTurnBuilder<'a> {
1150 type Output = Result<PendingTurnInput>;
1151 type IntoFuture =
1152 std::pin::Pin<Box<dyn std::future::Future<Output = Result<PendingTurnInput>> + 'a>>;
1153
1154 fn into_future(self) -> Self::IntoFuture {
1155 Box::pin(self.send())
1156 }
1157}