Skip to main content

lash/
turn.rs

1use crate::support::*;
2
3pub use lash_core::{AssistantOutput, TurnIssue};
4
5/// The two internal event sinks threaded through the turn-execution helpers.
6///
7/// `events` is the raw `SessionEvent` stream (the lower-level escape hatch,
8/// reachable from app code only via [`TurnBuilder::advanced`]); `turn_events`
9/// is the semantic [`TurnActivity`] stream used by the primary builder API.
10/// Bundling them keeps the internal turn fns to a single sink parameter.
11#[derive(Clone, Copy, Default)]
12pub(crate) struct TurnSinks<'a> {
13    events: Option<&'a dyn EventSink>,
14    turn_events: Option<&'a dyn TurnActivitySink>,
15}
16
17impl<'a> TurnSinks<'a> {
18    pub(crate) fn turn(events: &'a dyn TurnActivitySink) -> Self {
19        Self {
20            events: None,
21            turn_events: Some(events),
22        }
23    }
24
25    pub(crate) fn session(events: &'a dyn EventSink) -> Self {
26        Self {
27            events: Some(events),
28            turn_events: None,
29        }
30    }
31
32    fn events(&self) -> Option<&'a dyn EventSink> {
33        self.events
34    }
35
36    fn turn_events(&self) -> Option<&'a dyn TurnActivitySink> {
37        self.turn_events
38    }
39}
40
41pub struct TurnBuilder {
42    pub(crate) runtime: RuntimeHandle,
43    pub(crate) effect_host: Arc<dyn EffectHost>,
44    pub(crate) active_plugins: Vec<ActivePluginBinding>,
45    pub(crate) input: TurnInput,
46    pub(crate) cancel: CancellationToken,
47    pub(crate) protocol_turn_options: Option<ProtocolTurnOptions>,
48    pub(crate) provider: Option<ProviderHandle>,
49    pub(crate) model: Option<ModelSpec>,
50    pub(crate) turn_id: Option<String>,
51}
52
53impl TurnBuilder {
54    pub fn cancel(mut self, cancel: CancellationToken) -> Self {
55        self.cancel = cancel;
56        self
57    }
58
59    pub fn protocol_turn_options(mut self, options: ProtocolTurnOptions) -> Self {
60        self.protocol_turn_options = Some(options);
61        self
62    }
63
64    pub fn provider(mut self, provider: ProviderHandle) -> Self {
65        self.provider = Some(provider);
66        self
67    }
68
69    pub fn model(mut self, model: ModelSpec) -> Self {
70        self.model = Some(model);
71        self
72    }
73
74    pub fn turn_id(mut self, id: impl Into<String>) -> Self {
75        self.turn_id = Some(id.into());
76        self
77    }
78
79    pub fn prompt_template(mut self, template: PromptTemplate) -> Self {
80        self.input.turn_context.set_prompt_template(template);
81        self
82    }
83
84    pub fn prompt_contribution(mut self, contribution: PromptContribution) -> Self {
85        self.input
86            .turn_context
87            .add_prompt_contribution(contribution);
88        self
89    }
90
91    pub fn replace_prompt_slot(
92        mut self,
93        slot: PromptSlot,
94        contributions: impl IntoIterator<Item = PromptContribution>,
95    ) -> Self {
96        self.input
97            .turn_context
98            .replace_prompt_slot(slot, contributions);
99        self
100    }
101
102    pub fn clear_prompt_slot(mut self, slot: PromptSlot) -> Self {
103        self.input.turn_context.clear_prompt_slot(slot);
104        self
105    }
106
107    pub fn prompt_layer(mut self, layer: PromptLayer) -> Self {
108        self.input.turn_context.set_prompt_layer(layer);
109        self
110    }
111
112    /// Attach typed per-turn input for an activated plugin binding.
113    ///
114    /// This is the generic primitive. Plugin crates should usually wrap it in a
115    /// domain extension trait such as `.with_tone(tone)` or `.with_board(board)`
116    /// so application code stays typed in its own vocabulary.
117    pub fn with_plugin_input<P: PluginBinding>(mut self, input: P::Input) -> Self {
118        self.input.turn_context.insert_plugin_input(P::ID, input);
119        self
120    }
121
122    pub fn effects(self, controller: &dyn RuntimeEffectController) -> ScopedTurnBuilder<'_> {
123        ScopedTurnBuilder {
124            builder: self,
125            controller,
126        }
127    }
128
129    pub async fn run(self) -> Result<TurnOutput> {
130        let collector = RunActivityCollector::default();
131        let result = self.stream_to(&collector).await?;
132        Ok(TurnOutput {
133            result,
134            activities: collector.into_activities(),
135        })
136    }
137
138    pub async fn stream_to(self, events: &dyn TurnActivitySink) -> Result<TurnResult> {
139        let effect_host = Arc::clone(&self.effect_host);
140        reject_configured_durable_effect_host(effect_host.as_ref(), "turn")?;
141        self.stream_to_with_effect_host(events, effect_host.as_ref())
142            .await
143    }
144
145    pub fn stream(self) -> Result<TurnStream> {
146        let effect_host = Arc::clone(&self.effect_host);
147        reject_configured_durable_effect_host(effect_host.as_ref(), "turn stream")?;
148        self.stream_with_effect_host(effect_host.as_ref())
149    }
150
151    /// Access lower-level turn execution that bypasses the semantic
152    /// [`TurnActivity`] tier.
153    pub fn advanced(self) -> AdvancedTurn {
154        AdvancedTurn { builder: self }
155    }
156
157    fn resolved_turn_id(&self) -> String {
158        self.turn_id
159            .clone()
160            .or_else(|| self.input.trace_turn_id.clone())
161            .unwrap_or_else(fresh_turn_id)
162    }
163
164    fn turn_scope(&self, turn_id: &str) -> lash_core::EffectScope {
165        lash_core::EffectScope::turn(self.runtime.observe().session_id(), turn_id)
166    }
167
168    pub(crate) fn prepare(
169        mut self,
170        trace_turn_id: Option<String>,
171    ) -> Result<(RuntimeHandle, TurnInput, CancellationToken)> {
172        if let Some(options) = self.protocol_turn_options {
173            self.input.protocol_turn_options = Some(options);
174        }
175        if let Some(provider) = self.provider {
176            self.input.turn_context.set_provider(provider);
177        }
178        if let Some(model) = self.model {
179            self.input.turn_context.set_model(model);
180        }
181        if let Some(trace_turn_id) = trace_turn_id {
182            self.input.trace_turn_id = Some(trace_turn_id);
183        }
184        validate_required_plugin_inputs(&self.active_plugins, &self.input)?;
185        Ok((self.runtime, self.input, self.cancel))
186    }
187
188    async fn stream_to_with_effect_host(
189        self,
190        events: &dyn TurnActivitySink,
191        effect_host: &dyn EffectHost,
192    ) -> Result<TurnResult> {
193        let turn_id = self.resolved_turn_id();
194        let scoped_effect_controller = effect_host.scoped(self.turn_scope(&turn_id))?;
195        self.stream_to_with_scope(events, scoped_effect_controller, Some(turn_id))
196            .await
197    }
198
199    async fn stream_to_with_effect_controller(
200        self,
201        events: &dyn TurnActivitySink,
202        controller: &dyn RuntimeEffectController,
203    ) -> Result<TurnResult> {
204        let turn_id = self.resolved_turn_id();
205        let scoped_effect_controller =
206            ScopedEffectController::borrowed(controller, self.turn_scope(&turn_id))?;
207        self.stream_to_with_scope(events, scoped_effect_controller, Some(turn_id))
208            .await
209    }
210
211    async fn stream_to_with_scope(
212        self,
213        events: &dyn TurnActivitySink,
214        scoped_effect_controller: ScopedEffectController<'_>,
215        trace_turn_id: Option<String>,
216    ) -> Result<TurnResult> {
217        let (runtime, input, cancel) = self.prepare(trace_turn_id)?;
218        stream_prepared_turn(
219            &runtime,
220            input,
221            TurnSinks::turn(events),
222            scoped_effect_controller,
223            cancel,
224        )
225        .await
226    }
227
228    fn stream_with_effect_host(self, effect_host: &dyn EffectHost) -> Result<TurnStream> {
229        let turn_id = self.resolved_turn_id();
230        let scoped_effect_controller = effect_host
231            .scoped_static(self.turn_scope(&turn_id))?
232            .ok_or(EmbedError::StaticTurnStreamRequiresStaticEffectHost)?;
233        self.stream_with_scope(scoped_effect_controller, Some(turn_id))
234    }
235
236    fn stream_with_scope(
237        self,
238        scoped_effect_controller: ScopedEffectController<'static>,
239        trace_turn_id: Option<String>,
240    ) -> Result<TurnStream> {
241        let (runtime, input, cancel) = self.prepare(trace_turn_id)?;
242        let (tx, rx) = mpsc::channel(64);
243        let sink = ChannelTurnActivitySink { tx };
244        let completion = tokio::spawn(async move {
245            stream_prepared_turn(
246                &runtime,
247                input,
248                TurnSinks::turn(&sink),
249                scoped_effect_controller,
250                cancel,
251            )
252            .await
253        });
254        Ok(TurnStream {
255            activities: rx,
256            completion,
257        })
258    }
259}
260
261pub struct ScopedTurnBuilder<'run> {
262    builder: TurnBuilder,
263    controller: &'run dyn RuntimeEffectController,
264}
265
266impl<'run> ScopedTurnBuilder<'run> {
267    pub fn cancel(mut self, cancel: CancellationToken) -> Self {
268        self.builder = self.builder.cancel(cancel);
269        self
270    }
271
272    pub fn protocol_turn_options(mut self, options: ProtocolTurnOptions) -> Self {
273        self.builder = self.builder.protocol_turn_options(options);
274        self
275    }
276
277    pub fn provider(mut self, provider: ProviderHandle) -> Self {
278        self.builder = self.builder.provider(provider);
279        self
280    }
281
282    pub fn model(mut self, model: ModelSpec) -> Self {
283        self.builder = self.builder.model(model);
284        self
285    }
286
287    pub fn turn_id(mut self, id: impl Into<String>) -> Self {
288        self.builder = self.builder.turn_id(id);
289        self
290    }
291
292    pub fn prompt_template(mut self, template: PromptTemplate) -> Self {
293        self.builder = self.builder.prompt_template(template);
294        self
295    }
296
297    pub fn prompt_contribution(mut self, contribution: PromptContribution) -> Self {
298        self.builder = self.builder.prompt_contribution(contribution);
299        self
300    }
301
302    pub fn replace_prompt_slot(
303        mut self,
304        slot: PromptSlot,
305        contributions: impl IntoIterator<Item = PromptContribution>,
306    ) -> Self {
307        self.builder = self.builder.replace_prompt_slot(slot, contributions);
308        self
309    }
310
311    pub fn clear_prompt_slot(mut self, slot: PromptSlot) -> Self {
312        self.builder = self.builder.clear_prompt_slot(slot);
313        self
314    }
315
316    pub fn prompt_layer(mut self, layer: PromptLayer) -> Self {
317        self.builder = self.builder.prompt_layer(layer);
318        self
319    }
320
321    pub fn with_plugin_input<P: PluginBinding>(mut self, input: P::Input) -> Self {
322        self.builder = self.builder.with_plugin_input::<P>(input);
323        self
324    }
325
326    pub async fn run(self) -> Result<TurnOutput> {
327        let collector = RunActivityCollector::default();
328        let result = self.stream_to(&collector).await?;
329        Ok(TurnOutput {
330            result,
331            activities: collector.into_activities(),
332        })
333    }
334
335    pub async fn stream_to(self, events: &dyn TurnActivitySink) -> Result<TurnResult> {
336        self.builder
337            .stream_to_with_effect_controller(events, self.controller)
338            .await
339    }
340
341    pub fn advanced(self) -> AdvancedTurn {
342        self.builder.advanced()
343    }
344}
345
346/// Lower-level turn execution that exposes the raw `SessionEvent` stream.
347///
348/// Reachable via [`TurnBuilder::advanced`]. Most applications should use
349/// [`TurnBuilder::stream_to`] for semantic turn activity; benchmarks and
350/// diagnostics use this when they need the same session-event stream as the
351/// lower-level runtime trace.
352pub struct AdvancedTurn {
353    builder: TurnBuilder,
354}
355
356impl AdvancedTurn {
357    pub async fn run_with_scope(
358        self,
359        scoped_effect_controller: ScopedEffectController<'_>,
360    ) -> Result<TurnOutput> {
361        let collector = RunActivityCollector::default();
362        let result = self
363            .stream_to_with_scope(&collector, scoped_effect_controller)
364            .await?;
365        Ok(TurnOutput {
366            result,
367            activities: collector.into_activities(),
368        })
369    }
370
371    pub async fn collect_with_scope(
372        self,
373        events: &dyn TurnActivitySink,
374        scoped_effect_controller: ScopedEffectController<'_>,
375    ) -> Result<TurnOutput> {
376        let collector = RunActivityCollector::default();
377        let fanout = BorrowedTurnActivityFanout {
378            live: events,
379            collector: &collector,
380        };
381        let result = self
382            .stream_to_with_scope(&fanout, scoped_effect_controller)
383            .await?;
384        Ok(TurnOutput {
385            result,
386            activities: collector.into_activities(),
387        })
388    }
389
390    pub async fn stream_to_with_scope(
391        self,
392        events: &dyn TurnActivitySink,
393        scoped_effect_controller: ScopedEffectController<'_>,
394    ) -> Result<TurnResult> {
395        let trace_turn_id = trace_turn_id_for_scope(&self.builder, &scoped_effect_controller);
396        self.builder
397            .stream_to_with_scope(events, scoped_effect_controller, trace_turn_id)
398            .await
399    }
400
401    pub fn stream_with_scope(
402        self,
403        scoped_effect_controller: ScopedEffectController<'static>,
404    ) -> Result<TurnStream> {
405        let trace_turn_id = trace_turn_id_for_scope(&self.builder, &scoped_effect_controller);
406        self.builder
407            .stream_with_scope(scoped_effect_controller, trace_turn_id)
408    }
409
410    /// Run the turn while sending raw session events to `events`.
411    pub async fn collect_session_events_with_scope(
412        self,
413        events: &dyn EventSink,
414        scoped_effect_controller: ScopedEffectController<'_>,
415    ) -> Result<TurnResult> {
416        let trace_turn_id = trace_turn_id_for_scope(&self.builder, &scoped_effect_controller);
417        let (runtime, input, cancel) = self.builder.prepare(trace_turn_id)?;
418        stream_prepared_turn(
419            &runtime,
420            input,
421            TurnSinks::session(events),
422            scoped_effect_controller,
423            cancel,
424        )
425        .await
426    }
427}
428
429pub struct TurnStream {
430    activities: mpsc::Receiver<Result<TurnActivity>>,
431    completion: JoinHandle<Result<TurnResult>>,
432}
433
434impl TurnStream {
435    pub async fn next_activity(&mut self) -> Option<Result<TurnActivity>> {
436        self.activities.recv().await
437    }
438
439    pub async fn finish(self) -> Result<TurnResult> {
440        self.completion.await.map_err(|err| {
441            EmbedError::Runtime(lash_core::RuntimeError::new(
442                RuntimeErrorCode::TurnStreamJoin,
443                format!("turn stream task failed: {err}"),
444            ))
445        })?
446    }
447}
448
449pub struct QueuedTurnBuilder {
450    pub(crate) runtime: RuntimeHandle,
451    pub(crate) effect_host: Arc<dyn EffectHost>,
452    pub(crate) cancel: CancellationToken,
453    pub(crate) batch_ids: Vec<String>,
454    pub(crate) drain_id: Option<String>,
455}
456
457impl QueuedTurnBuilder {
458    pub fn cancel(mut self, cancel: CancellationToken) -> Self {
459        self.cancel = cancel;
460        self
461    }
462
463    pub fn batch_ids(mut self, batch_ids: impl IntoIterator<Item = impl Into<String>>) -> Self {
464        self.batch_ids = batch_ids.into_iter().map(Into::into).collect();
465        self
466    }
467
468    pub fn drain_id(mut self, drain_id: impl Into<String>) -> Self {
469        self.drain_id = Some(drain_id.into());
470        self
471    }
472
473    pub fn effects(self, controller: &dyn RuntimeEffectController) -> ScopedQueuedTurnBuilder<'_> {
474        ScopedQueuedTurnBuilder {
475            builder: self,
476            controller,
477        }
478    }
479
480    pub async fn run(self) -> Result<Option<TurnOutput>> {
481        let collector = RunActivityCollector::default();
482        let Some(result) = self.stream_to(&collector).await? else {
483            return Ok(None);
484        };
485        Ok(Some(TurnOutput {
486            result,
487            activities: collector.into_activities(),
488        }))
489    }
490
491    pub async fn stream_to(self, events: &dyn TurnActivitySink) -> Result<Option<TurnResult>> {
492        let effect_host = Arc::clone(&self.effect_host);
493        reject_configured_durable_effect_host(effect_host.as_ref(), "queued turn")?;
494        self.stream_to_with_effect_host(events, effect_host.as_ref())
495            .await
496    }
497
498    fn resolved_drain_id(&self) -> String {
499        self.drain_id
500            .clone()
501            .or_else(|| self.batch_ids.first().cloned())
502            .unwrap_or_else(fresh_queue_drain_id)
503    }
504
505    async fn stream_to_with_effect_host(
506        self,
507        events: &dyn TurnActivitySink,
508        effect_host: &dyn EffectHost,
509    ) -> Result<Option<TurnResult>> {
510        let drain_id = self.resolved_drain_id();
511        let scope =
512            lash_core::EffectScope::queue_drain(self.runtime.observe().session_id(), drain_id);
513        let scoped_effect_controller = effect_host.scoped(scope)?;
514        self.stream_to_with_scope(events, scoped_effect_controller)
515            .await
516    }
517
518    async fn stream_to_with_effect_controller(
519        self,
520        events: &dyn TurnActivitySink,
521        controller: &dyn RuntimeEffectController,
522    ) -> Result<Option<TurnResult>> {
523        let drain_id = self.resolved_drain_id();
524        let scope =
525            lash_core::EffectScope::queue_drain(self.runtime.observe().session_id(), drain_id);
526        let scoped_effect_controller = ScopedEffectController::borrowed(controller, scope)?;
527        self.stream_to_with_scope(events, scoped_effect_controller)
528            .await
529    }
530
531    async fn stream_to_with_scope(
532        self,
533        events: &dyn TurnActivitySink,
534        scoped_effect_controller: ScopedEffectController<'_>,
535    ) -> Result<Option<TurnResult>> {
536        let Self {
537            runtime,
538            effect_host: _,
539            cancel,
540            batch_ids,
541            drain_id: _,
542        } = self;
543        stream_next_queued_prepared_turn(
544            &runtime,
545            TurnSinks::turn(events),
546            scoped_effect_controller,
547            cancel,
548            &batch_ids,
549        )
550        .await
551    }
552}
553
554pub struct ScopedQueuedTurnBuilder<'run> {
555    builder: QueuedTurnBuilder,
556    controller: &'run dyn RuntimeEffectController,
557}
558
559impl<'run> ScopedQueuedTurnBuilder<'run> {
560    pub fn cancel(mut self, cancel: CancellationToken) -> Self {
561        self.builder = self.builder.cancel(cancel);
562        self
563    }
564
565    pub fn batch_ids(mut self, batch_ids: impl IntoIterator<Item = impl Into<String>>) -> Self {
566        self.builder = self.builder.batch_ids(batch_ids);
567        self
568    }
569
570    pub fn drain_id(mut self, drain_id: impl Into<String>) -> Self {
571        self.builder = self.builder.drain_id(drain_id);
572        self
573    }
574
575    pub async fn run(self) -> Result<Option<TurnOutput>> {
576        let collector = RunActivityCollector::default();
577        let Some(result) = self.stream_to(&collector).await? else {
578            return Ok(None);
579        };
580        Ok(Some(TurnOutput {
581            result,
582            activities: collector.into_activities(),
583        }))
584    }
585
586    pub async fn stream_to(self, events: &dyn TurnActivitySink) -> Result<Option<TurnResult>> {
587        self.builder
588            .stream_to_with_effect_controller(events, self.controller)
589            .await
590    }
591}
592
593fn fresh_turn_id() -> String {
594    lash_core::TurnActivityId::fresh().0
595}
596
597fn fresh_queue_drain_id() -> String {
598    format!("queue-drain:{}", fresh_turn_id())
599}
600
601fn trace_turn_id_for_scope(
602    builder: &TurnBuilder,
603    scoped_effect_controller: &ScopedEffectController<'_>,
604) -> Option<String> {
605    if scoped_effect_controller
606        .effect_scope()
607        .validates_turn_trace_id()
608    {
609        Some(
610            builder
611                .turn_id
612                .clone()
613                .unwrap_or_else(|| scoped_effect_controller.scope_id().to_string()),
614        )
615    } else {
616        builder
617            .turn_id
618            .clone()
619            .or_else(|| builder.input.trace_turn_id.clone())
620    }
621}
622
623fn reject_configured_durable_effect_host(
624    effect_host: &dyn EffectHost,
625    operation: &'static str,
626) -> Result<()> {
627    if effect_host.durability_tier() == DurabilityTier::Durable {
628        return Err(EmbedError::DurableEffectHostRequiresHandlerContext { operation });
629    }
630    Ok(())
631}
632
633pub(crate) async fn stream_next_queued_prepared_turn(
634    runtime: &RuntimeHandle,
635    sinks: TurnSinks<'_>,
636    scoped_effect_controller: ScopedEffectController<'_>,
637    cancel: CancellationToken,
638    batch_ids: &[String],
639) -> Result<Option<TurnResult>> {
640    let turn = Box::pin(stream_next_queued_prepared_assembled(
641        runtime,
642        sinks,
643        scoped_effect_controller,
644        cancel,
645        batch_ids,
646    ))
647    .await?;
648    Ok(turn.map(TurnResult::from_assembled))
649}
650
651pub(crate) async fn stream_next_queued_prepared_assembled(
652    runtime: &RuntimeHandle,
653    sinks: TurnSinks<'_>,
654    scoped_effect_controller: ScopedEffectController<'_>,
655    cancel: CancellationToken,
656    batch_ids: &[String],
657) -> Result<Option<AssembledTurn>> {
658    let writer_handle = runtime.writer();
659    let mut writer = writer_handle.lock().await;
660    let observation_sink = SessionObservationTurnActivitySink {
661        runtime: runtime.clone(),
662        live: sinks.turn_events(),
663    };
664    let opts = turn_options(
665        sinks.events(),
666        &observation_sink,
667        scoped_effect_controller,
668        cancel,
669    );
670    let turn = if batch_ids.is_empty() {
671        writer.stream_next_queued_work(opts).await?
672    } else {
673        writer.stream_selected_queued_work(opts, batch_ids).await?
674    };
675    runtime.publish_from(&writer);
676    Ok(turn)
677}
678
679fn turn_options<'a>(
680    events: Option<&'a dyn EventSink>,
681    turn_events: &'a dyn TurnActivitySink,
682    scoped_effect_controller: ScopedEffectController<'a>,
683    cancel: CancellationToken,
684) -> lash_core::TurnOptions<'a> {
685    let mut opts = lash_core::TurnOptions::new(cancel, scoped_effect_controller);
686    if let Some(events) = events {
687        opts = opts.with_events(events);
688    }
689    opts.with_turn_events(turn_events)
690}
691
692struct SessionObservationTurnActivitySink<'a> {
693    runtime: RuntimeHandle,
694    live: Option<&'a dyn TurnActivitySink>,
695}
696
697#[async_trait]
698impl TurnActivitySink for SessionObservationTurnActivitySink<'_> {
699    fn is_noop(&self) -> bool {
700        false
701    }
702
703    async fn emit(&self, activity: TurnActivity) {
704        self.runtime.record_turn_activity(activity.clone());
705        if let Some(live) = self.live {
706            live.emit(activity).await;
707        }
708    }
709}
710
711struct ChannelTurnActivitySink {
712    tx: mpsc::Sender<Result<TurnActivity>>,
713}
714
715#[async_trait]
716impl TurnActivitySink for ChannelTurnActivitySink {
717    async fn emit(&self, activity: TurnActivity) {
718        let _ = self.tx.send(Ok(activity)).await;
719    }
720}
721fn validate_required_plugin_inputs(
722    active_plugins: &[ActivePluginBinding],
723    input: &TurnInput,
724) -> Result<()> {
725    for plugin in active_plugins {
726        if plugin.requires_turn_input && !input.turn_context.has_plugin_input(plugin.id) {
727            return Err(EmbedError::MissingPluginTurnInput {
728                plugin_id: plugin.id,
729            });
730        }
731    }
732    Ok(())
733}
734
735pub(crate) async fn stream_prepared_turn(
736    runtime: &RuntimeHandle,
737    input: TurnInput,
738    sinks: TurnSinks<'_>,
739    scoped_effect_controller: ScopedEffectController<'_>,
740    cancel: CancellationToken,
741) -> Result<TurnResult> {
742    let turn = Box::pin(stream_prepared_assembled(
743        runtime,
744        input,
745        sinks,
746        scoped_effect_controller,
747        cancel,
748    ))
749    .await?;
750    Ok(TurnResult::from_assembled(turn))
751}
752
753pub(crate) async fn stream_prepared_assembled(
754    runtime: &RuntimeHandle,
755    input: TurnInput,
756    sinks: TurnSinks<'_>,
757    scoped_effect_controller: ScopedEffectController<'_>,
758    cancel: CancellationToken,
759) -> Result<AssembledTurn> {
760    let turn = Box::pin(stream_prepared_agent_frame_run(
761        runtime,
762        input,
763        sinks,
764        scoped_effect_controller,
765        cancel,
766    ))
767    .await?;
768    turn.into_final_turn().ok_or_else(|| {
769        EmbedError::Runtime(lash_core::RuntimeError::new(
770            RuntimeErrorCode::EmptyAgentFrameRun,
771            "runtime completed without an assembled turn",
772        ))
773    })
774}
775
776pub(crate) async fn stream_prepared_agent_frame_run(
777    runtime: &RuntimeHandle,
778    input: TurnInput,
779    sinks: TurnSinks<'_>,
780    scoped_effect_controller: ScopedEffectController<'_>,
781    cancel: CancellationToken,
782) -> Result<lash_core::AgentFrameRun> {
783    let writer_handle = runtime.writer();
784    let mut writer = writer_handle.lock().await;
785    if let Some(extension) = input.protocol_extension.as_ref() {
786        writer
787            .validate_protocol_turn_extension(extension)
788            .await
789            .map_err(EmbedError::Session)?;
790    }
791    let observation_sink = SessionObservationTurnActivitySink {
792        runtime: runtime.clone(),
793        live: sinks.turn_events(),
794    };
795    let turn = Box::pin(writer.stream_turn_with_agent_frames(
796        input,
797        turn_options(
798            sinks.events(),
799            &observation_sink,
800            scoped_effect_controller,
801            cancel,
802        ),
803    ))
804    .await?;
805    runtime.publish_from(&writer);
806    Ok(turn)
807}
808
809#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
810pub struct TurnResult {
811    pub state: SessionSnapshot,
812    pub outcome: TurnOutcome,
813    pub assistant_output: AssistantOutput,
814    /// Parent's own LLM tokens for this turn. Does **not** include child
815    /// sessions; see [`children_usage`](Self::children_usage) and
816    /// [`total_usage`](Self::total_usage).
817    pub usage: TokenUsage,
818    /// Per-`(source, model)` ledger entries for child sessions whose LLM
819    /// calls completed during this turn (subagents, compaction, observers,
820    /// etc.). Empty unless the turn spawned children.
821    #[serde(default)]
822    pub children_usage: Vec<TokenLedgerEntry>,
823    pub tool_calls: Vec<ToolCallRecord>,
824    pub execution: ExecutionSummary,
825    pub errors: Vec<TurnIssue>,
826}
827
828impl TurnResult {
829    fn from_assembled(turn: lash_core::AssembledTurn) -> Self {
830        Self {
831            state: turn.state,
832            outcome: turn.outcome,
833            assistant_output: turn.assistant_output,
834            usage: turn.token_usage,
835            children_usage: turn.children_usage,
836            tool_calls: turn.tool_calls,
837            execution: turn.execution,
838            errors: turn.errors,
839        }
840    }
841
842    /// Sum of parent's own LLM tokens and every child session's LLM tokens
843    /// for this turn.
844    pub fn total_usage(&self) -> TokenUsage {
845        let mut total = self.usage.clone();
846        for entry in &self.children_usage {
847            total.add(&entry.usage);
848        }
849        total
850    }
851
852    pub fn assistant_message(&self) -> Option<&str> {
853        match &self.outcome {
854            TurnOutcome::Finished(lash_core::TurnFinish::AssistantMessage { text }) => Some(text),
855            _ => None,
856        }
857    }
858
859    pub fn submitted_value(&self) -> Option<&serde_json::Value> {
860        match &self.outcome {
861            TurnOutcome::Finished(lash_core::TurnFinish::SubmittedValue { value }) => Some(value),
862            _ => None,
863        }
864    }
865
866    pub fn tool_value(&self) -> Option<(&str, &serde_json::Value)> {
867        match &self.outcome {
868            TurnOutcome::Finished(lash_core::TurnFinish::ToolValue { tool_name, value }) => {
869                Some((tool_name.as_str(), value))
870            }
871            _ => None,
872        }
873    }
874
875    pub fn is_success(&self) -> bool {
876        matches!(
877            self.outcome,
878            TurnOutcome::Finished(_) | TurnOutcome::AgentFrameSwitch { .. }
879        )
880    }
881}
882
883#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
884pub struct TurnOutput {
885    pub result: TurnResult,
886    pub activities: Vec<TurnActivity>,
887}
888
889impl TurnOutput {
890    pub fn assistant_message(&self) -> Option<&str> {
891        self.result.assistant_message()
892    }
893
894    pub fn submitted_value(&self) -> Option<&serde_json::Value> {
895        self.result.submitted_value()
896    }
897
898    pub fn tool_value(&self) -> Option<(&str, &serde_json::Value)> {
899        self.result.tool_value()
900    }
901
902    pub fn is_success(&self) -> bool {
903        self.result.is_success()
904    }
905}
906
907struct BorrowedTurnActivityFanout<'a> {
908    live: &'a dyn TurnActivitySink,
909    collector: &'a RunActivityCollector,
910}
911
912#[async_trait]
913impl TurnActivitySink for BorrowedTurnActivityFanout<'_> {
914    async fn emit(&self, activity: TurnActivity) {
915        self.live.emit(activity.clone()).await;
916        self.collector.emit(activity).await;
917    }
918}
919
920#[derive(Default)]
921pub(crate) struct RunActivityCollector {
922    activities: Arc<StdMutex<Vec<TurnActivity>>>,
923}
924
925impl RunActivityCollector {
926    fn into_activities(self) -> Vec<TurnActivity> {
927        self.activities
928            .lock()
929            .expect("run activity collector lock")
930            .clone()
931    }
932
933    #[cfg(test)]
934    pub(crate) fn snapshot(&self) -> Vec<TurnActivity> {
935        self.activities
936            .lock()
937            .expect("run activity collector lock")
938            .clone()
939    }
940}
941
942#[async_trait]
943impl TurnActivitySink for RunActivityCollector {
944    async fn emit(&self, activity: TurnActivity) {
945        self.activities
946            .lock()
947            .expect("run activity collector lock")
948            .push(activity);
949    }
950}
951
952pub struct TurnActivityFanout {
953    sinks: Vec<Arc<dyn TurnActivitySink>>,
954}
955
956impl TurnActivityFanout {
957    pub fn new(sinks: impl IntoIterator<Item = Arc<dyn TurnActivitySink>>) -> Self {
958        Self {
959            sinks: sinks.into_iter().collect(),
960        }
961    }
962}
963
964#[async_trait]
965impl TurnActivitySink for TurnActivityFanout {
966    async fn emit(&self, activity: TurnActivity) {
967        for sink in &self.sinks {
968            sink.emit(activity.clone()).await;
969        }
970    }
971}
972
973pub fn message_text(message: &Message) -> String {
974    message
975        .parts
976        .iter()
977        .map(|part| part.content.as_str())
978        .collect::<Vec<_>>()
979        .join("\n")
980}
981
982pub fn message_role(message: &Message) -> &'static str {
983    match message.role {
984        MessageRole::User => "user",
985        MessageRole::Assistant => "assistant",
986        MessageRole::System => "system",
987        MessageRole::Event => "event",
988    }
989}