Skip to main content

lash/
session.rs

1use crate::support::*;
2use lash_core::runtime::{DeliveryPolicy, QueuedWorkBatch, SlotPolicy};
3
4pub struct SessionBuilder {
5    pub(crate) core: LashCore,
6    pub(crate) session_id: String,
7    pub(crate) spec: SessionSpec,
8    pub(crate) parent_session_id: Option<String>,
9    pub(crate) store: Option<Arc<dyn RuntimePersistence>>,
10    pub(crate) provider: Option<ProviderHandle>,
11    pub(crate) active_plugins: Vec<ActivePluginBinding>,
12    pub(crate) plugin_factories: Vec<Arc<dyn PluginFactory>>,
13}
14
15#[cfg(feature = "rlm")]
16pub struct RlmSessionBuilder {
17    pub(crate) builder: SessionBuilder,
18    pub(crate) rlm_final_answer_format: Option<lash_rlm_types::RlmFinalAnswerFormat>,
19}
20
21impl SessionBuilder {
22    pub fn provider(mut self, provider: ProviderHandle) -> Self {
23        self.spec = self.spec.provider_id(provider.kind());
24        self.provider = Some(provider);
25        self
26    }
27
28    pub fn session_spec(mut self, spec: SessionSpec) -> Self {
29        self.spec = spec;
30        self
31    }
32
33    pub fn parent(mut self, parent_session_id: impl Into<String>) -> Self {
34        self.parent_session_id = Some(parent_session_id.into());
35        self
36    }
37
38    /// Use a specific persistence store for this root session.
39    ///
40    /// This is the right API for a host-owned, pre-opened session database.
41    /// Managed child sessions never reuse this store; configure
42    /// `LashCoreBuilder::child_store_factory` when child sessions should also
43    /// persist.
44    pub fn store(mut self, store: Arc<dyn RuntimePersistence>) -> Self {
45        self.store = Some(store);
46        self
47    }
48
49    pub fn plugin<P: PluginBinding>(mut self, config: P::SessionConfig) -> Self {
50        self.active_plugins.push(ActivePluginBinding {
51            id: P::ID,
52            requires_turn_input: P::requires_turn_input(&config),
53        });
54        self.plugin_factories.push(P::factory(&config));
55        self
56    }
57
58    pub async fn open(self) -> Result<LashSession> {
59        let policy = self.session_policy();
60        let store = self.create_store(&policy).await?;
61        let state = self
62            .load_or_default_state(&policy, store.as_deref())
63            .await?;
64        self.open_resolved(policy, state, store).await
65    }
66
67    /// Open this session with a fresh resident graph, ignoring any persisted
68    /// session graph/checkpoint state that may already exist for the same
69    /// session id.
70    ///
71    /// The next successful commit writes a full replacement graph, so normal
72    /// embedders can use this to start over without manually calling
73    /// `load_persisted_session_state` or constructing a `RuntimeSessionState`.
74    /// Use [`Self::open`] for resume and [`Self::open_with_state`] only when
75    /// restoring explicit host-owned state.
76    pub async fn open_fresh(self) -> Result<LashSession> {
77        let policy = self.session_policy();
78        let store = self.create_store(&policy).await?;
79        let state = RuntimeSessionState {
80            session_id: self.session_id.clone(),
81            policy: policy.clone(),
82            graph_replace_required: true,
83            ..RuntimeSessionState::default()
84        };
85        self.open_resolved(policy, state, store).await
86    }
87
88    /// Open with an explicitly supplied runtime state.
89    ///
90    /// This is for advanced hosts that already own a complete state snapshot.
91    /// Normal embedders should use [`Self::open`] to resume according to Lash's
92    /// residency policy or [`Self::open_fresh`] to start over and replace prior
93    /// persisted state on the next commit.
94    pub async fn open_with_state(self, mut state: RuntimeSessionState) -> Result<LashSession> {
95        let policy = self.session_policy();
96        let store = self.create_store(&policy).await?;
97        if state.session_id != self.session_id {
98            return Err(EmbedError::StoreSessionMismatch {
99                loaded: state.session_id,
100                requested: self.session_id,
101            });
102        }
103        let recorded_provider_id = state.policy.recorded_provider_id().to_string();
104        state.policy = policy.clone();
105        state.policy.provider_id = recorded_provider_id;
106        self.open_resolved(policy, state, store).await
107    }
108
109    fn session_policy(&self) -> SessionPolicy {
110        let mut policy = self.spec.resolve_against(&self.core.policy);
111        policy.session_id = Some(self.session_id.clone());
112        policy
113    }
114
115    async fn load_or_default_state(
116        &self,
117        policy: &SessionPolicy,
118        store: Option<&dyn RuntimePersistence>,
119    ) -> Result<RuntimeSessionState> {
120        let state = match store {
121            Some(store) => {
122                let loaded = self.load_persisted_state_for_residency(store).await?;
123                let mut state = loaded.unwrap_or_else(|| RuntimeSessionState {
124                    session_id: self.session_id.clone(),
125                    policy: policy.clone(),
126                    ..RuntimeSessionState::default()
127                });
128                if state.session_id != self.session_id {
129                    return Err(EmbedError::StoreSessionMismatch {
130                        loaded: state.session_id,
131                        requested: self.session_id.clone(),
132                    });
133                }
134                let recorded_provider_id = state.policy.recorded_provider_id().to_string();
135                state.policy = policy.clone();
136                state.policy.provider_id = recorded_provider_id;
137                state
138            }
139            None => RuntimeSessionState {
140                session_id: self.session_id.clone(),
141                policy: policy.clone(),
142                ..RuntimeSessionState::default()
143            },
144        };
145        Ok(state)
146    }
147
148    async fn load_persisted_state_for_residency(
149        &self,
150        store: &dyn RuntimePersistence,
151    ) -> Result<Option<RuntimeSessionState>> {
152        match self.core.env.residency {
153            Residency::KeepAll => {
154                let loaded = lash_core::store::load_persisted_session_state(store)
155                    .await
156                    .map_err(|err| {
157                        SessionError::Protocol(format!("failed to load store: {err}"))
158                    })?;
159                Ok(loaded)
160            }
161            Residency::ActivePathOnly => {
162                let active =
163                    lash_core::store::load_persisted_session_state_active_path(store, None)
164                        .await
165                        .map_err(|err| {
166                            SessionError::Protocol(format!(
167                                "failed to load active-path store: {err}"
168                            ))
169                        })?;
170                if active
171                    .as_ref()
172                    .is_some_and(|state| state.session_graph.nodes.is_empty())
173                {
174                    let mut full = lash_core::store::load_persisted_session_state(store)
175                        .await
176                        .map_err(|err| {
177                            SessionError::Protocol(format!(
178                                "failed to heal active-path store from full graph: {err}"
179                            ))
180                        })?;
181                    if let Some(state) = full.as_mut() {
182                        state.graph_replace_required = true;
183                    }
184                    return Ok(full);
185                }
186                Ok(active)
187            }
188        }
189    }
190
191    async fn open_resolved(
192        self,
193        policy: SessionPolicy,
194        state: RuntimeSessionState,
195        store: Option<Arc<dyn RuntimePersistence>>,
196    ) -> Result<LashSession> {
197        let mut env = self.core.env.clone();
198        if let Some(provider) = self.provider.clone().or_else(|| self.core.provider.clone()) {
199            env.core.providers.provider_resolver =
200                Arc::new(lash_core::SingleProviderResolver::new(provider));
201        }
202        let plugin_host = build_plugin_host(
203            self.core.protocol_factory.as_ref(),
204            self.core.plugin_factories.as_ref(),
205            self.plugin_factories,
206        )?;
207        env.core = self
208            .core
209            .runtime_host_for_plugin_host(env.core.clone(), &plugin_host)?;
210        env.plugin_host = Some(Arc::new(plugin_host));
211        let effect_host = Arc::clone(&env.core.control.effect_host);
212        // Lazily spawn the default process work runner (Decision 3: deferred to
213        // the first open so a tokio runtime is guaranteed; idempotent via the
214        // shared once-guard) and thread its poke onto this session's host so the
215        // process admin seam can wake the runner after a successful start.
216        env.process_work_poke = self.core.process_work_runner.poke().await;
217        let runtime = LashRuntime::from_environment(&env, policy, state, store).await?;
218        let handle = RuntimeHandle::with_live_replay_store(
219            runtime,
220            Arc::clone(&self.core.live_replay_store),
221        );
222        Ok(LashSession {
223            runtime: handle,
224            effect_host,
225            parent_session_id: self.parent_session_id,
226            active_plugins: self.active_plugins,
227            process_phase_probe_slot: self.core.process_work_runner.phase_probe_slot(),
228            turn_cancels: crate::turn::TurnCancelRegistry::default(),
229        })
230    }
231
232    async fn create_store(
233        &self,
234        policy: &SessionPolicy,
235    ) -> Result<Option<Arc<dyn RuntimePersistence>>> {
236        if let Some(store) = self.store.as_ref() {
237            return Ok(Some(Arc::clone(store)));
238        }
239        let Some(factory) = self.core.store_factory.as_ref() else {
240            return Ok(None);
241        };
242        let request = SessionStoreCreateRequest {
243            session_id: self.session_id.clone(),
244            relation: self
245                .parent_session_id
246                .as_ref()
247                .map(|parent_session_id| lash_core::SessionRelation::Child {
248                    parent_session_id: parent_session_id.clone(),
249                    caused_by: None,
250                })
251                .unwrap_or_default(),
252            policy: policy.clone(),
253        };
254        factory
255            .create_store(&request)
256            .await
257            .map(Some)
258            .map_err(|message| EmbedError::StoreFactory {
259                session_id: self.session_id.clone(),
260                message,
261            })
262    }
263}
264
265impl PromptLayerSink for SessionBuilder {
266    fn prompt_layer_mut(&mut self) -> &mut PromptLayer {
267        self.spec.prompt.get_or_insert_with(PromptLayer::new)
268    }
269}
270
271#[cfg(feature = "rlm")]
272impl RlmSessionBuilder {
273    pub fn provider(mut self, provider: ProviderHandle) -> Self {
274        self.builder = self.builder.provider(provider);
275        self
276    }
277
278    pub fn session_spec(mut self, spec: SessionSpec) -> Self {
279        self.builder = self.builder.session_spec(spec);
280        self
281    }
282
283    pub fn parent(mut self, parent_session_id: impl Into<String>) -> Self {
284        self.builder = self.builder.parent(parent_session_id);
285        self
286    }
287
288    pub fn store(mut self, store: Arc<dyn RuntimePersistence>) -> Self {
289        self.builder = self.builder.store(store);
290        self
291    }
292
293    pub fn plugin<P: PluginBinding>(mut self, config: P::SessionConfig) -> Self {
294        self.builder = self.builder.plugin::<P>(config);
295        self
296    }
297
298    pub async fn open(self) -> Result<LashSession> {
299        self.open_resolved(RlmOpenState::Resume).await
300    }
301
302    pub async fn open_fresh(self) -> Result<LashSession> {
303        self.open_resolved(RlmOpenState::Fresh).await
304    }
305
306    pub async fn open_with_state(self, state: RuntimeSessionState) -> Result<LashSession> {
307        self.open_resolved(RlmOpenState::Explicit(state)).await
308    }
309
310    async fn open_resolved(self, open_state: RlmOpenState) -> Result<LashSession> {
311        let Self {
312            builder,
313            rlm_final_answer_format,
314        } = self;
315        let policy = builder.session_policy();
316        let store = builder.create_store(&policy).await?;
317        let mut state = match open_state {
318            RlmOpenState::Resume => {
319                builder
320                    .load_or_default_state(&policy, store.as_deref())
321                    .await?
322            }
323            RlmOpenState::Fresh => RuntimeSessionState {
324                session_id: builder.session_id.clone(),
325                policy: policy.clone(),
326                graph_replace_required: true,
327                ..RuntimeSessionState::default()
328            },
329            RlmOpenState::Explicit(mut state) => {
330                if state.session_id != builder.session_id {
331                    return Err(EmbedError::StoreSessionMismatch {
332                        loaded: state.session_id,
333                        requested: builder.session_id.clone(),
334                    });
335                }
336                let recorded_provider_id = state.policy.recorded_provider_id().to_string();
337                state.policy = policy.clone();
338                state.policy.provider_id = recorded_provider_id;
339                state
340            }
341        };
342        apply_rlm_session_options(
343            builder.parent_session_id.is_none(),
344            rlm_final_answer_format,
345            &mut state,
346        )?;
347        builder.open_resolved(policy, state, store).await
348    }
349}
350
351#[cfg(feature = "rlm")]
352impl PromptLayerSink for RlmSessionBuilder {
353    fn prompt_layer_mut(&mut self) -> &mut PromptLayer {
354        self.builder.prompt_layer_mut()
355    }
356}
357
358#[cfg(feature = "rlm")]
359enum RlmOpenState {
360    Resume,
361    Fresh,
362    Explicit(RuntimeSessionState),
363}
364
365#[cfg(feature = "rlm")]
366fn apply_rlm_session_options(
367    is_root_session: bool,
368    explicit_format: Option<lash_rlm_types::RlmFinalAnswerFormat>,
369    state: &mut RuntimeSessionState,
370) -> Result<()> {
371    let final_answer_format = explicit_format.unwrap_or_else(|| {
372        if is_root_session {
373            lash_rlm_types::RlmFinalAnswerFormat::Markdown
374        } else {
375            lash_rlm_types::RlmFinalAnswerFormat::RawSubmitValue
376        }
377    });
378    let mut extras = if state.protocol_turn_options.is_empty() {
379        lash_rlm_types::RlmCreateExtras::default()
380    } else {
381        state.protocol_turn_options.decode()?
382    };
383    extras.final_answer_format = Some(final_answer_format);
384    let options = ProtocolTurnOptions::typed(extras)?;
385    state.protocol_turn_options = options.clone();
386    for frame in &mut state.agent_frames {
387        frame.protocol_turn_options = options.clone();
388    }
389    Ok(())
390}
391
392#[derive(Clone)]
393pub struct LashSession {
394    pub(crate) runtime: RuntimeHandle,
395    pub(crate) effect_host: Arc<dyn EffectHost>,
396    pub(crate) parent_session_id: Option<String>,
397    pub(crate) active_plugins: Vec<ActivePluginBinding>,
398    pub(crate) process_phase_probe_slot: Option<lash_core::runtime::RuntimeTurnPhaseProbeSlot>,
399    pub(crate) turn_cancels: crate::turn::TurnCancelRegistry,
400}
401
402#[derive(Clone, Debug, Default)]
403pub struct SessionConfigPatch {
404    pub provider: Option<ProviderHandle>,
405    pub model: Option<ModelSpec>,
406    pub prompt: Option<PromptLayer>,
407}
408
409impl LashSession {
410    pub async fn close(self) -> Result<()> {
411        let runtime = self.runtime.writer();
412        let runtime = runtime.lock().await;
413        runtime.unregister_plugin_session()?;
414        Ok(())
415    }
416
417    pub fn session_id(&self) -> String {
418        self.runtime.observe().session_id().to_string()
419    }
420
421    pub fn policy_snapshot(&self) -> SessionPolicy {
422        self.runtime.observe().policy.clone()
423    }
424
425    pub fn observe(&self) -> ObservableSession {
426        ObservableSession {
427            runtime: self.runtime.clone(),
428        }
429    }
430
431    pub fn parent_session_id(&self) -> Option<&str> {
432        self.parent_session_id.as_deref()
433    }
434
435    pub fn effect_host(&self) -> Arc<dyn EffectHost> {
436        Arc::clone(&self.effect_host)
437    }
438
439    pub fn turn(&self, input: TurnInput) -> TurnBuilder {
440        TurnBuilder {
441            runtime: self.runtime.clone(),
442            effect_host: Arc::clone(&self.effect_host),
443            active_plugins: self.active_plugins.clone(),
444            input,
445            cancel: CancellationToken::new(),
446            cancels: self.turn_cancels.clone(),
447            protocol_turn_options: None,
448            provider: None,
449            model: None,
450            turn_id: None,
451        }
452    }
453
454    pub fn queued_turn(&self) -> QueuedTurnBuilder {
455        QueuedTurnBuilder {
456            runtime: self.runtime.clone(),
457            effect_host: Arc::clone(&self.effect_host),
458            cancel: CancellationToken::new(),
459            cancels: self.turn_cancels.clone(),
460            batch_ids: Vec::new(),
461            drain_id: None,
462        }
463    }
464
465    /// Cancel every turn currently executing through this opened session
466    /// (including its clones) and report how many were signalled.
467    ///
468    /// This is the affordance behind a UI "stop" control: hold a clone of the
469    /// session wherever the stop arrives and call this, instead of threading a
470    /// [`CancellationToken`](crate::CancellationToken) into every turn call
471    /// ([`TurnBuilder::cancel`](crate::TurnBuilder::cancel) remains the
472    /// per-turn hook when you need one). A cancelled turn finishes with
473    /// `TurnOutcome::Stopped(TurnStop::Cancelled)` and commits like any other
474    /// turn; the session stays usable.
475    ///
476    /// Scope: turns started from this `LashSession` instance and its clones.
477    /// A handle opened separately for the same session id has its own
478    /// registry and is not reached.
479    pub fn cancel_running_turns(&self) -> usize {
480        self.turn_cancels.cancel_all()
481    }
482
483    pub fn admin(&self) -> SessionAdmin {
484        SessionAdmin {
485            runtime: self.runtime.clone(),
486        }
487    }
488
489    pub async fn configure(&self, patch: SessionConfigPatch) -> Result<()> {
490        self.admin().config().update(patch).await
491    }
492
493    pub fn tools(&self) -> ToolAdmin {
494        ToolAdmin::new(self.admin())
495    }
496
497    pub fn commands(&self) -> SessionCommandAdmin {
498        self.admin().commands()
499    }
500
501    pub fn triggers(&self) -> SessionTriggerAdmin {
502        self.admin().triggers()
503    }
504
505    pub fn processes(&self) -> SessionProcessAdmin {
506        SessionProcessAdmin::new(self.admin())
507    }
508
509    pub fn plugin_actions(&self) -> PluginActions {
510        PluginActions {
511            control: self.admin(),
512        }
513    }
514
515    pub fn enqueue(&self, input: TurnInput) -> EnqueueTurnBuilder<'_> {
516        EnqueueTurnBuilder {
517            session: self,
518            input,
519            id: None,
520            delivery_policy: DeliveryPolicy::AfterCurrentTurnCommit,
521            slot_policy: SlotPolicy::Exclusive,
522        }
523    }
524
525    pub async fn queued_work(&self) -> Result<Vec<QueuedWorkBatch>> {
526        let observation = self.runtime.observe();
527        let store = observation.queue_store.as_ref().ok_or_else(|| {
528            EmbedError::Runtime(lash_core::RuntimeError::new(
529                lash_core::RuntimeErrorCode::StoreCommitFailed,
530                "queued work inspection requires a persistent runtime store",
531            ))
532        })?;
533        store
534            .list_pending_queued_work(observation.session_id())
535            .await
536            .map_err(|err| {
537                EmbedError::Runtime(lash_core::RuntimeError::new(
538                    lash_core::RuntimeErrorCode::StoreCommitFailed,
539                    err.to_string(),
540                ))
541            })
542    }
543
544    pub async fn cancel_queued_work_batch(
545        &self,
546        batch_id: &str,
547    ) -> Result<Option<QueuedWorkBatch>> {
548        let session_id = self.session_id();
549        self.runtime
550            .cancel_queued_work_batch(&session_id, batch_id)
551            .await
552            .map_err(EmbedError::Runtime)
553    }
554
555    /// Resolve once `batch_id` is no longer pending in the queue store —
556    /// drained by whoever runs queued work (a queued-work runner, a durable
557    /// worker, or another handle's [`queued_turn`](Self::queued_turn)) or
558    /// cancelled. This is the enqueue-and-observe side of the queue: the
559    /// caller never claims the work itself.
560    ///
561    /// Completion is read from the persistent queue store, so it observes
562    /// drains performed by other session handles and other processes alike.
563    /// There is no built-in deadline — nothing resolves if nothing drains the
564    /// queue, so bound it with `tokio::time::timeout` when the worker may be
565    /// unavailable. A batch id the store has never seen resolves immediately.
566    pub async fn await_queued_work_batch(&self, batch_id: &str) -> Result<()> {
567        let observation = self.runtime.observe();
568        let store = observation.queue_store.clone().ok_or_else(|| {
569            EmbedError::Runtime(lash_core::RuntimeError::new(
570                lash_core::RuntimeErrorCode::StoreCommitFailed,
571                "queued work inspection requires a persistent runtime store",
572            ))
573        })?;
574        let session_id = observation.session_id().to_string();
575        drop(observation);
576        let mut delay = std::time::Duration::from_millis(25);
577        loop {
578            let pending = store
579                .list_pending_queued_work(&session_id)
580                .await
581                .map_err(|err| {
582                    EmbedError::Runtime(lash_core::RuntimeError::new(
583                        lash_core::RuntimeErrorCode::StoreCommitFailed,
584                        err.to_string(),
585                    ))
586                })?;
587            if !pending.iter().any(|batch| batch.batch_id == batch_id) {
588                return Ok(());
589            }
590            tokio::time::sleep(delay).await;
591            delay = (delay * 2).min(std::time::Duration::from_millis(400));
592        }
593    }
594
595    pub fn read_view(&self) -> SessionReadView {
596        self.runtime.observe().read_view.clone()
597    }
598
599    pub fn usage_report(&self) -> SessionUsageReport {
600        self.runtime.observe().usage_report.clone()
601    }
602
603    pub async fn set_turn_phase_probe(
604        &self,
605        probe: Arc<dyn lash_core::runtime::RuntimeTurnPhaseProbe>,
606    ) {
607        let writer = self.runtime.writer();
608        let mut runtime = writer.lock().await;
609        runtime.set_turn_phase_probe(Arc::clone(&probe));
610        self.runtime.publish_from(&runtime);
611        if let Some(slot) = &self.process_phase_probe_slot {
612            let observation = self.runtime.observe();
613            slot.set_for_session(observation.session_id(), Arc::clone(&probe));
614            let current_frame = observation.persisted_state.current_agent_frame_id.as_str();
615            if !current_frame.is_empty() {
616                let scope = lash_core::SessionScope::for_agent_frame(
617                    observation.session_id(),
618                    current_frame,
619                );
620                slot.set_for_scope(&scope, probe);
621            }
622        }
623    }
624}
625
626#[derive(Clone)]
627pub struct ObservableSession {
628    pub(crate) runtime: RuntimeHandle,
629}
630
631impl ObservableSession {
632    fn snapshot(&self) -> Arc<RuntimeObservation> {
633        self.runtime.observe()
634    }
635
636    pub fn current_observation(&self) -> SessionObservation {
637        self.runtime.current_session_observation()
638    }
639
640    pub fn resume_from_cursor(&self, cursor: &SessionCursor) -> Result<SessionResume> {
641        self.runtime
642            .resume_session_observation(cursor)
643            .map_err(live_replay_error)
644    }
645
646    pub fn subscribe_from_cursor(
647        &self,
648        cursor: &SessionCursor,
649    ) -> Result<SessionObservationSubscription> {
650        self.runtime
651            .subscribe_session_observation(cursor)
652            .map_err(live_replay_error)
653    }
654
655    pub fn session_id(&self) -> String {
656        self.snapshot().session_id().to_string()
657    }
658
659    pub fn policy_snapshot(&self) -> SessionPolicy {
660        self.snapshot().policy.clone()
661    }
662
663    pub fn read_view(&self) -> SessionReadView {
664        self.snapshot().read_view.clone()
665    }
666
667    pub fn usage_report(&self) -> SessionUsageReport {
668        self.snapshot().usage_report.clone()
669    }
670
671    pub fn tool_state(&self) -> Option<ToolState> {
672        self.snapshot().tool_state.clone()
673    }
674
675    pub fn active_tool_manifests(&self) -> Vec<ToolManifest> {
676        self.snapshot()
677            .tool_state
678            .as_ref()
679            .map(ToolState::tool_manifests)
680            .unwrap_or_default()
681    }
682
683    pub async fn list_process_handles(&self) -> Vec<ProcessHandleSummary> {
684        self.snapshot().list_process_handles().await
685    }
686
687    pub async fn list_all_process_handles(&self) -> Vec<ProcessHandleSummary> {
688        self.snapshot().list_all_process_handles().await
689    }
690
691    pub fn process_scope(&self) -> SessionScope {
692        self.snapshot().process_scope()
693    }
694}
695
696fn live_replay_error(err: lash_core::LiveReplayStoreError) -> EmbedError {
697    EmbedError::Runtime(lash_core::RuntimeError::new(
698        RuntimeErrorCode::Other("live_replay".to_string()),
699        err.to_string(),
700    ))
701}
702
703pub struct EnqueueTurnBuilder<'a> {
704    session: &'a LashSession,
705    input: TurnInput,
706    id: Option<String>,
707    delivery_policy: DeliveryPolicy,
708    slot_policy: SlotPolicy,
709}
710
711impl<'a> EnqueueTurnBuilder<'a> {
712    pub fn id(mut self, id: impl Into<String>) -> Self {
713        self.id = Some(id.into());
714        self
715    }
716
717    pub fn delivery_policy(mut self, policy: DeliveryPolicy) -> Self {
718        self.delivery_policy = policy;
719        self
720    }
721
722    pub fn slot_policy(mut self, policy: SlotPolicy) -> Self {
723        self.slot_policy = policy;
724        self
725    }
726
727    pub async fn send(self) -> Result<QueuedWorkBatch> {
728        let source_key = self.id.map(|id| format!("host:{id}"));
729        self.session
730            .runtime
731            .enqueue_turn_input(
732                self.input,
733                self.delivery_policy,
734                self.slot_policy,
735                source_key,
736            )
737            .await
738            .map_err(EmbedError::Runtime)
739    }
740}
741
742impl<'a> std::future::IntoFuture for EnqueueTurnBuilder<'a> {
743    type Output = Result<QueuedWorkBatch>;
744    type IntoFuture =
745        std::pin::Pin<Box<dyn std::future::Future<Output = Result<QueuedWorkBatch>> + 'a>>;
746
747    fn into_future(self) -> Self::IntoFuture {
748        Box::pin(self.send())
749    }
750}