Skip to main content

lash/
turn.rs

1use std::pin::Pin;
2use std::task::{Context, Poll};
3
4use crate::support::*;
5use futures_util::Stream;
6
7pub use lash_core::{AssistantOutput, TurnIssue};
8
9/// The two internal event sinks threaded through the turn-execution helpers.
10///
11/// `events` is the raw lower-level runtime event stream, reachable from app
12/// code only via [`TurnBuilder::advanced`]; `turn_events` is the semantic
13/// [`TurnActivity`] stream used by the primary builder API.
14/// Bundling them keeps the internal turn fns to a single sink parameter.
15#[derive(Clone, Copy, Default)]
16pub(crate) struct TurnSinks<'a> {
17    events: Option<&'a dyn EventSink>,
18    turn_events: Option<&'a dyn TurnActivitySink>,
19}
20
21impl<'a> TurnSinks<'a> {
22    pub(crate) fn turn(events: &'a dyn TurnActivitySink) -> Self {
23        Self {
24            events: None,
25            turn_events: Some(events),
26        }
27    }
28
29    pub(crate) fn session(events: &'a dyn EventSink) -> Self {
30        Self {
31            events: Some(events),
32            turn_events: None,
33        }
34    }
35
36    fn events(&self) -> Option<&'a dyn EventSink> {
37        self.events
38    }
39
40    fn turn_events(&self) -> Option<&'a dyn TurnActivitySink> {
41        self.turn_events
42    }
43}
44
45/// Cancellation tokens of the turns currently executing through one opened
46/// [`LashSession`](crate::LashSession) (shared by its clones).
47/// [`LashSession::cancel_running_turns`](crate::LashSession::cancel_running_turns)
48/// cancels them without the caller having to thread a token around.
49#[derive(Clone, Default)]
50pub(crate) struct TurnCancelRegistry {
51    inner: Arc<StdMutex<TurnCancelRegistryInner>>,
52}
53
54#[derive(Default)]
55struct TurnCancelRegistryInner {
56    next_id: u64,
57    active: BTreeMap<u64, CancellationToken>,
58}
59
60impl TurnCancelRegistry {
61    /// Register the token of a turn that is about to execute. The guard
62    /// removes the entry when the turn finishes, however it finishes.
63    fn register(&self, token: CancellationToken) -> TurnCancelGuard {
64        let mut inner = self.inner.lock().expect("turn cancel registry");
65        let id = inner.next_id;
66        inner.next_id += 1;
67        inner.active.insert(id, token);
68        TurnCancelGuard {
69            registry: Arc::clone(&self.inner),
70            id,
71        }
72    }
73
74    pub(crate) fn cancel_all(&self) -> usize {
75        let inner = self.inner.lock().expect("turn cancel registry");
76        for token in inner.active.values() {
77            token.cancel();
78        }
79        inner.active.len()
80    }
81}
82
83pub(crate) struct TurnCancelGuard {
84    registry: Arc<StdMutex<TurnCancelRegistryInner>>,
85    id: u64,
86}
87
88impl Drop for TurnCancelGuard {
89    fn drop(&mut self) {
90        self.registry
91            .lock()
92            .expect("turn cancel registry")
93            .active
94            .remove(&self.id);
95    }
96}
97
98pub struct TurnBuilder {
99    pub(crate) runtime: RuntimeHandle,
100    pub(crate) effect_host: Arc<dyn EffectHost>,
101    pub(crate) active_plugins: Vec<ActivePluginBinding>,
102    pub(crate) input: TurnInput,
103    pub(crate) cancel: CancellationToken,
104    pub(crate) cancels: TurnCancelRegistry,
105    pub(crate) protocol_turn_options: Option<ProtocolTurnOptions>,
106    pub(crate) provider: Option<ProviderHandle>,
107    pub(crate) model: Option<ModelSpec>,
108    pub(crate) turn_id: Option<String>,
109}
110
111impl TurnBuilder {
112    pub fn cancel(mut self, cancel: CancellationToken) -> Self {
113        self.cancel = cancel;
114        self
115    }
116
117    pub fn protocol_turn_options(mut self, options: ProtocolTurnOptions) -> Self {
118        self.protocol_turn_options = Some(options);
119        self
120    }
121
122    pub fn provider(mut self, provider: ProviderHandle) -> Self {
123        self.provider = Some(provider);
124        self
125    }
126
127    pub fn model(mut self, model: ModelSpec) -> Self {
128        self.model = Some(model);
129        self
130    }
131
132    pub fn turn_id(mut self, id: impl Into<String>) -> Self {
133        self.turn_id = Some(id.into());
134        self
135    }
136
137    pub fn prompt_template(mut self, template: PromptTemplate) -> Self {
138        self.input.turn_context.set_prompt_template(template);
139        self
140    }
141
142    pub fn prompt_contribution(mut self, contribution: PromptContribution) -> Self {
143        self.input
144            .turn_context
145            .add_prompt_contribution(contribution);
146        self
147    }
148
149    pub fn replace_prompt_slot(
150        mut self,
151        slot: PromptSlot,
152        contributions: impl IntoIterator<Item = PromptContribution>,
153    ) -> Self {
154        self.input
155            .turn_context
156            .replace_prompt_slot(slot, contributions);
157        self
158    }
159
160    pub fn clear_prompt_slot(mut self, slot: PromptSlot) -> Self {
161        self.input.turn_context.clear_prompt_slot(slot);
162        self
163    }
164
165    pub fn prompt_layer(mut self, layer: PromptLayer) -> Self {
166        self.input.turn_context.set_prompt_layer(layer);
167        self
168    }
169
170    /// Attach typed per-turn input for an activated plugin binding.
171    ///
172    /// This is the generic primitive. Plugin crates should usually wrap it in a
173    /// domain extension trait such as `.with_tone(tone)` or `.with_board(board)`
174    /// so application code stays typed in its own vocabulary.
175    pub fn with_plugin_input<P: PluginBinding>(mut self, input: P::Input) -> Self {
176        self.input.turn_context.insert_plugin_input(P::ID, input);
177        self
178    }
179
180    pub fn effects(self, controller: &dyn RuntimeEffectController) -> ScopedTurnBuilder<'_> {
181        ScopedTurnBuilder {
182            builder: self,
183            controller,
184        }
185    }
186
187    pub async fn run(self) -> Result<TurnOutput> {
188        let collector = RunActivityCollector::default();
189        let result = self.stream_to(&collector).await?;
190        Ok(TurnOutput {
191            result,
192            activities: collector.into_activities(),
193        })
194    }
195
196    pub async fn stream_to(self, events: &dyn TurnActivitySink) -> Result<TurnResult> {
197        let effect_host = Arc::clone(&self.effect_host);
198        reject_configured_durable_effect_host(effect_host.as_ref(), "turn")?;
199        self.stream_to_with_effect_host(events, effect_host.as_ref())
200            .await
201    }
202
203    pub fn stream(self) -> Result<TurnStream> {
204        let effect_host = Arc::clone(&self.effect_host);
205        reject_configured_durable_effect_host(effect_host.as_ref(), "turn stream")?;
206        self.stream_with_effect_host(effect_host.as_ref())
207    }
208
209    /// Access lower-level turn execution that bypasses the semantic
210    /// [`TurnActivity`] tier.
211    pub fn advanced(self) -> AdvancedTurn {
212        AdvancedTurn { builder: self }
213    }
214
215    fn resolved_turn_id(&self) -> String {
216        self.turn_id
217            .clone()
218            .or_else(|| self.input.trace_turn_id.clone())
219            .unwrap_or_else(fresh_turn_id)
220    }
221
222    fn turn_scope(&self, turn_id: &str) -> lash_core::ExecutionScope {
223        lash_core::ExecutionScope::turn(self.runtime.observe().session_id(), turn_id)
224    }
225
226    pub(crate) fn prepare(
227        mut self,
228        trace_turn_id: Option<String>,
229    ) -> Result<(RuntimeHandle, TurnInput, CancellationToken, TurnCancelGuard)> {
230        if let Some(options) = self.protocol_turn_options {
231            self.input.protocol_turn_options = Some(options);
232        }
233        if let Some(provider) = self.provider {
234            self.input.turn_context.set_provider(provider);
235        }
236        if let Some(model) = self.model {
237            self.input.turn_context.set_model(model);
238        }
239        if let Some(trace_turn_id) = trace_turn_id {
240            self.input.trace_turn_id = Some(trace_turn_id);
241        }
242        validate_required_plugin_inputs(&self.active_plugins, &self.input)?;
243        let cancel_guard = self.cancels.register(self.cancel.clone());
244        Ok((self.runtime, self.input, self.cancel, cancel_guard))
245    }
246
247    async fn stream_to_with_effect_host(
248        self,
249        events: &dyn TurnActivitySink,
250        effect_host: &dyn EffectHost,
251    ) -> Result<TurnResult> {
252        let turn_id = self.resolved_turn_id();
253        let scoped_effect_controller = effect_host.scoped(self.turn_scope(&turn_id))?;
254        self.stream_to_with_scope(events, scoped_effect_controller, Some(turn_id))
255            .await
256    }
257
258    async fn stream_to_with_effect_controller(
259        self,
260        events: &dyn TurnActivitySink,
261        controller: &dyn RuntimeEffectController,
262    ) -> Result<TurnResult> {
263        let turn_id = self.resolved_turn_id();
264        let scoped_effect_controller =
265            ScopedEffectController::borrowed(controller, self.turn_scope(&turn_id))?;
266        self.stream_to_with_scope(events, scoped_effect_controller, Some(turn_id))
267            .await
268    }
269
270    async fn stream_to_with_scope(
271        self,
272        events: &dyn TurnActivitySink,
273        scoped_effect_controller: ScopedEffectController<'_>,
274        trace_turn_id: Option<String>,
275    ) -> Result<TurnResult> {
276        let (runtime, input, cancel, _cancel_guard) = self.prepare(trace_turn_id)?;
277        stream_prepared_turn(
278            &runtime,
279            input,
280            TurnSinks::turn(events),
281            scoped_effect_controller,
282            cancel,
283        )
284        .await
285    }
286
287    fn stream_with_effect_host(self, effect_host: &dyn EffectHost) -> Result<TurnStream> {
288        let turn_id = self.resolved_turn_id();
289        let scoped_effect_controller = effect_host
290            .scoped_static(self.turn_scope(&turn_id))?
291            .ok_or(EmbedError::StaticTurnStreamRequiresStaticEffectHost)?;
292        self.stream_with_scope(scoped_effect_controller, Some(turn_id))
293    }
294
295    fn stream_with_scope(
296        self,
297        scoped_effect_controller: ScopedEffectController<'static>,
298        trace_turn_id: Option<String>,
299    ) -> Result<TurnStream> {
300        let (runtime, input, cancel, cancel_guard) = self.prepare(trace_turn_id)?;
301        let (tx, rx) = mpsc::channel(64);
302        let sink = ChannelTurnActivitySink { tx };
303        let completion = tokio::spawn(async move {
304            let _cancel_guard = cancel_guard;
305            stream_prepared_turn(
306                &runtime,
307                input,
308                TurnSinks::turn(&sink),
309                scoped_effect_controller,
310                cancel,
311            )
312            .await
313        });
314        Ok(TurnStream {
315            activities: rx,
316            completion,
317        })
318    }
319}
320
321pub struct ScopedTurnBuilder<'run> {
322    builder: TurnBuilder,
323    controller: &'run dyn RuntimeEffectController,
324}
325
326impl<'run> ScopedTurnBuilder<'run> {
327    pub fn cancel(mut self, cancel: CancellationToken) -> Self {
328        self.builder = self.builder.cancel(cancel);
329        self
330    }
331
332    pub fn protocol_turn_options(mut self, options: ProtocolTurnOptions) -> Self {
333        self.builder = self.builder.protocol_turn_options(options);
334        self
335    }
336
337    pub fn provider(mut self, provider: ProviderHandle) -> Self {
338        self.builder = self.builder.provider(provider);
339        self
340    }
341
342    pub fn model(mut self, model: ModelSpec) -> Self {
343        self.builder = self.builder.model(model);
344        self
345    }
346
347    pub fn turn_id(mut self, id: impl Into<String>) -> Self {
348        self.builder = self.builder.turn_id(id);
349        self
350    }
351
352    pub fn prompt_template(mut self, template: PromptTemplate) -> Self {
353        self.builder = self.builder.prompt_template(template);
354        self
355    }
356
357    pub fn prompt_contribution(mut self, contribution: PromptContribution) -> Self {
358        self.builder = self.builder.prompt_contribution(contribution);
359        self
360    }
361
362    pub fn replace_prompt_slot(
363        mut self,
364        slot: PromptSlot,
365        contributions: impl IntoIterator<Item = PromptContribution>,
366    ) -> Self {
367        self.builder = self.builder.replace_prompt_slot(slot, contributions);
368        self
369    }
370
371    pub fn clear_prompt_slot(mut self, slot: PromptSlot) -> Self {
372        self.builder = self.builder.clear_prompt_slot(slot);
373        self
374    }
375
376    pub fn prompt_layer(mut self, layer: PromptLayer) -> Self {
377        self.builder = self.builder.prompt_layer(layer);
378        self
379    }
380
381    pub fn with_plugin_input<P: PluginBinding>(mut self, input: P::Input) -> Self {
382        self.builder = self.builder.with_plugin_input::<P>(input);
383        self
384    }
385
386    pub async fn run(self) -> Result<TurnOutput> {
387        let collector = RunActivityCollector::default();
388        let result = self.stream_to(&collector).await?;
389        Ok(TurnOutput {
390            result,
391            activities: collector.into_activities(),
392        })
393    }
394
395    pub async fn stream_to(self, events: &dyn TurnActivitySink) -> Result<TurnResult> {
396        self.builder
397            .stream_to_with_effect_controller(events, self.controller)
398            .await
399    }
400}
401
402/// Lower-level turn execution that exposes the raw runtime event stream.
403///
404/// Reachable via [`TurnBuilder::advanced`]. Most applications should use
405/// [`TurnBuilder::stream_to`] for semantic turn activity; benchmarks and
406/// diagnostics use this when they need the same low-level event stream as the
407/// runtime trace.
408pub struct AdvancedTurn {
409    builder: TurnBuilder,
410}
411
412impl AdvancedTurn {
413    pub async fn run_with_scope(
414        self,
415        scoped_effect_controller: ScopedEffectController<'_>,
416    ) -> Result<TurnOutput> {
417        let collector = RunActivityCollector::default();
418        let result = self
419            .stream_to_with_scope(&collector, scoped_effect_controller)
420            .await?;
421        Ok(TurnOutput {
422            result,
423            activities: collector.into_activities(),
424        })
425    }
426
427    pub async fn collect_with_scope(
428        self,
429        events: &dyn TurnActivitySink,
430        scoped_effect_controller: ScopedEffectController<'_>,
431    ) -> Result<TurnOutput> {
432        let collector = RunActivityCollector::default();
433        let fanout = BorrowedTurnActivityFanout {
434            live: events,
435            collector: &collector,
436        };
437        let result = self
438            .stream_to_with_scope(&fanout, scoped_effect_controller)
439            .await?;
440        Ok(TurnOutput {
441            result,
442            activities: collector.into_activities(),
443        })
444    }
445
446    pub async fn stream_to_with_scope(
447        self,
448        events: &dyn TurnActivitySink,
449        scoped_effect_controller: ScopedEffectController<'_>,
450    ) -> Result<TurnResult> {
451        let trace_turn_id = trace_turn_id_for_scope(&self.builder, &scoped_effect_controller);
452        self.builder
453            .stream_to_with_scope(events, scoped_effect_controller, trace_turn_id)
454            .await
455    }
456
457    pub fn stream_with_scope(
458        self,
459        scoped_effect_controller: ScopedEffectController<'static>,
460    ) -> Result<TurnStream> {
461        let trace_turn_id = trace_turn_id_for_scope(&self.builder, &scoped_effect_controller);
462        self.builder
463            .stream_with_scope(scoped_effect_controller, trace_turn_id)
464    }
465
466    /// Run the turn while sending raw lower-level runtime events to `events`.
467    pub async fn collect_session_events_with_scope(
468        self,
469        events: &dyn EventSink,
470        scoped_effect_controller: ScopedEffectController<'_>,
471    ) -> Result<TurnResult> {
472        let trace_turn_id = trace_turn_id_for_scope(&self.builder, &scoped_effect_controller);
473        let (runtime, input, cancel, _cancel_guard) = self.builder.prepare(trace_turn_id)?;
474        stream_prepared_turn(
475            &runtime,
476            input,
477            TurnSinks::session(events),
478            scoped_effect_controller,
479            cancel,
480        )
481        .await
482    }
483}
484
485pub struct TurnStream {
486    activities: mpsc::Receiver<Result<TurnActivity>>,
487    completion: JoinHandle<Result<TurnResult>>,
488}
489
490impl TurnStream {
491    pub async fn next_activity(&mut self) -> Option<Result<TurnActivity>> {
492        self.activities.recv().await
493    }
494
495    pub async fn finish(self) -> Result<TurnResult> {
496        self.completion.await.map_err(|err| {
497            EmbedError::Runtime(lash_core::RuntimeError::new(
498                RuntimeErrorCode::TurnStreamJoin,
499                format!("turn stream task failed: {err}"),
500            ))
501        })?
502    }
503}
504
505impl Stream for TurnStream {
506    type Item = Result<TurnActivity>;
507
508    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
509        self.activities.poll_recv(cx)
510    }
511}
512
513pub struct QueuedTurnBuilder {
514    pub(crate) runtime: RuntimeHandle,
515    pub(crate) effect_host: Arc<dyn EffectHost>,
516    pub(crate) cancel: CancellationToken,
517    pub(crate) cancels: TurnCancelRegistry,
518    pub(crate) batch_ids: Vec<String>,
519    pub(crate) drain_id: Option<String>,
520}
521
522impl QueuedTurnBuilder {
523    pub fn cancel(mut self, cancel: CancellationToken) -> Self {
524        self.cancel = cancel;
525        self
526    }
527
528    pub fn batch_ids(mut self, batch_ids: impl IntoIterator<Item = impl Into<String>>) -> Self {
529        self.batch_ids = batch_ids.into_iter().map(Into::into).collect();
530        self
531    }
532
533    pub fn drain_id(mut self, drain_id: impl Into<String>) -> Self {
534        self.drain_id = Some(drain_id.into());
535        self
536    }
537
538    pub fn effects(self, controller: &dyn RuntimeEffectController) -> ScopedQueuedTurnBuilder<'_> {
539        ScopedQueuedTurnBuilder {
540            builder: self,
541            controller,
542        }
543    }
544
545    pub async fn run(self) -> Result<Option<TurnOutput>> {
546        let collector = RunActivityCollector::default();
547        let Some(result) = self.stream_to(&collector).await? else {
548            return Ok(None);
549        };
550        Ok(Some(TurnOutput {
551            result,
552            activities: collector.into_activities(),
553        }))
554    }
555
556    pub async fn stream_to(self, events: &dyn TurnActivitySink) -> Result<Option<TurnResult>> {
557        let effect_host = Arc::clone(&self.effect_host);
558        reject_configured_durable_effect_host(effect_host.as_ref(), "queued turn")?;
559        self.stream_to_with_effect_host(events, effect_host.as_ref())
560            .await
561    }
562
563    pub fn advanced(self) -> AdvancedQueuedTurn {
564        AdvancedQueuedTurn { builder: self }
565    }
566
567    fn resolved_drain_id(&self) -> String {
568        self.drain_id
569            .clone()
570            .or_else(|| self.batch_ids.first().cloned())
571            .unwrap_or_else(fresh_queue_drain_id)
572    }
573
574    async fn stream_to_with_effect_host(
575        self,
576        events: &dyn TurnActivitySink,
577        effect_host: &dyn EffectHost,
578    ) -> Result<Option<TurnResult>> {
579        let drain_id = self.resolved_drain_id();
580        let scope =
581            lash_core::ExecutionScope::queue_drain(self.runtime.observe().session_id(), drain_id);
582        let scoped_effect_controller = effect_host.scoped(scope)?;
583        self.stream_to_with_scope(events, scoped_effect_controller)
584            .await
585    }
586
587    async fn stream_to_with_effect_controller(
588        self,
589        events: &dyn TurnActivitySink,
590        controller: &dyn RuntimeEffectController,
591    ) -> Result<Option<TurnResult>> {
592        let drain_id = self.resolved_drain_id();
593        let scope =
594            lash_core::ExecutionScope::queue_drain(self.runtime.observe().session_id(), drain_id);
595        let scoped_effect_controller = ScopedEffectController::borrowed(controller, scope)?;
596        self.stream_to_with_scope(events, scoped_effect_controller)
597            .await
598    }
599
600    async fn stream_to_with_scope(
601        self,
602        events: &dyn TurnActivitySink,
603        scoped_effect_controller: ScopedEffectController<'_>,
604    ) -> Result<Option<TurnResult>> {
605        let Self {
606            runtime,
607            effect_host: _,
608            cancel,
609            cancels,
610            batch_ids,
611            drain_id: _,
612        } = self;
613        let _cancel_guard = cancels.register(cancel.clone());
614        stream_next_queued_prepared_turn(
615            &runtime,
616            TurnSinks::turn(events),
617            scoped_effect_controller,
618            cancel,
619            &batch_ids,
620        )
621        .await
622    }
623}
624
625pub struct ScopedQueuedTurnBuilder<'run> {
626    builder: QueuedTurnBuilder,
627    controller: &'run dyn RuntimeEffectController,
628}
629
630impl<'run> ScopedQueuedTurnBuilder<'run> {
631    pub fn cancel(mut self, cancel: CancellationToken) -> Self {
632        self.builder = self.builder.cancel(cancel);
633        self
634    }
635
636    pub fn batch_ids(mut self, batch_ids: impl IntoIterator<Item = impl Into<String>>) -> Self {
637        self.builder = self.builder.batch_ids(batch_ids);
638        self
639    }
640
641    pub fn drain_id(mut self, drain_id: impl Into<String>) -> Self {
642        self.builder = self.builder.drain_id(drain_id);
643        self
644    }
645
646    pub async fn run(self) -> Result<Option<TurnOutput>> {
647        let collector = RunActivityCollector::default();
648        let Some(result) = self.stream_to(&collector).await? else {
649            return Ok(None);
650        };
651        Ok(Some(TurnOutput {
652            result,
653            activities: collector.into_activities(),
654        }))
655    }
656
657    pub async fn stream_to(self, events: &dyn TurnActivitySink) -> Result<Option<TurnResult>> {
658        self.builder
659            .stream_to_with_effect_controller(events, self.controller)
660            .await
661    }
662}
663
664pub struct AdvancedQueuedTurn {
665    builder: QueuedTurnBuilder,
666}
667
668impl AdvancedQueuedTurn {
669    pub async fn stream_to_with_scope(
670        self,
671        events: &dyn TurnActivitySink,
672        scoped_effect_controller: ScopedEffectController<'_>,
673    ) -> Result<Option<TurnResult>> {
674        self.builder
675            .stream_to_with_scope(events, scoped_effect_controller)
676            .await
677    }
678}
679
680fn fresh_turn_id() -> String {
681    lash_core::TurnActivityId::fresh().0
682}
683
684fn fresh_queue_drain_id() -> String {
685    format!("queue-drain:{}", fresh_turn_id())
686}
687
688fn trace_turn_id_for_scope(
689    builder: &TurnBuilder,
690    scoped_effect_controller: &ScopedEffectController<'_>,
691) -> Option<String> {
692    if scoped_effect_controller
693        .execution_scope()
694        .validates_turn_trace_id()
695    {
696        Some(
697            builder
698                .turn_id
699                .clone()
700                .unwrap_or_else(|| scoped_effect_controller.scope_id().to_string()),
701        )
702    } else {
703        builder
704            .turn_id
705            .clone()
706            .or_else(|| builder.input.trace_turn_id.clone())
707    }
708}
709
710fn reject_configured_durable_effect_host(
711    effect_host: &dyn EffectHost,
712    operation: &'static str,
713) -> Result<()> {
714    if effect_host.durability_tier() == DurabilityTier::Durable {
715        return Err(EmbedError::DurableEffectHostRequiresHandlerContext { operation });
716    }
717    Ok(())
718}
719
720pub(crate) async fn stream_next_queued_prepared_turn(
721    runtime: &RuntimeHandle,
722    sinks: TurnSinks<'_>,
723    scoped_effect_controller: ScopedEffectController<'_>,
724    cancel: CancellationToken,
725    batch_ids: &[String],
726) -> Result<Option<TurnResult>> {
727    let turn = Box::pin(stream_next_queued_prepared_assembled(
728        runtime,
729        sinks,
730        scoped_effect_controller,
731        cancel,
732        batch_ids,
733    ))
734    .await?;
735    Ok(turn.map(TurnResult::from_assembled))
736}
737
738pub(crate) async fn stream_next_queued_prepared_assembled(
739    runtime: &RuntimeHandle,
740    sinks: TurnSinks<'_>,
741    scoped_effect_controller: ScopedEffectController<'_>,
742    cancel: CancellationToken,
743    batch_ids: &[String],
744) -> Result<Option<AssembledTurn>> {
745    let writer_handle = runtime.writer();
746    let mut writer = writer_handle.lock().await;
747    let observation_sink = SessionObservationTurnActivitySink {
748        runtime: runtime.clone(),
749        live: sinks.turn_events(),
750    };
751    let opts = turn_options(
752        sinks.events(),
753        &observation_sink,
754        scoped_effect_controller,
755        cancel,
756    );
757    let turn = if batch_ids.is_empty() {
758        writer.stream_next_queued_work(opts).await?
759    } else {
760        writer.stream_selected_queued_work(opts, batch_ids).await?
761    };
762    runtime.publish_from(&writer);
763    Ok(turn)
764}
765
766fn turn_options<'a>(
767    events: Option<&'a dyn EventSink>,
768    turn_events: &'a dyn TurnActivitySink,
769    scoped_effect_controller: ScopedEffectController<'a>,
770    cancel: CancellationToken,
771) -> lash_core::TurnOptions<'a> {
772    let mut opts = lash_core::TurnOptions::new(cancel, scoped_effect_controller);
773    if let Some(events) = events {
774        opts = opts.with_events(events);
775    }
776    opts.with_turn_events(turn_events)
777}
778
779struct SessionObservationTurnActivitySink<'a> {
780    runtime: RuntimeHandle,
781    live: Option<&'a dyn TurnActivitySink>,
782}
783
784#[async_trait]
785impl TurnActivitySink for SessionObservationTurnActivitySink<'_> {
786    fn is_noop(&self) -> bool {
787        false
788    }
789
790    async fn emit(&self, activity: TurnActivity) {
791        self.runtime.record_turn_activity(activity.clone());
792        if let Some(live) = self.live {
793            live.emit(activity).await;
794        }
795    }
796}
797
798struct ChannelTurnActivitySink {
799    tx: mpsc::Sender<Result<TurnActivity>>,
800}
801
802#[async_trait]
803impl TurnActivitySink for ChannelTurnActivitySink {
804    async fn emit(&self, activity: TurnActivity) {
805        let _ = self.tx.send(Ok(activity)).await;
806    }
807}
808fn validate_required_plugin_inputs(
809    active_plugins: &[ActivePluginBinding],
810    input: &TurnInput,
811) -> Result<()> {
812    for plugin in active_plugins {
813        if plugin.requires_turn_input && !input.turn_context.has_plugin_input(plugin.id) {
814            return Err(EmbedError::MissingPluginTurnInput {
815                plugin_id: plugin.id,
816            });
817        }
818    }
819    Ok(())
820}
821
822pub(crate) async fn stream_prepared_turn(
823    runtime: &RuntimeHandle,
824    input: TurnInput,
825    sinks: TurnSinks<'_>,
826    scoped_effect_controller: ScopedEffectController<'_>,
827    cancel: CancellationToken,
828) -> Result<TurnResult> {
829    let turn = Box::pin(stream_prepared_assembled(
830        runtime,
831        input,
832        sinks,
833        scoped_effect_controller,
834        cancel,
835    ))
836    .await?;
837    Ok(TurnResult::from_assembled(turn))
838}
839
840pub(crate) async fn stream_prepared_assembled(
841    runtime: &RuntimeHandle,
842    input: TurnInput,
843    sinks: TurnSinks<'_>,
844    scoped_effect_controller: ScopedEffectController<'_>,
845    cancel: CancellationToken,
846) -> Result<AssembledTurn> {
847    let turn = Box::pin(stream_prepared_agent_frame_run(
848        runtime,
849        input,
850        sinks,
851        scoped_effect_controller,
852        cancel,
853    ))
854    .await?;
855    turn.into_final_turn().ok_or_else(|| {
856        EmbedError::Runtime(lash_core::RuntimeError::new(
857            RuntimeErrorCode::EmptyAgentFrameRun,
858            "runtime completed without an assembled turn",
859        ))
860    })
861}
862
863pub(crate) async fn stream_prepared_agent_frame_run(
864    runtime: &RuntimeHandle,
865    input: TurnInput,
866    sinks: TurnSinks<'_>,
867    scoped_effect_controller: ScopedEffectController<'_>,
868    cancel: CancellationToken,
869) -> Result<lash_core::AgentFrameRun> {
870    let writer_handle = runtime.writer();
871    let mut writer = writer_handle.lock().await;
872    if let Some(extension) = input.protocol_extension.as_ref() {
873        writer
874            .validate_protocol_turn_extension(extension)
875            .await
876            .map_err(EmbedError::Session)?;
877    }
878    let observation_sink = SessionObservationTurnActivitySink {
879        runtime: runtime.clone(),
880        live: sinks.turn_events(),
881    };
882    let turn = Box::pin(writer.stream_turn_with_agent_frames(
883        input,
884        turn_options(
885            sinks.events(),
886            &observation_sink,
887            scoped_effect_controller,
888            cancel,
889        ),
890    ))
891    .await?;
892    runtime.publish_from(&writer);
893    Ok(turn)
894}
895
896#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
897pub struct TurnResult {
898    pub state: SessionSnapshot,
899    pub outcome: TurnOutcome,
900    pub assistant_output: AssistantOutput,
901    /// Parent's own LLM tokens for this turn. Does **not** include child
902    /// sessions; see [`children_usage`](Self::children_usage) and
903    /// [`total_usage`](Self::total_usage).
904    pub usage: TokenUsage,
905    /// Per-`(source, model)` ledger entries for child sessions whose LLM
906    /// calls completed during this turn (subagents, compaction, observers,
907    /// etc.). Empty unless the turn spawned children.
908    #[serde(default)]
909    pub children_usage: Vec<TokenLedgerEntry>,
910    pub tool_calls: Vec<ToolCallRecord>,
911    pub execution: ExecutionSummary,
912    pub errors: Vec<TurnIssue>,
913}
914
915impl TurnResult {
916    fn from_assembled(turn: lash_core::AssembledTurn) -> Self {
917        Self {
918            state: turn.state,
919            outcome: turn.outcome,
920            assistant_output: turn.assistant_output,
921            usage: turn.token_usage,
922            children_usage: turn.children_usage,
923            tool_calls: turn.tool_calls,
924            execution: turn.execution,
925            errors: turn.errors,
926        }
927    }
928
929    /// Sum of parent's own LLM tokens and every child session's LLM tokens
930    /// for this turn.
931    pub fn total_usage(&self) -> TokenUsage {
932        let mut total = self.usage.clone();
933        for entry in &self.children_usage {
934            total.add(&entry.usage);
935        }
936        total
937    }
938
939    pub fn assistant_message(&self) -> Option<&str> {
940        match &self.outcome {
941            TurnOutcome::Finished(lash_core::TurnFinish::AssistantMessage { text }) => Some(text),
942            _ => None,
943        }
944    }
945
946    pub fn submitted_value(&self) -> Option<&serde_json::Value> {
947        match &self.outcome {
948            TurnOutcome::Finished(lash_core::TurnFinish::SubmittedValue { value }) => Some(value),
949            _ => None,
950        }
951    }
952
953    pub fn tool_value(&self) -> Option<(&str, &serde_json::Value)> {
954        match &self.outcome {
955            TurnOutcome::Finished(lash_core::TurnFinish::ToolValue { tool_name, value }) => {
956                Some((tool_name.as_str(), value))
957            }
958            _ => None,
959        }
960    }
961
962    pub fn is_success(&self) -> bool {
963        matches!(
964            self.outcome,
965            TurnOutcome::Finished(_) | TurnOutcome::AgentFrameSwitch { .. }
966        )
967    }
968}
969
970#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
971pub struct TurnOutput {
972    pub result: TurnResult,
973    pub activities: Vec<TurnActivity>,
974}
975
976impl TurnOutput {
977    pub fn assistant_message(&self) -> Option<&str> {
978        self.result.assistant_message()
979    }
980
981    pub fn submitted_value(&self) -> Option<&serde_json::Value> {
982        self.result.submitted_value()
983    }
984
985    pub fn tool_value(&self) -> Option<(&str, &serde_json::Value)> {
986        self.result.tool_value()
987    }
988
989    pub fn is_success(&self) -> bool {
990        self.result.is_success()
991    }
992}
993
994struct BorrowedTurnActivityFanout<'a> {
995    live: &'a dyn TurnActivitySink,
996    collector: &'a RunActivityCollector,
997}
998
999#[async_trait]
1000impl TurnActivitySink for BorrowedTurnActivityFanout<'_> {
1001    async fn emit(&self, activity: TurnActivity) {
1002        self.live.emit(activity.clone()).await;
1003        self.collector.emit(activity).await;
1004    }
1005}
1006
1007#[derive(Default)]
1008pub(crate) struct RunActivityCollector {
1009    activities: Arc<StdMutex<Vec<TurnActivity>>>,
1010}
1011
1012impl RunActivityCollector {
1013    fn into_activities(self) -> Vec<TurnActivity> {
1014        self.activities
1015            .lock()
1016            .expect("run activity collector lock")
1017            .clone()
1018    }
1019
1020    #[cfg(test)]
1021    pub(crate) fn snapshot(&self) -> Vec<TurnActivity> {
1022        self.activities
1023            .lock()
1024            .expect("run activity collector lock")
1025            .clone()
1026    }
1027}
1028
1029#[async_trait]
1030impl TurnActivitySink for RunActivityCollector {
1031    async fn emit(&self, activity: TurnActivity) {
1032        self.activities
1033            .lock()
1034            .expect("run activity collector lock")
1035            .push(activity);
1036    }
1037}
1038
1039pub struct TurnActivityFanout {
1040    sinks: Vec<Arc<dyn TurnActivitySink>>,
1041}
1042
1043impl TurnActivityFanout {
1044    pub fn new(sinks: impl IntoIterator<Item = Arc<dyn TurnActivitySink>>) -> Self {
1045        Self {
1046            sinks: sinks.into_iter().collect(),
1047        }
1048    }
1049}
1050
1051#[async_trait]
1052impl TurnActivitySink for TurnActivityFanout {
1053    async fn emit(&self, activity: TurnActivity) {
1054        for sink in &self.sinks {
1055            sink.emit(activity.clone()).await;
1056        }
1057    }
1058}
1059
1060pub fn message_text(message: &Message) -> String {
1061    message
1062        .parts
1063        .iter()
1064        .map(|part| part.content.as_str())
1065        .collect::<Vec<_>>()
1066        .join("\n")
1067}
1068
1069pub fn message_role(message: &Message) -> &'static str {
1070    match message.role {
1071        MessageRole::User => "user",
1072        MessageRole::Assistant => "assistant",
1073        MessageRole::System => "system",
1074        MessageRole::Event => "event",
1075    }
1076}