1use crate::support::*;
2use lash_core::runtime::{DeliveryPolicy, QueuedWorkBatch, SlotPolicy};
3
4pub struct SessionBuilder {
5 pub(crate) core: LashCore,
6 pub(crate) session_id: String,
7 pub(crate) spec: SessionSpec,
8 pub(crate) parent_session_id: Option<String>,
9 pub(crate) session_execution_owner_id: Option<String>,
10 pub(crate) store: Option<Arc<dyn RuntimePersistence>>,
11 pub(crate) provider: Option<ProviderHandle>,
12 pub(crate) active_plugins: Vec<ActivePluginBinding>,
13 pub(crate) plugin_factories: Vec<Arc<dyn PluginFactory>>,
14}
15
16#[cfg(feature = "rlm")]
17pub struct RlmSessionBuilder {
18 pub(crate) builder: SessionBuilder,
19 pub(crate) rlm_final_answer_format: Option<lash_rlm_types::RlmFinalAnswerFormat>,
20}
21
22impl SessionBuilder {
23 pub fn provider(mut self, provider: ProviderHandle) -> Self {
24 self.spec = self.spec.provider_id(provider.kind());
25 self.provider = Some(provider);
26 self
27 }
28
29 pub fn session_spec(mut self, spec: SessionSpec) -> Self {
30 self.spec = spec;
31 self
32 }
33
34 pub fn parent(mut self, parent_session_id: impl Into<String>) -> Self {
35 self.parent_session_id = Some(parent_session_id.into());
36 self
37 }
38
39 pub fn session_execution_owner_id(mut self, owner_id: impl Into<String>) -> Self {
47 self.session_execution_owner_id = Some(owner_id.into());
48 self
49 }
50
51 pub fn store(mut self, store: Arc<dyn RuntimePersistence>) -> Self {
58 self.store = Some(store);
59 self
60 }
61
62 pub fn plugin<P: PluginBinding>(mut self, config: P::SessionConfig) -> Self {
63 self.active_plugins.push(ActivePluginBinding {
64 id: P::ID,
65 requires_turn_input: P::requires_turn_input(&config),
66 });
67 self.plugin_factories.push(P::factory(&config));
68 self
69 }
70
71 pub async fn open(self) -> Result<LashSession> {
72 let policy = self.session_policy();
73 let store = self.create_store(&policy).await?;
74 let state = self
75 .load_or_default_state(&policy, store.as_deref())
76 .await?;
77 self.open_resolved(policy, state, store).await
78 }
79
80 pub async fn open_fresh(self) -> Result<LashSession> {
90 let policy = self.session_policy();
91 let store = self.create_store(&policy).await?;
92 let state = RuntimeSessionState {
93 session_id: self.session_id.clone(),
94 policy: policy.clone(),
95 graph_replace_required: true,
96 ..RuntimeSessionState::default()
97 };
98 self.open_resolved(policy, state, store).await
99 }
100
101 pub async fn open_with_state(self, mut state: RuntimeSessionState) -> Result<LashSession> {
108 let policy = self.session_policy();
109 let store = self.create_store(&policy).await?;
110 if state.session_id != self.session_id {
111 return Err(EmbedError::StoreSessionMismatch {
112 loaded: state.session_id,
113 requested: self.session_id,
114 });
115 }
116 let recorded_provider_id = state.policy.recorded_provider_id().to_string();
117 state.policy = policy.clone();
118 state.policy.provider_id = recorded_provider_id;
119 self.open_resolved(policy, state, store).await
120 }
121
122 fn session_policy(&self) -> SessionPolicy {
123 let mut policy = self.spec.resolve_against(&self.core.policy);
124 policy.session_id = Some(self.session_id.clone());
125 policy
126 }
127
128 async fn load_or_default_state(
129 &self,
130 policy: &SessionPolicy,
131 store: Option<&dyn RuntimePersistence>,
132 ) -> Result<RuntimeSessionState> {
133 let state = match store {
134 Some(store) => {
135 let loaded = self.load_persisted_state_for_residency(store).await?;
136 let mut state = loaded.unwrap_or_else(|| RuntimeSessionState {
137 session_id: self.session_id.clone(),
138 policy: policy.clone(),
139 ..RuntimeSessionState::default()
140 });
141 if state.session_id != self.session_id {
142 return Err(EmbedError::StoreSessionMismatch {
143 loaded: state.session_id,
144 requested: self.session_id.clone(),
145 });
146 }
147 let recorded_provider_id = state.policy.recorded_provider_id().to_string();
148 state.policy = policy.clone();
149 state.policy.provider_id = recorded_provider_id;
150 state
151 }
152 None => RuntimeSessionState {
153 session_id: self.session_id.clone(),
154 policy: policy.clone(),
155 ..RuntimeSessionState::default()
156 },
157 };
158 Ok(state)
159 }
160
161 async fn load_persisted_state_for_residency(
162 &self,
163 store: &dyn RuntimePersistence,
164 ) -> Result<Option<RuntimeSessionState>> {
165 load_persisted_state_for_residency(self.core.env.residency, store).await
166 }
167
168 async fn open_resolved(
169 self,
170 policy: SessionPolicy,
171 state: RuntimeSessionState,
172 store: Option<Arc<dyn RuntimePersistence>>,
173 ) -> Result<LashSession> {
174 let mut env = self.core.env.clone();
175 if let Some(provider) = self.provider.clone().or_else(|| self.core.provider.clone()) {
176 env.core.providers.provider_resolver =
177 Arc::new(lash_core::SingleProviderResolver::new(provider));
178 }
179 let plugin_host = build_plugin_host(
180 self.core.protocol_factory.as_ref(),
181 self.core.plugin_factories.as_ref(),
182 self.plugin_factories,
183 )?;
184 env.core = self
185 .core
186 .runtime_host_for_plugin_host(env.core.clone(), &plugin_host)?;
187 env.plugin_host = Some(Arc::new(plugin_host));
188 let effect_host = Arc::clone(&env.core.control.effect_host);
189 let drivers = self.core.work_driver.drivers().await;
190 env.process_work_driver = drivers.process.clone();
191 env.queued_work_driver = drivers.queued.clone();
192 let mut runtime = LashRuntime::from_environment(&env, policy, state, store).await?;
193 if let Some(owner_id) = self.session_execution_owner_id {
194 runtime.set_runtime_scope_id(owner_id);
195 }
196 if drivers.drive_process_on_open
197 && let Some(driver) = drivers.process.as_ref()
198 {
199 driver.claim_and_run_pending("session_open").await?;
200 }
201 let handle = RuntimeHandle::with_live_replay_store(
202 runtime,
203 Arc::clone(&self.core.live_replay_store),
204 );
205 Ok(LashSession {
206 runtime: handle,
207 effect_host,
208 parent_session_id: self.parent_session_id,
209 active_plugins: self.active_plugins,
210 process_phase_probe_slot: self.core.work_driver.phase_probe_slot(),
211 turn_cancels: crate::turn::TurnCancelRegistry::default(),
212 })
213 }
214
215 async fn create_store(
216 &self,
217 policy: &SessionPolicy,
218 ) -> Result<Option<Arc<dyn RuntimePersistence>>> {
219 if let Some(store) = self.store.as_ref() {
220 return Ok(Some(Arc::clone(store)));
221 }
222 let Some(factory) = self.core.store_factory.as_ref() else {
223 return Ok(None);
224 };
225 let request = SessionStoreCreateRequest {
226 session_id: self.session_id.clone(),
227 relation: self
228 .parent_session_id
229 .as_ref()
230 .map(|parent_session_id| lash_core::SessionRelation::Child {
231 parent_session_id: parent_session_id.clone(),
232 caused_by: None,
233 })
234 .unwrap_or_default(),
235 policy: policy.clone(),
236 };
237 factory
238 .create_store(&request)
239 .await
240 .map(Some)
241 .map_err(|message| EmbedError::StoreFactory {
242 session_id: self.session_id.clone(),
243 message,
244 })
245 }
246}
247
248pub(crate) async fn load_state_for_residency(
249 residency: Residency,
250 session_id: &str,
251 policy: &SessionPolicy,
252 store: &dyn RuntimePersistence,
253) -> Result<RuntimeSessionState> {
254 let mut state = load_persisted_state_for_residency(residency, store)
255 .await?
256 .unwrap_or_else(|| RuntimeSessionState {
257 session_id: session_id.to_string(),
258 policy: policy.clone(),
259 ..RuntimeSessionState::default()
260 });
261 if state.session_id != session_id {
262 return Err(EmbedError::StoreSessionMismatch {
263 loaded: state.session_id,
264 requested: session_id.to_string(),
265 });
266 }
267 let recorded_provider_id = state.policy.recorded_provider_id().to_string();
268 state.policy = policy.clone();
269 state.policy.provider_id = recorded_provider_id;
270 Ok(state)
271}
272
273async fn load_persisted_state_for_residency(
274 residency: Residency,
275 store: &dyn RuntimePersistence,
276) -> Result<Option<RuntimeSessionState>> {
277 match residency {
278 Residency::KeepAll => {
279 let loaded = lash_core::store::load_persisted_session_state(store)
280 .await
281 .map_err(|err| SessionError::Protocol(format!("failed to load store: {err}")))?;
282 Ok(loaded)
283 }
284 Residency::ActivePathOnly => {
285 let active = lash_core::store::load_persisted_session_state_active_path(store, None)
286 .await
287 .map_err(|err| {
288 SessionError::Protocol(format!("failed to load active-path store: {err}"))
289 })?;
290 if active
291 .as_ref()
292 .is_some_and(|state| state.session_graph.nodes.is_empty())
293 {
294 let mut full = lash_core::store::load_persisted_session_state(store)
295 .await
296 .map_err(|err| {
297 SessionError::Protocol(format!(
298 "failed to heal active-path store from full graph: {err}"
299 ))
300 })?;
301 if let Some(state) = full.as_mut() {
302 state.graph_replace_required = true;
303 }
304 return Ok(full);
305 }
306 Ok(active)
307 }
308 }
309}
310
311impl PromptLayerSink for SessionBuilder {
312 fn prompt_layer_mut(&mut self) -> &mut PromptLayer {
313 self.spec.prompt.get_or_insert_with(PromptLayer::new)
314 }
315}
316
317#[cfg(feature = "rlm")]
318impl RlmSessionBuilder {
319 pub fn provider(mut self, provider: ProviderHandle) -> Self {
320 self.builder = self.builder.provider(provider);
321 self
322 }
323
324 pub fn session_spec(mut self, spec: SessionSpec) -> Self {
325 self.builder = self.builder.session_spec(spec);
326 self
327 }
328
329 pub fn parent(mut self, parent_session_id: impl Into<String>) -> Self {
330 self.builder = self.builder.parent(parent_session_id);
331 self
332 }
333
334 pub fn session_execution_owner_id(mut self, owner_id: impl Into<String>) -> Self {
335 self.builder = self.builder.session_execution_owner_id(owner_id);
336 self
337 }
338
339 pub fn store(mut self, store: Arc<dyn RuntimePersistence>) -> Self {
340 self.builder = self.builder.store(store);
341 self
342 }
343
344 pub fn plugin<P: PluginBinding>(mut self, config: P::SessionConfig) -> Self {
345 self.builder = self.builder.plugin::<P>(config);
346 self
347 }
348
349 pub async fn open(self) -> Result<LashSession> {
350 self.open_resolved(RlmOpenState::Resume).await
351 }
352
353 pub async fn open_fresh(self) -> Result<LashSession> {
354 self.open_resolved(RlmOpenState::Fresh).await
355 }
356
357 pub async fn open_with_state(self, state: RuntimeSessionState) -> Result<LashSession> {
358 self.open_resolved(RlmOpenState::Explicit(state)).await
359 }
360
361 async fn open_resolved(self, open_state: RlmOpenState) -> Result<LashSession> {
362 let Self {
363 builder,
364 rlm_final_answer_format,
365 } = self;
366 let policy = builder.session_policy();
367 let store = builder.create_store(&policy).await?;
368 let mut state = match open_state {
369 RlmOpenState::Resume => {
370 builder
371 .load_or_default_state(&policy, store.as_deref())
372 .await?
373 }
374 RlmOpenState::Fresh => RuntimeSessionState {
375 session_id: builder.session_id.clone(),
376 policy: policy.clone(),
377 graph_replace_required: true,
378 ..RuntimeSessionState::default()
379 },
380 RlmOpenState::Explicit(mut state) => {
381 if state.session_id != builder.session_id {
382 return Err(EmbedError::StoreSessionMismatch {
383 loaded: state.session_id,
384 requested: builder.session_id.clone(),
385 });
386 }
387 let recorded_provider_id = state.policy.recorded_provider_id().to_string();
388 state.policy = policy.clone();
389 state.policy.provider_id = recorded_provider_id;
390 state
391 }
392 };
393 apply_rlm_session_options(
394 builder.parent_session_id.is_none(),
395 rlm_final_answer_format,
396 &mut state,
397 )?;
398 builder.open_resolved(policy, state, store).await
399 }
400}
401
402#[cfg(feature = "rlm")]
403impl PromptLayerSink for RlmSessionBuilder {
404 fn prompt_layer_mut(&mut self) -> &mut PromptLayer {
405 self.builder.prompt_layer_mut()
406 }
407}
408
409#[cfg(feature = "rlm")]
410#[allow(clippy::large_enum_variant)]
411enum RlmOpenState {
412 Resume,
413 Fresh,
414 Explicit(RuntimeSessionState),
415}
416
417#[cfg(feature = "rlm")]
418fn apply_rlm_session_options(
419 is_root_session: bool,
420 explicit_format: Option<lash_rlm_types::RlmFinalAnswerFormat>,
421 state: &mut RuntimeSessionState,
422) -> Result<()> {
423 let final_answer_format = explicit_format.unwrap_or({
424 if is_root_session {
425 lash_rlm_types::RlmFinalAnswerFormat::Markdown
426 } else {
427 lash_rlm_types::RlmFinalAnswerFormat::RawSubmitValue
428 }
429 });
430 let mut extras = if state.protocol_turn_options.is_empty() {
431 lash_rlm_types::RlmCreateExtras::default()
432 } else {
433 state.protocol_turn_options.decode()?
434 };
435 extras.final_answer_format = Some(final_answer_format);
436 let options = ProtocolTurnOptions::typed(extras)?;
437 state.protocol_turn_options = options.clone();
438 for frame in &mut state.agent_frames {
439 frame.protocol_turn_options = options.clone();
440 }
441 Ok(())
442}
443
444#[cfg(all(test, feature = "rlm"))]
445mod tests {
446 use super::*;
447
448 #[test]
449 fn apply_rlm_session_options_preserves_existing_termination() -> Result<()> {
450 let mut state = RuntimeSessionState {
451 protocol_turn_options: ProtocolTurnOptions::typed(lash_rlm_types::RlmCreateExtras {
452 termination: lash_rlm_types::RlmTermination::ProseOrSubmit,
453 final_answer_format: None,
454 })?,
455 ..Default::default()
456 };
457
458 apply_rlm_session_options(true, None, &mut state)?;
459
460 let extras: lash_rlm_types::RlmCreateExtras = state.protocol_turn_options.decode()?;
461 assert_eq!(
462 extras.termination,
463 lash_rlm_types::RlmTermination::ProseOrSubmit
464 );
465 assert_eq!(
466 extras.final_answer_format,
467 Some(lash_rlm_types::RlmFinalAnswerFormat::Markdown)
468 );
469 Ok(())
470 }
471}
472
473#[derive(Clone)]
474pub struct LashSession {
475 pub(crate) runtime: RuntimeHandle,
476 pub(crate) effect_host: Arc<dyn EffectHost>,
477 pub(crate) parent_session_id: Option<String>,
478 pub(crate) active_plugins: Vec<ActivePluginBinding>,
479 pub(crate) process_phase_probe_slot: Option<lash_core::runtime::RuntimeTurnPhaseProbeSlot>,
480 pub(crate) turn_cancels: crate::turn::TurnCancelRegistry,
481}
482
483#[derive(Clone, Debug, Default)]
484pub struct SessionConfigPatch {
485 pub provider: Option<ProviderHandle>,
486 pub model: Option<ModelSpec>,
487 pub prompt: Option<PromptLayer>,
488}
489
490impl LashSession {
491 pub async fn close(self) -> Result<()> {
492 let runtime = self.runtime.writer();
493 let runtime = runtime.lock().await;
494 runtime.unregister_plugin_session()?;
495 Ok(())
496 }
497
498 pub fn session_id(&self) -> String {
499 self.runtime.observe().session_id().to_string()
500 }
501
502 pub fn policy_snapshot(&self) -> SessionPolicy {
503 self.runtime.observe().policy.clone()
504 }
505
506 pub fn observe(&self) -> ObservableSession {
507 ObservableSession {
508 runtime: self.runtime.clone(),
509 }
510 }
511
512 pub fn parent_session_id(&self) -> Option<&str> {
513 self.parent_session_id.as_deref()
514 }
515
516 pub fn effect_host(&self) -> Arc<dyn EffectHost> {
517 Arc::clone(&self.effect_host)
518 }
519
520 pub fn turn(&self, input: TurnInput) -> TurnBuilder {
521 TurnBuilder {
522 runtime: self.runtime.clone(),
523 effect_host: Arc::clone(&self.effect_host),
524 active_plugins: self.active_plugins.clone(),
525 input,
526 cancel: CancellationToken::new(),
527 cancels: self.turn_cancels.clone(),
528 protocol_turn_options: None,
529 provider: None,
530 model: None,
531 turn_id: None,
532 }
533 }
534
535 pub fn queued_turn(&self) -> QueuedTurnBuilder {
536 QueuedTurnBuilder {
537 runtime: self.runtime.clone(),
538 effect_host: Arc::clone(&self.effect_host),
539 cancel: CancellationToken::new(),
540 cancels: self.turn_cancels.clone(),
541 batch_ids: Vec::new(),
542 drain_id: None,
543 }
544 }
545
546 pub fn cancel_running_turns(&self) -> usize {
561 self.turn_cancels.cancel_all()
562 }
563
564 pub fn admin(&self) -> SessionAdmin {
565 SessionAdmin {
566 runtime: self.runtime.clone(),
567 }
568 }
569
570 pub async fn configure(&self, patch: SessionConfigPatch) -> Result<()> {
571 self.admin().config().update(patch).await
572 }
573
574 pub fn tools(&self) -> ToolAdmin {
575 ToolAdmin::new(self.admin())
576 }
577
578 pub fn commands(&self) -> SessionCommandAdmin {
579 self.admin().commands()
580 }
581
582 pub fn triggers(&self) -> SessionTriggerAdmin {
583 self.admin().triggers()
584 }
585
586 pub fn processes(&self) -> SessionProcessAdmin {
587 SessionProcessAdmin::new(self.admin())
588 }
589
590 pub fn plugin_operations(&self) -> PluginOperations {
591 PluginOperations {
592 control: self.admin(),
593 }
594 }
595
596 pub fn enqueue(&self, input: TurnInput) -> EnqueueTurnBuilder<'_> {
597 EnqueueTurnBuilder {
598 session: self,
599 input,
600 id: None,
601 delivery_policy: DeliveryPolicy::AfterCurrentTurnCommit,
602 slot_policy: SlotPolicy::Exclusive,
603 }
604 }
605
606 pub async fn queued_work(&self) -> Result<Vec<QueuedWorkBatch>> {
607 let observation = self.runtime.observe();
608 let store = observation.queue_store.as_ref().ok_or_else(|| {
609 EmbedError::Runtime(lash_core::RuntimeError::new(
610 lash_core::RuntimeErrorCode::StoreCommitFailed,
611 "queued work inspection requires a persistent runtime store",
612 ))
613 })?;
614 store
615 .list_pending_queued_work(observation.session_id())
616 .await
617 .map_err(|err| {
618 EmbedError::Runtime(lash_core::RuntimeError::new(
619 lash_core::RuntimeErrorCode::StoreCommitFailed,
620 err.to_string(),
621 ))
622 })
623 }
624
625 pub async fn cancel_queued_work_batch(
626 &self,
627 batch_id: &str,
628 ) -> Result<Option<QueuedWorkBatch>> {
629 let session_id = self.session_id();
630 self.runtime
631 .cancel_queued_work_batch(&session_id, batch_id)
632 .await
633 .map_err(EmbedError::Runtime)
634 }
635
636 pub async fn await_queued_work_batch(&self, batch_id: &str) -> Result<()> {
648 let observation = self.runtime.observe();
649 let store = observation.queue_store.clone().ok_or_else(|| {
650 EmbedError::Runtime(lash_core::RuntimeError::new(
651 lash_core::RuntimeErrorCode::StoreCommitFailed,
652 "queued work inspection requires a persistent runtime store",
653 ))
654 })?;
655 let session_id = observation.session_id().to_string();
656 drop(observation);
657 let mut delay = std::time::Duration::from_millis(25);
658 loop {
659 let pending = store
660 .list_pending_queued_work(&session_id)
661 .await
662 .map_err(|err| {
663 EmbedError::Runtime(lash_core::RuntimeError::new(
664 lash_core::RuntimeErrorCode::StoreCommitFailed,
665 err.to_string(),
666 ))
667 })?;
668 if !pending.iter().any(|batch| batch.batch_id == batch_id) {
669 return Ok(());
670 }
671 tokio::time::sleep(delay).await;
672 delay = (delay * 2).min(std::time::Duration::from_millis(400));
673 }
674 }
675
676 pub fn read_view(&self) -> SessionReadView {
677 self.runtime.observe().read_view.clone()
678 }
679
680 pub fn usage_report(&self) -> SessionUsageReport {
681 self.runtime.observe().usage_report.clone()
682 }
683
684 pub async fn set_turn_phase_probe(
685 &self,
686 probe: Arc<dyn lash_core::runtime::RuntimeTurnPhaseProbe>,
687 ) {
688 let writer = self.runtime.writer();
689 let mut runtime = writer.lock().await;
690 runtime.set_turn_phase_probe(Arc::clone(&probe));
691 self.runtime.publish_from(&runtime);
692 if let Some(slot) = &self.process_phase_probe_slot {
693 let observation = self.runtime.observe();
694 slot.set_for_session(observation.session_id(), Arc::clone(&probe));
695 let current_frame = observation.persisted_state.current_agent_frame_id.as_str();
696 if !current_frame.is_empty() {
697 let scope = lash_core::SessionScope::for_agent_frame(
698 observation.session_id(),
699 current_frame,
700 );
701 slot.set_for_scope(&scope, probe);
702 }
703 }
704 }
705}
706
707#[derive(Clone)]
708pub struct ObservableSession {
709 pub(crate) runtime: RuntimeHandle,
710}
711
712impl ObservableSession {
713 fn snapshot(&self) -> Arc<RuntimeObservation> {
714 self.runtime.observe()
715 }
716
717 pub fn current_observation(&self) -> SessionObservation {
718 self.runtime.current_session_observation()
719 }
720
721 pub fn resume_from_cursor(&self, cursor: &SessionCursor) -> Result<SessionResume> {
722 self.runtime
723 .resume_session_observation(cursor)
724 .map_err(live_replay_error)
725 }
726
727 pub fn subscribe_from_cursor(
728 &self,
729 cursor: &SessionCursor,
730 ) -> Result<SessionObservationSubscription> {
731 self.runtime
732 .subscribe_session_observation(cursor)
733 .map_err(live_replay_error)
734 }
735
736 pub fn session_id(&self) -> String {
737 self.snapshot().session_id().to_string()
738 }
739
740 pub fn policy_snapshot(&self) -> SessionPolicy {
741 self.snapshot().policy.clone()
742 }
743
744 pub fn read_view(&self) -> SessionReadView {
745 self.snapshot().read_view.clone()
746 }
747
748 pub fn usage_report(&self) -> SessionUsageReport {
749 self.snapshot().usage_report.clone()
750 }
751
752 pub fn tool_state(&self) -> Option<ToolState> {
753 self.snapshot().tool_state.clone()
754 }
755
756 pub fn active_tool_manifests(&self) -> Vec<ToolManifest> {
757 self.snapshot()
758 .tool_state
759 .as_ref()
760 .map(ToolState::tool_manifests)
761 .unwrap_or_default()
762 }
763
764 pub async fn list_process_handles(&self) -> Vec<ProcessHandleSummary> {
765 self.snapshot().list_process_handles().await
766 }
767
768 pub async fn list_all_process_handles(&self) -> Vec<ProcessHandleSummary> {
769 self.snapshot().list_all_process_handles().await
770 }
771
772 pub fn process_scope(&self) -> SessionScope {
773 self.snapshot().process_scope()
774 }
775}
776
777fn live_replay_error(err: lash_core::LiveReplayStoreError) -> EmbedError {
778 EmbedError::Runtime(lash_core::RuntimeError::new(
779 RuntimeErrorCode::Other("live_replay".to_string()),
780 err.to_string(),
781 ))
782}
783
784pub struct EnqueueTurnBuilder<'a> {
785 session: &'a LashSession,
786 input: TurnInput,
787 id: Option<String>,
788 delivery_policy: DeliveryPolicy,
789 slot_policy: SlotPolicy,
790}
791
792impl<'a> EnqueueTurnBuilder<'a> {
793 pub fn id(mut self, id: impl Into<String>) -> Self {
794 self.id = Some(id.into());
795 self
796 }
797
798 pub fn delivery_policy(mut self, policy: DeliveryPolicy) -> Self {
799 self.delivery_policy = policy;
800 self
801 }
802
803 pub fn slot_policy(mut self, policy: SlotPolicy) -> Self {
804 self.slot_policy = policy;
805 self
806 }
807
808 pub async fn send(self) -> Result<QueuedWorkBatch> {
809 let source_key = self.id.map(|id| format!("host:{id}"));
810 self.session
811 .runtime
812 .enqueue_turn_input(
813 self.input,
814 self.delivery_policy,
815 self.slot_policy,
816 source_key,
817 )
818 .await
819 .map_err(EmbedError::Runtime)
820 }
821}
822
823impl<'a> std::future::IntoFuture for EnqueueTurnBuilder<'a> {
824 type Output = Result<QueuedWorkBatch>;
825 type IntoFuture =
826 std::pin::Pin<Box<dyn std::future::Future<Output = Result<QueuedWorkBatch>> + 'a>>;
827
828 fn into_future(self) -> Self::IntoFuture {
829 Box::pin(self.send())
830 }
831}