Skip to main content

lash/
turn.rs

1use crate::support::*;
2
3pub use lash_core::{AssistantOutput, TurnIssue};
4
5/// The two internal event sinks threaded through the turn-execution helpers.
6///
7/// `events` is the raw lower-level runtime event stream, reachable from app
8/// code only via [`TurnBuilder::advanced`]; `turn_events` is the semantic
9/// [`TurnActivity`] stream used by the primary builder API.
10/// Bundling them keeps the internal turn fns to a single sink parameter.
11#[derive(Clone, Copy, Default)]
12pub(crate) struct TurnSinks<'a> {
13    events: Option<&'a dyn EventSink>,
14    turn_events: Option<&'a dyn TurnActivitySink>,
15}
16
17impl<'a> TurnSinks<'a> {
18    pub(crate) fn turn(events: &'a dyn TurnActivitySink) -> Self {
19        Self {
20            events: None,
21            turn_events: Some(events),
22        }
23    }
24
25    pub(crate) fn session(events: &'a dyn EventSink) -> Self {
26        Self {
27            events: Some(events),
28            turn_events: None,
29        }
30    }
31
32    fn events(&self) -> Option<&'a dyn EventSink> {
33        self.events
34    }
35
36    fn turn_events(&self) -> Option<&'a dyn TurnActivitySink> {
37        self.turn_events
38    }
39}
40
41/// Cancellation tokens of the turns currently executing through one opened
42/// [`LashSession`](crate::LashSession) (shared by its clones).
43/// [`LashSession::cancel_running_turns`](crate::LashSession::cancel_running_turns)
44/// cancels them without the caller having to thread a token around.
45#[derive(Clone, Default)]
46pub(crate) struct TurnCancelRegistry {
47    inner: Arc<StdMutex<TurnCancelRegistryInner>>,
48}
49
50#[derive(Default)]
51struct TurnCancelRegistryInner {
52    next_id: u64,
53    active: BTreeMap<u64, CancellationToken>,
54}
55
56impl TurnCancelRegistry {
57    /// Register the token of a turn that is about to execute. The guard
58    /// removes the entry when the turn finishes, however it finishes.
59    fn register(&self, token: CancellationToken) -> TurnCancelGuard {
60        let mut inner = self.inner.lock().expect("turn cancel registry");
61        let id = inner.next_id;
62        inner.next_id += 1;
63        inner.active.insert(id, token);
64        TurnCancelGuard {
65            registry: Arc::clone(&self.inner),
66            id,
67        }
68    }
69
70    pub(crate) fn cancel_all(&self) -> usize {
71        let inner = self.inner.lock().expect("turn cancel registry");
72        for token in inner.active.values() {
73            token.cancel();
74        }
75        inner.active.len()
76    }
77}
78
79pub(crate) struct TurnCancelGuard {
80    registry: Arc<StdMutex<TurnCancelRegistryInner>>,
81    id: u64,
82}
83
84impl Drop for TurnCancelGuard {
85    fn drop(&mut self) {
86        self.registry
87            .lock()
88            .expect("turn cancel registry")
89            .active
90            .remove(&self.id);
91    }
92}
93
94pub struct TurnBuilder {
95    pub(crate) runtime: RuntimeHandle,
96    pub(crate) effect_host: Arc<dyn EffectHost>,
97    pub(crate) active_plugins: Vec<ActivePluginBinding>,
98    pub(crate) input: TurnInput,
99    pub(crate) cancel: CancellationToken,
100    pub(crate) cancels: TurnCancelRegistry,
101    pub(crate) protocol_turn_options: Option<ProtocolTurnOptions>,
102    pub(crate) provider: Option<ProviderHandle>,
103    pub(crate) model: Option<ModelSpec>,
104    pub(crate) turn_id: Option<String>,
105}
106
107impl TurnBuilder {
108    pub fn cancel(mut self, cancel: CancellationToken) -> Self {
109        self.cancel = cancel;
110        self
111    }
112
113    pub fn protocol_turn_options(mut self, options: ProtocolTurnOptions) -> Self {
114        self.protocol_turn_options = Some(options);
115        self
116    }
117
118    pub fn provider(mut self, provider: ProviderHandle) -> Self {
119        self.provider = Some(provider);
120        self
121    }
122
123    pub fn model(mut self, model: ModelSpec) -> Self {
124        self.model = Some(model);
125        self
126    }
127
128    pub fn turn_id(mut self, id: impl Into<String>) -> Self {
129        self.turn_id = Some(id.into());
130        self
131    }
132
133    pub fn prompt_template(mut self, template: PromptTemplate) -> Self {
134        self.input.turn_context.set_prompt_template(template);
135        self
136    }
137
138    pub fn prompt_contribution(mut self, contribution: PromptContribution) -> Self {
139        self.input
140            .turn_context
141            .add_prompt_contribution(contribution);
142        self
143    }
144
145    pub fn replace_prompt_slot(
146        mut self,
147        slot: PromptSlot,
148        contributions: impl IntoIterator<Item = PromptContribution>,
149    ) -> Self {
150        self.input
151            .turn_context
152            .replace_prompt_slot(slot, contributions);
153        self
154    }
155
156    pub fn clear_prompt_slot(mut self, slot: PromptSlot) -> Self {
157        self.input.turn_context.clear_prompt_slot(slot);
158        self
159    }
160
161    pub fn prompt_layer(mut self, layer: PromptLayer) -> Self {
162        self.input.turn_context.set_prompt_layer(layer);
163        self
164    }
165
166    /// Attach typed per-turn input for an activated plugin binding.
167    ///
168    /// This is the generic primitive. Plugin crates should usually wrap it in a
169    /// domain extension trait such as `.with_tone(tone)` or `.with_board(board)`
170    /// so application code stays typed in its own vocabulary.
171    pub fn with_plugin_input<P: PluginBinding>(mut self, input: P::Input) -> Self {
172        self.input.turn_context.insert_plugin_input(P::ID, input);
173        self
174    }
175
176    pub fn effects(self, controller: &dyn RuntimeEffectController) -> ScopedTurnBuilder<'_> {
177        ScopedTurnBuilder {
178            builder: self,
179            controller,
180        }
181    }
182
183    pub async fn run(self) -> Result<TurnOutput> {
184        let collector = RunActivityCollector::default();
185        let result = self.stream_to(&collector).await?;
186        Ok(TurnOutput {
187            result,
188            activities: collector.into_activities(),
189        })
190    }
191
192    pub async fn stream_to(self, events: &dyn TurnActivitySink) -> Result<TurnResult> {
193        let effect_host = Arc::clone(&self.effect_host);
194        reject_configured_durable_effect_host(effect_host.as_ref(), "turn")?;
195        self.stream_to_with_effect_host(events, effect_host.as_ref())
196            .await
197    }
198
199    pub fn stream(self) -> Result<TurnStream> {
200        let effect_host = Arc::clone(&self.effect_host);
201        reject_configured_durable_effect_host(effect_host.as_ref(), "turn stream")?;
202        self.stream_with_effect_host(effect_host.as_ref())
203    }
204
205    /// Access lower-level turn execution that bypasses the semantic
206    /// [`TurnActivity`] tier.
207    pub fn advanced(self) -> AdvancedTurn {
208        AdvancedTurn { builder: self }
209    }
210
211    fn resolved_turn_id(&self) -> String {
212        self.turn_id
213            .clone()
214            .or_else(|| self.input.trace_turn_id.clone())
215            .unwrap_or_else(fresh_turn_id)
216    }
217
218    fn turn_scope(&self, turn_id: &str) -> lash_core::ExecutionScope {
219        lash_core::ExecutionScope::turn(self.runtime.observe().session_id(), turn_id)
220    }
221
222    pub(crate) fn prepare(
223        mut self,
224        trace_turn_id: Option<String>,
225    ) -> Result<(RuntimeHandle, TurnInput, CancellationToken, TurnCancelGuard)> {
226        if let Some(options) = self.protocol_turn_options {
227            self.input.protocol_turn_options = Some(options);
228        }
229        if let Some(provider) = self.provider {
230            self.input.turn_context.set_provider(provider);
231        }
232        if let Some(model) = self.model {
233            self.input.turn_context.set_model(model);
234        }
235        if let Some(trace_turn_id) = trace_turn_id {
236            self.input.trace_turn_id = Some(trace_turn_id);
237        }
238        validate_required_plugin_inputs(&self.active_plugins, &self.input)?;
239        let cancel_guard = self.cancels.register(self.cancel.clone());
240        Ok((self.runtime, self.input, self.cancel, cancel_guard))
241    }
242
243    async fn stream_to_with_effect_host(
244        self,
245        events: &dyn TurnActivitySink,
246        effect_host: &dyn EffectHost,
247    ) -> Result<TurnResult> {
248        let turn_id = self.resolved_turn_id();
249        let scoped_effect_controller = effect_host.scoped(self.turn_scope(&turn_id))?;
250        self.stream_to_with_scope(events, scoped_effect_controller, Some(turn_id))
251            .await
252    }
253
254    async fn stream_to_with_effect_controller(
255        self,
256        events: &dyn TurnActivitySink,
257        controller: &dyn RuntimeEffectController,
258    ) -> Result<TurnResult> {
259        let turn_id = self.resolved_turn_id();
260        let scoped_effect_controller =
261            ScopedEffectController::borrowed(controller, self.turn_scope(&turn_id))?;
262        self.stream_to_with_scope(events, scoped_effect_controller, Some(turn_id))
263            .await
264    }
265
266    async fn stream_to_with_scope(
267        self,
268        events: &dyn TurnActivitySink,
269        scoped_effect_controller: ScopedEffectController<'_>,
270        trace_turn_id: Option<String>,
271    ) -> Result<TurnResult> {
272        let (runtime, input, cancel, _cancel_guard) = self.prepare(trace_turn_id)?;
273        stream_prepared_turn(
274            &runtime,
275            input,
276            TurnSinks::turn(events),
277            scoped_effect_controller,
278            cancel,
279        )
280        .await
281    }
282
283    fn stream_with_effect_host(self, effect_host: &dyn EffectHost) -> Result<TurnStream> {
284        let turn_id = self.resolved_turn_id();
285        let scoped_effect_controller = effect_host
286            .scoped_static(self.turn_scope(&turn_id))?
287            .ok_or(EmbedError::StaticTurnStreamRequiresStaticEffectHost)?;
288        self.stream_with_scope(scoped_effect_controller, Some(turn_id))
289    }
290
291    fn stream_with_scope(
292        self,
293        scoped_effect_controller: ScopedEffectController<'static>,
294        trace_turn_id: Option<String>,
295    ) -> Result<TurnStream> {
296        let (runtime, input, cancel, cancel_guard) = self.prepare(trace_turn_id)?;
297        let (tx, rx) = mpsc::channel(64);
298        let sink = ChannelTurnActivitySink { tx };
299        let completion = tokio::spawn(async move {
300            let _cancel_guard = cancel_guard;
301            stream_prepared_turn(
302                &runtime,
303                input,
304                TurnSinks::turn(&sink),
305                scoped_effect_controller,
306                cancel,
307            )
308            .await
309        });
310        Ok(TurnStream {
311            activities: rx,
312            completion,
313        })
314    }
315}
316
317pub struct ScopedTurnBuilder<'run> {
318    builder: TurnBuilder,
319    controller: &'run dyn RuntimeEffectController,
320}
321
322impl<'run> ScopedTurnBuilder<'run> {
323    pub fn cancel(mut self, cancel: CancellationToken) -> Self {
324        self.builder = self.builder.cancel(cancel);
325        self
326    }
327
328    pub fn protocol_turn_options(mut self, options: ProtocolTurnOptions) -> Self {
329        self.builder = self.builder.protocol_turn_options(options);
330        self
331    }
332
333    pub fn provider(mut self, provider: ProviderHandle) -> Self {
334        self.builder = self.builder.provider(provider);
335        self
336    }
337
338    pub fn model(mut self, model: ModelSpec) -> Self {
339        self.builder = self.builder.model(model);
340        self
341    }
342
343    pub fn turn_id(mut self, id: impl Into<String>) -> Self {
344        self.builder = self.builder.turn_id(id);
345        self
346    }
347
348    pub fn prompt_template(mut self, template: PromptTemplate) -> Self {
349        self.builder = self.builder.prompt_template(template);
350        self
351    }
352
353    pub fn prompt_contribution(mut self, contribution: PromptContribution) -> Self {
354        self.builder = self.builder.prompt_contribution(contribution);
355        self
356    }
357
358    pub fn replace_prompt_slot(
359        mut self,
360        slot: PromptSlot,
361        contributions: impl IntoIterator<Item = PromptContribution>,
362    ) -> Self {
363        self.builder = self.builder.replace_prompt_slot(slot, contributions);
364        self
365    }
366
367    pub fn clear_prompt_slot(mut self, slot: PromptSlot) -> Self {
368        self.builder = self.builder.clear_prompt_slot(slot);
369        self
370    }
371
372    pub fn prompt_layer(mut self, layer: PromptLayer) -> Self {
373        self.builder = self.builder.prompt_layer(layer);
374        self
375    }
376
377    pub fn with_plugin_input<P: PluginBinding>(mut self, input: P::Input) -> Self {
378        self.builder = self.builder.with_plugin_input::<P>(input);
379        self
380    }
381
382    pub async fn run(self) -> Result<TurnOutput> {
383        let collector = RunActivityCollector::default();
384        let result = self.stream_to(&collector).await?;
385        Ok(TurnOutput {
386            result,
387            activities: collector.into_activities(),
388        })
389    }
390
391    pub async fn stream_to(self, events: &dyn TurnActivitySink) -> Result<TurnResult> {
392        self.builder
393            .stream_to_with_effect_controller(events, self.controller)
394            .await
395    }
396}
397
398/// Lower-level turn execution that exposes the raw runtime event stream.
399///
400/// Reachable via [`TurnBuilder::advanced`]. Most applications should use
401/// [`TurnBuilder::stream_to`] for semantic turn activity; benchmarks and
402/// diagnostics use this when they need the same low-level event stream as the
403/// runtime trace.
404pub struct AdvancedTurn {
405    builder: TurnBuilder,
406}
407
408impl AdvancedTurn {
409    pub async fn run_with_scope(
410        self,
411        scoped_effect_controller: ScopedEffectController<'_>,
412    ) -> Result<TurnOutput> {
413        let collector = RunActivityCollector::default();
414        let result = self
415            .stream_to_with_scope(&collector, scoped_effect_controller)
416            .await?;
417        Ok(TurnOutput {
418            result,
419            activities: collector.into_activities(),
420        })
421    }
422
423    pub async fn collect_with_scope(
424        self,
425        events: &dyn TurnActivitySink,
426        scoped_effect_controller: ScopedEffectController<'_>,
427    ) -> Result<TurnOutput> {
428        let collector = RunActivityCollector::default();
429        let fanout = BorrowedTurnActivityFanout {
430            live: events,
431            collector: &collector,
432        };
433        let result = self
434            .stream_to_with_scope(&fanout, scoped_effect_controller)
435            .await?;
436        Ok(TurnOutput {
437            result,
438            activities: collector.into_activities(),
439        })
440    }
441
442    pub async fn stream_to_with_scope(
443        self,
444        events: &dyn TurnActivitySink,
445        scoped_effect_controller: ScopedEffectController<'_>,
446    ) -> Result<TurnResult> {
447        let trace_turn_id = trace_turn_id_for_scope(&self.builder, &scoped_effect_controller);
448        self.builder
449            .stream_to_with_scope(events, scoped_effect_controller, trace_turn_id)
450            .await
451    }
452
453    pub fn stream_with_scope(
454        self,
455        scoped_effect_controller: ScopedEffectController<'static>,
456    ) -> Result<TurnStream> {
457        let trace_turn_id = trace_turn_id_for_scope(&self.builder, &scoped_effect_controller);
458        self.builder
459            .stream_with_scope(scoped_effect_controller, trace_turn_id)
460    }
461
462    /// Run the turn while sending raw lower-level runtime events to `events`.
463    pub async fn collect_session_events_with_scope(
464        self,
465        events: &dyn EventSink,
466        scoped_effect_controller: ScopedEffectController<'_>,
467    ) -> Result<TurnResult> {
468        let trace_turn_id = trace_turn_id_for_scope(&self.builder, &scoped_effect_controller);
469        let (runtime, input, cancel, _cancel_guard) = self.builder.prepare(trace_turn_id)?;
470        stream_prepared_turn(
471            &runtime,
472            input,
473            TurnSinks::session(events),
474            scoped_effect_controller,
475            cancel,
476        )
477        .await
478    }
479}
480
481pub struct TurnStream {
482    activities: mpsc::Receiver<Result<TurnActivity>>,
483    completion: JoinHandle<Result<TurnResult>>,
484}
485
486impl TurnStream {
487    pub async fn next_activity(&mut self) -> Option<Result<TurnActivity>> {
488        self.activities.recv().await
489    }
490
491    pub async fn finish(self) -> Result<TurnResult> {
492        self.completion.await.map_err(|err| {
493            EmbedError::Runtime(lash_core::RuntimeError::new(
494                RuntimeErrorCode::TurnStreamJoin,
495                format!("turn stream task failed: {err}"),
496            ))
497        })?
498    }
499}
500
501pub struct QueuedTurnBuilder {
502    pub(crate) runtime: RuntimeHandle,
503    pub(crate) effect_host: Arc<dyn EffectHost>,
504    pub(crate) cancel: CancellationToken,
505    pub(crate) cancels: TurnCancelRegistry,
506    pub(crate) batch_ids: Vec<String>,
507    pub(crate) drain_id: Option<String>,
508}
509
510impl QueuedTurnBuilder {
511    pub fn cancel(mut self, cancel: CancellationToken) -> Self {
512        self.cancel = cancel;
513        self
514    }
515
516    pub fn batch_ids(mut self, batch_ids: impl IntoIterator<Item = impl Into<String>>) -> Self {
517        self.batch_ids = batch_ids.into_iter().map(Into::into).collect();
518        self
519    }
520
521    pub fn drain_id(mut self, drain_id: impl Into<String>) -> Self {
522        self.drain_id = Some(drain_id.into());
523        self
524    }
525
526    pub fn effects(self, controller: &dyn RuntimeEffectController) -> ScopedQueuedTurnBuilder<'_> {
527        ScopedQueuedTurnBuilder {
528            builder: self,
529            controller,
530        }
531    }
532
533    pub async fn run(self) -> Result<Option<TurnOutput>> {
534        let collector = RunActivityCollector::default();
535        let Some(result) = self.stream_to(&collector).await? else {
536            return Ok(None);
537        };
538        Ok(Some(TurnOutput {
539            result,
540            activities: collector.into_activities(),
541        }))
542    }
543
544    pub async fn stream_to(self, events: &dyn TurnActivitySink) -> Result<Option<TurnResult>> {
545        let effect_host = Arc::clone(&self.effect_host);
546        reject_configured_durable_effect_host(effect_host.as_ref(), "queued turn")?;
547        self.stream_to_with_effect_host(events, effect_host.as_ref())
548            .await
549    }
550
551    fn resolved_drain_id(&self) -> String {
552        self.drain_id
553            .clone()
554            .or_else(|| self.batch_ids.first().cloned())
555            .unwrap_or_else(fresh_queue_drain_id)
556    }
557
558    async fn stream_to_with_effect_host(
559        self,
560        events: &dyn TurnActivitySink,
561        effect_host: &dyn EffectHost,
562    ) -> Result<Option<TurnResult>> {
563        let drain_id = self.resolved_drain_id();
564        let scope =
565            lash_core::ExecutionScope::queue_drain(self.runtime.observe().session_id(), drain_id);
566        let scoped_effect_controller = effect_host.scoped(scope)?;
567        self.stream_to_with_scope(events, scoped_effect_controller)
568            .await
569    }
570
571    async fn stream_to_with_effect_controller(
572        self,
573        events: &dyn TurnActivitySink,
574        controller: &dyn RuntimeEffectController,
575    ) -> Result<Option<TurnResult>> {
576        let drain_id = self.resolved_drain_id();
577        let scope =
578            lash_core::ExecutionScope::queue_drain(self.runtime.observe().session_id(), drain_id);
579        let scoped_effect_controller = ScopedEffectController::borrowed(controller, scope)?;
580        self.stream_to_with_scope(events, scoped_effect_controller)
581            .await
582    }
583
584    async fn stream_to_with_scope(
585        self,
586        events: &dyn TurnActivitySink,
587        scoped_effect_controller: ScopedEffectController<'_>,
588    ) -> Result<Option<TurnResult>> {
589        let Self {
590            runtime,
591            effect_host: _,
592            cancel,
593            cancels,
594            batch_ids,
595            drain_id: _,
596        } = self;
597        let _cancel_guard = cancels.register(cancel.clone());
598        stream_next_queued_prepared_turn(
599            &runtime,
600            TurnSinks::turn(events),
601            scoped_effect_controller,
602            cancel,
603            &batch_ids,
604        )
605        .await
606    }
607}
608
609pub struct ScopedQueuedTurnBuilder<'run> {
610    builder: QueuedTurnBuilder,
611    controller: &'run dyn RuntimeEffectController,
612}
613
614impl<'run> ScopedQueuedTurnBuilder<'run> {
615    pub fn cancel(mut self, cancel: CancellationToken) -> Self {
616        self.builder = self.builder.cancel(cancel);
617        self
618    }
619
620    pub fn batch_ids(mut self, batch_ids: impl IntoIterator<Item = impl Into<String>>) -> Self {
621        self.builder = self.builder.batch_ids(batch_ids);
622        self
623    }
624
625    pub fn drain_id(mut self, drain_id: impl Into<String>) -> Self {
626        self.builder = self.builder.drain_id(drain_id);
627        self
628    }
629
630    pub async fn run(self) -> Result<Option<TurnOutput>> {
631        let collector = RunActivityCollector::default();
632        let Some(result) = self.stream_to(&collector).await? else {
633            return Ok(None);
634        };
635        Ok(Some(TurnOutput {
636            result,
637            activities: collector.into_activities(),
638        }))
639    }
640
641    pub async fn stream_to(self, events: &dyn TurnActivitySink) -> Result<Option<TurnResult>> {
642        self.builder
643            .stream_to_with_effect_controller(events, self.controller)
644            .await
645    }
646}
647
648fn fresh_turn_id() -> String {
649    lash_core::TurnActivityId::fresh().0
650}
651
652fn fresh_queue_drain_id() -> String {
653    format!("queue-drain:{}", fresh_turn_id())
654}
655
656fn trace_turn_id_for_scope(
657    builder: &TurnBuilder,
658    scoped_effect_controller: &ScopedEffectController<'_>,
659) -> Option<String> {
660    if scoped_effect_controller
661        .execution_scope()
662        .validates_turn_trace_id()
663    {
664        Some(
665            builder
666                .turn_id
667                .clone()
668                .unwrap_or_else(|| scoped_effect_controller.scope_id().to_string()),
669        )
670    } else {
671        builder
672            .turn_id
673            .clone()
674            .or_else(|| builder.input.trace_turn_id.clone())
675    }
676}
677
678fn reject_configured_durable_effect_host(
679    effect_host: &dyn EffectHost,
680    operation: &'static str,
681) -> Result<()> {
682    if effect_host.durability_tier() == DurabilityTier::Durable {
683        return Err(EmbedError::DurableEffectHostRequiresHandlerContext { operation });
684    }
685    Ok(())
686}
687
688pub(crate) async fn stream_next_queued_prepared_turn(
689    runtime: &RuntimeHandle,
690    sinks: TurnSinks<'_>,
691    scoped_effect_controller: ScopedEffectController<'_>,
692    cancel: CancellationToken,
693    batch_ids: &[String],
694) -> Result<Option<TurnResult>> {
695    let turn = Box::pin(stream_next_queued_prepared_assembled(
696        runtime,
697        sinks,
698        scoped_effect_controller,
699        cancel,
700        batch_ids,
701    ))
702    .await?;
703    Ok(turn.map(TurnResult::from_assembled))
704}
705
706pub(crate) async fn stream_next_queued_prepared_assembled(
707    runtime: &RuntimeHandle,
708    sinks: TurnSinks<'_>,
709    scoped_effect_controller: ScopedEffectController<'_>,
710    cancel: CancellationToken,
711    batch_ids: &[String],
712) -> Result<Option<AssembledTurn>> {
713    let writer_handle = runtime.writer();
714    let mut writer = writer_handle.lock().await;
715    let observation_sink = SessionObservationTurnActivitySink {
716        runtime: runtime.clone(),
717        live: sinks.turn_events(),
718    };
719    let opts = turn_options(
720        sinks.events(),
721        &observation_sink,
722        scoped_effect_controller,
723        cancel,
724    );
725    let turn = if batch_ids.is_empty() {
726        writer.stream_next_queued_work(opts).await?
727    } else {
728        writer.stream_selected_queued_work(opts, batch_ids).await?
729    };
730    runtime.publish_from(&writer);
731    Ok(turn)
732}
733
734fn turn_options<'a>(
735    events: Option<&'a dyn EventSink>,
736    turn_events: &'a dyn TurnActivitySink,
737    scoped_effect_controller: ScopedEffectController<'a>,
738    cancel: CancellationToken,
739) -> lash_core::TurnOptions<'a> {
740    let mut opts = lash_core::TurnOptions::new(cancel, scoped_effect_controller);
741    if let Some(events) = events {
742        opts = opts.with_events(events);
743    }
744    opts.with_turn_events(turn_events)
745}
746
747struct SessionObservationTurnActivitySink<'a> {
748    runtime: RuntimeHandle,
749    live: Option<&'a dyn TurnActivitySink>,
750}
751
752#[async_trait]
753impl TurnActivitySink for SessionObservationTurnActivitySink<'_> {
754    fn is_noop(&self) -> bool {
755        false
756    }
757
758    async fn emit(&self, activity: TurnActivity) {
759        self.runtime.record_turn_activity(activity.clone());
760        if let Some(live) = self.live {
761            live.emit(activity).await;
762        }
763    }
764}
765
766struct ChannelTurnActivitySink {
767    tx: mpsc::Sender<Result<TurnActivity>>,
768}
769
770#[async_trait]
771impl TurnActivitySink for ChannelTurnActivitySink {
772    async fn emit(&self, activity: TurnActivity) {
773        let _ = self.tx.send(Ok(activity)).await;
774    }
775}
776fn validate_required_plugin_inputs(
777    active_plugins: &[ActivePluginBinding],
778    input: &TurnInput,
779) -> Result<()> {
780    for plugin in active_plugins {
781        if plugin.requires_turn_input && !input.turn_context.has_plugin_input(plugin.id) {
782            return Err(EmbedError::MissingPluginTurnInput {
783                plugin_id: plugin.id,
784            });
785        }
786    }
787    Ok(())
788}
789
790pub(crate) async fn stream_prepared_turn(
791    runtime: &RuntimeHandle,
792    input: TurnInput,
793    sinks: TurnSinks<'_>,
794    scoped_effect_controller: ScopedEffectController<'_>,
795    cancel: CancellationToken,
796) -> Result<TurnResult> {
797    let turn = Box::pin(stream_prepared_assembled(
798        runtime,
799        input,
800        sinks,
801        scoped_effect_controller,
802        cancel,
803    ))
804    .await?;
805    Ok(TurnResult::from_assembled(turn))
806}
807
808pub(crate) async fn stream_prepared_assembled(
809    runtime: &RuntimeHandle,
810    input: TurnInput,
811    sinks: TurnSinks<'_>,
812    scoped_effect_controller: ScopedEffectController<'_>,
813    cancel: CancellationToken,
814) -> Result<AssembledTurn> {
815    let turn = Box::pin(stream_prepared_agent_frame_run(
816        runtime,
817        input,
818        sinks,
819        scoped_effect_controller,
820        cancel,
821    ))
822    .await?;
823    turn.into_final_turn().ok_or_else(|| {
824        EmbedError::Runtime(lash_core::RuntimeError::new(
825            RuntimeErrorCode::EmptyAgentFrameRun,
826            "runtime completed without an assembled turn",
827        ))
828    })
829}
830
831pub(crate) async fn stream_prepared_agent_frame_run(
832    runtime: &RuntimeHandle,
833    input: TurnInput,
834    sinks: TurnSinks<'_>,
835    scoped_effect_controller: ScopedEffectController<'_>,
836    cancel: CancellationToken,
837) -> Result<lash_core::AgentFrameRun> {
838    let writer_handle = runtime.writer();
839    let mut writer = writer_handle.lock().await;
840    if let Some(extension) = input.protocol_extension.as_ref() {
841        writer
842            .validate_protocol_turn_extension(extension)
843            .await
844            .map_err(EmbedError::Session)?;
845    }
846    let observation_sink = SessionObservationTurnActivitySink {
847        runtime: runtime.clone(),
848        live: sinks.turn_events(),
849    };
850    let turn = Box::pin(writer.stream_turn_with_agent_frames(
851        input,
852        turn_options(
853            sinks.events(),
854            &observation_sink,
855            scoped_effect_controller,
856            cancel,
857        ),
858    ))
859    .await?;
860    runtime.publish_from(&writer);
861    Ok(turn)
862}
863
864#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
865pub struct TurnResult {
866    pub state: SessionSnapshot,
867    pub outcome: TurnOutcome,
868    pub assistant_output: AssistantOutput,
869    /// Parent's own LLM tokens for this turn. Does **not** include child
870    /// sessions; see [`children_usage`](Self::children_usage) and
871    /// [`total_usage`](Self::total_usage).
872    pub usage: TokenUsage,
873    /// Per-`(source, model)` ledger entries for child sessions whose LLM
874    /// calls completed during this turn (subagents, compaction, observers,
875    /// etc.). Empty unless the turn spawned children.
876    #[serde(default)]
877    pub children_usage: Vec<TokenLedgerEntry>,
878    pub tool_calls: Vec<ToolCallRecord>,
879    pub execution: ExecutionSummary,
880    pub errors: Vec<TurnIssue>,
881}
882
883impl TurnResult {
884    fn from_assembled(turn: lash_core::AssembledTurn) -> Self {
885        Self {
886            state: turn.state,
887            outcome: turn.outcome,
888            assistant_output: turn.assistant_output,
889            usage: turn.token_usage,
890            children_usage: turn.children_usage,
891            tool_calls: turn.tool_calls,
892            execution: turn.execution,
893            errors: turn.errors,
894        }
895    }
896
897    /// Sum of parent's own LLM tokens and every child session's LLM tokens
898    /// for this turn.
899    pub fn total_usage(&self) -> TokenUsage {
900        let mut total = self.usage.clone();
901        for entry in &self.children_usage {
902            total.add(&entry.usage);
903        }
904        total
905    }
906
907    pub fn assistant_message(&self) -> Option<&str> {
908        match &self.outcome {
909            TurnOutcome::Finished(lash_core::TurnFinish::AssistantMessage { text }) => Some(text),
910            _ => None,
911        }
912    }
913
914    pub fn submitted_value(&self) -> Option<&serde_json::Value> {
915        match &self.outcome {
916            TurnOutcome::Finished(lash_core::TurnFinish::SubmittedValue { value }) => Some(value),
917            _ => None,
918        }
919    }
920
921    pub fn tool_value(&self) -> Option<(&str, &serde_json::Value)> {
922        match &self.outcome {
923            TurnOutcome::Finished(lash_core::TurnFinish::ToolValue { tool_name, value }) => {
924                Some((tool_name.as_str(), value))
925            }
926            _ => None,
927        }
928    }
929
930    pub fn is_success(&self) -> bool {
931        matches!(
932            self.outcome,
933            TurnOutcome::Finished(_) | TurnOutcome::AgentFrameSwitch { .. }
934        )
935    }
936}
937
938#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
939pub struct TurnOutput {
940    pub result: TurnResult,
941    pub activities: Vec<TurnActivity>,
942}
943
944impl TurnOutput {
945    pub fn assistant_message(&self) -> Option<&str> {
946        self.result.assistant_message()
947    }
948
949    pub fn submitted_value(&self) -> Option<&serde_json::Value> {
950        self.result.submitted_value()
951    }
952
953    pub fn tool_value(&self) -> Option<(&str, &serde_json::Value)> {
954        self.result.tool_value()
955    }
956
957    pub fn is_success(&self) -> bool {
958        self.result.is_success()
959    }
960}
961
962struct BorrowedTurnActivityFanout<'a> {
963    live: &'a dyn TurnActivitySink,
964    collector: &'a RunActivityCollector,
965}
966
967#[async_trait]
968impl TurnActivitySink for BorrowedTurnActivityFanout<'_> {
969    async fn emit(&self, activity: TurnActivity) {
970        self.live.emit(activity.clone()).await;
971        self.collector.emit(activity).await;
972    }
973}
974
975#[derive(Default)]
976pub(crate) struct RunActivityCollector {
977    activities: Arc<StdMutex<Vec<TurnActivity>>>,
978}
979
980impl RunActivityCollector {
981    fn into_activities(self) -> Vec<TurnActivity> {
982        self.activities
983            .lock()
984            .expect("run activity collector lock")
985            .clone()
986    }
987
988    #[cfg(test)]
989    pub(crate) fn snapshot(&self) -> Vec<TurnActivity> {
990        self.activities
991            .lock()
992            .expect("run activity collector lock")
993            .clone()
994    }
995}
996
997#[async_trait]
998impl TurnActivitySink for RunActivityCollector {
999    async fn emit(&self, activity: TurnActivity) {
1000        self.activities
1001            .lock()
1002            .expect("run activity collector lock")
1003            .push(activity);
1004    }
1005}
1006
1007pub struct TurnActivityFanout {
1008    sinks: Vec<Arc<dyn TurnActivitySink>>,
1009}
1010
1011impl TurnActivityFanout {
1012    pub fn new(sinks: impl IntoIterator<Item = Arc<dyn TurnActivitySink>>) -> Self {
1013        Self {
1014            sinks: sinks.into_iter().collect(),
1015        }
1016    }
1017}
1018
1019#[async_trait]
1020impl TurnActivitySink for TurnActivityFanout {
1021    async fn emit(&self, activity: TurnActivity) {
1022        for sink in &self.sinks {
1023            sink.emit(activity.clone()).await;
1024        }
1025    }
1026}
1027
1028pub fn message_text(message: &Message) -> String {
1029    message
1030        .parts
1031        .iter()
1032        .map(|part| part.content.as_str())
1033        .collect::<Vec<_>>()
1034        .join("\n")
1035}
1036
1037pub fn message_role(message: &Message) -> &'static str {
1038    match message.role {
1039        MessageRole::User => "user",
1040        MessageRole::Assistant => "assistant",
1041        MessageRole::System => "system",
1042        MessageRole::Event => "event",
1043    }
1044}