Skip to main content

lash/
turn.rs

1use crate::support::*;
2
3pub use lash_core::{AssistantOutput, TurnIssue};
4
5/// The two internal event sinks threaded through the turn-execution helpers.
6///
7/// `events` is the raw lower-level runtime event stream, reachable from app
8/// code only via [`TurnBuilder::advanced`]; `turn_events` is the semantic
9/// [`TurnActivity`] stream used by the primary builder API.
10/// Bundling them keeps the internal turn fns to a single sink parameter.
11#[derive(Clone, Copy, Default)]
12pub(crate) struct TurnSinks<'a> {
13    events: Option<&'a dyn EventSink>,
14    turn_events: Option<&'a dyn TurnActivitySink>,
15}
16
17impl<'a> TurnSinks<'a> {
18    pub(crate) fn turn(events: &'a dyn TurnActivitySink) -> Self {
19        Self {
20            events: None,
21            turn_events: Some(events),
22        }
23    }
24
25    pub(crate) fn session(events: &'a dyn EventSink) -> Self {
26        Self {
27            events: Some(events),
28            turn_events: None,
29        }
30    }
31
32    fn events(&self) -> Option<&'a dyn EventSink> {
33        self.events
34    }
35
36    fn turn_events(&self) -> Option<&'a dyn TurnActivitySink> {
37        self.turn_events
38    }
39}
40
41/// Cancellation tokens of the turns currently executing through one opened
42/// [`LashSession`](crate::LashSession) (shared by its clones).
43/// [`LashSession::cancel_running_turns`](crate::LashSession::cancel_running_turns)
44/// cancels them without the caller having to thread a token around.
45#[derive(Clone, Default)]
46pub(crate) struct TurnCancelRegistry {
47    inner: Arc<StdMutex<TurnCancelRegistryInner>>,
48}
49
50#[derive(Default)]
51struct TurnCancelRegistryInner {
52    next_id: u64,
53    active: BTreeMap<u64, CancellationToken>,
54}
55
56impl TurnCancelRegistry {
57    /// Register the token of a turn that is about to execute. The guard
58    /// removes the entry when the turn finishes, however it finishes.
59    fn register(&self, token: CancellationToken) -> TurnCancelGuard {
60        let mut inner = self.inner.lock().expect("turn cancel registry");
61        let id = inner.next_id;
62        inner.next_id += 1;
63        inner.active.insert(id, token);
64        TurnCancelGuard {
65            registry: Arc::clone(&self.inner),
66            id,
67        }
68    }
69
70    pub(crate) fn cancel_all(&self) -> usize {
71        let inner = self.inner.lock().expect("turn cancel registry");
72        for token in inner.active.values() {
73            token.cancel();
74        }
75        inner.active.len()
76    }
77}
78
79pub(crate) struct TurnCancelGuard {
80    registry: Arc<StdMutex<TurnCancelRegistryInner>>,
81    id: u64,
82}
83
84impl Drop for TurnCancelGuard {
85    fn drop(&mut self) {
86        self.registry
87            .lock()
88            .expect("turn cancel registry")
89            .active
90            .remove(&self.id);
91    }
92}
93
94pub struct TurnBuilder {
95    pub(crate) runtime: RuntimeHandle,
96    pub(crate) effect_host: Arc<dyn EffectHost>,
97    pub(crate) active_plugins: Vec<ActivePluginBinding>,
98    pub(crate) input: TurnInput,
99    pub(crate) cancel: CancellationToken,
100    pub(crate) cancels: TurnCancelRegistry,
101    pub(crate) protocol_turn_options: Option<ProtocolTurnOptions>,
102    pub(crate) provider: Option<ProviderHandle>,
103    pub(crate) model: Option<ModelSpec>,
104    pub(crate) turn_id: Option<String>,
105}
106
107impl TurnBuilder {
108    pub fn cancel(mut self, cancel: CancellationToken) -> Self {
109        self.cancel = cancel;
110        self
111    }
112
113    pub fn protocol_turn_options(mut self, options: ProtocolTurnOptions) -> Self {
114        self.protocol_turn_options = Some(options);
115        self
116    }
117
118    pub fn provider(mut self, provider: ProviderHandle) -> Self {
119        self.provider = Some(provider);
120        self
121    }
122
123    pub fn model(mut self, model: ModelSpec) -> Self {
124        self.model = Some(model);
125        self
126    }
127
128    pub fn turn_id(mut self, id: impl Into<String>) -> Self {
129        self.turn_id = Some(id.into());
130        self
131    }
132
133    pub fn prompt_template(mut self, template: PromptTemplate) -> Self {
134        self.input.turn_context.set_prompt_template(template);
135        self
136    }
137
138    pub fn prompt_contribution(mut self, contribution: PromptContribution) -> Self {
139        self.input
140            .turn_context
141            .add_prompt_contribution(contribution);
142        self
143    }
144
145    pub fn replace_prompt_slot(
146        mut self,
147        slot: PromptSlot,
148        contributions: impl IntoIterator<Item = PromptContribution>,
149    ) -> Self {
150        self.input
151            .turn_context
152            .replace_prompt_slot(slot, contributions);
153        self
154    }
155
156    pub fn clear_prompt_slot(mut self, slot: PromptSlot) -> Self {
157        self.input.turn_context.clear_prompt_slot(slot);
158        self
159    }
160
161    pub fn prompt_layer(mut self, layer: PromptLayer) -> Self {
162        self.input.turn_context.set_prompt_layer(layer);
163        self
164    }
165
166    /// Attach typed per-turn input for an activated plugin binding.
167    ///
168    /// This is the generic primitive. Plugin crates should usually wrap it in a
169    /// domain extension trait such as `.with_tone(tone)` or `.with_board(board)`
170    /// so application code stays typed in its own vocabulary.
171    pub fn with_plugin_input<P: PluginBinding>(mut self, input: P::Input) -> Self {
172        self.input.turn_context.insert_plugin_input(P::ID, input);
173        self
174    }
175
176    pub fn effects(self, controller: &dyn RuntimeEffectController) -> ScopedTurnBuilder<'_> {
177        ScopedTurnBuilder {
178            builder: self,
179            controller,
180        }
181    }
182
183    pub async fn run(self) -> Result<TurnOutput> {
184        let collector = RunActivityCollector::default();
185        let result = self.stream_to(&collector).await?;
186        Ok(TurnOutput {
187            result,
188            activities: collector.into_activities(),
189        })
190    }
191
192    pub async fn stream_to(self, events: &dyn TurnActivitySink) -> Result<TurnResult> {
193        let effect_host = Arc::clone(&self.effect_host);
194        reject_configured_durable_effect_host(effect_host.as_ref(), "turn")?;
195        self.stream_to_with_effect_host(events, effect_host.as_ref())
196            .await
197    }
198
199    pub fn stream(self) -> Result<TurnStream> {
200        let effect_host = Arc::clone(&self.effect_host);
201        reject_configured_durable_effect_host(effect_host.as_ref(), "turn stream")?;
202        self.stream_with_effect_host(effect_host.as_ref())
203    }
204
205    /// Access lower-level turn execution that bypasses the semantic
206    /// [`TurnActivity`] tier.
207    pub fn advanced(self) -> AdvancedTurn {
208        AdvancedTurn { builder: self }
209    }
210
211    fn resolved_turn_id(&self) -> String {
212        self.turn_id
213            .clone()
214            .or_else(|| self.input.trace_turn_id.clone())
215            .unwrap_or_else(fresh_turn_id)
216    }
217
218    fn turn_scope(&self, turn_id: &str) -> lash_core::ExecutionScope {
219        lash_core::ExecutionScope::turn(self.runtime.observe().session_id(), turn_id)
220    }
221
222    pub(crate) fn prepare(
223        mut self,
224        trace_turn_id: Option<String>,
225    ) -> Result<(RuntimeHandle, TurnInput, CancellationToken, TurnCancelGuard)> {
226        if let Some(options) = self.protocol_turn_options {
227            self.input.protocol_turn_options = Some(options);
228        }
229        if let Some(provider) = self.provider {
230            self.input.turn_context.set_provider(provider);
231        }
232        if let Some(model) = self.model {
233            self.input.turn_context.set_model(model);
234        }
235        if let Some(trace_turn_id) = trace_turn_id {
236            self.input.trace_turn_id = Some(trace_turn_id);
237        }
238        validate_required_plugin_inputs(&self.active_plugins, &self.input)?;
239        let cancel_guard = self.cancels.register(self.cancel.clone());
240        Ok((self.runtime, self.input, self.cancel, cancel_guard))
241    }
242
243    async fn stream_to_with_effect_host(
244        self,
245        events: &dyn TurnActivitySink,
246        effect_host: &dyn EffectHost,
247    ) -> Result<TurnResult> {
248        let turn_id = self.resolved_turn_id();
249        let scoped_effect_controller = effect_host.scoped(self.turn_scope(&turn_id))?;
250        self.stream_to_with_scope(events, scoped_effect_controller, Some(turn_id))
251            .await
252    }
253
254    async fn stream_to_with_effect_controller(
255        self,
256        events: &dyn TurnActivitySink,
257        controller: &dyn RuntimeEffectController,
258    ) -> Result<TurnResult> {
259        let turn_id = self.resolved_turn_id();
260        let scoped_effect_controller =
261            ScopedEffectController::borrowed(controller, self.turn_scope(&turn_id))?;
262        self.stream_to_with_scope(events, scoped_effect_controller, Some(turn_id))
263            .await
264    }
265
266    async fn stream_to_with_scope(
267        self,
268        events: &dyn TurnActivitySink,
269        scoped_effect_controller: ScopedEffectController<'_>,
270        trace_turn_id: Option<String>,
271    ) -> Result<TurnResult> {
272        let (runtime, input, cancel, _cancel_guard) = self.prepare(trace_turn_id)?;
273        stream_prepared_turn(
274            &runtime,
275            input,
276            TurnSinks::turn(events),
277            scoped_effect_controller,
278            cancel,
279        )
280        .await
281    }
282
283    fn stream_with_effect_host(self, effect_host: &dyn EffectHost) -> Result<TurnStream> {
284        let turn_id = self.resolved_turn_id();
285        let scoped_effect_controller = effect_host
286            .scoped_static(self.turn_scope(&turn_id))?
287            .ok_or(EmbedError::StaticTurnStreamRequiresStaticEffectHost)?;
288        self.stream_with_scope(scoped_effect_controller, Some(turn_id))
289    }
290
291    fn stream_with_scope(
292        self,
293        scoped_effect_controller: ScopedEffectController<'static>,
294        trace_turn_id: Option<String>,
295    ) -> Result<TurnStream> {
296        let (runtime, input, cancel, cancel_guard) = self.prepare(trace_turn_id)?;
297        let (tx, rx) = mpsc::channel(64);
298        let sink = ChannelTurnActivitySink { tx };
299        let completion = tokio::spawn(async move {
300            let _cancel_guard = cancel_guard;
301            stream_prepared_turn(
302                &runtime,
303                input,
304                TurnSinks::turn(&sink),
305                scoped_effect_controller,
306                cancel,
307            )
308            .await
309        });
310        Ok(TurnStream {
311            activities: rx,
312            completion,
313        })
314    }
315}
316
317pub struct ScopedTurnBuilder<'run> {
318    builder: TurnBuilder,
319    controller: &'run dyn RuntimeEffectController,
320}
321
322impl<'run> ScopedTurnBuilder<'run> {
323    pub fn cancel(mut self, cancel: CancellationToken) -> Self {
324        self.builder = self.builder.cancel(cancel);
325        self
326    }
327
328    pub fn protocol_turn_options(mut self, options: ProtocolTurnOptions) -> Self {
329        self.builder = self.builder.protocol_turn_options(options);
330        self
331    }
332
333    pub fn provider(mut self, provider: ProviderHandle) -> Self {
334        self.builder = self.builder.provider(provider);
335        self
336    }
337
338    pub fn model(mut self, model: ModelSpec) -> Self {
339        self.builder = self.builder.model(model);
340        self
341    }
342
343    pub fn turn_id(mut self, id: impl Into<String>) -> Self {
344        self.builder = self.builder.turn_id(id);
345        self
346    }
347
348    pub fn prompt_template(mut self, template: PromptTemplate) -> Self {
349        self.builder = self.builder.prompt_template(template);
350        self
351    }
352
353    pub fn prompt_contribution(mut self, contribution: PromptContribution) -> Self {
354        self.builder = self.builder.prompt_contribution(contribution);
355        self
356    }
357
358    pub fn replace_prompt_slot(
359        mut self,
360        slot: PromptSlot,
361        contributions: impl IntoIterator<Item = PromptContribution>,
362    ) -> Self {
363        self.builder = self.builder.replace_prompt_slot(slot, contributions);
364        self
365    }
366
367    pub fn clear_prompt_slot(mut self, slot: PromptSlot) -> Self {
368        self.builder = self.builder.clear_prompt_slot(slot);
369        self
370    }
371
372    pub fn prompt_layer(mut self, layer: PromptLayer) -> Self {
373        self.builder = self.builder.prompt_layer(layer);
374        self
375    }
376
377    pub fn with_plugin_input<P: PluginBinding>(mut self, input: P::Input) -> Self {
378        self.builder = self.builder.with_plugin_input::<P>(input);
379        self
380    }
381
382    pub async fn run(self) -> Result<TurnOutput> {
383        let collector = RunActivityCollector::default();
384        let result = self.stream_to(&collector).await?;
385        Ok(TurnOutput {
386            result,
387            activities: collector.into_activities(),
388        })
389    }
390
391    pub async fn stream_to(self, events: &dyn TurnActivitySink) -> Result<TurnResult> {
392        self.builder
393            .stream_to_with_effect_controller(events, self.controller)
394            .await
395    }
396}
397
398/// Lower-level turn execution that exposes the raw runtime event stream.
399///
400/// Reachable via [`TurnBuilder::advanced`]. Most applications should use
401/// [`TurnBuilder::stream_to`] for semantic turn activity; benchmarks and
402/// diagnostics use this when they need the same low-level event stream as the
403/// runtime trace.
404pub struct AdvancedTurn {
405    builder: TurnBuilder,
406}
407
408impl AdvancedTurn {
409    pub async fn run_with_scope(
410        self,
411        scoped_effect_controller: ScopedEffectController<'_>,
412    ) -> Result<TurnOutput> {
413        let collector = RunActivityCollector::default();
414        let result = self
415            .stream_to_with_scope(&collector, scoped_effect_controller)
416            .await?;
417        Ok(TurnOutput {
418            result,
419            activities: collector.into_activities(),
420        })
421    }
422
423    pub async fn collect_with_scope(
424        self,
425        events: &dyn TurnActivitySink,
426        scoped_effect_controller: ScopedEffectController<'_>,
427    ) -> Result<TurnOutput> {
428        let collector = RunActivityCollector::default();
429        let fanout = BorrowedTurnActivityFanout {
430            live: events,
431            collector: &collector,
432        };
433        let result = self
434            .stream_to_with_scope(&fanout, scoped_effect_controller)
435            .await?;
436        Ok(TurnOutput {
437            result,
438            activities: collector.into_activities(),
439        })
440    }
441
442    pub async fn stream_to_with_scope(
443        self,
444        events: &dyn TurnActivitySink,
445        scoped_effect_controller: ScopedEffectController<'_>,
446    ) -> Result<TurnResult> {
447        let trace_turn_id = trace_turn_id_for_scope(&self.builder, &scoped_effect_controller);
448        self.builder
449            .stream_to_with_scope(events, scoped_effect_controller, trace_turn_id)
450            .await
451    }
452
453    pub fn stream_with_scope(
454        self,
455        scoped_effect_controller: ScopedEffectController<'static>,
456    ) -> Result<TurnStream> {
457        let trace_turn_id = trace_turn_id_for_scope(&self.builder, &scoped_effect_controller);
458        self.builder
459            .stream_with_scope(scoped_effect_controller, trace_turn_id)
460    }
461
462    /// Run the turn while sending raw lower-level runtime events to `events`.
463    pub async fn collect_session_events_with_scope(
464        self,
465        events: &dyn EventSink,
466        scoped_effect_controller: ScopedEffectController<'_>,
467    ) -> Result<TurnResult> {
468        let trace_turn_id = trace_turn_id_for_scope(&self.builder, &scoped_effect_controller);
469        let (runtime, input, cancel, _cancel_guard) = self.builder.prepare(trace_turn_id)?;
470        stream_prepared_turn(
471            &runtime,
472            input,
473            TurnSinks::session(events),
474            scoped_effect_controller,
475            cancel,
476        )
477        .await
478    }
479}
480
481pub struct TurnStream {
482    activities: mpsc::Receiver<Result<TurnActivity>>,
483    completion: JoinHandle<Result<TurnResult>>,
484}
485
486impl TurnStream {
487    pub async fn next_activity(&mut self) -> Option<Result<TurnActivity>> {
488        self.activities.recv().await
489    }
490
491    pub async fn finish(self) -> Result<TurnResult> {
492        self.completion.await.map_err(|err| {
493            EmbedError::Runtime(lash_core::RuntimeError::new(
494                RuntimeErrorCode::TurnStreamJoin,
495                format!("turn stream task failed: {err}"),
496            ))
497        })?
498    }
499}
500
501pub struct QueuedTurnBuilder {
502    pub(crate) runtime: RuntimeHandle,
503    pub(crate) effect_host: Arc<dyn EffectHost>,
504    pub(crate) cancel: CancellationToken,
505    pub(crate) cancels: TurnCancelRegistry,
506    pub(crate) batch_ids: Vec<String>,
507    pub(crate) drain_id: Option<String>,
508}
509
510impl QueuedTurnBuilder {
511    pub fn cancel(mut self, cancel: CancellationToken) -> Self {
512        self.cancel = cancel;
513        self
514    }
515
516    pub fn batch_ids(mut self, batch_ids: impl IntoIterator<Item = impl Into<String>>) -> Self {
517        self.batch_ids = batch_ids.into_iter().map(Into::into).collect();
518        self
519    }
520
521    pub fn drain_id(mut self, drain_id: impl Into<String>) -> Self {
522        self.drain_id = Some(drain_id.into());
523        self
524    }
525
526    pub fn effects(self, controller: &dyn RuntimeEffectController) -> ScopedQueuedTurnBuilder<'_> {
527        ScopedQueuedTurnBuilder {
528            builder: self,
529            controller,
530        }
531    }
532
533    pub async fn run(self) -> Result<Option<TurnOutput>> {
534        let collector = RunActivityCollector::default();
535        let Some(result) = self.stream_to(&collector).await? else {
536            return Ok(None);
537        };
538        Ok(Some(TurnOutput {
539            result,
540            activities: collector.into_activities(),
541        }))
542    }
543
544    pub async fn stream_to(self, events: &dyn TurnActivitySink) -> Result<Option<TurnResult>> {
545        let effect_host = Arc::clone(&self.effect_host);
546        reject_configured_durable_effect_host(effect_host.as_ref(), "queued turn")?;
547        self.stream_to_with_effect_host(events, effect_host.as_ref())
548            .await
549    }
550
551    pub fn advanced(self) -> AdvancedQueuedTurn {
552        AdvancedQueuedTurn { builder: self }
553    }
554
555    fn resolved_drain_id(&self) -> String {
556        self.drain_id
557            .clone()
558            .or_else(|| self.batch_ids.first().cloned())
559            .unwrap_or_else(fresh_queue_drain_id)
560    }
561
562    async fn stream_to_with_effect_host(
563        self,
564        events: &dyn TurnActivitySink,
565        effect_host: &dyn EffectHost,
566    ) -> Result<Option<TurnResult>> {
567        let drain_id = self.resolved_drain_id();
568        let scope =
569            lash_core::ExecutionScope::queue_drain(self.runtime.observe().session_id(), drain_id);
570        let scoped_effect_controller = effect_host.scoped(scope)?;
571        self.stream_to_with_scope(events, scoped_effect_controller)
572            .await
573    }
574
575    async fn stream_to_with_effect_controller(
576        self,
577        events: &dyn TurnActivitySink,
578        controller: &dyn RuntimeEffectController,
579    ) -> Result<Option<TurnResult>> {
580        let drain_id = self.resolved_drain_id();
581        let scope =
582            lash_core::ExecutionScope::queue_drain(self.runtime.observe().session_id(), drain_id);
583        let scoped_effect_controller = ScopedEffectController::borrowed(controller, scope)?;
584        self.stream_to_with_scope(events, scoped_effect_controller)
585            .await
586    }
587
588    async fn stream_to_with_scope(
589        self,
590        events: &dyn TurnActivitySink,
591        scoped_effect_controller: ScopedEffectController<'_>,
592    ) -> Result<Option<TurnResult>> {
593        let Self {
594            runtime,
595            effect_host: _,
596            cancel,
597            cancels,
598            batch_ids,
599            drain_id: _,
600        } = self;
601        let _cancel_guard = cancels.register(cancel.clone());
602        stream_next_queued_prepared_turn(
603            &runtime,
604            TurnSinks::turn(events),
605            scoped_effect_controller,
606            cancel,
607            &batch_ids,
608        )
609        .await
610    }
611}
612
613pub struct ScopedQueuedTurnBuilder<'run> {
614    builder: QueuedTurnBuilder,
615    controller: &'run dyn RuntimeEffectController,
616}
617
618impl<'run> ScopedQueuedTurnBuilder<'run> {
619    pub fn cancel(mut self, cancel: CancellationToken) -> Self {
620        self.builder = self.builder.cancel(cancel);
621        self
622    }
623
624    pub fn batch_ids(mut self, batch_ids: impl IntoIterator<Item = impl Into<String>>) -> Self {
625        self.builder = self.builder.batch_ids(batch_ids);
626        self
627    }
628
629    pub fn drain_id(mut self, drain_id: impl Into<String>) -> Self {
630        self.builder = self.builder.drain_id(drain_id);
631        self
632    }
633
634    pub async fn run(self) -> Result<Option<TurnOutput>> {
635        let collector = RunActivityCollector::default();
636        let Some(result) = self.stream_to(&collector).await? else {
637            return Ok(None);
638        };
639        Ok(Some(TurnOutput {
640            result,
641            activities: collector.into_activities(),
642        }))
643    }
644
645    pub async fn stream_to(self, events: &dyn TurnActivitySink) -> Result<Option<TurnResult>> {
646        self.builder
647            .stream_to_with_effect_controller(events, self.controller)
648            .await
649    }
650}
651
652pub struct AdvancedQueuedTurn {
653    builder: QueuedTurnBuilder,
654}
655
656impl AdvancedQueuedTurn {
657    pub async fn stream_to_with_scope(
658        self,
659        events: &dyn TurnActivitySink,
660        scoped_effect_controller: ScopedEffectController<'_>,
661    ) -> Result<Option<TurnResult>> {
662        self.builder
663            .stream_to_with_scope(events, scoped_effect_controller)
664            .await
665    }
666}
667
668fn fresh_turn_id() -> String {
669    lash_core::TurnActivityId::fresh().0
670}
671
672fn fresh_queue_drain_id() -> String {
673    format!("queue-drain:{}", fresh_turn_id())
674}
675
676fn trace_turn_id_for_scope(
677    builder: &TurnBuilder,
678    scoped_effect_controller: &ScopedEffectController<'_>,
679) -> Option<String> {
680    if scoped_effect_controller
681        .execution_scope()
682        .validates_turn_trace_id()
683    {
684        Some(
685            builder
686                .turn_id
687                .clone()
688                .unwrap_or_else(|| scoped_effect_controller.scope_id().to_string()),
689        )
690    } else {
691        builder
692            .turn_id
693            .clone()
694            .or_else(|| builder.input.trace_turn_id.clone())
695    }
696}
697
698fn reject_configured_durable_effect_host(
699    effect_host: &dyn EffectHost,
700    operation: &'static str,
701) -> Result<()> {
702    if effect_host.durability_tier() == DurabilityTier::Durable {
703        return Err(EmbedError::DurableEffectHostRequiresHandlerContext { operation });
704    }
705    Ok(())
706}
707
708pub(crate) async fn stream_next_queued_prepared_turn(
709    runtime: &RuntimeHandle,
710    sinks: TurnSinks<'_>,
711    scoped_effect_controller: ScopedEffectController<'_>,
712    cancel: CancellationToken,
713    batch_ids: &[String],
714) -> Result<Option<TurnResult>> {
715    let turn = Box::pin(stream_next_queued_prepared_assembled(
716        runtime,
717        sinks,
718        scoped_effect_controller,
719        cancel,
720        batch_ids,
721    ))
722    .await?;
723    Ok(turn.map(TurnResult::from_assembled))
724}
725
726pub(crate) async fn stream_next_queued_prepared_assembled(
727    runtime: &RuntimeHandle,
728    sinks: TurnSinks<'_>,
729    scoped_effect_controller: ScopedEffectController<'_>,
730    cancel: CancellationToken,
731    batch_ids: &[String],
732) -> Result<Option<AssembledTurn>> {
733    let writer_handle = runtime.writer();
734    let mut writer = writer_handle.lock().await;
735    let observation_sink = SessionObservationTurnActivitySink {
736        runtime: runtime.clone(),
737        live: sinks.turn_events(),
738    };
739    let opts = turn_options(
740        sinks.events(),
741        &observation_sink,
742        scoped_effect_controller,
743        cancel,
744    );
745    let turn = if batch_ids.is_empty() {
746        writer.stream_next_queued_work(opts).await?
747    } else {
748        writer.stream_selected_queued_work(opts, batch_ids).await?
749    };
750    runtime.publish_from(&writer);
751    Ok(turn)
752}
753
754fn turn_options<'a>(
755    events: Option<&'a dyn EventSink>,
756    turn_events: &'a dyn TurnActivitySink,
757    scoped_effect_controller: ScopedEffectController<'a>,
758    cancel: CancellationToken,
759) -> lash_core::TurnOptions<'a> {
760    let mut opts = lash_core::TurnOptions::new(cancel, scoped_effect_controller);
761    if let Some(events) = events {
762        opts = opts.with_events(events);
763    }
764    opts.with_turn_events(turn_events)
765}
766
767struct SessionObservationTurnActivitySink<'a> {
768    runtime: RuntimeHandle,
769    live: Option<&'a dyn TurnActivitySink>,
770}
771
772#[async_trait]
773impl TurnActivitySink for SessionObservationTurnActivitySink<'_> {
774    fn is_noop(&self) -> bool {
775        false
776    }
777
778    async fn emit(&self, activity: TurnActivity) {
779        self.runtime.record_turn_activity(activity.clone());
780        if let Some(live) = self.live {
781            live.emit(activity).await;
782        }
783    }
784}
785
786struct ChannelTurnActivitySink {
787    tx: mpsc::Sender<Result<TurnActivity>>,
788}
789
790#[async_trait]
791impl TurnActivitySink for ChannelTurnActivitySink {
792    async fn emit(&self, activity: TurnActivity) {
793        let _ = self.tx.send(Ok(activity)).await;
794    }
795}
796fn validate_required_plugin_inputs(
797    active_plugins: &[ActivePluginBinding],
798    input: &TurnInput,
799) -> Result<()> {
800    for plugin in active_plugins {
801        if plugin.requires_turn_input && !input.turn_context.has_plugin_input(plugin.id) {
802            return Err(EmbedError::MissingPluginTurnInput {
803                plugin_id: plugin.id,
804            });
805        }
806    }
807    Ok(())
808}
809
810pub(crate) async fn stream_prepared_turn(
811    runtime: &RuntimeHandle,
812    input: TurnInput,
813    sinks: TurnSinks<'_>,
814    scoped_effect_controller: ScopedEffectController<'_>,
815    cancel: CancellationToken,
816) -> Result<TurnResult> {
817    let turn = Box::pin(stream_prepared_assembled(
818        runtime,
819        input,
820        sinks,
821        scoped_effect_controller,
822        cancel,
823    ))
824    .await?;
825    Ok(TurnResult::from_assembled(turn))
826}
827
828pub(crate) async fn stream_prepared_assembled(
829    runtime: &RuntimeHandle,
830    input: TurnInput,
831    sinks: TurnSinks<'_>,
832    scoped_effect_controller: ScopedEffectController<'_>,
833    cancel: CancellationToken,
834) -> Result<AssembledTurn> {
835    let turn = Box::pin(stream_prepared_agent_frame_run(
836        runtime,
837        input,
838        sinks,
839        scoped_effect_controller,
840        cancel,
841    ))
842    .await?;
843    turn.into_final_turn().ok_or_else(|| {
844        EmbedError::Runtime(lash_core::RuntimeError::new(
845            RuntimeErrorCode::EmptyAgentFrameRun,
846            "runtime completed without an assembled turn",
847        ))
848    })
849}
850
851pub(crate) async fn stream_prepared_agent_frame_run(
852    runtime: &RuntimeHandle,
853    input: TurnInput,
854    sinks: TurnSinks<'_>,
855    scoped_effect_controller: ScopedEffectController<'_>,
856    cancel: CancellationToken,
857) -> Result<lash_core::AgentFrameRun> {
858    let writer_handle = runtime.writer();
859    let mut writer = writer_handle.lock().await;
860    if let Some(extension) = input.protocol_extension.as_ref() {
861        writer
862            .validate_protocol_turn_extension(extension)
863            .await
864            .map_err(EmbedError::Session)?;
865    }
866    let observation_sink = SessionObservationTurnActivitySink {
867        runtime: runtime.clone(),
868        live: sinks.turn_events(),
869    };
870    let turn = Box::pin(writer.stream_turn_with_agent_frames(
871        input,
872        turn_options(
873            sinks.events(),
874            &observation_sink,
875            scoped_effect_controller,
876            cancel,
877        ),
878    ))
879    .await?;
880    runtime.publish_from(&writer);
881    Ok(turn)
882}
883
884#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
885pub struct TurnResult {
886    pub state: SessionSnapshot,
887    pub outcome: TurnOutcome,
888    pub assistant_output: AssistantOutput,
889    /// Parent's own LLM tokens for this turn. Does **not** include child
890    /// sessions; see [`children_usage`](Self::children_usage) and
891    /// [`total_usage`](Self::total_usage).
892    pub usage: TokenUsage,
893    /// Per-`(source, model)` ledger entries for child sessions whose LLM
894    /// calls completed during this turn (subagents, compaction, observers,
895    /// etc.). Empty unless the turn spawned children.
896    #[serde(default)]
897    pub children_usage: Vec<TokenLedgerEntry>,
898    pub tool_calls: Vec<ToolCallRecord>,
899    pub execution: ExecutionSummary,
900    pub errors: Vec<TurnIssue>,
901}
902
903impl TurnResult {
904    fn from_assembled(turn: lash_core::AssembledTurn) -> Self {
905        Self {
906            state: turn.state,
907            outcome: turn.outcome,
908            assistant_output: turn.assistant_output,
909            usage: turn.token_usage,
910            children_usage: turn.children_usage,
911            tool_calls: turn.tool_calls,
912            execution: turn.execution,
913            errors: turn.errors,
914        }
915    }
916
917    /// Sum of parent's own LLM tokens and every child session's LLM tokens
918    /// for this turn.
919    pub fn total_usage(&self) -> TokenUsage {
920        let mut total = self.usage.clone();
921        for entry in &self.children_usage {
922            total.add(&entry.usage);
923        }
924        total
925    }
926
927    pub fn assistant_message(&self) -> Option<&str> {
928        match &self.outcome {
929            TurnOutcome::Finished(lash_core::TurnFinish::AssistantMessage { text }) => Some(text),
930            _ => None,
931        }
932    }
933
934    pub fn submitted_value(&self) -> Option<&serde_json::Value> {
935        match &self.outcome {
936            TurnOutcome::Finished(lash_core::TurnFinish::SubmittedValue { value }) => Some(value),
937            _ => None,
938        }
939    }
940
941    pub fn tool_value(&self) -> Option<(&str, &serde_json::Value)> {
942        match &self.outcome {
943            TurnOutcome::Finished(lash_core::TurnFinish::ToolValue { tool_name, value }) => {
944                Some((tool_name.as_str(), value))
945            }
946            _ => None,
947        }
948    }
949
950    pub fn is_success(&self) -> bool {
951        matches!(
952            self.outcome,
953            TurnOutcome::Finished(_) | TurnOutcome::AgentFrameSwitch { .. }
954        )
955    }
956}
957
958#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
959pub struct TurnOutput {
960    pub result: TurnResult,
961    pub activities: Vec<TurnActivity>,
962}
963
964impl TurnOutput {
965    pub fn assistant_message(&self) -> Option<&str> {
966        self.result.assistant_message()
967    }
968
969    pub fn submitted_value(&self) -> Option<&serde_json::Value> {
970        self.result.submitted_value()
971    }
972
973    pub fn tool_value(&self) -> Option<(&str, &serde_json::Value)> {
974        self.result.tool_value()
975    }
976
977    pub fn is_success(&self) -> bool {
978        self.result.is_success()
979    }
980}
981
982struct BorrowedTurnActivityFanout<'a> {
983    live: &'a dyn TurnActivitySink,
984    collector: &'a RunActivityCollector,
985}
986
987#[async_trait]
988impl TurnActivitySink for BorrowedTurnActivityFanout<'_> {
989    async fn emit(&self, activity: TurnActivity) {
990        self.live.emit(activity.clone()).await;
991        self.collector.emit(activity).await;
992    }
993}
994
995#[derive(Default)]
996pub(crate) struct RunActivityCollector {
997    activities: Arc<StdMutex<Vec<TurnActivity>>>,
998}
999
1000impl RunActivityCollector {
1001    fn into_activities(self) -> Vec<TurnActivity> {
1002        self.activities
1003            .lock()
1004            .expect("run activity collector lock")
1005            .clone()
1006    }
1007
1008    #[cfg(test)]
1009    pub(crate) fn snapshot(&self) -> Vec<TurnActivity> {
1010        self.activities
1011            .lock()
1012            .expect("run activity collector lock")
1013            .clone()
1014    }
1015}
1016
1017#[async_trait]
1018impl TurnActivitySink for RunActivityCollector {
1019    async fn emit(&self, activity: TurnActivity) {
1020        self.activities
1021            .lock()
1022            .expect("run activity collector lock")
1023            .push(activity);
1024    }
1025}
1026
1027pub struct TurnActivityFanout {
1028    sinks: Vec<Arc<dyn TurnActivitySink>>,
1029}
1030
1031impl TurnActivityFanout {
1032    pub fn new(sinks: impl IntoIterator<Item = Arc<dyn TurnActivitySink>>) -> Self {
1033        Self {
1034            sinks: sinks.into_iter().collect(),
1035        }
1036    }
1037}
1038
1039#[async_trait]
1040impl TurnActivitySink for TurnActivityFanout {
1041    async fn emit(&self, activity: TurnActivity) {
1042        for sink in &self.sinks {
1043            sink.emit(activity.clone()).await;
1044        }
1045    }
1046}
1047
1048pub fn message_text(message: &Message) -> String {
1049    message
1050        .parts
1051        .iter()
1052        .map(|part| part.content.as_str())
1053        .collect::<Vec<_>>()
1054        .join("\n")
1055}
1056
1057pub fn message_role(message: &Message) -> &'static str {
1058    match message.role {
1059        MessageRole::User => "user",
1060        MessageRole::Assistant => "assistant",
1061        MessageRole::System => "system",
1062        MessageRole::Event => "event",
1063    }
1064}