1use crate::support::*;
2
3pub use lash_core::{AssistantOutput, TurnIssue};
4
5#[derive(Clone, Copy, Default)]
12pub(crate) struct TurnSinks<'a> {
13 events: Option<&'a dyn EventSink>,
14 turn_events: Option<&'a dyn TurnActivitySink>,
15}
16
17impl<'a> TurnSinks<'a> {
18 pub(crate) fn turn(events: &'a dyn TurnActivitySink) -> Self {
19 Self {
20 events: None,
21 turn_events: Some(events),
22 }
23 }
24
25 pub(crate) fn session(events: &'a dyn EventSink) -> Self {
26 Self {
27 events: Some(events),
28 turn_events: None,
29 }
30 }
31
32 fn events(&self) -> Option<&'a dyn EventSink> {
33 self.events
34 }
35
36 fn turn_events(&self) -> Option<&'a dyn TurnActivitySink> {
37 self.turn_events
38 }
39}
40
41#[derive(Clone, Default)]
46pub(crate) struct TurnCancelRegistry {
47 inner: Arc<StdMutex<TurnCancelRegistryInner>>,
48}
49
50#[derive(Default)]
51struct TurnCancelRegistryInner {
52 next_id: u64,
53 active: BTreeMap<u64, CancellationToken>,
54}
55
56impl TurnCancelRegistry {
57 fn register(&self, token: CancellationToken) -> TurnCancelGuard {
60 let mut inner = self.inner.lock().expect("turn cancel registry");
61 let id = inner.next_id;
62 inner.next_id += 1;
63 inner.active.insert(id, token);
64 TurnCancelGuard {
65 registry: Arc::clone(&self.inner),
66 id,
67 }
68 }
69
70 pub(crate) fn cancel_all(&self) -> usize {
71 let inner = self.inner.lock().expect("turn cancel registry");
72 for token in inner.active.values() {
73 token.cancel();
74 }
75 inner.active.len()
76 }
77}
78
79pub(crate) struct TurnCancelGuard {
80 registry: Arc<StdMutex<TurnCancelRegistryInner>>,
81 id: u64,
82}
83
84impl Drop for TurnCancelGuard {
85 fn drop(&mut self) {
86 self.registry
87 .lock()
88 .expect("turn cancel registry")
89 .active
90 .remove(&self.id);
91 }
92}
93
94pub struct TurnBuilder {
95 pub(crate) runtime: RuntimeHandle,
96 pub(crate) effect_host: Arc<dyn EffectHost>,
97 pub(crate) active_plugins: Vec<ActivePluginBinding>,
98 pub(crate) input: TurnInput,
99 pub(crate) cancel: CancellationToken,
100 pub(crate) cancels: TurnCancelRegistry,
101 pub(crate) protocol_turn_options: Option<ProtocolTurnOptions>,
102 pub(crate) provider: Option<ProviderHandle>,
103 pub(crate) model: Option<ModelSpec>,
104 pub(crate) turn_id: Option<String>,
105}
106
107impl TurnBuilder {
108 pub fn cancel(mut self, cancel: CancellationToken) -> Self {
109 self.cancel = cancel;
110 self
111 }
112
113 pub fn protocol_turn_options(mut self, options: ProtocolTurnOptions) -> Self {
114 self.protocol_turn_options = Some(options);
115 self
116 }
117
118 pub fn provider(mut self, provider: ProviderHandle) -> Self {
119 self.provider = Some(provider);
120 self
121 }
122
123 pub fn model(mut self, model: ModelSpec) -> Self {
124 self.model = Some(model);
125 self
126 }
127
128 pub fn turn_id(mut self, id: impl Into<String>) -> Self {
129 self.turn_id = Some(id.into());
130 self
131 }
132
133 pub fn prompt_template(mut self, template: PromptTemplate) -> Self {
134 self.input.turn_context.set_prompt_template(template);
135 self
136 }
137
138 pub fn prompt_contribution(mut self, contribution: PromptContribution) -> Self {
139 self.input
140 .turn_context
141 .add_prompt_contribution(contribution);
142 self
143 }
144
145 pub fn replace_prompt_slot(
146 mut self,
147 slot: PromptSlot,
148 contributions: impl IntoIterator<Item = PromptContribution>,
149 ) -> Self {
150 self.input
151 .turn_context
152 .replace_prompt_slot(slot, contributions);
153 self
154 }
155
156 pub fn clear_prompt_slot(mut self, slot: PromptSlot) -> Self {
157 self.input.turn_context.clear_prompt_slot(slot);
158 self
159 }
160
161 pub fn prompt_layer(mut self, layer: PromptLayer) -> Self {
162 self.input.turn_context.set_prompt_layer(layer);
163 self
164 }
165
166 pub fn with_plugin_input<P: PluginBinding>(mut self, input: P::Input) -> Self {
172 self.input.turn_context.insert_plugin_input(P::ID, input);
173 self
174 }
175
176 pub fn effects(self, controller: &dyn RuntimeEffectController) -> ScopedTurnBuilder<'_> {
177 ScopedTurnBuilder {
178 builder: self,
179 controller,
180 }
181 }
182
183 pub async fn run(self) -> Result<TurnOutput> {
184 let collector = RunActivityCollector::default();
185 let result = self.stream_to(&collector).await?;
186 Ok(TurnOutput {
187 result,
188 activities: collector.into_activities(),
189 })
190 }
191
192 pub async fn stream_to(self, events: &dyn TurnActivitySink) -> Result<TurnResult> {
193 let effect_host = Arc::clone(&self.effect_host);
194 reject_configured_durable_effect_host(effect_host.as_ref(), "turn")?;
195 self.stream_to_with_effect_host(events, effect_host.as_ref())
196 .await
197 }
198
199 pub fn stream(self) -> Result<TurnStream> {
200 let effect_host = Arc::clone(&self.effect_host);
201 reject_configured_durable_effect_host(effect_host.as_ref(), "turn stream")?;
202 self.stream_with_effect_host(effect_host.as_ref())
203 }
204
205 pub fn advanced(self) -> AdvancedTurn {
208 AdvancedTurn { builder: self }
209 }
210
211 fn resolved_turn_id(&self) -> String {
212 self.turn_id
213 .clone()
214 .or_else(|| self.input.trace_turn_id.clone())
215 .unwrap_or_else(fresh_turn_id)
216 }
217
218 fn turn_scope(&self, turn_id: &str) -> lash_core::EffectScope {
219 lash_core::EffectScope::turn(self.runtime.observe().session_id(), turn_id)
220 }
221
222 pub(crate) fn prepare(
223 mut self,
224 trace_turn_id: Option<String>,
225 ) -> Result<(RuntimeHandle, TurnInput, CancellationToken, TurnCancelGuard)> {
226 if let Some(options) = self.protocol_turn_options {
227 self.input.protocol_turn_options = Some(options);
228 }
229 if let Some(provider) = self.provider {
230 self.input.turn_context.set_provider(provider);
231 }
232 if let Some(model) = self.model {
233 self.input.turn_context.set_model(model);
234 }
235 if let Some(trace_turn_id) = trace_turn_id {
236 self.input.trace_turn_id = Some(trace_turn_id);
237 }
238 validate_required_plugin_inputs(&self.active_plugins, &self.input)?;
239 let cancel_guard = self.cancels.register(self.cancel.clone());
240 Ok((self.runtime, self.input, self.cancel, cancel_guard))
241 }
242
243 async fn stream_to_with_effect_host(
244 self,
245 events: &dyn TurnActivitySink,
246 effect_host: &dyn EffectHost,
247 ) -> Result<TurnResult> {
248 let turn_id = self.resolved_turn_id();
249 let scoped_effect_controller = effect_host.scoped(self.turn_scope(&turn_id))?;
250 self.stream_to_with_scope(events, scoped_effect_controller, Some(turn_id))
251 .await
252 }
253
254 async fn stream_to_with_effect_controller(
255 self,
256 events: &dyn TurnActivitySink,
257 controller: &dyn RuntimeEffectController,
258 ) -> Result<TurnResult> {
259 let turn_id = self.resolved_turn_id();
260 let scoped_effect_controller =
261 ScopedEffectController::borrowed(controller, self.turn_scope(&turn_id))?;
262 self.stream_to_with_scope(events, scoped_effect_controller, Some(turn_id))
263 .await
264 }
265
266 async fn stream_to_with_scope(
267 self,
268 events: &dyn TurnActivitySink,
269 scoped_effect_controller: ScopedEffectController<'_>,
270 trace_turn_id: Option<String>,
271 ) -> Result<TurnResult> {
272 let (runtime, input, cancel, _cancel_guard) = self.prepare(trace_turn_id)?;
273 stream_prepared_turn(
274 &runtime,
275 input,
276 TurnSinks::turn(events),
277 scoped_effect_controller,
278 cancel,
279 )
280 .await
281 }
282
283 fn stream_with_effect_host(self, effect_host: &dyn EffectHost) -> Result<TurnStream> {
284 let turn_id = self.resolved_turn_id();
285 let scoped_effect_controller = effect_host
286 .scoped_static(self.turn_scope(&turn_id))?
287 .ok_or(EmbedError::StaticTurnStreamRequiresStaticEffectHost)?;
288 self.stream_with_scope(scoped_effect_controller, Some(turn_id))
289 }
290
291 fn stream_with_scope(
292 self,
293 scoped_effect_controller: ScopedEffectController<'static>,
294 trace_turn_id: Option<String>,
295 ) -> Result<TurnStream> {
296 let (runtime, input, cancel, cancel_guard) = self.prepare(trace_turn_id)?;
297 let (tx, rx) = mpsc::channel(64);
298 let sink = ChannelTurnActivitySink { tx };
299 let completion = tokio::spawn(async move {
300 let _cancel_guard = cancel_guard;
301 stream_prepared_turn(
302 &runtime,
303 input,
304 TurnSinks::turn(&sink),
305 scoped_effect_controller,
306 cancel,
307 )
308 .await
309 });
310 Ok(TurnStream {
311 activities: rx,
312 completion,
313 })
314 }
315}
316
317pub struct ScopedTurnBuilder<'run> {
318 builder: TurnBuilder,
319 controller: &'run dyn RuntimeEffectController,
320}
321
322impl<'run> ScopedTurnBuilder<'run> {
323 pub fn cancel(mut self, cancel: CancellationToken) -> Self {
324 self.builder = self.builder.cancel(cancel);
325 self
326 }
327
328 pub fn protocol_turn_options(mut self, options: ProtocolTurnOptions) -> Self {
329 self.builder = self.builder.protocol_turn_options(options);
330 self
331 }
332
333 pub fn provider(mut self, provider: ProviderHandle) -> Self {
334 self.builder = self.builder.provider(provider);
335 self
336 }
337
338 pub fn model(mut self, model: ModelSpec) -> Self {
339 self.builder = self.builder.model(model);
340 self
341 }
342
343 pub fn turn_id(mut self, id: impl Into<String>) -> Self {
344 self.builder = self.builder.turn_id(id);
345 self
346 }
347
348 pub fn prompt_template(mut self, template: PromptTemplate) -> Self {
349 self.builder = self.builder.prompt_template(template);
350 self
351 }
352
353 pub fn prompt_contribution(mut self, contribution: PromptContribution) -> Self {
354 self.builder = self.builder.prompt_contribution(contribution);
355 self
356 }
357
358 pub fn replace_prompt_slot(
359 mut self,
360 slot: PromptSlot,
361 contributions: impl IntoIterator<Item = PromptContribution>,
362 ) -> Self {
363 self.builder = self.builder.replace_prompt_slot(slot, contributions);
364 self
365 }
366
367 pub fn clear_prompt_slot(mut self, slot: PromptSlot) -> Self {
368 self.builder = self.builder.clear_prompt_slot(slot);
369 self
370 }
371
372 pub fn prompt_layer(mut self, layer: PromptLayer) -> Self {
373 self.builder = self.builder.prompt_layer(layer);
374 self
375 }
376
377 pub fn with_plugin_input<P: PluginBinding>(mut self, input: P::Input) -> Self {
378 self.builder = self.builder.with_plugin_input::<P>(input);
379 self
380 }
381
382 pub async fn run(self) -> Result<TurnOutput> {
383 let collector = RunActivityCollector::default();
384 let result = self.stream_to(&collector).await?;
385 Ok(TurnOutput {
386 result,
387 activities: collector.into_activities(),
388 })
389 }
390
391 pub async fn stream_to(self, events: &dyn TurnActivitySink) -> Result<TurnResult> {
392 self.builder
393 .stream_to_with_effect_controller(events, self.controller)
394 .await
395 }
396}
397
398pub struct AdvancedTurn {
405 builder: TurnBuilder,
406}
407
408impl AdvancedTurn {
409 pub async fn run_with_scope(
410 self,
411 scoped_effect_controller: ScopedEffectController<'_>,
412 ) -> Result<TurnOutput> {
413 let collector = RunActivityCollector::default();
414 let result = self
415 .stream_to_with_scope(&collector, scoped_effect_controller)
416 .await?;
417 Ok(TurnOutput {
418 result,
419 activities: collector.into_activities(),
420 })
421 }
422
423 pub async fn collect_with_scope(
424 self,
425 events: &dyn TurnActivitySink,
426 scoped_effect_controller: ScopedEffectController<'_>,
427 ) -> Result<TurnOutput> {
428 let collector = RunActivityCollector::default();
429 let fanout = BorrowedTurnActivityFanout {
430 live: events,
431 collector: &collector,
432 };
433 let result = self
434 .stream_to_with_scope(&fanout, scoped_effect_controller)
435 .await?;
436 Ok(TurnOutput {
437 result,
438 activities: collector.into_activities(),
439 })
440 }
441
442 pub async fn stream_to_with_scope(
443 self,
444 events: &dyn TurnActivitySink,
445 scoped_effect_controller: ScopedEffectController<'_>,
446 ) -> Result<TurnResult> {
447 let trace_turn_id = trace_turn_id_for_scope(&self.builder, &scoped_effect_controller);
448 self.builder
449 .stream_to_with_scope(events, scoped_effect_controller, trace_turn_id)
450 .await
451 }
452
453 pub fn stream_with_scope(
454 self,
455 scoped_effect_controller: ScopedEffectController<'static>,
456 ) -> Result<TurnStream> {
457 let trace_turn_id = trace_turn_id_for_scope(&self.builder, &scoped_effect_controller);
458 self.builder
459 .stream_with_scope(scoped_effect_controller, trace_turn_id)
460 }
461
462 pub async fn collect_session_events_with_scope(
464 self,
465 events: &dyn EventSink,
466 scoped_effect_controller: ScopedEffectController<'_>,
467 ) -> Result<TurnResult> {
468 let trace_turn_id = trace_turn_id_for_scope(&self.builder, &scoped_effect_controller);
469 let (runtime, input, cancel, _cancel_guard) = self.builder.prepare(trace_turn_id)?;
470 stream_prepared_turn(
471 &runtime,
472 input,
473 TurnSinks::session(events),
474 scoped_effect_controller,
475 cancel,
476 )
477 .await
478 }
479}
480
481pub struct TurnStream {
482 activities: mpsc::Receiver<Result<TurnActivity>>,
483 completion: JoinHandle<Result<TurnResult>>,
484}
485
486impl TurnStream {
487 pub async fn next_activity(&mut self) -> Option<Result<TurnActivity>> {
488 self.activities.recv().await
489 }
490
491 pub async fn finish(self) -> Result<TurnResult> {
492 self.completion.await.map_err(|err| {
493 EmbedError::Runtime(lash_core::RuntimeError::new(
494 RuntimeErrorCode::TurnStreamJoin,
495 format!("turn stream task failed: {err}"),
496 ))
497 })?
498 }
499}
500
501pub struct QueuedTurnBuilder {
502 pub(crate) runtime: RuntimeHandle,
503 pub(crate) effect_host: Arc<dyn EffectHost>,
504 pub(crate) cancel: CancellationToken,
505 pub(crate) cancels: TurnCancelRegistry,
506 pub(crate) batch_ids: Vec<String>,
507 pub(crate) drain_id: Option<String>,
508}
509
510impl QueuedTurnBuilder {
511 pub fn cancel(mut self, cancel: CancellationToken) -> Self {
512 self.cancel = cancel;
513 self
514 }
515
516 pub fn batch_ids(mut self, batch_ids: impl IntoIterator<Item = impl Into<String>>) -> Self {
517 self.batch_ids = batch_ids.into_iter().map(Into::into).collect();
518 self
519 }
520
521 pub fn drain_id(mut self, drain_id: impl Into<String>) -> Self {
522 self.drain_id = Some(drain_id.into());
523 self
524 }
525
526 pub fn effects(self, controller: &dyn RuntimeEffectController) -> ScopedQueuedTurnBuilder<'_> {
527 ScopedQueuedTurnBuilder {
528 builder: self,
529 controller,
530 }
531 }
532
533 pub async fn run(self) -> Result<Option<TurnOutput>> {
534 let collector = RunActivityCollector::default();
535 let Some(result) = self.stream_to(&collector).await? else {
536 return Ok(None);
537 };
538 Ok(Some(TurnOutput {
539 result,
540 activities: collector.into_activities(),
541 }))
542 }
543
544 pub async fn stream_to(self, events: &dyn TurnActivitySink) -> Result<Option<TurnResult>> {
545 let effect_host = Arc::clone(&self.effect_host);
546 reject_configured_durable_effect_host(effect_host.as_ref(), "queued turn")?;
547 self.stream_to_with_effect_host(events, effect_host.as_ref())
548 .await
549 }
550
551 fn resolved_drain_id(&self) -> String {
552 self.drain_id
553 .clone()
554 .or_else(|| self.batch_ids.first().cloned())
555 .unwrap_or_else(fresh_queue_drain_id)
556 }
557
558 async fn stream_to_with_effect_host(
559 self,
560 events: &dyn TurnActivitySink,
561 effect_host: &dyn EffectHost,
562 ) -> Result<Option<TurnResult>> {
563 let drain_id = self.resolved_drain_id();
564 let scope =
565 lash_core::EffectScope::queue_drain(self.runtime.observe().session_id(), drain_id);
566 let scoped_effect_controller = effect_host.scoped(scope)?;
567 self.stream_to_with_scope(events, scoped_effect_controller)
568 .await
569 }
570
571 async fn stream_to_with_effect_controller(
572 self,
573 events: &dyn TurnActivitySink,
574 controller: &dyn RuntimeEffectController,
575 ) -> Result<Option<TurnResult>> {
576 let drain_id = self.resolved_drain_id();
577 let scope =
578 lash_core::EffectScope::queue_drain(self.runtime.observe().session_id(), drain_id);
579 let scoped_effect_controller = ScopedEffectController::borrowed(controller, scope)?;
580 self.stream_to_with_scope(events, scoped_effect_controller)
581 .await
582 }
583
584 async fn stream_to_with_scope(
585 self,
586 events: &dyn TurnActivitySink,
587 scoped_effect_controller: ScopedEffectController<'_>,
588 ) -> Result<Option<TurnResult>> {
589 let Self {
590 runtime,
591 effect_host: _,
592 cancel,
593 cancels,
594 batch_ids,
595 drain_id: _,
596 } = self;
597 let _cancel_guard = cancels.register(cancel.clone());
598 stream_next_queued_prepared_turn(
599 &runtime,
600 TurnSinks::turn(events),
601 scoped_effect_controller,
602 cancel,
603 &batch_ids,
604 )
605 .await
606 }
607}
608
609pub struct ScopedQueuedTurnBuilder<'run> {
610 builder: QueuedTurnBuilder,
611 controller: &'run dyn RuntimeEffectController,
612}
613
614impl<'run> ScopedQueuedTurnBuilder<'run> {
615 pub fn cancel(mut self, cancel: CancellationToken) -> Self {
616 self.builder = self.builder.cancel(cancel);
617 self
618 }
619
620 pub fn batch_ids(mut self, batch_ids: impl IntoIterator<Item = impl Into<String>>) -> Self {
621 self.builder = self.builder.batch_ids(batch_ids);
622 self
623 }
624
625 pub fn drain_id(mut self, drain_id: impl Into<String>) -> Self {
626 self.builder = self.builder.drain_id(drain_id);
627 self
628 }
629
630 pub async fn run(self) -> Result<Option<TurnOutput>> {
631 let collector = RunActivityCollector::default();
632 let Some(result) = self.stream_to(&collector).await? else {
633 return Ok(None);
634 };
635 Ok(Some(TurnOutput {
636 result,
637 activities: collector.into_activities(),
638 }))
639 }
640
641 pub async fn stream_to(self, events: &dyn TurnActivitySink) -> Result<Option<TurnResult>> {
642 self.builder
643 .stream_to_with_effect_controller(events, self.controller)
644 .await
645 }
646}
647
648fn fresh_turn_id() -> String {
649 lash_core::TurnActivityId::fresh().0
650}
651
652fn fresh_queue_drain_id() -> String {
653 format!("queue-drain:{}", fresh_turn_id())
654}
655
656fn trace_turn_id_for_scope(
657 builder: &TurnBuilder,
658 scoped_effect_controller: &ScopedEffectController<'_>,
659) -> Option<String> {
660 if scoped_effect_controller
661 .effect_scope()
662 .validates_turn_trace_id()
663 {
664 Some(
665 builder
666 .turn_id
667 .clone()
668 .unwrap_or_else(|| scoped_effect_controller.scope_id().to_string()),
669 )
670 } else {
671 builder
672 .turn_id
673 .clone()
674 .or_else(|| builder.input.trace_turn_id.clone())
675 }
676}
677
678fn reject_configured_durable_effect_host(
679 effect_host: &dyn EffectHost,
680 operation: &'static str,
681) -> Result<()> {
682 if effect_host.durability_tier() == DurabilityTier::Durable {
683 return Err(EmbedError::DurableEffectHostRequiresHandlerContext { operation });
684 }
685 Ok(())
686}
687
688pub(crate) async fn stream_next_queued_prepared_turn(
689 runtime: &RuntimeHandle,
690 sinks: TurnSinks<'_>,
691 scoped_effect_controller: ScopedEffectController<'_>,
692 cancel: CancellationToken,
693 batch_ids: &[String],
694) -> Result<Option<TurnResult>> {
695 let turn = Box::pin(stream_next_queued_prepared_assembled(
696 runtime,
697 sinks,
698 scoped_effect_controller,
699 cancel,
700 batch_ids,
701 ))
702 .await?;
703 Ok(turn.map(TurnResult::from_assembled))
704}
705
706pub(crate) async fn stream_next_queued_prepared_assembled(
707 runtime: &RuntimeHandle,
708 sinks: TurnSinks<'_>,
709 scoped_effect_controller: ScopedEffectController<'_>,
710 cancel: CancellationToken,
711 batch_ids: &[String],
712) -> Result<Option<AssembledTurn>> {
713 let writer_handle = runtime.writer();
714 let mut writer = writer_handle.lock().await;
715 let observation_sink = SessionObservationTurnActivitySink {
716 runtime: runtime.clone(),
717 live: sinks.turn_events(),
718 };
719 let opts = turn_options(
720 sinks.events(),
721 &observation_sink,
722 scoped_effect_controller,
723 cancel,
724 );
725 let turn = if batch_ids.is_empty() {
726 writer.stream_next_queued_work(opts).await?
727 } else {
728 writer.stream_selected_queued_work(opts, batch_ids).await?
729 };
730 runtime.publish_from(&writer);
731 Ok(turn)
732}
733
734fn turn_options<'a>(
735 events: Option<&'a dyn EventSink>,
736 turn_events: &'a dyn TurnActivitySink,
737 scoped_effect_controller: ScopedEffectController<'a>,
738 cancel: CancellationToken,
739) -> lash_core::TurnOptions<'a> {
740 let mut opts = lash_core::TurnOptions::new(cancel, scoped_effect_controller);
741 if let Some(events) = events {
742 opts = opts.with_events(events);
743 }
744 opts.with_turn_events(turn_events)
745}
746
747struct SessionObservationTurnActivitySink<'a> {
748 runtime: RuntimeHandle,
749 live: Option<&'a dyn TurnActivitySink>,
750}
751
752#[async_trait]
753impl TurnActivitySink for SessionObservationTurnActivitySink<'_> {
754 fn is_noop(&self) -> bool {
755 false
756 }
757
758 async fn emit(&self, activity: TurnActivity) {
759 self.runtime.record_turn_activity(activity.clone());
760 if let Some(live) = self.live {
761 live.emit(activity).await;
762 }
763 }
764}
765
766struct ChannelTurnActivitySink {
767 tx: mpsc::Sender<Result<TurnActivity>>,
768}
769
770#[async_trait]
771impl TurnActivitySink for ChannelTurnActivitySink {
772 async fn emit(&self, activity: TurnActivity) {
773 let _ = self.tx.send(Ok(activity)).await;
774 }
775}
776fn validate_required_plugin_inputs(
777 active_plugins: &[ActivePluginBinding],
778 input: &TurnInput,
779) -> Result<()> {
780 for plugin in active_plugins {
781 if plugin.requires_turn_input && !input.turn_context.has_plugin_input(plugin.id) {
782 return Err(EmbedError::MissingPluginTurnInput {
783 plugin_id: plugin.id,
784 });
785 }
786 }
787 Ok(())
788}
789
790pub(crate) async fn stream_prepared_turn(
791 runtime: &RuntimeHandle,
792 input: TurnInput,
793 sinks: TurnSinks<'_>,
794 scoped_effect_controller: ScopedEffectController<'_>,
795 cancel: CancellationToken,
796) -> Result<TurnResult> {
797 let turn = Box::pin(stream_prepared_assembled(
798 runtime,
799 input,
800 sinks,
801 scoped_effect_controller,
802 cancel,
803 ))
804 .await?;
805 Ok(TurnResult::from_assembled(turn))
806}
807
808pub(crate) async fn stream_prepared_assembled(
809 runtime: &RuntimeHandle,
810 input: TurnInput,
811 sinks: TurnSinks<'_>,
812 scoped_effect_controller: ScopedEffectController<'_>,
813 cancel: CancellationToken,
814) -> Result<AssembledTurn> {
815 let turn = Box::pin(stream_prepared_agent_frame_run(
816 runtime,
817 input,
818 sinks,
819 scoped_effect_controller,
820 cancel,
821 ))
822 .await?;
823 turn.into_final_turn().ok_or_else(|| {
824 EmbedError::Runtime(lash_core::RuntimeError::new(
825 RuntimeErrorCode::EmptyAgentFrameRun,
826 "runtime completed without an assembled turn",
827 ))
828 })
829}
830
831pub(crate) async fn stream_prepared_agent_frame_run(
832 runtime: &RuntimeHandle,
833 input: TurnInput,
834 sinks: TurnSinks<'_>,
835 scoped_effect_controller: ScopedEffectController<'_>,
836 cancel: CancellationToken,
837) -> Result<lash_core::AgentFrameRun> {
838 let writer_handle = runtime.writer();
839 let mut writer = writer_handle.lock().await;
840 if let Some(extension) = input.protocol_extension.as_ref() {
841 writer
842 .validate_protocol_turn_extension(extension)
843 .await
844 .map_err(EmbedError::Session)?;
845 }
846 let observation_sink = SessionObservationTurnActivitySink {
847 runtime: runtime.clone(),
848 live: sinks.turn_events(),
849 };
850 let turn = Box::pin(writer.stream_turn_with_agent_frames(
851 input,
852 turn_options(
853 sinks.events(),
854 &observation_sink,
855 scoped_effect_controller,
856 cancel,
857 ),
858 ))
859 .await?;
860 runtime.publish_from(&writer);
861 Ok(turn)
862}
863
864#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
865pub struct TurnResult {
866 pub state: SessionSnapshot,
867 pub outcome: TurnOutcome,
868 pub assistant_output: AssistantOutput,
869 pub usage: TokenUsage,
873 #[serde(default)]
877 pub children_usage: Vec<TokenLedgerEntry>,
878 pub tool_calls: Vec<ToolCallRecord>,
879 pub execution: ExecutionSummary,
880 pub errors: Vec<TurnIssue>,
881}
882
883impl TurnResult {
884 fn from_assembled(turn: lash_core::AssembledTurn) -> Self {
885 Self {
886 state: turn.state,
887 outcome: turn.outcome,
888 assistant_output: turn.assistant_output,
889 usage: turn.token_usage,
890 children_usage: turn.children_usage,
891 tool_calls: turn.tool_calls,
892 execution: turn.execution,
893 errors: turn.errors,
894 }
895 }
896
897 pub fn total_usage(&self) -> TokenUsage {
900 let mut total = self.usage.clone();
901 for entry in &self.children_usage {
902 total.add(&entry.usage);
903 }
904 total
905 }
906
907 pub fn assistant_message(&self) -> Option<&str> {
908 match &self.outcome {
909 TurnOutcome::Finished(lash_core::TurnFinish::AssistantMessage { text }) => Some(text),
910 _ => None,
911 }
912 }
913
914 pub fn submitted_value(&self) -> Option<&serde_json::Value> {
915 match &self.outcome {
916 TurnOutcome::Finished(lash_core::TurnFinish::SubmittedValue { value }) => Some(value),
917 _ => None,
918 }
919 }
920
921 pub fn tool_value(&self) -> Option<(&str, &serde_json::Value)> {
922 match &self.outcome {
923 TurnOutcome::Finished(lash_core::TurnFinish::ToolValue { tool_name, value }) => {
924 Some((tool_name.as_str(), value))
925 }
926 _ => None,
927 }
928 }
929
930 pub fn is_success(&self) -> bool {
931 matches!(
932 self.outcome,
933 TurnOutcome::Finished(_) | TurnOutcome::AgentFrameSwitch { .. }
934 )
935 }
936}
937
938#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
939pub struct TurnOutput {
940 pub result: TurnResult,
941 pub activities: Vec<TurnActivity>,
942}
943
944impl TurnOutput {
945 pub fn assistant_message(&self) -> Option<&str> {
946 self.result.assistant_message()
947 }
948
949 pub fn submitted_value(&self) -> Option<&serde_json::Value> {
950 self.result.submitted_value()
951 }
952
953 pub fn tool_value(&self) -> Option<(&str, &serde_json::Value)> {
954 self.result.tool_value()
955 }
956
957 pub fn is_success(&self) -> bool {
958 self.result.is_success()
959 }
960}
961
962struct BorrowedTurnActivityFanout<'a> {
963 live: &'a dyn TurnActivitySink,
964 collector: &'a RunActivityCollector,
965}
966
967#[async_trait]
968impl TurnActivitySink for BorrowedTurnActivityFanout<'_> {
969 async fn emit(&self, activity: TurnActivity) {
970 self.live.emit(activity.clone()).await;
971 self.collector.emit(activity).await;
972 }
973}
974
975#[derive(Default)]
976pub(crate) struct RunActivityCollector {
977 activities: Arc<StdMutex<Vec<TurnActivity>>>,
978}
979
980impl RunActivityCollector {
981 fn into_activities(self) -> Vec<TurnActivity> {
982 self.activities
983 .lock()
984 .expect("run activity collector lock")
985 .clone()
986 }
987
988 #[cfg(test)]
989 pub(crate) fn snapshot(&self) -> Vec<TurnActivity> {
990 self.activities
991 .lock()
992 .expect("run activity collector lock")
993 .clone()
994 }
995}
996
997#[async_trait]
998impl TurnActivitySink for RunActivityCollector {
999 async fn emit(&self, activity: TurnActivity) {
1000 self.activities
1001 .lock()
1002 .expect("run activity collector lock")
1003 .push(activity);
1004 }
1005}
1006
1007pub struct TurnActivityFanout {
1008 sinks: Vec<Arc<dyn TurnActivitySink>>,
1009}
1010
1011impl TurnActivityFanout {
1012 pub fn new(sinks: impl IntoIterator<Item = Arc<dyn TurnActivitySink>>) -> Self {
1013 Self {
1014 sinks: sinks.into_iter().collect(),
1015 }
1016 }
1017}
1018
1019#[async_trait]
1020impl TurnActivitySink for TurnActivityFanout {
1021 async fn emit(&self, activity: TurnActivity) {
1022 for sink in &self.sinks {
1023 sink.emit(activity.clone()).await;
1024 }
1025 }
1026}
1027
1028pub fn message_text(message: &Message) -> String {
1029 message
1030 .parts
1031 .iter()
1032 .map(|part| part.content.as_str())
1033 .collect::<Vec<_>>()
1034 .join("\n")
1035}
1036
1037pub fn message_role(message: &Message) -> &'static str {
1038 match message.role {
1039 MessageRole::User => "user",
1040 MessageRole::Assistant => "assistant",
1041 MessageRole::System => "system",
1042 MessageRole::Event => "event",
1043 }
1044}