1use crate::support::*;
2
3pub use lash_core::{AssistantOutput, TurnIssue};
4
5#[derive(Clone, Copy, Default)]
12pub(crate) struct TurnSinks<'a> {
13 events: Option<&'a dyn EventSink>,
14 turn_events: Option<&'a dyn TurnActivitySink>,
15}
16
17impl<'a> TurnSinks<'a> {
18 pub(crate) fn turn(events: &'a dyn TurnActivitySink) -> Self {
19 Self {
20 events: None,
21 turn_events: Some(events),
22 }
23 }
24
25 pub(crate) fn session(events: &'a dyn EventSink) -> Self {
26 Self {
27 events: Some(events),
28 turn_events: None,
29 }
30 }
31
32 fn events(&self) -> Option<&'a dyn EventSink> {
33 self.events
34 }
35
36 fn turn_events(&self) -> Option<&'a dyn TurnActivitySink> {
37 self.turn_events
38 }
39}
40
41pub struct TurnBuilder {
42 pub(crate) runtime: RuntimeHandle,
43 pub(crate) effect_host: Arc<dyn EffectHost>,
44 pub(crate) active_plugins: Vec<ActivePluginBinding>,
45 pub(crate) input: TurnInput,
46 pub(crate) cancel: CancellationToken,
47 pub(crate) protocol_turn_options: Option<ProtocolTurnOptions>,
48 pub(crate) provider: Option<ProviderHandle>,
49 pub(crate) model: Option<ModelSpec>,
50 pub(crate) turn_id: Option<String>,
51}
52
53impl TurnBuilder {
54 pub fn cancel(mut self, cancel: CancellationToken) -> Self {
55 self.cancel = cancel;
56 self
57 }
58
59 pub fn protocol_turn_options(mut self, options: ProtocolTurnOptions) -> Self {
60 self.protocol_turn_options = Some(options);
61 self
62 }
63
64 pub fn provider(mut self, provider: ProviderHandle) -> Self {
65 self.provider = Some(provider);
66 self
67 }
68
69 pub fn model(mut self, model: ModelSpec) -> Self {
70 self.model = Some(model);
71 self
72 }
73
74 pub fn turn_id(mut self, id: impl Into<String>) -> Self {
75 self.turn_id = Some(id.into());
76 self
77 }
78
79 pub fn prompt_template(mut self, template: PromptTemplate) -> Self {
80 self.input.turn_context.set_prompt_template(template);
81 self
82 }
83
84 pub fn prompt_contribution(mut self, contribution: PromptContribution) -> Self {
85 self.input
86 .turn_context
87 .add_prompt_contribution(contribution);
88 self
89 }
90
91 pub fn replace_prompt_slot(
92 mut self,
93 slot: PromptSlot,
94 contributions: impl IntoIterator<Item = PromptContribution>,
95 ) -> Self {
96 self.input
97 .turn_context
98 .replace_prompt_slot(slot, contributions);
99 self
100 }
101
102 pub fn clear_prompt_slot(mut self, slot: PromptSlot) -> Self {
103 self.input.turn_context.clear_prompt_slot(slot);
104 self
105 }
106
107 pub fn prompt_layer(mut self, layer: PromptLayer) -> Self {
108 self.input.turn_context.set_prompt_layer(layer);
109 self
110 }
111
112 pub fn with_plugin_input<P: PluginBinding>(mut self, input: P::Input) -> Self {
118 self.input.turn_context.insert_plugin_input(P::ID, input);
119 self
120 }
121
122 pub fn effects(self, controller: &dyn RuntimeEffectController) -> ScopedTurnBuilder<'_> {
123 ScopedTurnBuilder {
124 builder: self,
125 controller,
126 }
127 }
128
129 pub async fn run(self) -> Result<TurnOutput> {
130 let collector = RunActivityCollector::default();
131 let result = self.stream_to(&collector).await?;
132 Ok(TurnOutput {
133 result,
134 activities: collector.into_activities(),
135 })
136 }
137
138 pub async fn stream_to(self, events: &dyn TurnActivitySink) -> Result<TurnResult> {
139 let effect_host = Arc::clone(&self.effect_host);
140 reject_configured_durable_effect_host(effect_host.as_ref(), "turn")?;
141 self.stream_to_with_effect_host(events, effect_host.as_ref())
142 .await
143 }
144
145 pub fn stream(self) -> Result<TurnStream> {
146 let effect_host = Arc::clone(&self.effect_host);
147 reject_configured_durable_effect_host(effect_host.as_ref(), "turn stream")?;
148 self.stream_with_effect_host(effect_host.as_ref())
149 }
150
151 pub fn advanced(self) -> AdvancedTurn {
154 AdvancedTurn { builder: self }
155 }
156
157 fn resolved_turn_id(&self) -> String {
158 self.turn_id
159 .clone()
160 .or_else(|| self.input.trace_turn_id.clone())
161 .unwrap_or_else(fresh_turn_id)
162 }
163
164 fn turn_scope(&self, turn_id: &str) -> lash_core::EffectScope {
165 lash_core::EffectScope::turn(self.runtime.observe().session_id(), turn_id)
166 }
167
168 pub(crate) fn prepare(
169 mut self,
170 trace_turn_id: Option<String>,
171 ) -> Result<(RuntimeHandle, TurnInput, CancellationToken)> {
172 if let Some(options) = self.protocol_turn_options {
173 self.input.protocol_turn_options = Some(options);
174 }
175 if let Some(provider) = self.provider {
176 self.input.turn_context.set_provider(provider);
177 }
178 if let Some(model) = self.model {
179 self.input.turn_context.set_model(model);
180 }
181 if let Some(trace_turn_id) = trace_turn_id {
182 self.input.trace_turn_id = Some(trace_turn_id);
183 }
184 validate_required_plugin_inputs(&self.active_plugins, &self.input)?;
185 Ok((self.runtime, self.input, self.cancel))
186 }
187
188 async fn stream_to_with_effect_host(
189 self,
190 events: &dyn TurnActivitySink,
191 effect_host: &dyn EffectHost,
192 ) -> Result<TurnResult> {
193 let turn_id = self.resolved_turn_id();
194 let scoped_effect_controller = effect_host.scoped(self.turn_scope(&turn_id))?;
195 self.stream_to_with_scope(events, scoped_effect_controller, Some(turn_id))
196 .await
197 }
198
199 async fn stream_to_with_effect_controller(
200 self,
201 events: &dyn TurnActivitySink,
202 controller: &dyn RuntimeEffectController,
203 ) -> Result<TurnResult> {
204 let turn_id = self.resolved_turn_id();
205 let scoped_effect_controller =
206 ScopedEffectController::borrowed(controller, self.turn_scope(&turn_id))?;
207 self.stream_to_with_scope(events, scoped_effect_controller, Some(turn_id))
208 .await
209 }
210
211 async fn stream_to_with_scope(
212 self,
213 events: &dyn TurnActivitySink,
214 scoped_effect_controller: ScopedEffectController<'_>,
215 trace_turn_id: Option<String>,
216 ) -> Result<TurnResult> {
217 let (runtime, input, cancel) = self.prepare(trace_turn_id)?;
218 stream_prepared_turn(
219 &runtime,
220 input,
221 TurnSinks::turn(events),
222 scoped_effect_controller,
223 cancel,
224 )
225 .await
226 }
227
228 fn stream_with_effect_host(self, effect_host: &dyn EffectHost) -> Result<TurnStream> {
229 let turn_id = self.resolved_turn_id();
230 let scoped_effect_controller = effect_host
231 .scoped_static(self.turn_scope(&turn_id))?
232 .ok_or(EmbedError::StaticTurnStreamRequiresStaticEffectHost)?;
233 self.stream_with_scope(scoped_effect_controller, Some(turn_id))
234 }
235
236 fn stream_with_scope(
237 self,
238 scoped_effect_controller: ScopedEffectController<'static>,
239 trace_turn_id: Option<String>,
240 ) -> Result<TurnStream> {
241 let (runtime, input, cancel) = self.prepare(trace_turn_id)?;
242 let (tx, rx) = mpsc::channel(64);
243 let sink = ChannelTurnActivitySink { tx };
244 let completion = tokio::spawn(async move {
245 stream_prepared_turn(
246 &runtime,
247 input,
248 TurnSinks::turn(&sink),
249 scoped_effect_controller,
250 cancel,
251 )
252 .await
253 });
254 Ok(TurnStream {
255 activities: rx,
256 completion,
257 })
258 }
259}
260
261pub struct ScopedTurnBuilder<'run> {
262 builder: TurnBuilder,
263 controller: &'run dyn RuntimeEffectController,
264}
265
266impl<'run> ScopedTurnBuilder<'run> {
267 pub fn cancel(mut self, cancel: CancellationToken) -> Self {
268 self.builder = self.builder.cancel(cancel);
269 self
270 }
271
272 pub fn protocol_turn_options(mut self, options: ProtocolTurnOptions) -> Self {
273 self.builder = self.builder.protocol_turn_options(options);
274 self
275 }
276
277 pub fn provider(mut self, provider: ProviderHandle) -> Self {
278 self.builder = self.builder.provider(provider);
279 self
280 }
281
282 pub fn model(mut self, model: ModelSpec) -> Self {
283 self.builder = self.builder.model(model);
284 self
285 }
286
287 pub fn turn_id(mut self, id: impl Into<String>) -> Self {
288 self.builder = self.builder.turn_id(id);
289 self
290 }
291
292 pub fn prompt_template(mut self, template: PromptTemplate) -> Self {
293 self.builder = self.builder.prompt_template(template);
294 self
295 }
296
297 pub fn prompt_contribution(mut self, contribution: PromptContribution) -> Self {
298 self.builder = self.builder.prompt_contribution(contribution);
299 self
300 }
301
302 pub fn replace_prompt_slot(
303 mut self,
304 slot: PromptSlot,
305 contributions: impl IntoIterator<Item = PromptContribution>,
306 ) -> Self {
307 self.builder = self.builder.replace_prompt_slot(slot, contributions);
308 self
309 }
310
311 pub fn clear_prompt_slot(mut self, slot: PromptSlot) -> Self {
312 self.builder = self.builder.clear_prompt_slot(slot);
313 self
314 }
315
316 pub fn prompt_layer(mut self, layer: PromptLayer) -> Self {
317 self.builder = self.builder.prompt_layer(layer);
318 self
319 }
320
321 pub fn with_plugin_input<P: PluginBinding>(mut self, input: P::Input) -> Self {
322 self.builder = self.builder.with_plugin_input::<P>(input);
323 self
324 }
325
326 pub async fn run(self) -> Result<TurnOutput> {
327 let collector = RunActivityCollector::default();
328 let result = self.stream_to(&collector).await?;
329 Ok(TurnOutput {
330 result,
331 activities: collector.into_activities(),
332 })
333 }
334
335 pub async fn stream_to(self, events: &dyn TurnActivitySink) -> Result<TurnResult> {
336 self.builder
337 .stream_to_with_effect_controller(events, self.controller)
338 .await
339 }
340
341 pub fn advanced(self) -> AdvancedTurn {
342 self.builder.advanced()
343 }
344}
345
346pub struct AdvancedTurn {
353 builder: TurnBuilder,
354}
355
356impl AdvancedTurn {
357 pub async fn run_with_scope(
358 self,
359 scoped_effect_controller: ScopedEffectController<'_>,
360 ) -> Result<TurnOutput> {
361 let collector = RunActivityCollector::default();
362 let result = self
363 .stream_to_with_scope(&collector, scoped_effect_controller)
364 .await?;
365 Ok(TurnOutput {
366 result,
367 activities: collector.into_activities(),
368 })
369 }
370
371 pub async fn collect_with_scope(
372 self,
373 events: &dyn TurnActivitySink,
374 scoped_effect_controller: ScopedEffectController<'_>,
375 ) -> Result<TurnOutput> {
376 let collector = RunActivityCollector::default();
377 let fanout = BorrowedTurnActivityFanout {
378 live: events,
379 collector: &collector,
380 };
381 let result = self
382 .stream_to_with_scope(&fanout, scoped_effect_controller)
383 .await?;
384 Ok(TurnOutput {
385 result,
386 activities: collector.into_activities(),
387 })
388 }
389
390 pub async fn stream_to_with_scope(
391 self,
392 events: &dyn TurnActivitySink,
393 scoped_effect_controller: ScopedEffectController<'_>,
394 ) -> Result<TurnResult> {
395 let trace_turn_id = trace_turn_id_for_scope(&self.builder, &scoped_effect_controller);
396 self.builder
397 .stream_to_with_scope(events, scoped_effect_controller, trace_turn_id)
398 .await
399 }
400
401 pub fn stream_with_scope(
402 self,
403 scoped_effect_controller: ScopedEffectController<'static>,
404 ) -> Result<TurnStream> {
405 let trace_turn_id = trace_turn_id_for_scope(&self.builder, &scoped_effect_controller);
406 self.builder
407 .stream_with_scope(scoped_effect_controller, trace_turn_id)
408 }
409
410 pub async fn collect_session_events_with_scope(
412 self,
413 events: &dyn EventSink,
414 scoped_effect_controller: ScopedEffectController<'_>,
415 ) -> Result<TurnResult> {
416 let trace_turn_id = trace_turn_id_for_scope(&self.builder, &scoped_effect_controller);
417 let (runtime, input, cancel) = self.builder.prepare(trace_turn_id)?;
418 stream_prepared_turn(
419 &runtime,
420 input,
421 TurnSinks::session(events),
422 scoped_effect_controller,
423 cancel,
424 )
425 .await
426 }
427}
428
429pub struct TurnStream {
430 activities: mpsc::Receiver<Result<TurnActivity>>,
431 completion: JoinHandle<Result<TurnResult>>,
432}
433
434impl TurnStream {
435 pub async fn next_activity(&mut self) -> Option<Result<TurnActivity>> {
436 self.activities.recv().await
437 }
438
439 pub async fn finish(self) -> Result<TurnResult> {
440 self.completion.await.map_err(|err| {
441 EmbedError::Runtime(lash_core::RuntimeError::new(
442 RuntimeErrorCode::TurnStreamJoin,
443 format!("turn stream task failed: {err}"),
444 ))
445 })?
446 }
447}
448
449pub struct QueuedTurnBuilder {
450 pub(crate) runtime: RuntimeHandle,
451 pub(crate) effect_host: Arc<dyn EffectHost>,
452 pub(crate) cancel: CancellationToken,
453 pub(crate) batch_ids: Vec<String>,
454 pub(crate) drain_id: Option<String>,
455}
456
457impl QueuedTurnBuilder {
458 pub fn cancel(mut self, cancel: CancellationToken) -> Self {
459 self.cancel = cancel;
460 self
461 }
462
463 pub fn batch_ids(mut self, batch_ids: impl IntoIterator<Item = impl Into<String>>) -> Self {
464 self.batch_ids = batch_ids.into_iter().map(Into::into).collect();
465 self
466 }
467
468 pub fn drain_id(mut self, drain_id: impl Into<String>) -> Self {
469 self.drain_id = Some(drain_id.into());
470 self
471 }
472
473 pub fn effects(self, controller: &dyn RuntimeEffectController) -> ScopedQueuedTurnBuilder<'_> {
474 ScopedQueuedTurnBuilder {
475 builder: self,
476 controller,
477 }
478 }
479
480 pub async fn run(self) -> Result<Option<TurnOutput>> {
481 let collector = RunActivityCollector::default();
482 let Some(result) = self.stream_to(&collector).await? else {
483 return Ok(None);
484 };
485 Ok(Some(TurnOutput {
486 result,
487 activities: collector.into_activities(),
488 }))
489 }
490
491 pub async fn stream_to(self, events: &dyn TurnActivitySink) -> Result<Option<TurnResult>> {
492 let effect_host = Arc::clone(&self.effect_host);
493 reject_configured_durable_effect_host(effect_host.as_ref(), "queued turn")?;
494 self.stream_to_with_effect_host(events, effect_host.as_ref())
495 .await
496 }
497
498 fn resolved_drain_id(&self) -> String {
499 self.drain_id
500 .clone()
501 .or_else(|| self.batch_ids.first().cloned())
502 .unwrap_or_else(fresh_queue_drain_id)
503 }
504
505 async fn stream_to_with_effect_host(
506 self,
507 events: &dyn TurnActivitySink,
508 effect_host: &dyn EffectHost,
509 ) -> Result<Option<TurnResult>> {
510 let drain_id = self.resolved_drain_id();
511 let scope =
512 lash_core::EffectScope::queue_drain(self.runtime.observe().session_id(), drain_id);
513 let scoped_effect_controller = effect_host.scoped(scope)?;
514 self.stream_to_with_scope(events, scoped_effect_controller)
515 .await
516 }
517
518 async fn stream_to_with_effect_controller(
519 self,
520 events: &dyn TurnActivitySink,
521 controller: &dyn RuntimeEffectController,
522 ) -> Result<Option<TurnResult>> {
523 let drain_id = self.resolved_drain_id();
524 let scope =
525 lash_core::EffectScope::queue_drain(self.runtime.observe().session_id(), drain_id);
526 let scoped_effect_controller = ScopedEffectController::borrowed(controller, scope)?;
527 self.stream_to_with_scope(events, scoped_effect_controller)
528 .await
529 }
530
531 async fn stream_to_with_scope(
532 self,
533 events: &dyn TurnActivitySink,
534 scoped_effect_controller: ScopedEffectController<'_>,
535 ) -> Result<Option<TurnResult>> {
536 let Self {
537 runtime,
538 effect_host: _,
539 cancel,
540 batch_ids,
541 drain_id: _,
542 } = self;
543 stream_next_queued_prepared_turn(
544 &runtime,
545 TurnSinks::turn(events),
546 scoped_effect_controller,
547 cancel,
548 &batch_ids,
549 )
550 .await
551 }
552}
553
554pub struct ScopedQueuedTurnBuilder<'run> {
555 builder: QueuedTurnBuilder,
556 controller: &'run dyn RuntimeEffectController,
557}
558
559impl<'run> ScopedQueuedTurnBuilder<'run> {
560 pub fn cancel(mut self, cancel: CancellationToken) -> Self {
561 self.builder = self.builder.cancel(cancel);
562 self
563 }
564
565 pub fn batch_ids(mut self, batch_ids: impl IntoIterator<Item = impl Into<String>>) -> Self {
566 self.builder = self.builder.batch_ids(batch_ids);
567 self
568 }
569
570 pub fn drain_id(mut self, drain_id: impl Into<String>) -> Self {
571 self.builder = self.builder.drain_id(drain_id);
572 self
573 }
574
575 pub async fn run(self) -> Result<Option<TurnOutput>> {
576 let collector = RunActivityCollector::default();
577 let Some(result) = self.stream_to(&collector).await? else {
578 return Ok(None);
579 };
580 Ok(Some(TurnOutput {
581 result,
582 activities: collector.into_activities(),
583 }))
584 }
585
586 pub async fn stream_to(self, events: &dyn TurnActivitySink) -> Result<Option<TurnResult>> {
587 self.builder
588 .stream_to_with_effect_controller(events, self.controller)
589 .await
590 }
591}
592
593fn fresh_turn_id() -> String {
594 lash_core::TurnActivityId::fresh().0
595}
596
597fn fresh_queue_drain_id() -> String {
598 format!("queue-drain:{}", fresh_turn_id())
599}
600
601fn trace_turn_id_for_scope(
602 builder: &TurnBuilder,
603 scoped_effect_controller: &ScopedEffectController<'_>,
604) -> Option<String> {
605 if scoped_effect_controller
606 .effect_scope()
607 .validates_turn_trace_id()
608 {
609 Some(
610 builder
611 .turn_id
612 .clone()
613 .unwrap_or_else(|| scoped_effect_controller.scope_id().to_string()),
614 )
615 } else {
616 builder
617 .turn_id
618 .clone()
619 .or_else(|| builder.input.trace_turn_id.clone())
620 }
621}
622
623fn reject_configured_durable_effect_host(
624 effect_host: &dyn EffectHost,
625 operation: &'static str,
626) -> Result<()> {
627 if effect_host.durability_tier() == DurabilityTier::Durable {
628 return Err(EmbedError::DurableEffectHostRequiresHandlerContext { operation });
629 }
630 Ok(())
631}
632
633pub(crate) async fn stream_next_queued_prepared_turn(
634 runtime: &RuntimeHandle,
635 sinks: TurnSinks<'_>,
636 scoped_effect_controller: ScopedEffectController<'_>,
637 cancel: CancellationToken,
638 batch_ids: &[String],
639) -> Result<Option<TurnResult>> {
640 let turn = Box::pin(stream_next_queued_prepared_assembled(
641 runtime,
642 sinks,
643 scoped_effect_controller,
644 cancel,
645 batch_ids,
646 ))
647 .await?;
648 Ok(turn.map(TurnResult::from_assembled))
649}
650
651pub(crate) async fn stream_next_queued_prepared_assembled(
652 runtime: &RuntimeHandle,
653 sinks: TurnSinks<'_>,
654 scoped_effect_controller: ScopedEffectController<'_>,
655 cancel: CancellationToken,
656 batch_ids: &[String],
657) -> Result<Option<AssembledTurn>> {
658 let writer_handle = runtime.writer();
659 let mut writer = writer_handle.lock().await;
660 let observation_sink = SessionObservationTurnActivitySink {
661 runtime: runtime.clone(),
662 live: sinks.turn_events(),
663 };
664 let opts = turn_options(
665 sinks.events(),
666 &observation_sink,
667 scoped_effect_controller,
668 cancel,
669 );
670 let turn = if batch_ids.is_empty() {
671 writer.stream_next_queued_work(opts).await?
672 } else {
673 writer.stream_selected_queued_work(opts, batch_ids).await?
674 };
675 runtime.publish_from(&writer);
676 Ok(turn)
677}
678
679fn turn_options<'a>(
680 events: Option<&'a dyn EventSink>,
681 turn_events: &'a dyn TurnActivitySink,
682 scoped_effect_controller: ScopedEffectController<'a>,
683 cancel: CancellationToken,
684) -> lash_core::TurnOptions<'a> {
685 let mut opts = lash_core::TurnOptions::new(cancel, scoped_effect_controller);
686 if let Some(events) = events {
687 opts = opts.with_events(events);
688 }
689 opts.with_turn_events(turn_events)
690}
691
692struct SessionObservationTurnActivitySink<'a> {
693 runtime: RuntimeHandle,
694 live: Option<&'a dyn TurnActivitySink>,
695}
696
697#[async_trait]
698impl TurnActivitySink for SessionObservationTurnActivitySink<'_> {
699 fn is_noop(&self) -> bool {
700 false
701 }
702
703 async fn emit(&self, activity: TurnActivity) {
704 self.runtime.record_turn_activity(activity.clone());
705 if let Some(live) = self.live {
706 live.emit(activity).await;
707 }
708 }
709}
710
711struct ChannelTurnActivitySink {
712 tx: mpsc::Sender<Result<TurnActivity>>,
713}
714
715#[async_trait]
716impl TurnActivitySink for ChannelTurnActivitySink {
717 async fn emit(&self, activity: TurnActivity) {
718 let _ = self.tx.send(Ok(activity)).await;
719 }
720}
721fn validate_required_plugin_inputs(
722 active_plugins: &[ActivePluginBinding],
723 input: &TurnInput,
724) -> Result<()> {
725 for plugin in active_plugins {
726 if plugin.requires_turn_input && !input.turn_context.has_plugin_input(plugin.id) {
727 return Err(EmbedError::MissingPluginTurnInput {
728 plugin_id: plugin.id,
729 });
730 }
731 }
732 Ok(())
733}
734
735pub(crate) async fn stream_prepared_turn(
736 runtime: &RuntimeHandle,
737 input: TurnInput,
738 sinks: TurnSinks<'_>,
739 scoped_effect_controller: ScopedEffectController<'_>,
740 cancel: CancellationToken,
741) -> Result<TurnResult> {
742 let turn = Box::pin(stream_prepared_assembled(
743 runtime,
744 input,
745 sinks,
746 scoped_effect_controller,
747 cancel,
748 ))
749 .await?;
750 Ok(TurnResult::from_assembled(turn))
751}
752
753pub(crate) async fn stream_prepared_assembled(
754 runtime: &RuntimeHandle,
755 input: TurnInput,
756 sinks: TurnSinks<'_>,
757 scoped_effect_controller: ScopedEffectController<'_>,
758 cancel: CancellationToken,
759) -> Result<AssembledTurn> {
760 let turn = Box::pin(stream_prepared_agent_frame_run(
761 runtime,
762 input,
763 sinks,
764 scoped_effect_controller,
765 cancel,
766 ))
767 .await?;
768 turn.into_final_turn().ok_or_else(|| {
769 EmbedError::Runtime(lash_core::RuntimeError::new(
770 RuntimeErrorCode::EmptyAgentFrameRun,
771 "runtime completed without an assembled turn",
772 ))
773 })
774}
775
776pub(crate) async fn stream_prepared_agent_frame_run(
777 runtime: &RuntimeHandle,
778 input: TurnInput,
779 sinks: TurnSinks<'_>,
780 scoped_effect_controller: ScopedEffectController<'_>,
781 cancel: CancellationToken,
782) -> Result<lash_core::AgentFrameRun> {
783 let writer_handle = runtime.writer();
784 let mut writer = writer_handle.lock().await;
785 if let Some(extension) = input.protocol_extension.as_ref() {
786 writer
787 .validate_protocol_turn_extension(extension)
788 .await
789 .map_err(EmbedError::Session)?;
790 }
791 let observation_sink = SessionObservationTurnActivitySink {
792 runtime: runtime.clone(),
793 live: sinks.turn_events(),
794 };
795 let turn = Box::pin(writer.stream_turn_with_agent_frames(
796 input,
797 turn_options(
798 sinks.events(),
799 &observation_sink,
800 scoped_effect_controller,
801 cancel,
802 ),
803 ))
804 .await?;
805 runtime.publish_from(&writer);
806 Ok(turn)
807}
808
809#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
810pub struct TurnResult {
811 pub state: SessionSnapshot,
812 pub outcome: TurnOutcome,
813 pub assistant_output: AssistantOutput,
814 pub usage: TokenUsage,
818 #[serde(default)]
822 pub children_usage: Vec<TokenLedgerEntry>,
823 pub tool_calls: Vec<ToolCallRecord>,
824 pub execution: ExecutionSummary,
825 pub errors: Vec<TurnIssue>,
826}
827
828impl TurnResult {
829 fn from_assembled(turn: lash_core::AssembledTurn) -> Self {
830 Self {
831 state: turn.state,
832 outcome: turn.outcome,
833 assistant_output: turn.assistant_output,
834 usage: turn.token_usage,
835 children_usage: turn.children_usage,
836 tool_calls: turn.tool_calls,
837 execution: turn.execution,
838 errors: turn.errors,
839 }
840 }
841
842 pub fn total_usage(&self) -> TokenUsage {
845 let mut total = self.usage.clone();
846 for entry in &self.children_usage {
847 total.add(&entry.usage);
848 }
849 total
850 }
851
852 pub fn assistant_message(&self) -> Option<&str> {
853 match &self.outcome {
854 TurnOutcome::Finished(lash_core::TurnFinish::AssistantMessage { text }) => Some(text),
855 _ => None,
856 }
857 }
858
859 pub fn submitted_value(&self) -> Option<&serde_json::Value> {
860 match &self.outcome {
861 TurnOutcome::Finished(lash_core::TurnFinish::SubmittedValue { value }) => Some(value),
862 _ => None,
863 }
864 }
865
866 pub fn tool_value(&self) -> Option<(&str, &serde_json::Value)> {
867 match &self.outcome {
868 TurnOutcome::Finished(lash_core::TurnFinish::ToolValue { tool_name, value }) => {
869 Some((tool_name.as_str(), value))
870 }
871 _ => None,
872 }
873 }
874
875 pub fn is_success(&self) -> bool {
876 matches!(
877 self.outcome,
878 TurnOutcome::Finished(_) | TurnOutcome::AgentFrameSwitch { .. }
879 )
880 }
881}
882
883#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
884pub struct TurnOutput {
885 pub result: TurnResult,
886 pub activities: Vec<TurnActivity>,
887}
888
889impl TurnOutput {
890 pub fn assistant_message(&self) -> Option<&str> {
891 self.result.assistant_message()
892 }
893
894 pub fn submitted_value(&self) -> Option<&serde_json::Value> {
895 self.result.submitted_value()
896 }
897
898 pub fn tool_value(&self) -> Option<(&str, &serde_json::Value)> {
899 self.result.tool_value()
900 }
901
902 pub fn is_success(&self) -> bool {
903 self.result.is_success()
904 }
905}
906
907struct BorrowedTurnActivityFanout<'a> {
908 live: &'a dyn TurnActivitySink,
909 collector: &'a RunActivityCollector,
910}
911
912#[async_trait]
913impl TurnActivitySink for BorrowedTurnActivityFanout<'_> {
914 async fn emit(&self, activity: TurnActivity) {
915 self.live.emit(activity.clone()).await;
916 self.collector.emit(activity).await;
917 }
918}
919
920#[derive(Default)]
921pub(crate) struct RunActivityCollector {
922 activities: Arc<StdMutex<Vec<TurnActivity>>>,
923}
924
925impl RunActivityCollector {
926 fn into_activities(self) -> Vec<TurnActivity> {
927 self.activities
928 .lock()
929 .expect("run activity collector lock")
930 .clone()
931 }
932
933 #[cfg(test)]
934 pub(crate) fn snapshot(&self) -> Vec<TurnActivity> {
935 self.activities
936 .lock()
937 .expect("run activity collector lock")
938 .clone()
939 }
940}
941
942#[async_trait]
943impl TurnActivitySink for RunActivityCollector {
944 async fn emit(&self, activity: TurnActivity) {
945 self.activities
946 .lock()
947 .expect("run activity collector lock")
948 .push(activity);
949 }
950}
951
952pub struct TurnActivityFanout {
953 sinks: Vec<Arc<dyn TurnActivitySink>>,
954}
955
956impl TurnActivityFanout {
957 pub fn new(sinks: impl IntoIterator<Item = Arc<dyn TurnActivitySink>>) -> Self {
958 Self {
959 sinks: sinks.into_iter().collect(),
960 }
961 }
962}
963
964#[async_trait]
965impl TurnActivitySink for TurnActivityFanout {
966 async fn emit(&self, activity: TurnActivity) {
967 for sink in &self.sinks {
968 sink.emit(activity.clone()).await;
969 }
970 }
971}
972
973pub fn message_text(message: &Message) -> String {
974 message
975 .parts
976 .iter()
977 .map(|part| part.content.as_str())
978 .collect::<Vec<_>>()
979 .join("\n")
980}
981
982pub fn message_role(message: &Message) -> &'static str {
983 match message.role {
984 MessageRole::User => "user",
985 MessageRole::Assistant => "assistant",
986 MessageRole::System => "system",
987 MessageRole::Event => "event",
988 }
989}