1use super::*;
2
3fn trace_fields_from_outcome(
4 outcome: &TurnOutcome,
5) -> (
6 &'static str,
7 &'static str,
8 Option<lash_trace::TraceAgentFrameSwitch>,
9) {
10 match outcome {
11 TurnOutcome::Finished(TurnFinish::AssistantMessage { .. }) => {
12 ("completed", "assistant_message", None)
13 }
14 TurnOutcome::Finished(TurnFinish::SubmittedValue { .. }) => {
15 ("completed", "submitted_value", None)
16 }
17 TurnOutcome::Finished(TurnFinish::ToolValue { .. }) => ("completed", "tool_value", None),
18 TurnOutcome::AgentFrameSwitch { frame_id } => (
19 "completed",
20 "agent_frame_switch",
21 Some(lash_trace::TraceAgentFrameSwitch {
22 frame_id: frame_id.clone(),
23 }),
24 ),
25 TurnOutcome::Stopped(stop) => ("failed", trace_stop_reason(stop), None),
26 }
27}
28
29fn trace_stop_reason(stop: &TurnStop) -> &'static str {
30 match stop {
31 TurnStop::Cancelled => "cancelled",
32 TurnStop::Incomplete => "incomplete",
33 TurnStop::InvalidInput => "invalid_input",
34 TurnStop::MaxTurns => "max_turns",
35 TurnStop::ToolFailure => "tool_failure",
36 TurnStop::ProviderError => "provider_error",
37 TurnStop::PluginAbort => "plugin_abort",
38 TurnStop::RuntimeError => "runtime_error",
39 TurnStop::SubmittedError { .. } => "submitted_error",
40 TurnStop::ToolError { .. } => "tool_error",
41 }
42}
43
44fn session_head_refresh_error(err: SessionError) -> RuntimeError {
45 RuntimeError::new(
46 RuntimeErrorCode::Other("session_head_refresh".to_string()),
47 err.to_string(),
48 )
49}
50
51fn queued_work_payload_type(payload: &crate::QueuedWorkPayload) -> &'static str {
52 match payload {
53 crate::QueuedWorkPayload::TurnInput { .. } => "turn_input",
54 crate::QueuedWorkPayload::ProcessWake { .. } => "process_wake",
55 crate::QueuedWorkPayload::HostEvent { .. } => "host_event",
56 crate::QueuedWorkPayload::Timer { .. } => "timer",
57 crate::QueuedWorkPayload::SessionCommand { command } => command.kind(),
58 }
59}
60
61fn queued_work_batch_ids(claim: &crate::QueuedWorkClaim) -> Vec<String> {
62 claim
63 .batches
64 .iter()
65 .map(|batch| batch.batch_id.clone())
66 .collect()
67}
68
69fn turn_phase_id(parent_turn_id: &str, phase: &str) -> String {
70 format!("{parent_turn_id}:{phase}")
71}
72
73fn scoped_child_turn_controller<'run>(
74 scoped_effect_controller: &'run ScopedEffectController<'_>,
75 session_id: &str,
76 turn_id: &str,
77) -> Result<ScopedEffectController<'run>, RuntimeError> {
78 ScopedEffectController::borrowed(
79 scoped_effect_controller.controller(),
80 EffectScope::turn(session_id, turn_id),
81 )
82}
83
84pub(in crate::runtime) fn queued_work_trace_payload(
85 boundary: crate::QueuedWorkClaimBoundary,
86 claim: &crate::QueuedWorkClaim,
87 causes: &[crate::TurnCause],
88) -> serde_json::Value {
89 serde_json::json!({
90 "boundary": boundary,
91 "claim_id": claim.claim_id,
92 "owner_id": claim.owner_id,
93 "batch_ids": queued_work_batch_ids(claim),
94 "payload_types": claim.batches.iter()
95 .flat_map(|batch| batch.items.iter())
96 .map(|item| queued_work_payload_type(&item.payload))
97 .collect::<Vec<_>>(),
98 "causes": causes,
99 })
100}
101
102pub(in crate::runtime) fn queued_work_completion_trace_payload(
103 completions: &[crate::QueuedWorkCompletion],
104) -> serde_json::Value {
105 serde_json::json!({
106 "claims": completions.iter().map(|completion| {
107 serde_json::json!({
108 "session_id": completion.session_id,
109 "claim_id": completion.claim_id,
110 "batch_ids": completion.batch_ids,
111 })
112 }).collect::<Vec<_>>(),
113 })
114}
115
116async fn emit_queued_work_started_to_sink(
117 events: &dyn TurnActivitySink,
118 boundary: crate::QueuedWorkClaimBoundary,
119 claim: &crate::QueuedWorkClaim,
120 causes: Vec<crate::TurnCause>,
121) {
122 emit_turn_activity_to_sink(
123 events,
124 TurnActivity::independent(TurnEvent::QueuedWorkStarted {
125 boundary,
126 batch_ids: queued_work_batch_ids(claim),
127 causes,
128 }),
129 )
130 .await;
131}
132
133pub(in crate::runtime) async fn send_queued_work_started_event(
134 event_tx: &mpsc::Sender<RuntimeStreamEvent>,
135 boundary: crate::QueuedWorkClaimBoundary,
136 claim: &crate::QueuedWorkClaim,
137 causes: Vec<crate::TurnCause>,
138) {
139 send_turn_activity(
140 event_tx,
141 TurnActivityId::fresh(),
142 TurnEvent::QueuedWorkStarted {
143 boundary,
144 batch_ids: queued_work_batch_ids(claim),
145 causes,
146 },
147 )
148 .await;
149}
150
151struct TurnFinishInput {
152 turn_pipeline: TurnBoundary,
153 assembler: TurnAssembler,
154 new_messages: crate::MessageSequence,
155 policy: RuntimeSessionPolicy,
156 turn_index: usize,
157 queued_work_completions: Vec<crate::QueuedWorkCompletion>,
158 tool_host_events: Vec<crate::tool_dispatch::ToolHostEventEffectOutcome>,
159 trace_turn_id: String,
160}
161
162impl LashRuntime {
163 fn max_context_tokens(&self) -> usize {
164 self.state.effective_policy().context_window_tokens()
165 }
166 #[doc(hidden)]
167 pub fn set_turn_phase_probe(&mut self, probe: Arc<dyn RuntimeTurnPhaseProbe>) {
168 self.turn_phase_probe = Some(probe);
169 }
170
171 fn mark_phase_begin(&self, phase: RuntimeTurnPhase) {
172 if let Some(probe) = self.turn_phase_probe.as_ref() {
173 probe.begin(phase);
174 }
175 }
176
177 fn mark_phase_end(&self, phase: RuntimeTurnPhase) {
178 if let Some(probe) = self.turn_phase_probe.as_ref() {
179 probe.end(phase);
180 }
181 }
182
183 async fn finish_turn(
184 &mut self,
185 finish: TurnFinishInput,
186 events: &dyn EventSink,
187 scoped_effect_controller: &ScopedEffectController<'_>,
188 cancel_state: &CancellationToken,
189 ) -> Result<AssembledTurn, RuntimeError> {
190 let TurnFinishInput {
191 mut turn_pipeline,
192 assembler,
193 new_messages,
194 policy,
195 turn_index,
196 queued_work_completions,
197 tool_host_events,
198 trace_turn_id,
199 } = finish;
200 self.policy = self.state.effective_policy().clone();
201 turn_pipeline.state_mut().policy = self.policy.clone();
202 turn_pipeline.state_mut().turn_index = turn_index;
203
204 let mut turn_usage_delta = {
205 let mut ledger = self.shared_token_ledger.lock().expect("token ledger lock");
206 std::mem::take(&mut *ledger)
207 };
208 if assembler.token_usage.total() > 0 || assembler.token_usage.cached_input_tokens > 0 {
209 turn_usage_delta.push(TokenLedgerEntry {
210 source: "turn".to_string(),
211 model: policy.model.id.clone(),
212 usage: assembler.token_usage.clone(),
213 });
214 }
215 let turn_usage_delta = merge_usage_delta_entries(turn_usage_delta);
216
217 turn_pipeline.finalize_turn_read_state(
218 new_messages,
219 &assembler.tool_calls,
220 cancel_state.is_cancelled(),
221 );
222 turn_pipeline.append_tool_host_events(&tool_host_events);
223 if assembler.token_usage.total() > 0 || assembler.token_usage.cached_input_tokens > 0 {
224 turn_pipeline.state_mut().token_usage = assembler.token_usage.clone();
225 }
226
227 let last_prompt_usage = assembler
228 .last_llm_usage()
229 .and_then(|usage| normalize_prompt_usage(policy.provider(), usage));
230 turn_pipeline.state_mut().last_prompt_usage = last_prompt_usage;
231 let assembled_state = turn_pipeline.export_state_for_assembly();
232 let assembled = assembler.finish(
233 assembled_state,
234 cancel_state.is_cancelled(),
235 None,
236 &self.host.core.control.termination,
237 );
238
239 let Some(session) = self.session.as_ref() else {
240 self.state.apply_snapshot(&assembled.state);
241 self.emit_completed_turn_trace(&assembled.state, &assembled.outcome, &trace_turn_id);
242 return Ok(assembled);
243 };
244
245 let plugins = Arc::clone(session.plugins());
246 let manager = match self.runtime_session_services_for_turn(None) {
247 Ok(manager) => manager,
248 Err(err) => {
249 return Err(RuntimeError::new(
250 RuntimeErrorCode::PluginSessionManager,
251 err.to_string(),
252 ));
253 }
254 };
255
256 self.mark_phase_begin(RuntimeTurnPhase::FinalizeTurn);
257 let finalized = match plugins
258 .finalize_turn_with_phase_probe(
259 assembled,
260 manager.state_service(),
261 manager.lifecycle_service(),
262 manager.graph_service(),
263 self.turn_phase_probe.clone(),
264 )
265 .await
266 {
267 Ok(finalized) => finalized,
268 Err(err) => {
269 self.mark_phase_end(RuntimeTurnPhase::FinalizeTurn);
270 return Err(RuntimeError::new(
271 RuntimeErrorCode::PluginFinalizeTurn,
272 err.to_string(),
273 ));
274 }
275 };
276 self.mark_phase_end(RuntimeTurnPhase::FinalizeTurn);
277
278 let mut returned_turn = finalized.turn;
279 self.mark_phase_begin(RuntimeTurnPhase::PersistTurn);
280 self.mark_phase_begin(RuntimeTurnPhase::FinalCommit);
281 let queued_work_completion_trace = queued_work_completions.clone();
282 if let Err(err) = turn_pipeline
283 .final_commit(
284 &mut returned_turn,
285 self.session.as_mut(),
286 &turn_usage_delta,
287 Some(&trace_turn_id),
288 queued_work_completions,
289 )
290 .await
291 {
292 self.mark_phase_end(RuntimeTurnPhase::FinalCommit);
293 self.mark_phase_end(RuntimeTurnPhase::PersistTurn);
294 return Err(err);
295 }
296 self.mark_phase_end(RuntimeTurnPhase::FinalCommit);
297
298 emit_session_events_to_sink(events, finalized.events).await;
299 self.state = turn_pipeline.into_final_state();
300 if matches!(returned_turn.outcome, TurnOutcome::AgentFrameSwitch { .. })
301 && let Some(session) = self.session.as_mut()
302 {
303 let protocol_session = Arc::clone(session.plugins().protocol_session());
304 let session_id = self.state.session_id.clone();
305 protocol_session
306 .restore_session(
307 crate::plugin::ProtocolSessionContext::new(session, &session_id),
308 &self.state,
309 )
310 .await
311 .map_err(|err| {
312 RuntimeError::new(
313 RuntimeErrorCode::Other("protocol_restore_session".to_string()),
314 err.to_string(),
315 )
316 })?;
317 }
318 if !queued_work_completion_trace.is_empty() {
319 crate::trace::emit_trace(
320 &self.host.core.tracing.trace_sink,
321 &self.host.core.tracing.trace_context,
322 lash_trace::TraceContext::default()
323 .for_session(returned_turn.state.session_id.clone())
324 .for_turn_index(returned_turn.state.turn_index)
325 .for_turn(trace_turn_id.clone()),
326 lash_trace::TraceEvent::Custom {
327 name: "queued_work.completed".to_string(),
328 payload: queued_work_completion_trace_payload(&queued_work_completion_trace),
329 },
330 );
331 }
332 self.mark_phase_begin(RuntimeTurnPhase::PostPersistHooks);
333 self.emit_turn_persisted_event(&returned_turn, scoped_effect_controller, &trace_turn_id)
334 .await?;
335 self.mark_phase_end(RuntimeTurnPhase::PostPersistHooks);
336 self.mark_phase_end(RuntimeTurnPhase::PersistTurn);
337
338 self.emit_completed_turn_trace(
339 &returned_turn.state,
340 &returned_turn.outcome,
341 &trace_turn_id,
342 );
343 Ok(returned_turn)
344 }
345
346 fn emit_completed_turn_trace(
347 &self,
348 state: &SessionSnapshot,
349 outcome: &TurnOutcome,
350 trace_turn_id: &str,
351 ) {
352 if self.host.core.tracing.trace_sink.is_none() {
353 return;
354 }
355
356 let (status, done_reason, agent_frame_switch) = trace_fields_from_outcome(outcome);
357 crate::trace::emit_trace(
358 &self.host.core.tracing.trace_sink,
359 &self.host.core.tracing.trace_context,
360 lash_trace::TraceContext::default()
361 .for_session(state.session_id.clone())
362 .for_turn_index(state.turn_index)
363 .for_turn(trace_turn_id.to_string()),
364 lash_trace::TraceEvent::TurnCompleted {
365 status: status.to_string(),
366 done_reason: done_reason.to_string(),
367 agent_frame_switch,
368 },
369 );
370 }
371
372 async fn emit_turn_persisted_event(
373 &self,
374 returned_turn: &AssembledTurn,
375 scoped_effect_controller: &ScopedEffectController<'_>,
376 trace_turn_id: &str,
377 ) -> Result<(), RuntimeError> {
378 let Some(session) = self.session.as_ref() else {
379 return Ok(());
380 };
381 let Ok(manager) = self.runtime_session_services() else {
382 return Ok(());
383 };
384 let phase_turn_id = turn_phase_id(trace_turn_id, "turn-persisted");
385 let phase_controller = scoped_child_turn_controller(
386 scoped_effect_controller,
387 &self.state.session_id,
388 &phase_turn_id,
389 )?;
390 let direct_completions = manager.direct_completion_client(
391 RuntimeEffectControllerHandle::borrowed(phase_controller),
392 Some(phase_turn_id),
393 );
394
395 session
396 .plugins()
397 .emit_runtime_event_with_phase_probe(
398 crate::PluginLifecycleEvent::TurnPersisted(Box::new(
399 crate::SessionStateChangedContext {
400 session_id: self.state.session_id.clone(),
401 state: crate::SessionReadView::from_snapshot(&returned_turn.state),
402 sessions: manager.state_service(),
403 session_graph: manager.graph_service(),
404 direct_completions,
405 },
406 )),
407 self.turn_phase_probe.clone(),
408 )
409 .await;
410 Ok(())
411 }
412
413 pub async fn stream_turn(
415 &mut self,
416 input: TurnInput,
417 opts: TurnOptions<'_>,
418 ) -> Result<AssembledTurn, RuntimeError> {
419 let turn_id = input
420 .trace_turn_id
421 .clone()
422 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
423 let cancel = opts.cancel.clone();
424 let effect_host = Arc::clone(&self.host.core.control.effect_host);
425 let scoped_effect_controller = match opts.scoped_effect_controller() {
426 Some(scope) => scope,
427 None => effect_host.scoped(EffectScope::turn(&self.state.session_id, &turn_id))?,
428 };
429 self.stream_turn_with_scoped_effect_controller_inner(
430 input,
431 opts.events_or_noop(),
432 opts.turn_events_or_noop(),
433 scoped_effect_controller,
434 cancel,
435 None,
436 )
437 .await
438 }
439
440 pub async fn stream_next_queued_work(
441 &mut self,
442 opts: TurnOptions<'_>,
443 ) -> Result<Option<AssembledTurn>, RuntimeError> {
444 self.stream_queued_work(opts, None).await
445 }
446
447 pub async fn stream_selected_queued_work(
448 &mut self,
449 opts: TurnOptions<'_>,
450 batch_ids: &[String],
451 ) -> Result<Option<AssembledTurn>, RuntimeError> {
452 self.stream_queued_work(opts, Some(batch_ids)).await
453 }
454
455 async fn stream_queued_work(
456 &mut self,
457 opts: TurnOptions<'_>,
458 selected_batch_ids: Option<&[String]>,
459 ) -> Result<Option<AssembledTurn>, RuntimeError> {
460 if self.drain_next_session_command().await?.is_some() {
461 return Ok(None);
462 }
463 let Some(store) = self
464 .session
465 .as_ref()
466 .and_then(|session| session.history_store())
467 else {
468 return Ok(None);
469 };
470 let claim = if let Some(batch_ids) = selected_batch_ids {
471 store
472 .claim_ready_queued_work_by_batch_ids(
473 &self.state.session_id,
474 &self.runtime_scope_id,
475 crate::QueuedWorkClaimBoundary::Idle,
476 crate::QUEUED_WORK_CLAIM_TTL_MS,
477 batch_ids,
478 )
479 .await
480 } else {
481 store
482 .claim_ready_queued_work(
483 &self.state.session_id,
484 &self.runtime_scope_id,
485 crate::QueuedWorkClaimBoundary::Idle,
486 crate::QUEUED_WORK_CLAIM_TTL_MS,
487 64,
488 )
489 .await
490 }
491 .map_err(|err| RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string()))?;
492 let Some(claim) = claim else {
493 return Ok(None);
494 };
495 let mut work = claim.materialize_for_turn();
496 let turn_id = work
497 .input
498 .trace_turn_id
499 .clone()
500 .or_else(|| opts.effect_scope_id().map(ToOwned::to_owned))
501 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
502 work.input.trace_turn_id = Some(turn_id.clone());
503 let causes = work.turn_causes.clone();
504 emit_queued_work_started_to_sink(
505 opts.turn_events_or_noop(),
506 crate::QueuedWorkClaimBoundary::Idle,
507 &claim,
508 causes.clone(),
509 )
510 .await;
511 crate::trace::emit_trace(
512 &self.host.core.tracing.trace_sink,
513 &self.host.core.tracing.trace_context,
514 lash_trace::TraceContext::default()
515 .for_session(self.state.session_id.clone())
516 .for_turn_index(self.state.turn_index + 1)
517 .for_turn(turn_id.clone()),
518 lash_trace::TraceEvent::Custom {
519 name: "queued_work.claimed".to_string(),
520 payload: queued_work_trace_payload(
521 crate::QueuedWorkClaimBoundary::Idle,
522 &claim,
523 &causes,
524 ),
525 },
526 );
527 let cancel = opts.cancel.clone();
528 let effect_host = Arc::clone(&self.host.core.control.effect_host);
529 let scoped_effect_controller = match opts.scoped_effect_controller() {
530 Some(scope) => scope,
531 None => effect_host.scoped(EffectScope::queue_drain(
532 &self.state.session_id,
533 &claim.claim_id,
534 ))?,
535 };
536 self.stream_turn_with_scoped_effect_controller_inner(
537 work.input,
538 opts.events_or_noop(),
539 opts.turn_events_or_noop(),
540 scoped_effect_controller,
541 cancel,
542 Some(claim),
543 )
544 .await
545 .map(Some)
546 }
547
548 fn ensure_durable_store_facets_for_scope(
556 &self,
557 scoped_effect_controller: &ScopedEffectController<'_>,
558 ) -> Result<(), RuntimeError> {
559 if scoped_effect_controller.controller().durability_tier() != crate::DurabilityTier::Durable
560 {
561 return Ok(());
562 }
563 if self
564 .host
565 .core
566 .durability
567 .attachment_store
568 .persistence()
569 .durability_tier()
570 != crate::DurabilityTier::Durable
571 {
572 return Err(RuntimeError::durable_store_required(
573 crate::DurableStoreFacet::AttachmentStore,
574 ));
575 }
576 if self
577 .host
578 .core
579 .durability
580 .lashlang_artifact_store
581 .durability_tier()
582 != crate::DurabilityTier::Durable
583 {
584 return Err(RuntimeError::durable_store_required(
585 crate::DurableStoreFacet::ArtifactStore,
586 ));
587 }
588 if let Some(store) = self
589 .session
590 .as_ref()
591 .and_then(|session| session.history_store())
592 && store.durability_tier() != crate::DurabilityTier::Durable
593 {
594 return Err(RuntimeError::durable_store_required(
595 crate::DurableStoreFacet::SessionStore,
596 ));
597 }
598 if let Some(process_registry) = self.host.process_registry.as_ref()
599 && process_registry.durability_tier() != crate::DurabilityTier::Durable
600 {
601 return Err(RuntimeError::durable_store_required(
602 crate::DurableStoreFacet::ProcessRegistry,
603 ));
604 }
605 Ok(())
606 }
607
608 async fn stream_turn_with_scoped_effect_controller_inner(
609 &mut self,
610 mut input: TurnInput,
611 events: &dyn EventSink,
612 turn_events: &dyn TurnActivitySink,
613 scoped_effect_controller: ScopedEffectController<'_>,
614 cancel: CancellationToken,
615 queued_claim: Option<crate::QueuedWorkClaim>,
616 ) -> Result<AssembledTurn, RuntimeError> {
617 if queued_claim.is_none() {
618 while self.drain_next_session_command().await?.is_some() {}
619 }
620 if let Some(input_turn_id) = input.trace_turn_id.as_deref()
621 && scoped_effect_controller
622 .effect_scope()
623 .validates_turn_trace_id()
624 && input_turn_id != scoped_effect_controller.scope_id()
625 {
626 return Err(RuntimeError::new(
627 RuntimeErrorCode::EffectScopeTurnIdMismatch,
628 format!(
629 "input trace_turn_id `{input_turn_id}` does not match effect scope id `{}`",
630 scoped_effect_controller.scope_id()
631 ),
632 ));
633 }
634 self.ensure_durable_store_facets_for_scope(&scoped_effect_controller)?;
635 input
636 .trace_turn_id
637 .get_or_insert_with(|| scoped_effect_controller.scope_id().to_string());
638 self.stream_turn_inner(
639 input.clone(),
640 events,
641 turn_events,
642 scoped_effect_controller,
643 cancel.clone(),
644 queued_claim,
645 )
646 .await
647 }
648
649 pub async fn stream_turn_with_agent_frames(
657 &mut self,
658 input: TurnInput,
659 opts: TurnOptions<'_>,
660 ) -> Result<AgentFrameRun, RuntimeError> {
661 let follow_trace_turn_id = input
662 .trace_turn_id
663 .clone()
664 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
665 let cancel = opts.cancel.clone();
666 let effect_host = Arc::clone(&self.host.core.control.effect_host);
667 let scoped_effect_controller = match opts.scoped_effect_controller() {
668 Some(scope) => scope,
669 None => effect_host.scoped(EffectScope::turn(
670 &self.state.session_id,
671 &follow_trace_turn_id,
672 ))?,
673 };
674 self.stream_turn_with_agent_frames_inner(
675 input,
676 opts.events_or_noop(),
677 opts.turn_events_or_noop(),
678 scoped_effect_controller,
679 cancel,
680 )
681 .await
682 }
683
684 async fn stream_turn_with_agent_frames_inner(
685 &mut self,
686 mut input: TurnInput,
687 events: &dyn EventSink,
688 turn_events: &dyn TurnActivitySink,
689 scoped_effect_controller: ScopedEffectController<'_>,
690 cancel: CancellationToken,
691 ) -> Result<AgentFrameRun, RuntimeError> {
692 if let Some(input_turn_id) = input.trace_turn_id.as_deref()
693 && scoped_effect_controller
694 .effect_scope()
695 .validates_turn_trace_id()
696 && input_turn_id != scoped_effect_controller.scope_id()
697 {
698 return Err(RuntimeError::new(
699 RuntimeErrorCode::EffectScopeTurnIdMismatch,
700 format!(
701 "input trace_turn_id `{input_turn_id}` does not match effect scope id `{}`",
702 scoped_effect_controller.scope_id()
703 ),
704 ));
705 }
706 let follow_protocol_turn_options = input.protocol_turn_options.clone();
707 let follow_turn_context = input.turn_context.clone();
708 let follow_trace_turn_id = input
709 .trace_turn_id
710 .clone()
711 .unwrap_or_else(|| scoped_effect_controller.scope_id().to_string());
712 input
713 .trace_turn_id
714 .get_or_insert(follow_trace_turn_id.clone());
715 let mut turns = Vec::new();
716 loop {
717 let turn_trace_turn_id = agent_frame_follow_turn_id(&follow_trace_turn_id, turns.len());
718 input.trace_turn_id = Some(turn_trace_turn_id.clone());
719 let turn_effect_controller = if turns.is_empty() {
720 scoped_effect_controller.clone()
721 } else {
722 ScopedEffectController::borrowed(
723 scoped_effect_controller.controller(),
724 EffectScope::turn(&self.state.session_id, &turn_trace_turn_id),
725 )?
726 };
727 let turn = self
728 .stream_turn_with_scoped_effect_controller_inner(
729 input,
730 events,
731 turn_events,
732 turn_effect_controller,
733 cancel.clone(),
734 None,
735 )
736 .await?;
737 let switched_frame_id = match &turn.outcome {
738 TurnOutcome::AgentFrameSwitch { frame_id } => Some(frame_id.clone()),
739 _ => None,
740 };
741 let next_task = switched_frame_id
742 .as_ref()
743 .and_then(|frame_id| frame_switch_task(&turn, frame_id));
744 turns.push(turn);
745
746 let Some(_frame_id) = switched_frame_id else {
747 return Ok(AgentFrameRun { turns });
748 };
749
750 let task = next_task.ok_or_else(|| {
751 RuntimeError::new(
752 RuntimeErrorCode::Other("agent_frame_missing_task".to_string()),
753 "agent frame switch did not provide a task",
754 )
755 })?;
756 input = turn_input_from_text(task);
757 input.protocol_turn_options = follow_protocol_turn_options.clone();
758 input.turn_context = follow_turn_context.clone();
759 }
760 }
761
762 async fn stream_turn_inner(
763 &mut self,
764 mut input: TurnInput,
765 events: &dyn EventSink,
766 turn_events: &dyn TurnActivitySink,
767 scoped_effect_controller: ScopedEffectController<'_>,
768 cancel: CancellationToken,
769 queued_claim: Option<crate::QueuedWorkClaim>,
770 ) -> Result<AssembledTurn, RuntimeError> {
771 self.refresh_session_graph_from_store()
772 .await
773 .map_err(session_head_refresh_error)?;
774 let input_trace_turn_id = input.trace_turn_id.clone();
775 let queued_turn_work = queued_claim
776 .as_ref()
777 .map(crate::QueuedWorkClaim::materialize_for_turn);
778 if let Some(work) = queued_turn_work.as_ref()
779 && input.items.is_empty()
780 && input.image_blobs.is_empty()
781 {
782 input = work.input.clone();
783 if input.trace_turn_id.is_none() {
784 input.trace_turn_id = input_trace_turn_id;
785 }
786 }
787 if self
788 .session
789 .as_ref()
790 .and_then(|session| session.history_store())
791 .is_some()
792 {
793 ensure_durable_effect_input(&input)?;
794 }
795 if let Some(extension) = &input.protocol_extension
796 && let Some(session) = self.session.as_ref()
797 {
798 let protocol_session = std::sync::Arc::clone(session.plugins().protocol_session());
799 protocol_session
800 .validate_turn_extension(extension)
801 .await
802 .map_err(|err| {
803 RuntimeError::new(RuntimeErrorCode::ProtocolTurnExtension, err.to_string())
804 })?;
805 }
806 let previous_prompt_usage = self.state.last_prompt_usage.clone();
807 let normalized = match self.normalize_input_items(&input.items, &input.image_blobs) {
808 Ok(items) => items,
809 Err(e) => {
810 self.state.last_prompt_usage = None;
811 let mut assembler = TurnAssembler::default();
812 let error_event = SessionEvent::Error {
813 message: e.clone(),
814 envelope: Some(crate::session_model::ErrorEnvelope {
815 kind: "input_validation".to_string(),
816 code: Some("invalid_turn_input".to_string()),
817 terminal_reason: None,
818 user_message: e.clone(),
819 raw: None,
820 }),
821 };
822 assembler.push(&error_event);
823 emit_turn_activity_to_sink(
824 turn_events,
825 TurnActivity::independent(TurnEvent::Error { message: e }),
826 )
827 .await;
828 emit_session_event_to_sink(events, error_event).await;
829 let outcome_event = SessionEvent::TurnOutcome {
830 outcome: TurnOutcome::Stopped(TurnStop::InvalidInput),
831 };
832 assembler.push(&outcome_event);
833 emit_session_event_to_sink(events, outcome_event).await;
834 assembler.push(&SessionEvent::Done);
835 emit_session_event_to_sink(events, SessionEvent::Done).await;
836 return Ok(assembler.finish(
837 self.state.to_snapshot(),
838 false,
839 None,
840 &self.host.core.control.termination,
841 ));
842 }
843 };
844 let turn_index = self.state.turn_index + 1;
845 let trace_turn_id = input
846 .trace_turn_id
847 .clone()
848 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
849 if self.host.core.tracing.trace_sink.is_some() {
850 let mut trace_metadata = std::collections::BTreeMap::new();
851 trace_metadata.insert(
852 "input_item_count".to_string(),
853 serde_json::json!(normalized.len()),
854 );
855 crate::trace::emit_trace(
856 &self.host.core.tracing.trace_sink,
857 &self.host.core.tracing.trace_context,
858 lash_trace::TraceContext::default()
859 .for_session(self.state.session_id.clone())
860 .for_turn_index(turn_index)
861 .for_turn(trace_turn_id.clone()),
862 lash_trace::TraceEvent::TurnStarted {
863 metadata: trace_metadata,
864 },
865 );
866 }
867
868 let base_read_model = self.state.read_model();
869 let base_messages = base_read_model.messages;
870 let base_render_cache = base_read_model.prompt_render_cache;
871 let mut turn_delta = Vec::new();
872 let initial_turn_causes = queued_turn_work
873 .as_ref()
874 .map(|work| work.turn_causes.clone())
875 .unwrap_or_default();
876 turn_delta.extend(
877 initial_turn_causes
878 .iter()
879 .map(crate::TurnCause::to_event_message),
880 );
881
882 let user_id = fresh_message_id();
883 let mut user_parts: Vec<Part> = Vec::new();
884 for item in normalized {
885 match item {
886 NormalizedItem::Text(text) => {
887 if text.is_empty() {
888 continue;
889 }
890 user_parts.push(Part {
891 id: format!("{}.p{}", user_id, user_parts.len()),
892 kind: PartKind::Text,
893 content: text,
894 attachment: None,
895 tool_call_id: None,
896 tool_name: None,
897 tool_replay: None,
898 prune_state: PruneState::Intact,
899 reasoning_meta: None,
900 response_meta: None,
901 });
902 }
903 NormalizedItem::Image(reference) => {
904 user_parts.push(Part {
905 id: format!("{}.p{}", user_id, user_parts.len()),
906 kind: PartKind::Image,
907 content: String::new(),
908 attachment: Some(crate::session_model::message::PartAttachment {
909 reference,
910 }),
911 tool_call_id: None,
912 tool_name: None,
913 tool_replay: None,
914 prune_state: PruneState::Intact,
915 reasoning_meta: None,
916 response_meta: None,
917 });
918 }
919 }
920 }
921 if user_parts.is_empty() && initial_turn_causes.is_empty() {
922 user_parts.push(Part {
923 id: format!("{}.p0", user_id),
924 kind: PartKind::Text,
925 content: String::new(),
926 attachment: None,
927 tool_call_id: None,
928 tool_name: None,
929 tool_replay: None,
930 prune_state: PruneState::Intact,
931 reasoning_meta: None,
932 response_meta: None,
933 });
934 }
935 if !user_parts.is_empty() {
936 reassign_part_ids(&user_id, &mut user_parts);
937 turn_delta.push(Message {
938 id: user_id.clone(),
939 role: MessageRole::User,
940 parts: shared_parts(user_parts),
941 origin: None,
942 });
943 }
944
945 let manager = self
946 .runtime_session_services_for_turn(None)
947 .map_err(|err| {
948 RuntimeError::new(RuntimeErrorCode::PluginSessionManager, err.to_string())
949 })?;
950 let plugin_session = self
951 .session
952 .as_ref()
953 .map(|s| Arc::clone(s.plugins()))
954 .ok_or_else(|| {
955 RuntimeError::new(
956 RuntimeErrorCode::ContextPrepareTurn,
957 "runtime session not available",
958 )
959 })?;
960 let prepare_phase_turn_id = turn_phase_id(&trace_turn_id, "prepare-turn");
961 let prepare_phase_controller = scoped_child_turn_controller(
962 &scoped_effect_controller,
963 &self.state.session_id,
964 &prepare_phase_turn_id,
965 )?;
966 let turn_ctx = crate::TurnTransformContext {
967 session_id: self.state.session_id.clone(),
968 state: self.read_view(),
969 prompt_usage: previous_prompt_usage.clone(),
970 max_context_tokens: Some(LashRuntime::max_context_tokens(self)),
971 sessions: manager.state_service(),
972 session_lifecycle: manager.lifecycle_service(),
973 session_graph: manager.graph_service(),
974 scoped_effect_controller: scoped_effect_controller.clone(),
975 direct_completions: manager.direct_completion_client(
976 RuntimeEffectControllerHandle::borrowed(prepare_phase_controller),
977 Some(prepare_phase_turn_id),
978 ),
979 };
980 self.mark_phase_begin(RuntimeTurnPhase::ContextTransform);
981 let prepared_context = plugin_session
982 .prepare_turn_context(
983 &turn_ctx,
984 crate::session_model::context::PreparedContext {
985 messages: crate::MessageSequence::from_base_and_delta(
986 base_messages,
987 turn_delta,
988 )
989 .with_base_render_cache(base_render_cache),
990 ..Default::default()
991 },
992 self.turn_phase_probe.clone(),
993 )
994 .await
995 .map_err(|err| {
996 RuntimeError::new(RuntimeErrorCode::ContextPrepareTurn, err.to_string())
997 })?;
998 self.mark_phase_end(RuntimeTurnPhase::ContextTransform);
999 drop(turn_ctx);
1004 let messages = prepared_context.messages;
1005 if let Some(session) = self.session.as_mut() {
1006 session
1007 .set_context_surface(
1008 prepared_context.tool_providers,
1009 prepared_context.prompt_contributions,
1010 prepared_context.include_base_tools,
1011 )
1012 .map_err(|err| {
1013 RuntimeError::new(
1014 RuntimeErrorCode::Other("session_tool_registry".to_string()),
1015 err.to_string(),
1016 )
1017 })?;
1018 }
1019
1020 self.state.last_prompt_usage = None;
1021
1022 self.stream_prepared_turn(
1023 messages,
1024 previous_prompt_usage,
1025 input.protocol_turn_options.clone(),
1026 input.protocol_extension.clone(),
1027 input.turn_context.clone(),
1028 initial_turn_causes,
1029 trace_turn_id,
1030 turn_index,
1031 events,
1032 turn_events,
1033 scoped_effect_controller,
1034 cancel,
1035 queued_claim,
1036 )
1037 .await
1038 }
1039
1040 pub async fn run_turn_assembled(
1042 &mut self,
1043 input: TurnInput,
1044 cancel: CancellationToken,
1045 ) -> Result<AssembledTurn, RuntimeError> {
1046 self.stream_turn(input, TurnOptions::new(cancel)).await
1047 }
1048
1049 #[allow(clippy::too_many_arguments)]
1051 pub async fn stream_prepared_turn(
1052 &mut self,
1053 messages: crate::MessageSequence,
1054 _previous_prompt_usage: Option<PromptUsage>,
1055 protocol_turn_options: Option<crate::ProtocolTurnOptions>,
1056 protocol_extension: Option<crate::ProtocolTurnExtensionHandle>,
1057 turn_context: crate::TurnContext,
1058 initial_turn_causes: Vec<crate::TurnCause>,
1059 trace_turn_id: String,
1060 turn_index: usize,
1061 events: &dyn EventSink,
1062 turn_events: &dyn TurnActivitySink,
1063 scoped_effect_controller: ScopedEffectController<'_>,
1064 cancel: CancellationToken,
1065 initial_queue_claim: Option<crate::QueuedWorkClaim>,
1066 ) -> Result<AssembledTurn, RuntimeError> {
1067 let (event_tx, mut event_rx) = mpsc::channel::<RuntimeStreamEvent>(100);
1068 let child_usage_event_relay = ChildUsageEventRelay::new(event_tx.clone());
1069 let mut turn_policy = self.state.effective_policy().clone();
1070 let turn_provider_override = turn_context.provider().cloned();
1071 if let Some(provider) = turn_provider_override.as_ref() {
1072 turn_policy.provider_id = provider.kind().to_string();
1073 }
1074 if let Some(model) = turn_context.model_spec() {
1075 turn_policy.model = model.clone();
1076 }
1077 let session_protocol_turn_options = self.state.effective_protocol_turn_options().clone();
1078 let effective_protocol_turn_options = protocol_turn_options
1079 .clone()
1080 .map(|options| session_protocol_turn_options.merged_with_override(&options))
1081 .unwrap_or(session_protocol_turn_options);
1082 let manager = self
1083 .runtime_session_services_for_turn(Some(child_usage_event_relay.clone()))
1084 .map_err(|err| {
1085 RuntimeError::new(RuntimeErrorCode::PluginSessionManager, err.to_string())
1086 })?;
1087 let plugins = {
1088 let session = self
1089 .session
1090 .as_ref()
1091 .expect("lash runtime session must be available");
1092 Arc::clone(session.plugins())
1093 };
1094 let mut assembler = TurnAssembler::new();
1095 self.mark_phase_begin(RuntimeTurnPhase::BeforeTurnHooks);
1096 let prepared = {
1102 let prepare_turn = plugins.prepare_turn_with_phase_probe(
1103 PrepareTurnRequest {
1104 session_id: self.state.session_id.clone(),
1105 state: crate::SessionReadView::from_runtime_state(
1106 &self.state,
1107 turn_policy.clone(),
1108 effective_protocol_turn_options.clone(),
1109 ),
1110 messages,
1111 sessions: manager.state_service(),
1112 session_lifecycle: manager.lifecycle_service(),
1113 session_graph: manager.graph_service(),
1114 turn_context: turn_context.clone(),
1115 },
1116 self.turn_phase_probe.clone(),
1117 );
1118 tokio::pin!(prepare_turn);
1119
1120 loop {
1121 tokio::select! {
1122 prepared = &mut prepare_turn => {
1123 let prepared = prepared.map_err(|err| {
1124 RuntimeError::new(RuntimeErrorCode::PluginPrepareTurn, err.to_string())
1125 })?;
1126 self.mark_phase_end(RuntimeTurnPhase::BeforeTurnHooks);
1127 break prepared;
1128 }
1129 maybe_event = event_rx.recv() => {
1130 if let Some(event) = maybe_event {
1131 emit_runtime_stream_event_to_sinks(
1132 events,
1133 turn_events,
1134 event,
1135 &mut assembler,
1136 )
1137 .await;
1138 }
1139 }
1140 }
1141 }
1142 };
1143 for event in &prepared.events {
1144 assembler.push(event);
1145 }
1146 emit_session_events_to_sink(events, prepared.events).await;
1147 if let Some(abort) = prepared.abort {
1148 drop(event_tx);
1149
1150 let mut turn_pipeline = TurnBoundary::from_state(self.state.clone());
1151 turn_pipeline.apply_prepared_messages(&prepared.messages);
1152 let state = turn_pipeline.into_final_state();
1153 let issue = TurnIssue {
1154 kind: "plugin".to_string(),
1155 code: Some(abort.code),
1156 terminal_reason: None,
1157 message: abort.message.clone(),
1158 raw: None,
1159 };
1160 let error_event = SessionEvent::Error {
1161 message: abort.message,
1162 envelope: Some(crate::session_model::ErrorEnvelope {
1163 kind: "plugin".to_string(),
1164 code: issue.code.clone(),
1165 terminal_reason: None,
1166 user_message: issue.message.clone(),
1167 raw: None,
1168 }),
1169 };
1170 assembler.push(&error_event);
1171 emit_turn_activity_to_sink(
1172 turn_events,
1173 TurnActivity::independent(TurnEvent::Error {
1174 message: issue.message.clone(),
1175 }),
1176 )
1177 .await;
1178 emit_session_event_to_sink(events, error_event).await;
1179 let outcome_event = SessionEvent::TurnOutcome {
1180 outcome: TurnOutcome::Stopped(TurnStop::PluginAbort),
1181 };
1182 assembler.push(&outcome_event);
1183 emit_session_event_to_sink(events, outcome_event).await;
1184 assembler.push(&SessionEvent::Done);
1185 emit_session_event_to_sink(events, SessionEvent::Done).await;
1186 return Ok(assembler.finish(
1187 state.to_snapshot(),
1188 cancel.is_cancelled(),
1189 Some(issue),
1190 &self.host.core.control.termination,
1191 ));
1192 }
1193 let mut turn_pipeline = TurnBoundary::from_state(self.state.clone());
1194 let store = self
1195 .session
1196 .as_ref()
1197 .and_then(|session| session.history_store());
1198 turn_pipeline
1199 .prepared_checkpoint(
1200 store.as_ref().map(|store| store.as_ref()),
1201 turn_policy.clone(),
1202 turn_index,
1203 &prepared.messages,
1204 self.session.as_mut(),
1205 )
1206 .await
1207 .map_err(|err| {
1208 RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
1209 })?;
1210 let resolved_turn_policy = if let Some(provider) = turn_provider_override {
1211 RuntimeSessionPolicy::from_provider(turn_policy.clone(), provider)
1212 .map_err(|err| RuntimeError::new("llm_provider", err.to_string()))?
1213 } else {
1214 self.host
1215 .resolve_session_policy(&self.state.session_id, turn_policy.clone())
1216 .map_err(|err| RuntimeError::new("llm_provider", err.to_string()))?
1217 };
1218 let manager = self
1219 .runtime_session_services_for_turn(Some(child_usage_event_relay.clone()))
1220 .map_err(|err| {
1221 RuntimeError::new(RuntimeErrorCode::PluginSessionManager, err.to_string())
1222 })?;
1223 let cancel_state = cancel.clone();
1224 let finish_scoped_effect_controller = scoped_effect_controller.clone();
1225 let session = self
1226 .session
1227 .take()
1228 .expect("lash runtime session must be available");
1229 let mut driver = RuntimeTurnDriver {
1230 session,
1231 policy: resolved_turn_policy,
1232 host: self.host.clone(),
1233 turn_id: scoped_effect_controller.scope_id().to_string(),
1234 scoped_effect_controller,
1235 session_id: self.state.session_id.clone(),
1236 turn_index,
1237 turn_pipeline,
1238 llm_stream_summaries: HashMap::new(),
1239 next_llm_ordinal: 0,
1240 session_services: manager,
1241 protocol_turn_options: effective_protocol_turn_options,
1242 protocol_extension,
1243 turn_context,
1244 turn_causes: initial_turn_causes,
1245 pending_queue_claims: initial_queue_claim.into_iter().collect(),
1246 checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
1247 pending_tool_host_events: Vec::new(),
1248 turn_phase_probe: self.turn_phase_probe.clone(),
1249 };
1250 let protocol_run_offset = 0;
1251 self.mark_phase_begin(RuntimeTurnPhase::EffectLoop);
1252 let run_result = drive_turn_to_completion(
1253 driver.run(prepared.messages, event_tx, cancel, protocol_run_offset),
1254 &mut event_rx,
1255 &mut assembler,
1256 &child_usage_event_relay,
1257 events,
1258 turn_events,
1259 )
1260 .await;
1261 let (new_messages, _new_protocol_iteration) = match run_result {
1262 Ok(result) => result,
1263 Err(err) => {
1264 self.mark_phase_end(RuntimeTurnPhase::EffectLoop);
1265 let RuntimeTurnDriver { session, .. } = driver;
1266 self.session = Some(session);
1267 return Err(err);
1268 }
1269 };
1270 self.mark_phase_end(RuntimeTurnPhase::EffectLoop);
1271 tracing::debug!(
1272 new_message_count = new_messages.len(),
1273 tool_call_count = assembler.tool_calls.len(),
1274 "runtime post-run_task"
1275 );
1276
1277 let RuntimeTurnDriver {
1278 session,
1279 policy,
1280 turn_pipeline,
1281 pending_queue_claims,
1282 pending_tool_host_events,
1283 ..
1284 } = driver;
1285 self.session = Some(session);
1286 self.finish_turn(
1287 TurnFinishInput {
1288 turn_pipeline,
1289 assembler,
1290 new_messages,
1291 policy,
1292 turn_index,
1293 queued_work_completions: pending_queue_claims
1294 .iter()
1295 .map(crate::QueuedWorkClaim::completion)
1296 .collect(),
1297 tool_host_events: pending_tool_host_events,
1298 trace_turn_id,
1299 },
1300 events,
1301 &finish_scoped_effect_controller,
1302 &cancel_state,
1303 )
1304 .await
1305 }
1306 fn normalize_input_items(
1307 &self,
1308 items: &[InputItem],
1309 image_blobs: &HashMap<String, Vec<u8>>,
1310 ) -> Result<Vec<NormalizedItem>, String> {
1311 normalize_input_items(
1312 items,
1313 image_blobs,
1314 self.host.core.durability.attachment_store.as_ref(),
1315 )
1316 }
1317}
1318
1319fn frame_switch_task(turn: &AssembledTurn, frame_id: &str) -> Option<String> {
1320 turn.tool_calls
1321 .iter()
1322 .find_map(|record| match &record.output.control {
1323 Some(crate::ToolControl::SwitchAgentFrame {
1324 frame_id: control_frame_id,
1325 task: Some(task),
1326 ..
1327 }) if control_frame_id == frame_id => Some(task.clone()),
1328 _ => None,
1329 })
1330}
1331
1332fn turn_input_from_text(text: String) -> TurnInput {
1333 TurnInput {
1334 items: vec![InputItem::Text { text }],
1335 image_blobs: HashMap::new(),
1336 protocol_turn_options: None,
1337 trace_turn_id: None,
1338 protocol_extension: None,
1339 turn_context: crate::TurnContext::default(),
1340 }
1341}
1342
1343fn agent_frame_follow_turn_id(root_turn_id: &str, completed_turn_count: usize) -> String {
1344 if completed_turn_count == 0 {
1345 root_turn_id.to_string()
1346 } else {
1347 format!("{root_turn_id}:agent-frame:{completed_turn_count}")
1348 }
1349}
1350
1351pub fn ensure_durable_effect_input(input: &TurnInput) -> Result<(), RuntimeError> {
1352 if input.protocol_extension.is_some() {
1353 return Err(RuntimeError::new(
1354 RuntimeErrorCode::DurableEffectLiveProtocolExtension,
1355 "durable effect hosts do not support live protocol_extension inputs; encode replayable data in protocol_turn_options or persisted plugin state",
1356 ));
1357 }
1358 input
1359 .turn_context
1360 .live_plugin_inputs()
1361 .durable_effect_rejection()?;
1362 Ok(())
1363}
1364
1365async fn emit_turn_activity_to_sink(events: &dyn TurnActivitySink, activity: TurnActivity) {
1366 if !events.is_noop() {
1367 events.emit(activity).await;
1368 }
1369}
1370
1371async fn drive_turn_to_completion<F>(
1381 run_future: F,
1382 event_rx: &mut mpsc::Receiver<RuntimeStreamEvent>,
1383 assembler: &mut TurnAssembler,
1384 child_usage_event_relay: &ChildUsageEventRelay,
1385 events: &dyn EventSink,
1386 turn_events: &dyn TurnActivitySink,
1387) -> Result<(crate::MessageSequence, usize), RuntimeError>
1388where
1389 F: std::future::Future<Output = Result<(crate::MessageSequence, usize), RuntimeError>>,
1390{
1391 let run_result = {
1392 tokio::pin!(run_future);
1393 loop {
1394 tokio::select! {
1395 maybe_event = event_rx.recv() => {
1396 if let Some(event) = maybe_event {
1397 emit_runtime_stream_event_to_sinks(
1398 events,
1399 turn_events,
1400 event,
1401 assembler,
1402 )
1403 .await;
1404 }
1405 }
1406 completed = &mut run_future => {
1407 child_usage_event_relay.clear();
1408 break completed;
1409 }
1410 }
1411 }
1412 };
1413 while let Some(event) = event_rx.recv().await {
1414 emit_runtime_stream_event_to_sinks(events, turn_events, event, assembler).await;
1415 }
1416 run_result
1417}
1418
1419async fn emit_runtime_stream_event_to_sinks(
1420 events: &dyn EventSink,
1421 turn_events: &dyn TurnActivitySink,
1422 event: RuntimeStreamEvent,
1423 assembler: &mut TurnAssembler,
1424) {
1425 match event {
1426 RuntimeStreamEvent::Session(event) => {
1427 assembler.push(&event);
1428 emit_session_event_to_sink(events, event).await;
1429 }
1430 RuntimeStreamEvent::Turn(activity) => {
1431 assembler.push_turn_activity(&activity);
1432 emit_turn_activity_to_sink(turn_events, activity).await;
1433 }
1434 }
1435}
1436
1437#[cfg(test)]
1438mod tests {
1439 use super::agent_frame_follow_turn_id;
1440
1441 #[test]
1442 fn agent_frame_follow_turn_ids_are_distinct_and_deterministic() {
1443 assert_eq!(agent_frame_follow_turn_id("root-turn", 0), "root-turn");
1444 assert_eq!(
1445 agent_frame_follow_turn_id("root-turn", 1),
1446 "root-turn:agent-frame:1"
1447 );
1448 assert_eq!(
1449 agent_frame_follow_turn_id("root-turn", 2),
1450 "root-turn:agent-frame:2"
1451 );
1452 }
1453}