1use crate::support::*;
2use lash_core::runtime::{DeliveryPolicy, QueuedWorkBatch, SlotPolicy};
3
4pub struct SessionBuilder {
5 pub(crate) core: LashCore,
6 pub(crate) session_id: String,
7 pub(crate) spec: SessionSpec,
8 pub(crate) parent_session_id: Option<String>,
9 pub(crate) store: Option<Arc<dyn RuntimePersistence>>,
10 pub(crate) provider: Option<ProviderHandle>,
11 pub(crate) active_plugins: Vec<ActivePluginBinding>,
12 pub(crate) plugin_factories: Vec<Arc<dyn PluginFactory>>,
13}
14
15#[cfg(feature = "rlm")]
16pub struct RlmSessionBuilder {
17 pub(crate) builder: SessionBuilder,
18 pub(crate) rlm_final_answer_format: Option<lash_rlm_types::RlmFinalAnswerFormat>,
19}
20
21impl SessionBuilder {
22 pub fn provider(mut self, provider: ProviderHandle) -> Self {
23 self.spec = self.spec.provider_id(provider.kind());
24 self.provider = Some(provider);
25 self
26 }
27
28 pub fn session_spec(mut self, spec: SessionSpec) -> Self {
29 self.spec = spec;
30 self
31 }
32
33 pub fn parent(mut self, parent_session_id: impl Into<String>) -> Self {
34 self.parent_session_id = Some(parent_session_id.into());
35 self
36 }
37
38 pub fn store(mut self, store: Arc<dyn RuntimePersistence>) -> Self {
45 self.store = Some(store);
46 self
47 }
48
49 pub fn plugin<P: PluginBinding>(mut self, config: P::SessionConfig) -> Self {
50 self.active_plugins.push(ActivePluginBinding {
51 id: P::ID,
52 requires_turn_input: P::requires_turn_input(&config),
53 });
54 self.plugin_factories.push(P::factory(&config));
55 self
56 }
57
58 pub async fn open(self) -> Result<LashSession> {
59 let policy = self.session_policy();
60 let store = self.create_store(&policy).await?;
61 let state = self
62 .load_or_default_state(&policy, store.as_deref())
63 .await?;
64 self.open_resolved(policy, state, store).await
65 }
66
67 pub async fn open_fresh(self) -> Result<LashSession> {
77 let policy = self.session_policy();
78 let store = self.create_store(&policy).await?;
79 let state = RuntimeSessionState {
80 session_id: self.session_id.clone(),
81 policy: policy.clone(),
82 graph_replace_required: true,
83 ..RuntimeSessionState::default()
84 };
85 self.open_resolved(policy, state, store).await
86 }
87
88 pub async fn open_with_state(self, mut state: RuntimeSessionState) -> Result<LashSession> {
95 let policy = self.session_policy();
96 let store = self.create_store(&policy).await?;
97 if state.session_id != self.session_id {
98 return Err(EmbedError::StoreSessionMismatch {
99 loaded: state.session_id,
100 requested: self.session_id,
101 });
102 }
103 let recorded_provider_id = state.policy.recorded_provider_id().to_string();
104 state.policy = policy.clone();
105 state.policy.provider_id = recorded_provider_id;
106 self.open_resolved(policy, state, store).await
107 }
108
109 fn session_policy(&self) -> SessionPolicy {
110 let mut policy = self.spec.resolve_against(&self.core.policy);
111 policy.session_id = Some(self.session_id.clone());
112 policy
113 }
114
115 async fn load_or_default_state(
116 &self,
117 policy: &SessionPolicy,
118 store: Option<&dyn RuntimePersistence>,
119 ) -> Result<RuntimeSessionState> {
120 let state = match store {
121 Some(store) => {
122 let loaded = self.load_persisted_state_for_residency(store).await?;
123 let mut state = loaded.unwrap_or_else(|| RuntimeSessionState {
124 session_id: self.session_id.clone(),
125 policy: policy.clone(),
126 ..RuntimeSessionState::default()
127 });
128 if state.session_id != self.session_id {
129 return Err(EmbedError::StoreSessionMismatch {
130 loaded: state.session_id,
131 requested: self.session_id.clone(),
132 });
133 }
134 let recorded_provider_id = state.policy.recorded_provider_id().to_string();
135 state.policy = policy.clone();
136 state.policy.provider_id = recorded_provider_id;
137 state
138 }
139 None => RuntimeSessionState {
140 session_id: self.session_id.clone(),
141 policy: policy.clone(),
142 ..RuntimeSessionState::default()
143 },
144 };
145 Ok(state)
146 }
147
148 async fn load_persisted_state_for_residency(
149 &self,
150 store: &dyn RuntimePersistence,
151 ) -> Result<Option<RuntimeSessionState>> {
152 load_persisted_state_for_residency(self.core.env.residency, store).await
153 }
154
155 async fn open_resolved(
156 self,
157 policy: SessionPolicy,
158 state: RuntimeSessionState,
159 store: Option<Arc<dyn RuntimePersistence>>,
160 ) -> Result<LashSession> {
161 let mut env = self.core.env.clone();
162 if let Some(provider) = self.provider.clone().or_else(|| self.core.provider.clone()) {
163 env.core.providers.provider_resolver =
164 Arc::new(lash_core::SingleProviderResolver::new(provider));
165 }
166 let plugin_host = build_plugin_host(
167 self.core.protocol_factory.as_ref(),
168 self.core.plugin_factories.as_ref(),
169 self.plugin_factories,
170 )?;
171 env.core = self
172 .core
173 .runtime_host_for_plugin_host(env.core.clone(), &plugin_host)?;
174 env.plugin_host = Some(Arc::new(plugin_host));
175 let effect_host = Arc::clone(&env.core.control.effect_host);
176 let drivers = self.core.work_driver.drivers().await;
177 env.process_work_driver = drivers.process.clone();
178 env.queued_work_driver = drivers.queued.clone();
179 let runtime = LashRuntime::from_environment(&env, policy, state, store).await?;
180 if drivers.drive_process_on_open
181 && let Some(driver) = drivers.process.as_ref()
182 {
183 driver.claim_and_run_pending("session_open").await?;
184 }
185 let handle = RuntimeHandle::with_live_replay_store(
186 runtime,
187 Arc::clone(&self.core.live_replay_store),
188 );
189 Ok(LashSession {
190 runtime: handle,
191 effect_host,
192 parent_session_id: self.parent_session_id,
193 active_plugins: self.active_plugins,
194 process_phase_probe_slot: self.core.work_driver.phase_probe_slot(),
195 turn_cancels: crate::turn::TurnCancelRegistry::default(),
196 })
197 }
198
199 async fn create_store(
200 &self,
201 policy: &SessionPolicy,
202 ) -> Result<Option<Arc<dyn RuntimePersistence>>> {
203 if let Some(store) = self.store.as_ref() {
204 return Ok(Some(Arc::clone(store)));
205 }
206 let Some(factory) = self.core.store_factory.as_ref() else {
207 return Ok(None);
208 };
209 let request = SessionStoreCreateRequest {
210 session_id: self.session_id.clone(),
211 relation: self
212 .parent_session_id
213 .as_ref()
214 .map(|parent_session_id| lash_core::SessionRelation::Child {
215 parent_session_id: parent_session_id.clone(),
216 caused_by: None,
217 })
218 .unwrap_or_default(),
219 policy: policy.clone(),
220 };
221 factory
222 .create_store(&request)
223 .await
224 .map(Some)
225 .map_err(|message| EmbedError::StoreFactory {
226 session_id: self.session_id.clone(),
227 message,
228 })
229 }
230}
231
232pub(crate) async fn load_state_for_residency(
233 residency: Residency,
234 session_id: &str,
235 policy: &SessionPolicy,
236 store: &dyn RuntimePersistence,
237) -> Result<RuntimeSessionState> {
238 let mut state = load_persisted_state_for_residency(residency, store)
239 .await?
240 .unwrap_or_else(|| RuntimeSessionState {
241 session_id: session_id.to_string(),
242 policy: policy.clone(),
243 ..RuntimeSessionState::default()
244 });
245 if state.session_id != session_id {
246 return Err(EmbedError::StoreSessionMismatch {
247 loaded: state.session_id,
248 requested: session_id.to_string(),
249 });
250 }
251 let recorded_provider_id = state.policy.recorded_provider_id().to_string();
252 state.policy = policy.clone();
253 state.policy.provider_id = recorded_provider_id;
254 Ok(state)
255}
256
257async fn load_persisted_state_for_residency(
258 residency: Residency,
259 store: &dyn RuntimePersistence,
260) -> Result<Option<RuntimeSessionState>> {
261 match residency {
262 Residency::KeepAll => {
263 let loaded = lash_core::store::load_persisted_session_state(store)
264 .await
265 .map_err(|err| SessionError::Protocol(format!("failed to load store: {err}")))?;
266 Ok(loaded)
267 }
268 Residency::ActivePathOnly => {
269 let active = lash_core::store::load_persisted_session_state_active_path(store, None)
270 .await
271 .map_err(|err| {
272 SessionError::Protocol(format!("failed to load active-path store: {err}"))
273 })?;
274 if active
275 .as_ref()
276 .is_some_and(|state| state.session_graph.nodes.is_empty())
277 {
278 let mut full = lash_core::store::load_persisted_session_state(store)
279 .await
280 .map_err(|err| {
281 SessionError::Protocol(format!(
282 "failed to heal active-path store from full graph: {err}"
283 ))
284 })?;
285 if let Some(state) = full.as_mut() {
286 state.graph_replace_required = true;
287 }
288 return Ok(full);
289 }
290 Ok(active)
291 }
292 }
293}
294
295impl PromptLayerSink for SessionBuilder {
296 fn prompt_layer_mut(&mut self) -> &mut PromptLayer {
297 self.spec.prompt.get_or_insert_with(PromptLayer::new)
298 }
299}
300
301#[cfg(feature = "rlm")]
302impl RlmSessionBuilder {
303 pub fn provider(mut self, provider: ProviderHandle) -> Self {
304 self.builder = self.builder.provider(provider);
305 self
306 }
307
308 pub fn session_spec(mut self, spec: SessionSpec) -> Self {
309 self.builder = self.builder.session_spec(spec);
310 self
311 }
312
313 pub fn parent(mut self, parent_session_id: impl Into<String>) -> Self {
314 self.builder = self.builder.parent(parent_session_id);
315 self
316 }
317
318 pub fn store(mut self, store: Arc<dyn RuntimePersistence>) -> Self {
319 self.builder = self.builder.store(store);
320 self
321 }
322
323 pub fn plugin<P: PluginBinding>(mut self, config: P::SessionConfig) -> Self {
324 self.builder = self.builder.plugin::<P>(config);
325 self
326 }
327
328 pub async fn open(self) -> Result<LashSession> {
329 self.open_resolved(RlmOpenState::Resume).await
330 }
331
332 pub async fn open_fresh(self) -> Result<LashSession> {
333 self.open_resolved(RlmOpenState::Fresh).await
334 }
335
336 pub async fn open_with_state(self, state: RuntimeSessionState) -> Result<LashSession> {
337 self.open_resolved(RlmOpenState::Explicit(state)).await
338 }
339
340 async fn open_resolved(self, open_state: RlmOpenState) -> Result<LashSession> {
341 let Self {
342 builder,
343 rlm_final_answer_format,
344 } = self;
345 let policy = builder.session_policy();
346 let store = builder.create_store(&policy).await?;
347 let mut state = match open_state {
348 RlmOpenState::Resume => {
349 builder
350 .load_or_default_state(&policy, store.as_deref())
351 .await?
352 }
353 RlmOpenState::Fresh => RuntimeSessionState {
354 session_id: builder.session_id.clone(),
355 policy: policy.clone(),
356 graph_replace_required: true,
357 ..RuntimeSessionState::default()
358 },
359 RlmOpenState::Explicit(mut state) => {
360 if state.session_id != builder.session_id {
361 return Err(EmbedError::StoreSessionMismatch {
362 loaded: state.session_id,
363 requested: builder.session_id.clone(),
364 });
365 }
366 let recorded_provider_id = state.policy.recorded_provider_id().to_string();
367 state.policy = policy.clone();
368 state.policy.provider_id = recorded_provider_id;
369 state
370 }
371 };
372 apply_rlm_session_options(
373 builder.parent_session_id.is_none(),
374 rlm_final_answer_format,
375 &mut state,
376 )?;
377 builder.open_resolved(policy, state, store).await
378 }
379}
380
381#[cfg(feature = "rlm")]
382impl PromptLayerSink for RlmSessionBuilder {
383 fn prompt_layer_mut(&mut self) -> &mut PromptLayer {
384 self.builder.prompt_layer_mut()
385 }
386}
387
388#[cfg(feature = "rlm")]
389enum RlmOpenState {
390 Resume,
391 Fresh,
392 Explicit(RuntimeSessionState),
393}
394
395#[cfg(feature = "rlm")]
396fn apply_rlm_session_options(
397 is_root_session: bool,
398 explicit_format: Option<lash_rlm_types::RlmFinalAnswerFormat>,
399 state: &mut RuntimeSessionState,
400) -> Result<()> {
401 let final_answer_format = explicit_format.unwrap_or_else(|| {
402 if is_root_session {
403 lash_rlm_types::RlmFinalAnswerFormat::Markdown
404 } else {
405 lash_rlm_types::RlmFinalAnswerFormat::RawSubmitValue
406 }
407 });
408 let mut extras = if state.protocol_turn_options.is_empty() {
409 lash_rlm_types::RlmCreateExtras::default()
410 } else {
411 state.protocol_turn_options.decode()?
412 };
413 extras.final_answer_format = Some(final_answer_format);
414 let options = ProtocolTurnOptions::typed(extras)?;
415 state.protocol_turn_options = options.clone();
416 for frame in &mut state.agent_frames {
417 frame.protocol_turn_options = options.clone();
418 }
419 Ok(())
420}
421
422#[cfg(all(test, feature = "rlm"))]
423mod tests {
424 use super::*;
425
426 #[test]
427 fn apply_rlm_session_options_preserves_existing_termination() -> Result<()> {
428 let mut state = RuntimeSessionState::default();
429 state.protocol_turn_options =
430 ProtocolTurnOptions::typed(lash_rlm_types::RlmCreateExtras {
431 termination: lash_rlm_types::RlmTermination::ProseOrSubmit,
432 final_answer_format: None,
433 })?;
434
435 apply_rlm_session_options(true, None, &mut state)?;
436
437 let extras: lash_rlm_types::RlmCreateExtras = state.protocol_turn_options.decode()?;
438 assert_eq!(
439 extras.termination,
440 lash_rlm_types::RlmTermination::ProseOrSubmit
441 );
442 assert_eq!(
443 extras.final_answer_format,
444 Some(lash_rlm_types::RlmFinalAnswerFormat::Markdown)
445 );
446 Ok(())
447 }
448}
449
450#[derive(Clone)]
451pub struct LashSession {
452 pub(crate) runtime: RuntimeHandle,
453 pub(crate) effect_host: Arc<dyn EffectHost>,
454 pub(crate) parent_session_id: Option<String>,
455 pub(crate) active_plugins: Vec<ActivePluginBinding>,
456 pub(crate) process_phase_probe_slot: Option<lash_core::runtime::RuntimeTurnPhaseProbeSlot>,
457 pub(crate) turn_cancels: crate::turn::TurnCancelRegistry,
458}
459
460#[derive(Clone, Debug, Default)]
461pub struct SessionConfigPatch {
462 pub provider: Option<ProviderHandle>,
463 pub model: Option<ModelSpec>,
464 pub prompt: Option<PromptLayer>,
465}
466
467impl LashSession {
468 pub async fn close(self) -> Result<()> {
469 let runtime = self.runtime.writer();
470 let runtime = runtime.lock().await;
471 runtime.unregister_plugin_session()?;
472 Ok(())
473 }
474
475 pub fn session_id(&self) -> String {
476 self.runtime.observe().session_id().to_string()
477 }
478
479 pub fn policy_snapshot(&self) -> SessionPolicy {
480 self.runtime.observe().policy.clone()
481 }
482
483 pub fn observe(&self) -> ObservableSession {
484 ObservableSession {
485 runtime: self.runtime.clone(),
486 }
487 }
488
489 pub fn parent_session_id(&self) -> Option<&str> {
490 self.parent_session_id.as_deref()
491 }
492
493 pub fn effect_host(&self) -> Arc<dyn EffectHost> {
494 Arc::clone(&self.effect_host)
495 }
496
497 pub fn turn(&self, input: TurnInput) -> TurnBuilder {
498 TurnBuilder {
499 runtime: self.runtime.clone(),
500 effect_host: Arc::clone(&self.effect_host),
501 active_plugins: self.active_plugins.clone(),
502 input,
503 cancel: CancellationToken::new(),
504 cancels: self.turn_cancels.clone(),
505 protocol_turn_options: None,
506 provider: None,
507 model: None,
508 turn_id: None,
509 }
510 }
511
512 pub fn queued_turn(&self) -> QueuedTurnBuilder {
513 QueuedTurnBuilder {
514 runtime: self.runtime.clone(),
515 effect_host: Arc::clone(&self.effect_host),
516 cancel: CancellationToken::new(),
517 cancels: self.turn_cancels.clone(),
518 batch_ids: Vec::new(),
519 drain_id: None,
520 }
521 }
522
523 pub fn cancel_running_turns(&self) -> usize {
538 self.turn_cancels.cancel_all()
539 }
540
541 pub fn admin(&self) -> SessionAdmin {
542 SessionAdmin {
543 runtime: self.runtime.clone(),
544 }
545 }
546
547 pub async fn configure(&self, patch: SessionConfigPatch) -> Result<()> {
548 self.admin().config().update(patch).await
549 }
550
551 pub fn tools(&self) -> ToolAdmin {
552 ToolAdmin::new(self.admin())
553 }
554
555 pub fn commands(&self) -> SessionCommandAdmin {
556 self.admin().commands()
557 }
558
559 pub fn triggers(&self) -> SessionTriggerAdmin {
560 self.admin().triggers()
561 }
562
563 pub fn processes(&self) -> SessionProcessAdmin {
564 SessionProcessAdmin::new(self.admin())
565 }
566
567 pub fn plugin_operations(&self) -> PluginOperations {
568 PluginOperations {
569 control: self.admin(),
570 }
571 }
572
573 pub fn enqueue(&self, input: TurnInput) -> EnqueueTurnBuilder<'_> {
574 EnqueueTurnBuilder {
575 session: self,
576 input,
577 id: None,
578 delivery_policy: DeliveryPolicy::AfterCurrentTurnCommit,
579 slot_policy: SlotPolicy::Exclusive,
580 }
581 }
582
583 pub async fn queued_work(&self) -> Result<Vec<QueuedWorkBatch>> {
584 let observation = self.runtime.observe();
585 let store = observation.queue_store.as_ref().ok_or_else(|| {
586 EmbedError::Runtime(lash_core::RuntimeError::new(
587 lash_core::RuntimeErrorCode::StoreCommitFailed,
588 "queued work inspection requires a persistent runtime store",
589 ))
590 })?;
591 store
592 .list_pending_queued_work(observation.session_id())
593 .await
594 .map_err(|err| {
595 EmbedError::Runtime(lash_core::RuntimeError::new(
596 lash_core::RuntimeErrorCode::StoreCommitFailed,
597 err.to_string(),
598 ))
599 })
600 }
601
602 pub async fn cancel_queued_work_batch(
603 &self,
604 batch_id: &str,
605 ) -> Result<Option<QueuedWorkBatch>> {
606 let session_id = self.session_id();
607 self.runtime
608 .cancel_queued_work_batch(&session_id, batch_id)
609 .await
610 .map_err(EmbedError::Runtime)
611 }
612
613 pub async fn await_queued_work_batch(&self, batch_id: &str) -> Result<()> {
625 let observation = self.runtime.observe();
626 let store = observation.queue_store.clone().ok_or_else(|| {
627 EmbedError::Runtime(lash_core::RuntimeError::new(
628 lash_core::RuntimeErrorCode::StoreCommitFailed,
629 "queued work inspection requires a persistent runtime store",
630 ))
631 })?;
632 let session_id = observation.session_id().to_string();
633 drop(observation);
634 let mut delay = std::time::Duration::from_millis(25);
635 loop {
636 let pending = store
637 .list_pending_queued_work(&session_id)
638 .await
639 .map_err(|err| {
640 EmbedError::Runtime(lash_core::RuntimeError::new(
641 lash_core::RuntimeErrorCode::StoreCommitFailed,
642 err.to_string(),
643 ))
644 })?;
645 if !pending.iter().any(|batch| batch.batch_id == batch_id) {
646 return Ok(());
647 }
648 tokio::time::sleep(delay).await;
649 delay = (delay * 2).min(std::time::Duration::from_millis(400));
650 }
651 }
652
653 pub fn read_view(&self) -> SessionReadView {
654 self.runtime.observe().read_view.clone()
655 }
656
657 pub fn usage_report(&self) -> SessionUsageReport {
658 self.runtime.observe().usage_report.clone()
659 }
660
661 pub async fn set_turn_phase_probe(
662 &self,
663 probe: Arc<dyn lash_core::runtime::RuntimeTurnPhaseProbe>,
664 ) {
665 let writer = self.runtime.writer();
666 let mut runtime = writer.lock().await;
667 runtime.set_turn_phase_probe(Arc::clone(&probe));
668 self.runtime.publish_from(&runtime);
669 if let Some(slot) = &self.process_phase_probe_slot {
670 let observation = self.runtime.observe();
671 slot.set_for_session(observation.session_id(), Arc::clone(&probe));
672 let current_frame = observation.persisted_state.current_agent_frame_id.as_str();
673 if !current_frame.is_empty() {
674 let scope = lash_core::SessionScope::for_agent_frame(
675 observation.session_id(),
676 current_frame,
677 );
678 slot.set_for_scope(&scope, probe);
679 }
680 }
681 }
682}
683
684#[derive(Clone)]
685pub struct ObservableSession {
686 pub(crate) runtime: RuntimeHandle,
687}
688
689impl ObservableSession {
690 fn snapshot(&self) -> Arc<RuntimeObservation> {
691 self.runtime.observe()
692 }
693
694 pub fn current_observation(&self) -> SessionObservation {
695 self.runtime.current_session_observation()
696 }
697
698 pub fn resume_from_cursor(&self, cursor: &SessionCursor) -> Result<SessionResume> {
699 self.runtime
700 .resume_session_observation(cursor)
701 .map_err(live_replay_error)
702 }
703
704 pub fn subscribe_from_cursor(
705 &self,
706 cursor: &SessionCursor,
707 ) -> Result<SessionObservationSubscription> {
708 self.runtime
709 .subscribe_session_observation(cursor)
710 .map_err(live_replay_error)
711 }
712
713 pub fn session_id(&self) -> String {
714 self.snapshot().session_id().to_string()
715 }
716
717 pub fn policy_snapshot(&self) -> SessionPolicy {
718 self.snapshot().policy.clone()
719 }
720
721 pub fn read_view(&self) -> SessionReadView {
722 self.snapshot().read_view.clone()
723 }
724
725 pub fn usage_report(&self) -> SessionUsageReport {
726 self.snapshot().usage_report.clone()
727 }
728
729 pub fn tool_state(&self) -> Option<ToolState> {
730 self.snapshot().tool_state.clone()
731 }
732
733 pub fn active_tool_manifests(&self) -> Vec<ToolManifest> {
734 self.snapshot()
735 .tool_state
736 .as_ref()
737 .map(ToolState::tool_manifests)
738 .unwrap_or_default()
739 }
740
741 pub async fn list_process_handles(&self) -> Vec<ProcessHandleSummary> {
742 self.snapshot().list_process_handles().await
743 }
744
745 pub async fn list_all_process_handles(&self) -> Vec<ProcessHandleSummary> {
746 self.snapshot().list_all_process_handles().await
747 }
748
749 pub fn process_scope(&self) -> SessionScope {
750 self.snapshot().process_scope()
751 }
752}
753
754fn live_replay_error(err: lash_core::LiveReplayStoreError) -> EmbedError {
755 EmbedError::Runtime(lash_core::RuntimeError::new(
756 RuntimeErrorCode::Other("live_replay".to_string()),
757 err.to_string(),
758 ))
759}
760
761pub struct EnqueueTurnBuilder<'a> {
762 session: &'a LashSession,
763 input: TurnInput,
764 id: Option<String>,
765 delivery_policy: DeliveryPolicy,
766 slot_policy: SlotPolicy,
767}
768
769impl<'a> EnqueueTurnBuilder<'a> {
770 pub fn id(mut self, id: impl Into<String>) -> Self {
771 self.id = Some(id.into());
772 self
773 }
774
775 pub fn delivery_policy(mut self, policy: DeliveryPolicy) -> Self {
776 self.delivery_policy = policy;
777 self
778 }
779
780 pub fn slot_policy(mut self, policy: SlotPolicy) -> Self {
781 self.slot_policy = policy;
782 self
783 }
784
785 pub async fn send(self) -> Result<QueuedWorkBatch> {
786 let source_key = self.id.map(|id| format!("host:{id}"));
787 self.session
788 .runtime
789 .enqueue_turn_input(
790 self.input,
791 self.delivery_policy,
792 self.slot_policy,
793 source_key,
794 )
795 .await
796 .map_err(EmbedError::Runtime)
797 }
798}
799
800impl<'a> std::future::IntoFuture for EnqueueTurnBuilder<'a> {
801 type Output = Result<QueuedWorkBatch>;
802 type IntoFuture =
803 std::pin::Pin<Box<dyn std::future::Future<Output = Result<QueuedWorkBatch>> + 'a>>;
804
805 fn into_future(self) -> Self::IntoFuture {
806 Box::pin(self.send())
807 }
808}