1use std::pin::Pin;
2use std::task::{Context, Poll};
3
4use crate::support::*;
5use futures_util::Stream;
6
7pub use lash_core::{AssistantOutput, TurnIssue};
8
9#[derive(Clone, Copy, Default)]
16pub(crate) struct TurnSinks<'a> {
17 events: Option<&'a dyn EventSink>,
18 turn_events: Option<&'a dyn TurnActivitySink>,
19}
20
21impl<'a> TurnSinks<'a> {
22 pub(crate) fn turn(events: &'a dyn TurnActivitySink) -> Self {
23 Self {
24 events: None,
25 turn_events: Some(events),
26 }
27 }
28
29 pub(crate) fn session(events: &'a dyn EventSink) -> Self {
30 Self {
31 events: Some(events),
32 turn_events: None,
33 }
34 }
35
36 fn events(&self) -> Option<&'a dyn EventSink> {
37 self.events
38 }
39
40 fn turn_events(&self) -> Option<&'a dyn TurnActivitySink> {
41 self.turn_events
42 }
43}
44
45#[derive(Clone, Default)]
50pub(crate) struct TurnCancelRegistry {
51 inner: Arc<StdMutex<TurnCancelRegistryInner>>,
52}
53
54#[derive(Default)]
55struct TurnCancelRegistryInner {
56 next_id: u64,
57 active: BTreeMap<u64, CancellationToken>,
58}
59
60impl TurnCancelRegistry {
61 fn register(&self, token: CancellationToken) -> TurnCancelGuard {
64 let mut inner = self.inner.lock().expect("turn cancel registry");
65 let id = inner.next_id;
66 inner.next_id += 1;
67 inner.active.insert(id, token);
68 TurnCancelGuard {
69 registry: Arc::clone(&self.inner),
70 id,
71 }
72 }
73
74 pub(crate) fn cancel_all(&self) -> usize {
75 let inner = self.inner.lock().expect("turn cancel registry");
76 for token in inner.active.values() {
77 token.cancel();
78 }
79 inner.active.len()
80 }
81}
82
83pub(crate) struct TurnCancelGuard {
84 registry: Arc<StdMutex<TurnCancelRegistryInner>>,
85 id: u64,
86}
87
88impl Drop for TurnCancelGuard {
89 fn drop(&mut self) {
90 self.registry
91 .lock()
92 .expect("turn cancel registry")
93 .active
94 .remove(&self.id);
95 }
96}
97
98pub struct TurnBuilder {
99 pub(crate) runtime: RuntimeHandle,
100 pub(crate) effect_host: Arc<dyn EffectHost>,
101 pub(crate) active_plugins: Vec<ActivePluginBinding>,
102 pub(crate) input: TurnInput,
103 pub(crate) cancel: CancellationToken,
104 pub(crate) cancels: TurnCancelRegistry,
105 pub(crate) protocol_turn_options: Option<ProtocolTurnOptions>,
106 pub(crate) provider: Option<ProviderHandle>,
107 pub(crate) model: Option<ModelSpec>,
108 pub(crate) turn_id: Option<String>,
109}
110
111impl TurnBuilder {
112 pub fn cancel(mut self, cancel: CancellationToken) -> Self {
113 self.cancel = cancel;
114 self
115 }
116
117 pub fn protocol_turn_options(mut self, options: ProtocolTurnOptions) -> Self {
118 self.protocol_turn_options = Some(options);
119 self
120 }
121
122 pub fn provider(mut self, provider: ProviderHandle) -> Self {
123 self.provider = Some(provider);
124 self
125 }
126
127 pub fn model(mut self, model: ModelSpec) -> Self {
128 self.model = Some(model);
129 self
130 }
131
132 pub fn turn_id(mut self, id: impl Into<String>) -> Self {
133 self.turn_id = Some(id.into());
134 self
135 }
136
137 pub fn prompt_template(mut self, template: PromptTemplate) -> Self {
138 self.input.turn_context.set_prompt_template(template);
139 self
140 }
141
142 pub fn prompt_contribution(mut self, contribution: PromptContribution) -> Self {
143 self.input
144 .turn_context
145 .add_prompt_contribution(contribution);
146 self
147 }
148
149 pub fn replace_prompt_slot(
150 mut self,
151 slot: PromptSlot,
152 contributions: impl IntoIterator<Item = PromptContribution>,
153 ) -> Self {
154 self.input
155 .turn_context
156 .replace_prompt_slot(slot, contributions);
157 self
158 }
159
160 pub fn clear_prompt_slot(mut self, slot: PromptSlot) -> Self {
161 self.input.turn_context.clear_prompt_slot(slot);
162 self
163 }
164
165 pub fn prompt_layer(mut self, layer: PromptLayer) -> Self {
166 self.input.turn_context.set_prompt_layer(layer);
167 self
168 }
169
170 pub fn with_plugin_input<P: PluginBinding>(mut self, input: P::Input) -> Self {
176 self.input.turn_context.insert_plugin_input(P::ID, input);
177 self
178 }
179
180 pub fn effects(self, controller: &dyn RuntimeEffectController) -> ScopedTurnBuilder<'_> {
181 ScopedTurnBuilder {
182 builder: self,
183 controller,
184 }
185 }
186
187 pub async fn run(self) -> Result<TurnOutput> {
188 let collector = RunActivityCollector::default();
189 let result = self.stream_to(&collector).await?;
190 Ok(TurnOutput {
191 result,
192 activities: collector.into_activities(),
193 })
194 }
195
196 pub async fn stream_to(self, events: &dyn TurnActivitySink) -> Result<TurnResult> {
197 let effect_host = Arc::clone(&self.effect_host);
198 reject_configured_durable_effect_host(effect_host.as_ref(), "turn")?;
199 self.stream_to_with_effect_host(events, effect_host.as_ref())
200 .await
201 }
202
203 pub fn stream(self) -> Result<TurnStream> {
204 let effect_host = Arc::clone(&self.effect_host);
205 reject_configured_durable_effect_host(effect_host.as_ref(), "turn stream")?;
206 self.stream_with_effect_host(effect_host.as_ref())
207 }
208
209 pub fn advanced(self) -> AdvancedTurn {
212 AdvancedTurn { builder: self }
213 }
214
215 fn resolved_turn_id(&self) -> String {
216 self.turn_id
217 .clone()
218 .or_else(|| self.input.trace_turn_id.clone())
219 .unwrap_or_else(fresh_turn_id)
220 }
221
222 fn turn_scope(&self, turn_id: &str) -> lash_core::ExecutionScope {
223 lash_core::ExecutionScope::turn(self.runtime.observe().session_id(), turn_id)
224 }
225
226 pub(crate) fn prepare(
227 mut self,
228 trace_turn_id: Option<String>,
229 ) -> Result<(RuntimeHandle, TurnInput, CancellationToken, TurnCancelGuard)> {
230 if let Some(options) = self.protocol_turn_options {
231 self.input.protocol_turn_options = Some(options);
232 }
233 if let Some(provider) = self.provider {
234 self.input.turn_context.set_provider(provider);
235 }
236 if let Some(model) = self.model {
237 self.input.turn_context.set_model(model);
238 }
239 if let Some(trace_turn_id) = trace_turn_id {
240 self.input.trace_turn_id = Some(trace_turn_id);
241 }
242 validate_required_plugin_inputs(&self.active_plugins, &self.input)?;
243 let cancel_guard = self.cancels.register(self.cancel.clone());
244 Ok((self.runtime, self.input, self.cancel, cancel_guard))
245 }
246
247 async fn stream_to_with_effect_host(
248 self,
249 events: &dyn TurnActivitySink,
250 effect_host: &dyn EffectHost,
251 ) -> Result<TurnResult> {
252 let turn_id = self.resolved_turn_id();
253 let scoped_effect_controller = effect_host.scoped(self.turn_scope(&turn_id))?;
254 self.stream_to_with_scope(events, scoped_effect_controller, Some(turn_id))
255 .await
256 }
257
258 async fn stream_to_with_effect_controller(
259 self,
260 events: &dyn TurnActivitySink,
261 controller: &dyn RuntimeEffectController,
262 ) -> Result<TurnResult> {
263 let turn_id = self.resolved_turn_id();
264 let scoped_effect_controller =
265 ScopedEffectController::borrowed(controller, self.turn_scope(&turn_id))?;
266 self.stream_to_with_scope(events, scoped_effect_controller, Some(turn_id))
267 .await
268 }
269
270 async fn stream_to_with_scope(
271 self,
272 events: &dyn TurnActivitySink,
273 scoped_effect_controller: ScopedEffectController<'_>,
274 trace_turn_id: Option<String>,
275 ) -> Result<TurnResult> {
276 let (runtime, input, cancel, _cancel_guard) = self.prepare(trace_turn_id)?;
277 stream_prepared_turn(
278 &runtime,
279 input,
280 TurnSinks::turn(events),
281 scoped_effect_controller,
282 cancel,
283 )
284 .await
285 }
286
287 fn stream_with_effect_host(self, effect_host: &dyn EffectHost) -> Result<TurnStream> {
288 let turn_id = self.resolved_turn_id();
289 let scoped_effect_controller = effect_host
290 .scoped_static(self.turn_scope(&turn_id))?
291 .ok_or(EmbedError::StaticTurnStreamRequiresStaticEffectHost)?;
292 self.stream_with_scope(scoped_effect_controller, Some(turn_id))
293 }
294
295 fn stream_with_scope(
296 self,
297 scoped_effect_controller: ScopedEffectController<'static>,
298 trace_turn_id: Option<String>,
299 ) -> Result<TurnStream> {
300 let (runtime, input, cancel, cancel_guard) = self.prepare(trace_turn_id)?;
301 let (tx, rx) = mpsc::channel(64);
302 let sink = ChannelTurnActivitySink { tx };
303 let completion = tokio::spawn(async move {
304 let _cancel_guard = cancel_guard;
305 stream_prepared_turn(
306 &runtime,
307 input,
308 TurnSinks::turn(&sink),
309 scoped_effect_controller,
310 cancel,
311 )
312 .await
313 });
314 Ok(TurnStream {
315 activities: rx,
316 completion,
317 })
318 }
319}
320
321pub struct ScopedTurnBuilder<'run> {
322 builder: TurnBuilder,
323 controller: &'run dyn RuntimeEffectController,
324}
325
326impl<'run> ScopedTurnBuilder<'run> {
327 pub fn cancel(mut self, cancel: CancellationToken) -> Self {
328 self.builder = self.builder.cancel(cancel);
329 self
330 }
331
332 pub fn protocol_turn_options(mut self, options: ProtocolTurnOptions) -> Self {
333 self.builder = self.builder.protocol_turn_options(options);
334 self
335 }
336
337 pub fn provider(mut self, provider: ProviderHandle) -> Self {
338 self.builder = self.builder.provider(provider);
339 self
340 }
341
342 pub fn model(mut self, model: ModelSpec) -> Self {
343 self.builder = self.builder.model(model);
344 self
345 }
346
347 pub fn turn_id(mut self, id: impl Into<String>) -> Self {
348 self.builder = self.builder.turn_id(id);
349 self
350 }
351
352 pub fn prompt_template(mut self, template: PromptTemplate) -> Self {
353 self.builder = self.builder.prompt_template(template);
354 self
355 }
356
357 pub fn prompt_contribution(mut self, contribution: PromptContribution) -> Self {
358 self.builder = self.builder.prompt_contribution(contribution);
359 self
360 }
361
362 pub fn replace_prompt_slot(
363 mut self,
364 slot: PromptSlot,
365 contributions: impl IntoIterator<Item = PromptContribution>,
366 ) -> Self {
367 self.builder = self.builder.replace_prompt_slot(slot, contributions);
368 self
369 }
370
371 pub fn clear_prompt_slot(mut self, slot: PromptSlot) -> Self {
372 self.builder = self.builder.clear_prompt_slot(slot);
373 self
374 }
375
376 pub fn prompt_layer(mut self, layer: PromptLayer) -> Self {
377 self.builder = self.builder.prompt_layer(layer);
378 self
379 }
380
381 pub fn with_plugin_input<P: PluginBinding>(mut self, input: P::Input) -> Self {
382 self.builder = self.builder.with_plugin_input::<P>(input);
383 self
384 }
385
386 pub async fn run(self) -> Result<TurnOutput> {
387 let collector = RunActivityCollector::default();
388 let result = self.stream_to(&collector).await?;
389 Ok(TurnOutput {
390 result,
391 activities: collector.into_activities(),
392 })
393 }
394
395 pub async fn stream_to(self, events: &dyn TurnActivitySink) -> Result<TurnResult> {
396 self.builder
397 .stream_to_with_effect_controller(events, self.controller)
398 .await
399 }
400}
401
402pub struct AdvancedTurn {
409 builder: TurnBuilder,
410}
411
412impl AdvancedTurn {
413 pub async fn run_with_scope(
414 self,
415 scoped_effect_controller: ScopedEffectController<'_>,
416 ) -> Result<TurnOutput> {
417 let collector = RunActivityCollector::default();
418 let result = self
419 .stream_to_with_scope(&collector, scoped_effect_controller)
420 .await?;
421 Ok(TurnOutput {
422 result,
423 activities: collector.into_activities(),
424 })
425 }
426
427 pub async fn collect_with_scope(
428 self,
429 events: &dyn TurnActivitySink,
430 scoped_effect_controller: ScopedEffectController<'_>,
431 ) -> Result<TurnOutput> {
432 let collector = RunActivityCollector::default();
433 let fanout = BorrowedTurnActivityFanout {
434 live: events,
435 collector: &collector,
436 };
437 let result = self
438 .stream_to_with_scope(&fanout, scoped_effect_controller)
439 .await?;
440 Ok(TurnOutput {
441 result,
442 activities: collector.into_activities(),
443 })
444 }
445
446 pub async fn stream_to_with_scope(
447 self,
448 events: &dyn TurnActivitySink,
449 scoped_effect_controller: ScopedEffectController<'_>,
450 ) -> Result<TurnResult> {
451 let trace_turn_id = trace_turn_id_for_scope(&self.builder, &scoped_effect_controller);
452 self.builder
453 .stream_to_with_scope(events, scoped_effect_controller, trace_turn_id)
454 .await
455 }
456
457 pub fn stream_with_scope(
458 self,
459 scoped_effect_controller: ScopedEffectController<'static>,
460 ) -> Result<TurnStream> {
461 let trace_turn_id = trace_turn_id_for_scope(&self.builder, &scoped_effect_controller);
462 self.builder
463 .stream_with_scope(scoped_effect_controller, trace_turn_id)
464 }
465
466 pub async fn collect_session_events_with_scope(
468 self,
469 events: &dyn EventSink,
470 scoped_effect_controller: ScopedEffectController<'_>,
471 ) -> Result<TurnResult> {
472 let trace_turn_id = trace_turn_id_for_scope(&self.builder, &scoped_effect_controller);
473 let (runtime, input, cancel, _cancel_guard) = self.builder.prepare(trace_turn_id)?;
474 stream_prepared_turn(
475 &runtime,
476 input,
477 TurnSinks::session(events),
478 scoped_effect_controller,
479 cancel,
480 )
481 .await
482 }
483}
484
485pub struct TurnStream {
486 activities: mpsc::Receiver<Result<TurnActivity>>,
487 completion: JoinHandle<Result<TurnResult>>,
488}
489
490impl TurnStream {
491 pub async fn next_activity(&mut self) -> Option<Result<TurnActivity>> {
492 self.activities.recv().await
493 }
494
495 pub async fn finish(self) -> Result<TurnResult> {
496 self.completion.await.map_err(|err| {
497 EmbedError::Runtime(lash_core::RuntimeError::new(
498 RuntimeErrorCode::TurnStreamJoin,
499 format!("turn stream task failed: {err}"),
500 ))
501 })?
502 }
503}
504
505impl Stream for TurnStream {
506 type Item = Result<TurnActivity>;
507
508 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
509 self.activities.poll_recv(cx)
510 }
511}
512
513pub struct QueuedTurnBuilder {
514 pub(crate) runtime: RuntimeHandle,
515 pub(crate) effect_host: Arc<dyn EffectHost>,
516 pub(crate) cancel: CancellationToken,
517 pub(crate) cancels: TurnCancelRegistry,
518 pub(crate) batch_ids: Vec<String>,
519 pub(crate) drain_id: Option<String>,
520}
521
522impl QueuedTurnBuilder {
523 pub fn cancel(mut self, cancel: CancellationToken) -> Self {
524 self.cancel = cancel;
525 self
526 }
527
528 pub fn batch_ids(mut self, batch_ids: impl IntoIterator<Item = impl Into<String>>) -> Self {
529 self.batch_ids = batch_ids.into_iter().map(Into::into).collect();
530 self
531 }
532
533 pub fn drain_id(mut self, drain_id: impl Into<String>) -> Self {
534 self.drain_id = Some(drain_id.into());
535 self
536 }
537
538 pub fn effects(self, controller: &dyn RuntimeEffectController) -> ScopedQueuedTurnBuilder<'_> {
539 ScopedQueuedTurnBuilder {
540 builder: self,
541 controller,
542 }
543 }
544
545 pub async fn run(self) -> Result<Option<TurnOutput>> {
546 let collector = RunActivityCollector::default();
547 let Some(result) = self.stream_to(&collector).await? else {
548 return Ok(None);
549 };
550 Ok(Some(TurnOutput {
551 result,
552 activities: collector.into_activities(),
553 }))
554 }
555
556 pub async fn stream_to(self, events: &dyn TurnActivitySink) -> Result<Option<TurnResult>> {
557 let effect_host = Arc::clone(&self.effect_host);
558 reject_configured_durable_effect_host(effect_host.as_ref(), "queued turn")?;
559 self.stream_to_with_effect_host(events, effect_host.as_ref())
560 .await
561 }
562
563 pub fn advanced(self) -> AdvancedQueuedTurn {
564 AdvancedQueuedTurn { builder: self }
565 }
566
567 fn resolved_drain_id(&self) -> String {
568 self.drain_id
569 .clone()
570 .or_else(|| self.batch_ids.first().cloned())
571 .unwrap_or_else(fresh_queue_drain_id)
572 }
573
574 async fn stream_to_with_effect_host(
575 self,
576 events: &dyn TurnActivitySink,
577 effect_host: &dyn EffectHost,
578 ) -> Result<Option<TurnResult>> {
579 let drain_id = self.resolved_drain_id();
580 let scope =
581 lash_core::ExecutionScope::queue_drain(self.runtime.observe().session_id(), drain_id);
582 let scoped_effect_controller = effect_host.scoped(scope)?;
583 self.stream_to_with_scope(events, scoped_effect_controller)
584 .await
585 }
586
587 async fn stream_to_with_effect_controller(
588 self,
589 events: &dyn TurnActivitySink,
590 controller: &dyn RuntimeEffectController,
591 ) -> Result<Option<TurnResult>> {
592 let drain_id = self.resolved_drain_id();
593 let scope =
594 lash_core::ExecutionScope::queue_drain(self.runtime.observe().session_id(), drain_id);
595 let scoped_effect_controller = ScopedEffectController::borrowed(controller, scope)?;
596 self.stream_to_with_scope(events, scoped_effect_controller)
597 .await
598 }
599
600 async fn stream_to_with_scope(
601 self,
602 events: &dyn TurnActivitySink,
603 scoped_effect_controller: ScopedEffectController<'_>,
604 ) -> Result<Option<TurnResult>> {
605 let Self {
606 runtime,
607 effect_host: _,
608 cancel,
609 cancels,
610 batch_ids,
611 drain_id: _,
612 } = self;
613 let _cancel_guard = cancels.register(cancel.clone());
614 stream_next_queued_prepared_turn(
615 &runtime,
616 TurnSinks::turn(events),
617 scoped_effect_controller,
618 cancel,
619 &batch_ids,
620 )
621 .await
622 }
623}
624
625pub struct ScopedQueuedTurnBuilder<'run> {
626 builder: QueuedTurnBuilder,
627 controller: &'run dyn RuntimeEffectController,
628}
629
630impl<'run> ScopedQueuedTurnBuilder<'run> {
631 pub fn cancel(mut self, cancel: CancellationToken) -> Self {
632 self.builder = self.builder.cancel(cancel);
633 self
634 }
635
636 pub fn batch_ids(mut self, batch_ids: impl IntoIterator<Item = impl Into<String>>) -> Self {
637 self.builder = self.builder.batch_ids(batch_ids);
638 self
639 }
640
641 pub fn drain_id(mut self, drain_id: impl Into<String>) -> Self {
642 self.builder = self.builder.drain_id(drain_id);
643 self
644 }
645
646 pub async fn run(self) -> Result<Option<TurnOutput>> {
647 let collector = RunActivityCollector::default();
648 let Some(result) = self.stream_to(&collector).await? else {
649 return Ok(None);
650 };
651 Ok(Some(TurnOutput {
652 result,
653 activities: collector.into_activities(),
654 }))
655 }
656
657 pub async fn stream_to(self, events: &dyn TurnActivitySink) -> Result<Option<TurnResult>> {
658 self.builder
659 .stream_to_with_effect_controller(events, self.controller)
660 .await
661 }
662}
663
664pub struct AdvancedQueuedTurn {
665 builder: QueuedTurnBuilder,
666}
667
668impl AdvancedQueuedTurn {
669 pub async fn stream_to_with_scope(
670 self,
671 events: &dyn TurnActivitySink,
672 scoped_effect_controller: ScopedEffectController<'_>,
673 ) -> Result<Option<TurnResult>> {
674 self.builder
675 .stream_to_with_scope(events, scoped_effect_controller)
676 .await
677 }
678}
679
680fn fresh_turn_id() -> String {
681 lash_core::TurnActivityId::fresh().0
682}
683
684fn fresh_queue_drain_id() -> String {
685 format!("queue-drain:{}", fresh_turn_id())
686}
687
688fn trace_turn_id_for_scope(
689 builder: &TurnBuilder,
690 scoped_effect_controller: &ScopedEffectController<'_>,
691) -> Option<String> {
692 if scoped_effect_controller
693 .execution_scope()
694 .validates_turn_trace_id()
695 {
696 Some(
697 builder
698 .turn_id
699 .clone()
700 .unwrap_or_else(|| scoped_effect_controller.scope_id().to_string()),
701 )
702 } else {
703 builder
704 .turn_id
705 .clone()
706 .or_else(|| builder.input.trace_turn_id.clone())
707 }
708}
709
710fn reject_configured_durable_effect_host(
711 effect_host: &dyn EffectHost,
712 operation: &'static str,
713) -> Result<()> {
714 if effect_host.durability_tier() == DurabilityTier::Durable {
715 return Err(EmbedError::DurableEffectHostRequiresHandlerContext { operation });
716 }
717 Ok(())
718}
719
720pub(crate) async fn stream_next_queued_prepared_turn(
721 runtime: &RuntimeHandle,
722 sinks: TurnSinks<'_>,
723 scoped_effect_controller: ScopedEffectController<'_>,
724 cancel: CancellationToken,
725 batch_ids: &[String],
726) -> Result<Option<TurnResult>> {
727 let turn = Box::pin(stream_next_queued_prepared_assembled(
728 runtime,
729 sinks,
730 scoped_effect_controller,
731 cancel,
732 batch_ids,
733 ))
734 .await?;
735 Ok(turn.map(TurnResult::from_assembled))
736}
737
738pub(crate) async fn stream_next_queued_prepared_assembled(
739 runtime: &RuntimeHandle,
740 sinks: TurnSinks<'_>,
741 scoped_effect_controller: ScopedEffectController<'_>,
742 cancel: CancellationToken,
743 batch_ids: &[String],
744) -> Result<Option<AssembledTurn>> {
745 let writer_handle = runtime.writer();
746 let mut writer = writer_handle.lock().await;
747 let observation_sink = SessionObservationTurnActivitySink {
748 runtime: runtime.clone(),
749 live: sinks.turn_events(),
750 };
751 let opts = turn_options(
752 sinks.events(),
753 &observation_sink,
754 scoped_effect_controller,
755 cancel,
756 );
757 let turn = if batch_ids.is_empty() {
758 writer.stream_next_queued_work(opts).await?
759 } else {
760 writer.stream_selected_queued_work(opts, batch_ids).await?
761 };
762 runtime.publish_from(&writer);
763 Ok(turn)
764}
765
766fn turn_options<'a>(
767 events: Option<&'a dyn EventSink>,
768 turn_events: &'a dyn TurnActivitySink,
769 scoped_effect_controller: ScopedEffectController<'a>,
770 cancel: CancellationToken,
771) -> lash_core::TurnOptions<'a> {
772 let mut opts = lash_core::TurnOptions::new(cancel, scoped_effect_controller);
773 if let Some(events) = events {
774 opts = opts.with_events(events);
775 }
776 opts.with_turn_events(turn_events)
777}
778
779struct SessionObservationTurnActivitySink<'a> {
780 runtime: RuntimeHandle,
781 live: Option<&'a dyn TurnActivitySink>,
782}
783
784#[async_trait]
785impl TurnActivitySink for SessionObservationTurnActivitySink<'_> {
786 fn is_noop(&self) -> bool {
787 false
788 }
789
790 async fn emit(&self, activity: TurnActivity) {
791 self.runtime.record_turn_activity(activity.clone());
792 if let Some(live) = self.live {
793 live.emit(activity).await;
794 }
795 }
796}
797
798struct ChannelTurnActivitySink {
799 tx: mpsc::Sender<Result<TurnActivity>>,
800}
801
802#[async_trait]
803impl TurnActivitySink for ChannelTurnActivitySink {
804 async fn emit(&self, activity: TurnActivity) {
805 let _ = self.tx.send(Ok(activity)).await;
806 }
807}
808fn validate_required_plugin_inputs(
809 active_plugins: &[ActivePluginBinding],
810 input: &TurnInput,
811) -> Result<()> {
812 for plugin in active_plugins {
813 if plugin.requires_turn_input && !input.turn_context.has_plugin_input(plugin.id) {
814 return Err(EmbedError::MissingPluginTurnInput {
815 plugin_id: plugin.id,
816 });
817 }
818 }
819 Ok(())
820}
821
822pub(crate) async fn stream_prepared_turn(
823 runtime: &RuntimeHandle,
824 input: TurnInput,
825 sinks: TurnSinks<'_>,
826 scoped_effect_controller: ScopedEffectController<'_>,
827 cancel: CancellationToken,
828) -> Result<TurnResult> {
829 let turn = Box::pin(stream_prepared_assembled(
830 runtime,
831 input,
832 sinks,
833 scoped_effect_controller,
834 cancel,
835 ))
836 .await?;
837 Ok(TurnResult::from_assembled(turn))
838}
839
840pub(crate) async fn stream_prepared_assembled(
841 runtime: &RuntimeHandle,
842 input: TurnInput,
843 sinks: TurnSinks<'_>,
844 scoped_effect_controller: ScopedEffectController<'_>,
845 cancel: CancellationToken,
846) -> Result<AssembledTurn> {
847 let turn = Box::pin(stream_prepared_agent_frame_run(
848 runtime,
849 input,
850 sinks,
851 scoped_effect_controller,
852 cancel,
853 ))
854 .await?;
855 turn.into_final_turn().ok_or_else(|| {
856 EmbedError::Runtime(lash_core::RuntimeError::new(
857 RuntimeErrorCode::EmptyAgentFrameRun,
858 "runtime completed without an assembled turn",
859 ))
860 })
861}
862
863pub(crate) async fn stream_prepared_agent_frame_run(
864 runtime: &RuntimeHandle,
865 input: TurnInput,
866 sinks: TurnSinks<'_>,
867 scoped_effect_controller: ScopedEffectController<'_>,
868 cancel: CancellationToken,
869) -> Result<lash_core::AgentFrameRun> {
870 let writer_handle = runtime.writer();
871 let mut writer = writer_handle.lock().await;
872 if let Some(extension) = input.protocol_extension.as_ref() {
873 writer
874 .validate_protocol_turn_extension(extension)
875 .await
876 .map_err(EmbedError::Session)?;
877 }
878 let observation_sink = SessionObservationTurnActivitySink {
879 runtime: runtime.clone(),
880 live: sinks.turn_events(),
881 };
882 let turn = Box::pin(writer.stream_turn_with_agent_frames(
883 input,
884 turn_options(
885 sinks.events(),
886 &observation_sink,
887 scoped_effect_controller,
888 cancel,
889 ),
890 ))
891 .await?;
892 runtime.publish_from(&writer);
893 Ok(turn)
894}
895
896#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
897pub struct TurnResult {
898 pub state: SessionSnapshot,
899 pub outcome: TurnOutcome,
900 pub assistant_output: AssistantOutput,
901 pub usage: TokenUsage,
905 #[serde(default)]
909 pub children_usage: Vec<TokenLedgerEntry>,
910 pub tool_calls: Vec<ToolCallRecord>,
911 pub execution: ExecutionSummary,
912 pub errors: Vec<TurnIssue>,
913}
914
915impl TurnResult {
916 fn from_assembled(turn: lash_core::AssembledTurn) -> Self {
917 Self {
918 state: turn.state,
919 outcome: turn.outcome,
920 assistant_output: turn.assistant_output,
921 usage: turn.token_usage,
922 children_usage: turn.children_usage,
923 tool_calls: turn.tool_calls,
924 execution: turn.execution,
925 errors: turn.errors,
926 }
927 }
928
929 pub fn total_usage(&self) -> TokenUsage {
932 let mut total = self.usage.clone();
933 for entry in &self.children_usage {
934 total.add(&entry.usage);
935 }
936 total
937 }
938
939 pub fn assistant_message(&self) -> Option<&str> {
940 match &self.outcome {
941 TurnOutcome::Finished(lash_core::TurnFinish::AssistantMessage { text }) => Some(text),
942 _ => None,
943 }
944 }
945
946 pub fn submitted_value(&self) -> Option<&serde_json::Value> {
947 match &self.outcome {
948 TurnOutcome::Finished(lash_core::TurnFinish::SubmittedValue { value }) => Some(value),
949 _ => None,
950 }
951 }
952
953 pub fn tool_value(&self) -> Option<(&str, &serde_json::Value)> {
954 match &self.outcome {
955 TurnOutcome::Finished(lash_core::TurnFinish::ToolValue { tool_name, value }) => {
956 Some((tool_name.as_str(), value))
957 }
958 _ => None,
959 }
960 }
961
962 pub fn is_success(&self) -> bool {
963 matches!(
964 self.outcome,
965 TurnOutcome::Finished(_) | TurnOutcome::AgentFrameSwitch { .. }
966 )
967 }
968}
969
970#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
971pub struct TurnOutput {
972 pub result: TurnResult,
973 pub activities: Vec<TurnActivity>,
974}
975
976impl TurnOutput {
977 pub fn assistant_message(&self) -> Option<&str> {
978 self.result.assistant_message()
979 }
980
981 pub fn submitted_value(&self) -> Option<&serde_json::Value> {
982 self.result.submitted_value()
983 }
984
985 pub fn tool_value(&self) -> Option<(&str, &serde_json::Value)> {
986 self.result.tool_value()
987 }
988
989 pub fn is_success(&self) -> bool {
990 self.result.is_success()
991 }
992}
993
994struct BorrowedTurnActivityFanout<'a> {
995 live: &'a dyn TurnActivitySink,
996 collector: &'a RunActivityCollector,
997}
998
999#[async_trait]
1000impl TurnActivitySink for BorrowedTurnActivityFanout<'_> {
1001 async fn emit(&self, activity: TurnActivity) {
1002 self.live.emit(activity.clone()).await;
1003 self.collector.emit(activity).await;
1004 }
1005}
1006
1007#[derive(Default)]
1008pub(crate) struct RunActivityCollector {
1009 activities: Arc<StdMutex<Vec<TurnActivity>>>,
1010}
1011
1012impl RunActivityCollector {
1013 fn into_activities(self) -> Vec<TurnActivity> {
1014 self.activities
1015 .lock()
1016 .expect("run activity collector lock")
1017 .clone()
1018 }
1019
1020 #[cfg(test)]
1021 pub(crate) fn snapshot(&self) -> Vec<TurnActivity> {
1022 self.activities
1023 .lock()
1024 .expect("run activity collector lock")
1025 .clone()
1026 }
1027}
1028
1029#[async_trait]
1030impl TurnActivitySink for RunActivityCollector {
1031 async fn emit(&self, activity: TurnActivity) {
1032 self.activities
1033 .lock()
1034 .expect("run activity collector lock")
1035 .push(activity);
1036 }
1037}
1038
1039pub struct TurnActivityFanout {
1040 sinks: Vec<Arc<dyn TurnActivitySink>>,
1041}
1042
1043impl TurnActivityFanout {
1044 pub fn new(sinks: impl IntoIterator<Item = Arc<dyn TurnActivitySink>>) -> Self {
1045 Self {
1046 sinks: sinks.into_iter().collect(),
1047 }
1048 }
1049}
1050
1051#[async_trait]
1052impl TurnActivitySink for TurnActivityFanout {
1053 async fn emit(&self, activity: TurnActivity) {
1054 for sink in &self.sinks {
1055 sink.emit(activity.clone()).await;
1056 }
1057 }
1058}
1059
1060pub fn message_text(message: &Message) -> String {
1061 message
1062 .parts
1063 .iter()
1064 .map(|part| part.content.as_str())
1065 .collect::<Vec<_>>()
1066 .join("\n")
1067}
1068
1069pub fn message_role(message: &Message) -> &'static str {
1070 match message.role {
1071 MessageRole::User => "user",
1072 MessageRole::Assistant => "assistant",
1073 MessageRole::System => "system",
1074 MessageRole::Event => "event",
1075 }
1076}