1use crate::support::*;
2
3pub use lash_core::{AssistantOutput, TurnIssue};
4
5#[derive(Clone, Copy, Default)]
12pub(crate) struct TurnSinks<'a> {
13 events: Option<&'a dyn EventSink>,
14 turn_events: Option<&'a dyn TurnActivitySink>,
15}
16
17impl<'a> TurnSinks<'a> {
18 pub(crate) fn turn(events: &'a dyn TurnActivitySink) -> Self {
19 Self {
20 events: None,
21 turn_events: Some(events),
22 }
23 }
24
25 pub(crate) fn session(events: &'a dyn EventSink) -> Self {
26 Self {
27 events: Some(events),
28 turn_events: None,
29 }
30 }
31
32 fn events(&self) -> Option<&'a dyn EventSink> {
33 self.events
34 }
35
36 fn turn_events(&self) -> Option<&'a dyn TurnActivitySink> {
37 self.turn_events
38 }
39}
40
41#[derive(Clone, Default)]
46pub(crate) struct TurnCancelRegistry {
47 inner: Arc<StdMutex<TurnCancelRegistryInner>>,
48}
49
50#[derive(Default)]
51struct TurnCancelRegistryInner {
52 next_id: u64,
53 active: BTreeMap<u64, CancellationToken>,
54}
55
56impl TurnCancelRegistry {
57 fn register(&self, token: CancellationToken) -> TurnCancelGuard {
60 let mut inner = self.inner.lock().expect("turn cancel registry");
61 let id = inner.next_id;
62 inner.next_id += 1;
63 inner.active.insert(id, token);
64 TurnCancelGuard {
65 registry: Arc::clone(&self.inner),
66 id,
67 }
68 }
69
70 pub(crate) fn cancel_all(&self) -> usize {
71 let inner = self.inner.lock().expect("turn cancel registry");
72 for token in inner.active.values() {
73 token.cancel();
74 }
75 inner.active.len()
76 }
77}
78
79pub(crate) struct TurnCancelGuard {
80 registry: Arc<StdMutex<TurnCancelRegistryInner>>,
81 id: u64,
82}
83
84impl Drop for TurnCancelGuard {
85 fn drop(&mut self) {
86 self.registry
87 .lock()
88 .expect("turn cancel registry")
89 .active
90 .remove(&self.id);
91 }
92}
93
94pub struct TurnBuilder {
95 pub(crate) runtime: RuntimeHandle,
96 pub(crate) effect_host: Arc<dyn EffectHost>,
97 pub(crate) active_plugins: Vec<ActivePluginBinding>,
98 pub(crate) input: TurnInput,
99 pub(crate) cancel: CancellationToken,
100 pub(crate) cancels: TurnCancelRegistry,
101 pub(crate) protocol_turn_options: Option<ProtocolTurnOptions>,
102 pub(crate) provider: Option<ProviderHandle>,
103 pub(crate) model: Option<ModelSpec>,
104 pub(crate) turn_id: Option<String>,
105}
106
107impl TurnBuilder {
108 pub fn cancel(mut self, cancel: CancellationToken) -> Self {
109 self.cancel = cancel;
110 self
111 }
112
113 pub fn protocol_turn_options(mut self, options: ProtocolTurnOptions) -> Self {
114 self.protocol_turn_options = Some(options);
115 self
116 }
117
118 pub fn provider(mut self, provider: ProviderHandle) -> Self {
119 self.provider = Some(provider);
120 self
121 }
122
123 pub fn model(mut self, model: ModelSpec) -> Self {
124 self.model = Some(model);
125 self
126 }
127
128 pub fn turn_id(mut self, id: impl Into<String>) -> Self {
129 self.turn_id = Some(id.into());
130 self
131 }
132
133 pub fn prompt_template(mut self, template: PromptTemplate) -> Self {
134 self.input.turn_context.set_prompt_template(template);
135 self
136 }
137
138 pub fn prompt_contribution(mut self, contribution: PromptContribution) -> Self {
139 self.input
140 .turn_context
141 .add_prompt_contribution(contribution);
142 self
143 }
144
145 pub fn replace_prompt_slot(
146 mut self,
147 slot: PromptSlot,
148 contributions: impl IntoIterator<Item = PromptContribution>,
149 ) -> Self {
150 self.input
151 .turn_context
152 .replace_prompt_slot(slot, contributions);
153 self
154 }
155
156 pub fn clear_prompt_slot(mut self, slot: PromptSlot) -> Self {
157 self.input.turn_context.clear_prompt_slot(slot);
158 self
159 }
160
161 pub fn prompt_layer(mut self, layer: PromptLayer) -> Self {
162 self.input.turn_context.set_prompt_layer(layer);
163 self
164 }
165
166 pub fn with_plugin_input<P: PluginBinding>(mut self, input: P::Input) -> Self {
172 self.input.turn_context.insert_plugin_input(P::ID, input);
173 self
174 }
175
176 pub fn effects(self, controller: &dyn RuntimeEffectController) -> ScopedTurnBuilder<'_> {
177 ScopedTurnBuilder {
178 builder: self,
179 controller,
180 }
181 }
182
183 pub async fn run(self) -> Result<TurnOutput> {
184 let collector = RunActivityCollector::default();
185 let result = self.stream_to(&collector).await?;
186 Ok(TurnOutput {
187 result,
188 activities: collector.into_activities(),
189 })
190 }
191
192 pub async fn stream_to(self, events: &dyn TurnActivitySink) -> Result<TurnResult> {
193 let effect_host = Arc::clone(&self.effect_host);
194 reject_configured_durable_effect_host(effect_host.as_ref(), "turn")?;
195 self.stream_to_with_effect_host(events, effect_host.as_ref())
196 .await
197 }
198
199 pub fn stream(self) -> Result<TurnStream> {
200 let effect_host = Arc::clone(&self.effect_host);
201 reject_configured_durable_effect_host(effect_host.as_ref(), "turn stream")?;
202 self.stream_with_effect_host(effect_host.as_ref())
203 }
204
205 pub fn advanced(self) -> AdvancedTurn {
208 AdvancedTurn { builder: self }
209 }
210
211 fn resolved_turn_id(&self) -> String {
212 self.turn_id
213 .clone()
214 .or_else(|| self.input.trace_turn_id.clone())
215 .unwrap_or_else(fresh_turn_id)
216 }
217
218 fn turn_scope(&self, turn_id: &str) -> lash_core::ExecutionScope {
219 lash_core::ExecutionScope::turn(self.runtime.observe().session_id(), turn_id)
220 }
221
222 pub(crate) fn prepare(
223 mut self,
224 trace_turn_id: Option<String>,
225 ) -> Result<(RuntimeHandle, TurnInput, CancellationToken, TurnCancelGuard)> {
226 if let Some(options) = self.protocol_turn_options {
227 self.input.protocol_turn_options = Some(options);
228 }
229 if let Some(provider) = self.provider {
230 self.input.turn_context.set_provider(provider);
231 }
232 if let Some(model) = self.model {
233 self.input.turn_context.set_model(model);
234 }
235 if let Some(trace_turn_id) = trace_turn_id {
236 self.input.trace_turn_id = Some(trace_turn_id);
237 }
238 validate_required_plugin_inputs(&self.active_plugins, &self.input)?;
239 let cancel_guard = self.cancels.register(self.cancel.clone());
240 Ok((self.runtime, self.input, self.cancel, cancel_guard))
241 }
242
243 async fn stream_to_with_effect_host(
244 self,
245 events: &dyn TurnActivitySink,
246 effect_host: &dyn EffectHost,
247 ) -> Result<TurnResult> {
248 let turn_id = self.resolved_turn_id();
249 let scoped_effect_controller = effect_host.scoped(self.turn_scope(&turn_id))?;
250 self.stream_to_with_scope(events, scoped_effect_controller, Some(turn_id))
251 .await
252 }
253
254 async fn stream_to_with_effect_controller(
255 self,
256 events: &dyn TurnActivitySink,
257 controller: &dyn RuntimeEffectController,
258 ) -> Result<TurnResult> {
259 let turn_id = self.resolved_turn_id();
260 let scoped_effect_controller =
261 ScopedEffectController::borrowed(controller, self.turn_scope(&turn_id))?;
262 self.stream_to_with_scope(events, scoped_effect_controller, Some(turn_id))
263 .await
264 }
265
266 async fn stream_to_with_scope(
267 self,
268 events: &dyn TurnActivitySink,
269 scoped_effect_controller: ScopedEffectController<'_>,
270 trace_turn_id: Option<String>,
271 ) -> Result<TurnResult> {
272 let (runtime, input, cancel, _cancel_guard) = self.prepare(trace_turn_id)?;
273 stream_prepared_turn(
274 &runtime,
275 input,
276 TurnSinks::turn(events),
277 scoped_effect_controller,
278 cancel,
279 )
280 .await
281 }
282
283 fn stream_with_effect_host(self, effect_host: &dyn EffectHost) -> Result<TurnStream> {
284 let turn_id = self.resolved_turn_id();
285 let scoped_effect_controller = effect_host
286 .scoped_static(self.turn_scope(&turn_id))?
287 .ok_or(EmbedError::StaticTurnStreamRequiresStaticEffectHost)?;
288 self.stream_with_scope(scoped_effect_controller, Some(turn_id))
289 }
290
291 fn stream_with_scope(
292 self,
293 scoped_effect_controller: ScopedEffectController<'static>,
294 trace_turn_id: Option<String>,
295 ) -> Result<TurnStream> {
296 let (runtime, input, cancel, cancel_guard) = self.prepare(trace_turn_id)?;
297 let (tx, rx) = mpsc::channel(64);
298 let sink = ChannelTurnActivitySink { tx };
299 let completion = tokio::spawn(async move {
300 let _cancel_guard = cancel_guard;
301 stream_prepared_turn(
302 &runtime,
303 input,
304 TurnSinks::turn(&sink),
305 scoped_effect_controller,
306 cancel,
307 )
308 .await
309 });
310 Ok(TurnStream {
311 activities: rx,
312 completion,
313 })
314 }
315}
316
317pub struct ScopedTurnBuilder<'run> {
318 builder: TurnBuilder,
319 controller: &'run dyn RuntimeEffectController,
320}
321
322impl<'run> ScopedTurnBuilder<'run> {
323 pub fn cancel(mut self, cancel: CancellationToken) -> Self {
324 self.builder = self.builder.cancel(cancel);
325 self
326 }
327
328 pub fn protocol_turn_options(mut self, options: ProtocolTurnOptions) -> Self {
329 self.builder = self.builder.protocol_turn_options(options);
330 self
331 }
332
333 pub fn provider(mut self, provider: ProviderHandle) -> Self {
334 self.builder = self.builder.provider(provider);
335 self
336 }
337
338 pub fn model(mut self, model: ModelSpec) -> Self {
339 self.builder = self.builder.model(model);
340 self
341 }
342
343 pub fn turn_id(mut self, id: impl Into<String>) -> Self {
344 self.builder = self.builder.turn_id(id);
345 self
346 }
347
348 pub fn prompt_template(mut self, template: PromptTemplate) -> Self {
349 self.builder = self.builder.prompt_template(template);
350 self
351 }
352
353 pub fn prompt_contribution(mut self, contribution: PromptContribution) -> Self {
354 self.builder = self.builder.prompt_contribution(contribution);
355 self
356 }
357
358 pub fn replace_prompt_slot(
359 mut self,
360 slot: PromptSlot,
361 contributions: impl IntoIterator<Item = PromptContribution>,
362 ) -> Self {
363 self.builder = self.builder.replace_prompt_slot(slot, contributions);
364 self
365 }
366
367 pub fn clear_prompt_slot(mut self, slot: PromptSlot) -> Self {
368 self.builder = self.builder.clear_prompt_slot(slot);
369 self
370 }
371
372 pub fn prompt_layer(mut self, layer: PromptLayer) -> Self {
373 self.builder = self.builder.prompt_layer(layer);
374 self
375 }
376
377 pub fn with_plugin_input<P: PluginBinding>(mut self, input: P::Input) -> Self {
378 self.builder = self.builder.with_plugin_input::<P>(input);
379 self
380 }
381
382 pub async fn run(self) -> Result<TurnOutput> {
383 let collector = RunActivityCollector::default();
384 let result = self.stream_to(&collector).await?;
385 Ok(TurnOutput {
386 result,
387 activities: collector.into_activities(),
388 })
389 }
390
391 pub async fn stream_to(self, events: &dyn TurnActivitySink) -> Result<TurnResult> {
392 self.builder
393 .stream_to_with_effect_controller(events, self.controller)
394 .await
395 }
396}
397
398pub struct AdvancedTurn {
405 builder: TurnBuilder,
406}
407
408impl AdvancedTurn {
409 pub async fn run_with_scope(
410 self,
411 scoped_effect_controller: ScopedEffectController<'_>,
412 ) -> Result<TurnOutput> {
413 let collector = RunActivityCollector::default();
414 let result = self
415 .stream_to_with_scope(&collector, scoped_effect_controller)
416 .await?;
417 Ok(TurnOutput {
418 result,
419 activities: collector.into_activities(),
420 })
421 }
422
423 pub async fn collect_with_scope(
424 self,
425 events: &dyn TurnActivitySink,
426 scoped_effect_controller: ScopedEffectController<'_>,
427 ) -> Result<TurnOutput> {
428 let collector = RunActivityCollector::default();
429 let fanout = BorrowedTurnActivityFanout {
430 live: events,
431 collector: &collector,
432 };
433 let result = self
434 .stream_to_with_scope(&fanout, scoped_effect_controller)
435 .await?;
436 Ok(TurnOutput {
437 result,
438 activities: collector.into_activities(),
439 })
440 }
441
442 pub async fn stream_to_with_scope(
443 self,
444 events: &dyn TurnActivitySink,
445 scoped_effect_controller: ScopedEffectController<'_>,
446 ) -> Result<TurnResult> {
447 let trace_turn_id = trace_turn_id_for_scope(&self.builder, &scoped_effect_controller);
448 self.builder
449 .stream_to_with_scope(events, scoped_effect_controller, trace_turn_id)
450 .await
451 }
452
453 pub fn stream_with_scope(
454 self,
455 scoped_effect_controller: ScopedEffectController<'static>,
456 ) -> Result<TurnStream> {
457 let trace_turn_id = trace_turn_id_for_scope(&self.builder, &scoped_effect_controller);
458 self.builder
459 .stream_with_scope(scoped_effect_controller, trace_turn_id)
460 }
461
462 pub async fn collect_session_events_with_scope(
464 self,
465 events: &dyn EventSink,
466 scoped_effect_controller: ScopedEffectController<'_>,
467 ) -> Result<TurnResult> {
468 let trace_turn_id = trace_turn_id_for_scope(&self.builder, &scoped_effect_controller);
469 let (runtime, input, cancel, _cancel_guard) = self.builder.prepare(trace_turn_id)?;
470 stream_prepared_turn(
471 &runtime,
472 input,
473 TurnSinks::session(events),
474 scoped_effect_controller,
475 cancel,
476 )
477 .await
478 }
479}
480
481pub struct TurnStream {
482 activities: mpsc::Receiver<Result<TurnActivity>>,
483 completion: JoinHandle<Result<TurnResult>>,
484}
485
486impl TurnStream {
487 pub async fn next_activity(&mut self) -> Option<Result<TurnActivity>> {
488 self.activities.recv().await
489 }
490
491 pub async fn finish(self) -> Result<TurnResult> {
492 self.completion.await.map_err(|err| {
493 EmbedError::Runtime(lash_core::RuntimeError::new(
494 RuntimeErrorCode::TurnStreamJoin,
495 format!("turn stream task failed: {err}"),
496 ))
497 })?
498 }
499}
500
501pub struct QueuedTurnBuilder {
502 pub(crate) runtime: RuntimeHandle,
503 pub(crate) effect_host: Arc<dyn EffectHost>,
504 pub(crate) cancel: CancellationToken,
505 pub(crate) cancels: TurnCancelRegistry,
506 pub(crate) batch_ids: Vec<String>,
507 pub(crate) drain_id: Option<String>,
508}
509
510impl QueuedTurnBuilder {
511 pub fn cancel(mut self, cancel: CancellationToken) -> Self {
512 self.cancel = cancel;
513 self
514 }
515
516 pub fn batch_ids(mut self, batch_ids: impl IntoIterator<Item = impl Into<String>>) -> Self {
517 self.batch_ids = batch_ids.into_iter().map(Into::into).collect();
518 self
519 }
520
521 pub fn drain_id(mut self, drain_id: impl Into<String>) -> Self {
522 self.drain_id = Some(drain_id.into());
523 self
524 }
525
526 pub fn effects(self, controller: &dyn RuntimeEffectController) -> ScopedQueuedTurnBuilder<'_> {
527 ScopedQueuedTurnBuilder {
528 builder: self,
529 controller,
530 }
531 }
532
533 pub async fn run(self) -> Result<Option<TurnOutput>> {
534 let collector = RunActivityCollector::default();
535 let Some(result) = self.stream_to(&collector).await? else {
536 return Ok(None);
537 };
538 Ok(Some(TurnOutput {
539 result,
540 activities: collector.into_activities(),
541 }))
542 }
543
544 pub async fn stream_to(self, events: &dyn TurnActivitySink) -> Result<Option<TurnResult>> {
545 let effect_host = Arc::clone(&self.effect_host);
546 reject_configured_durable_effect_host(effect_host.as_ref(), "queued turn")?;
547 self.stream_to_with_effect_host(events, effect_host.as_ref())
548 .await
549 }
550
551 pub fn advanced(self) -> AdvancedQueuedTurn {
552 AdvancedQueuedTurn { builder: self }
553 }
554
555 fn resolved_drain_id(&self) -> String {
556 self.drain_id
557 .clone()
558 .or_else(|| self.batch_ids.first().cloned())
559 .unwrap_or_else(fresh_queue_drain_id)
560 }
561
562 async fn stream_to_with_effect_host(
563 self,
564 events: &dyn TurnActivitySink,
565 effect_host: &dyn EffectHost,
566 ) -> Result<Option<TurnResult>> {
567 let drain_id = self.resolved_drain_id();
568 let scope =
569 lash_core::ExecutionScope::queue_drain(self.runtime.observe().session_id(), drain_id);
570 let scoped_effect_controller = effect_host.scoped(scope)?;
571 self.stream_to_with_scope(events, scoped_effect_controller)
572 .await
573 }
574
575 async fn stream_to_with_effect_controller(
576 self,
577 events: &dyn TurnActivitySink,
578 controller: &dyn RuntimeEffectController,
579 ) -> Result<Option<TurnResult>> {
580 let drain_id = self.resolved_drain_id();
581 let scope =
582 lash_core::ExecutionScope::queue_drain(self.runtime.observe().session_id(), drain_id);
583 let scoped_effect_controller = ScopedEffectController::borrowed(controller, scope)?;
584 self.stream_to_with_scope(events, scoped_effect_controller)
585 .await
586 }
587
588 async fn stream_to_with_scope(
589 self,
590 events: &dyn TurnActivitySink,
591 scoped_effect_controller: ScopedEffectController<'_>,
592 ) -> Result<Option<TurnResult>> {
593 let Self {
594 runtime,
595 effect_host: _,
596 cancel,
597 cancels,
598 batch_ids,
599 drain_id: _,
600 } = self;
601 let _cancel_guard = cancels.register(cancel.clone());
602 stream_next_queued_prepared_turn(
603 &runtime,
604 TurnSinks::turn(events),
605 scoped_effect_controller,
606 cancel,
607 &batch_ids,
608 )
609 .await
610 }
611}
612
613pub struct ScopedQueuedTurnBuilder<'run> {
614 builder: QueuedTurnBuilder,
615 controller: &'run dyn RuntimeEffectController,
616}
617
618impl<'run> ScopedQueuedTurnBuilder<'run> {
619 pub fn cancel(mut self, cancel: CancellationToken) -> Self {
620 self.builder = self.builder.cancel(cancel);
621 self
622 }
623
624 pub fn batch_ids(mut self, batch_ids: impl IntoIterator<Item = impl Into<String>>) -> Self {
625 self.builder = self.builder.batch_ids(batch_ids);
626 self
627 }
628
629 pub fn drain_id(mut self, drain_id: impl Into<String>) -> Self {
630 self.builder = self.builder.drain_id(drain_id);
631 self
632 }
633
634 pub async fn run(self) -> Result<Option<TurnOutput>> {
635 let collector = RunActivityCollector::default();
636 let Some(result) = self.stream_to(&collector).await? else {
637 return Ok(None);
638 };
639 Ok(Some(TurnOutput {
640 result,
641 activities: collector.into_activities(),
642 }))
643 }
644
645 pub async fn stream_to(self, events: &dyn TurnActivitySink) -> Result<Option<TurnResult>> {
646 self.builder
647 .stream_to_with_effect_controller(events, self.controller)
648 .await
649 }
650}
651
652pub struct AdvancedQueuedTurn {
653 builder: QueuedTurnBuilder,
654}
655
656impl AdvancedQueuedTurn {
657 pub async fn stream_to_with_scope(
658 self,
659 events: &dyn TurnActivitySink,
660 scoped_effect_controller: ScopedEffectController<'_>,
661 ) -> Result<Option<TurnResult>> {
662 self.builder
663 .stream_to_with_scope(events, scoped_effect_controller)
664 .await
665 }
666}
667
668fn fresh_turn_id() -> String {
669 lash_core::TurnActivityId::fresh().0
670}
671
672fn fresh_queue_drain_id() -> String {
673 format!("queue-drain:{}", fresh_turn_id())
674}
675
676fn trace_turn_id_for_scope(
677 builder: &TurnBuilder,
678 scoped_effect_controller: &ScopedEffectController<'_>,
679) -> Option<String> {
680 if scoped_effect_controller
681 .execution_scope()
682 .validates_turn_trace_id()
683 {
684 Some(
685 builder
686 .turn_id
687 .clone()
688 .unwrap_or_else(|| scoped_effect_controller.scope_id().to_string()),
689 )
690 } else {
691 builder
692 .turn_id
693 .clone()
694 .or_else(|| builder.input.trace_turn_id.clone())
695 }
696}
697
698fn reject_configured_durable_effect_host(
699 effect_host: &dyn EffectHost,
700 operation: &'static str,
701) -> Result<()> {
702 if effect_host.durability_tier() == DurabilityTier::Durable {
703 return Err(EmbedError::DurableEffectHostRequiresHandlerContext { operation });
704 }
705 Ok(())
706}
707
708pub(crate) async fn stream_next_queued_prepared_turn(
709 runtime: &RuntimeHandle,
710 sinks: TurnSinks<'_>,
711 scoped_effect_controller: ScopedEffectController<'_>,
712 cancel: CancellationToken,
713 batch_ids: &[String],
714) -> Result<Option<TurnResult>> {
715 let turn = Box::pin(stream_next_queued_prepared_assembled(
716 runtime,
717 sinks,
718 scoped_effect_controller,
719 cancel,
720 batch_ids,
721 ))
722 .await?;
723 Ok(turn.map(TurnResult::from_assembled))
724}
725
726pub(crate) async fn stream_next_queued_prepared_assembled(
727 runtime: &RuntimeHandle,
728 sinks: TurnSinks<'_>,
729 scoped_effect_controller: ScopedEffectController<'_>,
730 cancel: CancellationToken,
731 batch_ids: &[String],
732) -> Result<Option<AssembledTurn>> {
733 let writer_handle = runtime.writer();
734 let mut writer = writer_handle.lock().await;
735 let observation_sink = SessionObservationTurnActivitySink {
736 runtime: runtime.clone(),
737 live: sinks.turn_events(),
738 };
739 let opts = turn_options(
740 sinks.events(),
741 &observation_sink,
742 scoped_effect_controller,
743 cancel,
744 );
745 let turn = if batch_ids.is_empty() {
746 writer.stream_next_queued_work(opts).await?
747 } else {
748 writer.stream_selected_queued_work(opts, batch_ids).await?
749 };
750 runtime.publish_from(&writer);
751 Ok(turn)
752}
753
754fn turn_options<'a>(
755 events: Option<&'a dyn EventSink>,
756 turn_events: &'a dyn TurnActivitySink,
757 scoped_effect_controller: ScopedEffectController<'a>,
758 cancel: CancellationToken,
759) -> lash_core::TurnOptions<'a> {
760 let mut opts = lash_core::TurnOptions::new(cancel, scoped_effect_controller);
761 if let Some(events) = events {
762 opts = opts.with_events(events);
763 }
764 opts.with_turn_events(turn_events)
765}
766
767struct SessionObservationTurnActivitySink<'a> {
768 runtime: RuntimeHandle,
769 live: Option<&'a dyn TurnActivitySink>,
770}
771
772#[async_trait]
773impl TurnActivitySink for SessionObservationTurnActivitySink<'_> {
774 fn is_noop(&self) -> bool {
775 false
776 }
777
778 async fn emit(&self, activity: TurnActivity) {
779 self.runtime.record_turn_activity(activity.clone());
780 if let Some(live) = self.live {
781 live.emit(activity).await;
782 }
783 }
784}
785
786struct ChannelTurnActivitySink {
787 tx: mpsc::Sender<Result<TurnActivity>>,
788}
789
790#[async_trait]
791impl TurnActivitySink for ChannelTurnActivitySink {
792 async fn emit(&self, activity: TurnActivity) {
793 let _ = self.tx.send(Ok(activity)).await;
794 }
795}
796fn validate_required_plugin_inputs(
797 active_plugins: &[ActivePluginBinding],
798 input: &TurnInput,
799) -> Result<()> {
800 for plugin in active_plugins {
801 if plugin.requires_turn_input && !input.turn_context.has_plugin_input(plugin.id) {
802 return Err(EmbedError::MissingPluginTurnInput {
803 plugin_id: plugin.id,
804 });
805 }
806 }
807 Ok(())
808}
809
810pub(crate) async fn stream_prepared_turn(
811 runtime: &RuntimeHandle,
812 input: TurnInput,
813 sinks: TurnSinks<'_>,
814 scoped_effect_controller: ScopedEffectController<'_>,
815 cancel: CancellationToken,
816) -> Result<TurnResult> {
817 let turn = Box::pin(stream_prepared_assembled(
818 runtime,
819 input,
820 sinks,
821 scoped_effect_controller,
822 cancel,
823 ))
824 .await?;
825 Ok(TurnResult::from_assembled(turn))
826}
827
828pub(crate) async fn stream_prepared_assembled(
829 runtime: &RuntimeHandle,
830 input: TurnInput,
831 sinks: TurnSinks<'_>,
832 scoped_effect_controller: ScopedEffectController<'_>,
833 cancel: CancellationToken,
834) -> Result<AssembledTurn> {
835 let turn = Box::pin(stream_prepared_agent_frame_run(
836 runtime,
837 input,
838 sinks,
839 scoped_effect_controller,
840 cancel,
841 ))
842 .await?;
843 turn.into_final_turn().ok_or_else(|| {
844 EmbedError::Runtime(lash_core::RuntimeError::new(
845 RuntimeErrorCode::EmptyAgentFrameRun,
846 "runtime completed without an assembled turn",
847 ))
848 })
849}
850
851pub(crate) async fn stream_prepared_agent_frame_run(
852 runtime: &RuntimeHandle,
853 input: TurnInput,
854 sinks: TurnSinks<'_>,
855 scoped_effect_controller: ScopedEffectController<'_>,
856 cancel: CancellationToken,
857) -> Result<lash_core::AgentFrameRun> {
858 let writer_handle = runtime.writer();
859 let mut writer = writer_handle.lock().await;
860 if let Some(extension) = input.protocol_extension.as_ref() {
861 writer
862 .validate_protocol_turn_extension(extension)
863 .await
864 .map_err(EmbedError::Session)?;
865 }
866 let observation_sink = SessionObservationTurnActivitySink {
867 runtime: runtime.clone(),
868 live: sinks.turn_events(),
869 };
870 let turn = Box::pin(writer.stream_turn_with_agent_frames(
871 input,
872 turn_options(
873 sinks.events(),
874 &observation_sink,
875 scoped_effect_controller,
876 cancel,
877 ),
878 ))
879 .await?;
880 runtime.publish_from(&writer);
881 Ok(turn)
882}
883
884#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
885pub struct TurnResult {
886 pub state: SessionSnapshot,
887 pub outcome: TurnOutcome,
888 pub assistant_output: AssistantOutput,
889 pub usage: TokenUsage,
893 #[serde(default)]
897 pub children_usage: Vec<TokenLedgerEntry>,
898 pub tool_calls: Vec<ToolCallRecord>,
899 pub execution: ExecutionSummary,
900 pub errors: Vec<TurnIssue>,
901}
902
903impl TurnResult {
904 fn from_assembled(turn: lash_core::AssembledTurn) -> Self {
905 Self {
906 state: turn.state,
907 outcome: turn.outcome,
908 assistant_output: turn.assistant_output,
909 usage: turn.token_usage,
910 children_usage: turn.children_usage,
911 tool_calls: turn.tool_calls,
912 execution: turn.execution,
913 errors: turn.errors,
914 }
915 }
916
917 pub fn total_usage(&self) -> TokenUsage {
920 let mut total = self.usage.clone();
921 for entry in &self.children_usage {
922 total.add(&entry.usage);
923 }
924 total
925 }
926
927 pub fn assistant_message(&self) -> Option<&str> {
928 match &self.outcome {
929 TurnOutcome::Finished(lash_core::TurnFinish::AssistantMessage { text }) => Some(text),
930 _ => None,
931 }
932 }
933
934 pub fn submitted_value(&self) -> Option<&serde_json::Value> {
935 match &self.outcome {
936 TurnOutcome::Finished(lash_core::TurnFinish::SubmittedValue { value }) => Some(value),
937 _ => None,
938 }
939 }
940
941 pub fn tool_value(&self) -> Option<(&str, &serde_json::Value)> {
942 match &self.outcome {
943 TurnOutcome::Finished(lash_core::TurnFinish::ToolValue { tool_name, value }) => {
944 Some((tool_name.as_str(), value))
945 }
946 _ => None,
947 }
948 }
949
950 pub fn is_success(&self) -> bool {
951 matches!(
952 self.outcome,
953 TurnOutcome::Finished(_) | TurnOutcome::AgentFrameSwitch { .. }
954 )
955 }
956}
957
958#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
959pub struct TurnOutput {
960 pub result: TurnResult,
961 pub activities: Vec<TurnActivity>,
962}
963
964impl TurnOutput {
965 pub fn assistant_message(&self) -> Option<&str> {
966 self.result.assistant_message()
967 }
968
969 pub fn submitted_value(&self) -> Option<&serde_json::Value> {
970 self.result.submitted_value()
971 }
972
973 pub fn tool_value(&self) -> Option<(&str, &serde_json::Value)> {
974 self.result.tool_value()
975 }
976
977 pub fn is_success(&self) -> bool {
978 self.result.is_success()
979 }
980}
981
982struct BorrowedTurnActivityFanout<'a> {
983 live: &'a dyn TurnActivitySink,
984 collector: &'a RunActivityCollector,
985}
986
987#[async_trait]
988impl TurnActivitySink for BorrowedTurnActivityFanout<'_> {
989 async fn emit(&self, activity: TurnActivity) {
990 self.live.emit(activity.clone()).await;
991 self.collector.emit(activity).await;
992 }
993}
994
995#[derive(Default)]
996pub(crate) struct RunActivityCollector {
997 activities: Arc<StdMutex<Vec<TurnActivity>>>,
998}
999
1000impl RunActivityCollector {
1001 fn into_activities(self) -> Vec<TurnActivity> {
1002 self.activities
1003 .lock()
1004 .expect("run activity collector lock")
1005 .clone()
1006 }
1007
1008 #[cfg(test)]
1009 pub(crate) fn snapshot(&self) -> Vec<TurnActivity> {
1010 self.activities
1011 .lock()
1012 .expect("run activity collector lock")
1013 .clone()
1014 }
1015}
1016
1017#[async_trait]
1018impl TurnActivitySink for RunActivityCollector {
1019 async fn emit(&self, activity: TurnActivity) {
1020 self.activities
1021 .lock()
1022 .expect("run activity collector lock")
1023 .push(activity);
1024 }
1025}
1026
1027pub struct TurnActivityFanout {
1028 sinks: Vec<Arc<dyn TurnActivitySink>>,
1029}
1030
1031impl TurnActivityFanout {
1032 pub fn new(sinks: impl IntoIterator<Item = Arc<dyn TurnActivitySink>>) -> Self {
1033 Self {
1034 sinks: sinks.into_iter().collect(),
1035 }
1036 }
1037}
1038
1039#[async_trait]
1040impl TurnActivitySink for TurnActivityFanout {
1041 async fn emit(&self, activity: TurnActivity) {
1042 for sink in &self.sinks {
1043 sink.emit(activity.clone()).await;
1044 }
1045 }
1046}
1047
1048pub fn message_text(message: &Message) -> String {
1049 message
1050 .parts
1051 .iter()
1052 .map(|part| part.content.as_str())
1053 .collect::<Vec<_>>()
1054 .join("\n")
1055}
1056
1057pub fn message_role(message: &Message) -> &'static str {
1058 match message.role {
1059 MessageRole::User => "user",
1060 MessageRole::Assistant => "assistant",
1061 MessageRole::System => "system",
1062 MessageRole::Event => "event",
1063 }
1064}