1use super::*;
2
3fn trace_fields_from_outcome(
4 outcome: &TurnOutcome,
5) -> (
6 &'static str,
7 &'static str,
8 Option<lash_trace::TraceAgentFrameSwitch>,
9) {
10 match outcome {
11 TurnOutcome::Finished(TurnFinish::AssistantMessage { .. }) => {
12 ("completed", "assistant_message", None)
13 }
14 TurnOutcome::Finished(TurnFinish::SubmittedValue { .. }) => {
15 ("completed", "submitted_value", None)
16 }
17 TurnOutcome::Finished(TurnFinish::ToolValue { .. }) => ("completed", "tool_value", None),
18 TurnOutcome::AgentFrameSwitch { frame_id } => (
19 "completed",
20 "agent_frame_switch",
21 Some(lash_trace::TraceAgentFrameSwitch {
22 frame_id: frame_id.clone(),
23 }),
24 ),
25 TurnOutcome::Stopped(stop) => ("failed", trace_stop_reason(stop), None),
26 }
27}
28
29fn trace_stop_reason(stop: &TurnStop) -> &'static str {
30 match stop {
31 TurnStop::Cancelled => "cancelled",
32 TurnStop::Incomplete => "incomplete",
33 TurnStop::InvalidInput => "invalid_input",
34 TurnStop::MaxTurns => "max_turns",
35 TurnStop::ToolFailure => "tool_failure",
36 TurnStop::ProviderError => "provider_error",
37 TurnStop::PluginAbort => "plugin_abort",
38 TurnStop::RuntimeError => "runtime_error",
39 TurnStop::SubmittedError { .. } => "submitted_error",
40 TurnStop::ToolError { .. } => "tool_error",
41 }
42}
43
44fn session_head_refresh_error(err: SessionError) -> RuntimeError {
45 RuntimeError::new(
46 RuntimeErrorCode::Other("session_head_refresh".to_string()),
47 err.to_string(),
48 )
49}
50
51fn queued_work_payload_type(payload: &crate::QueuedWorkPayload) -> &'static str {
52 match payload {
53 crate::QueuedWorkPayload::TurnInput { .. } => "turn_input",
54 crate::QueuedWorkPayload::ProcessWake { .. } => "process_wake",
55 crate::QueuedWorkPayload::SessionCommand { command } => command.kind(),
56 }
57}
58
59fn queued_work_batch_ids(claim: &crate::QueuedWorkClaim) -> Vec<String> {
60 claim
61 .batches
62 .iter()
63 .map(|batch| batch.batch_id.clone())
64 .collect()
65}
66
67fn turn_phase_id(parent_turn_id: &str, phase: &str) -> String {
68 format!("{parent_turn_id}:{phase}")
69}
70
71fn scoped_child_turn_controller<'run>(
72 scoped_effect_controller: &'run ScopedEffectController<'_>,
73 session_id: &str,
74 turn_id: &str,
75) -> Result<ScopedEffectController<'run>, RuntimeError> {
76 ScopedEffectController::borrowed(
77 scoped_effect_controller.controller(),
78 EffectScope::turn(session_id, turn_id),
79 )
80}
81
82pub(in crate::runtime) fn queued_work_trace_payload(
83 boundary: crate::QueuedWorkClaimBoundary,
84 claim: &crate::QueuedWorkClaim,
85 causes: &[crate::TurnCause],
86) -> serde_json::Value {
87 serde_json::json!({
88 "boundary": boundary,
89 "claim_id": claim.claim_id,
90 "owner_id": claim.owner_id,
91 "batch_ids": queued_work_batch_ids(claim),
92 "payload_types": claim.batches.iter()
93 .flat_map(|batch| batch.items.iter())
94 .map(|item| queued_work_payload_type(&item.payload))
95 .collect::<Vec<_>>(),
96 "causes": causes,
97 })
98}
99
100pub(in crate::runtime) fn queued_work_completion_trace_payload(
101 completions: &[crate::QueuedWorkCompletion],
102) -> serde_json::Value {
103 serde_json::json!({
104 "claims": completions.iter().map(|completion| {
105 serde_json::json!({
106 "session_id": completion.session_id,
107 "claim_id": completion.claim_id,
108 "batch_ids": completion.batch_ids,
109 })
110 }).collect::<Vec<_>>(),
111 })
112}
113
114async fn emit_queued_work_started_to_sink(
115 events: &dyn TurnActivitySink,
116 boundary: crate::QueuedWorkClaimBoundary,
117 claim: &crate::QueuedWorkClaim,
118 causes: Vec<crate::TurnCause>,
119) {
120 emit_turn_activity_to_sink(
121 events,
122 TurnActivity::independent(TurnEvent::QueuedWorkStarted {
123 boundary,
124 batch_ids: queued_work_batch_ids(claim),
125 causes,
126 }),
127 )
128 .await;
129}
130
131pub(in crate::runtime) async fn send_queued_work_started_event(
132 event_tx: &mpsc::Sender<RuntimeStreamEvent>,
133 boundary: crate::QueuedWorkClaimBoundary,
134 claim: &crate::QueuedWorkClaim,
135 causes: Vec<crate::TurnCause>,
136) {
137 send_turn_activity(
138 event_tx,
139 TurnActivityId::fresh(),
140 TurnEvent::QueuedWorkStarted {
141 boundary,
142 batch_ids: queued_work_batch_ids(claim),
143 causes,
144 },
145 )
146 .await;
147}
148
149struct TurnFinishInput {
150 turn_pipeline: TurnBoundary,
151 assembler: TurnAssembler,
152 new_messages: crate::MessageSequence,
153 policy: RuntimeSessionPolicy,
154 turn_index: usize,
155 queued_work_completions: Vec<crate::QueuedWorkCompletion>,
156 trace_turn_id: String,
157}
158
159impl LashRuntime {
160 fn max_context_tokens(&self) -> usize {
161 self.state.effective_policy().context_window_tokens()
162 }
163 #[doc(hidden)]
164 pub fn set_turn_phase_probe(&mut self, probe: Arc<dyn RuntimeTurnPhaseProbe>) {
165 self.turn_phase_probe = Some(probe);
166 }
167
168 fn mark_phase_begin(&self, phase: RuntimeTurnPhase) {
169 if let Some(probe) = self.turn_phase_probe.as_ref() {
170 probe.begin(phase);
171 }
172 }
173
174 fn mark_phase_end(&self, phase: RuntimeTurnPhase) {
175 if let Some(probe) = self.turn_phase_probe.as_ref() {
176 probe.end(phase);
177 }
178 }
179
180 async fn finish_turn(
181 &mut self,
182 finish: TurnFinishInput,
183 events: &dyn EventSink,
184 scoped_effect_controller: &ScopedEffectController<'_>,
185 cancel_state: &CancellationToken,
186 ) -> Result<AssembledTurn, RuntimeError> {
187 let TurnFinishInput {
188 mut turn_pipeline,
189 assembler,
190 new_messages,
191 policy,
192 turn_index,
193 queued_work_completions,
194 trace_turn_id,
195 } = finish;
196 self.policy = self.state.effective_policy().clone();
197 turn_pipeline.state_mut().policy = self.policy.clone();
198 turn_pipeline.state_mut().turn_index = turn_index;
199
200 let mut turn_usage_delta = {
201 let mut ledger = self.shared_token_ledger.lock().expect("token ledger lock");
202 std::mem::take(&mut *ledger)
203 };
204 if assembler.token_usage.total() > 0 || assembler.token_usage.cached_input_tokens > 0 {
205 turn_usage_delta.push(TokenLedgerEntry {
206 source: "turn".to_string(),
207 model: policy.model.id.clone(),
208 usage: assembler.token_usage.clone(),
209 });
210 }
211 let turn_usage_delta = merge_usage_delta_entries(turn_usage_delta);
212
213 turn_pipeline.finalize_turn_read_state(new_messages, cancel_state.is_cancelled());
214 if assembler.token_usage.total() > 0 || assembler.token_usage.cached_input_tokens > 0 {
215 turn_pipeline.state_mut().token_usage = assembler.token_usage.clone();
216 }
217
218 let last_prompt_usage = assembler
219 .last_llm_usage()
220 .and_then(|usage| normalize_prompt_usage(policy.provider(), usage));
221 turn_pipeline.state_mut().last_prompt_usage = last_prompt_usage;
222 let assembled_state = turn_pipeline.export_state_for_assembly();
223 let assembled = assembler.finish(
224 assembled_state,
225 cancel_state.is_cancelled(),
226 None,
227 &self.host.core.control.termination,
228 );
229
230 let Some(session) = self.session.as_ref() else {
231 self.state.apply_snapshot(&assembled.state);
232 self.emit_completed_turn_trace(&assembled.state, &assembled.outcome, &trace_turn_id);
233 return Ok(assembled);
234 };
235
236 let plugins = Arc::clone(session.plugins());
237 let manager = match self.runtime_session_services_for_turn(None) {
238 Ok(manager) => manager,
239 Err(err) => {
240 return Err(RuntimeError::new(
241 RuntimeErrorCode::PluginSessionManager,
242 err.to_string(),
243 ));
244 }
245 };
246
247 self.mark_phase_begin(RuntimeTurnPhase::FinalizeTurn);
248 let finalized = match plugins
249 .finalize_turn_with_phase_probe(
250 assembled,
251 manager.state_service(),
252 manager.lifecycle_service(),
253 manager.graph_service(),
254 self.turn_phase_probe.clone(),
255 )
256 .await
257 {
258 Ok(finalized) => finalized,
259 Err(err) => {
260 self.mark_phase_end(RuntimeTurnPhase::FinalizeTurn);
261 return Err(RuntimeError::new(
262 RuntimeErrorCode::PluginFinalizeTurn,
263 err.to_string(),
264 ));
265 }
266 };
267 self.mark_phase_end(RuntimeTurnPhase::FinalizeTurn);
268
269 let mut returned_turn = finalized.turn;
270 self.mark_phase_begin(RuntimeTurnPhase::PersistTurn);
271 self.mark_phase_begin(RuntimeTurnPhase::FinalCommit);
272 let queued_work_completion_trace = queued_work_completions.clone();
273 if let Err(err) = turn_pipeline
274 .final_commit(
275 &mut returned_turn,
276 self.session.as_mut(),
277 &turn_usage_delta,
278 Some(&trace_turn_id),
279 queued_work_completions,
280 )
281 .await
282 {
283 self.mark_phase_end(RuntimeTurnPhase::FinalCommit);
284 self.mark_phase_end(RuntimeTurnPhase::PersistTurn);
285 return Err(err);
286 }
287 self.mark_phase_end(RuntimeTurnPhase::FinalCommit);
288
289 emit_session_events_to_sink(events, finalized.events).await;
290 self.state = turn_pipeline.into_final_state();
291 if matches!(returned_turn.outcome, TurnOutcome::AgentFrameSwitch { .. })
292 && let Some(session) = self.session.as_mut()
293 {
294 let protocol_session = Arc::clone(session.plugins().protocol_session());
295 let session_id = self.state.session_id.clone();
296 protocol_session
297 .restore_session(
298 crate::plugin::ProtocolSessionContext::new(session, &session_id),
299 &self.state,
300 )
301 .await
302 .map_err(|err| {
303 RuntimeError::new(
304 RuntimeErrorCode::Other("protocol_restore_session".to_string()),
305 err.to_string(),
306 )
307 })?;
308 }
309 if !queued_work_completion_trace.is_empty() {
310 crate::trace::emit_trace(
311 &self.host.core.tracing.trace_sink,
312 &self.host.core.tracing.trace_context,
313 lash_trace::TraceContext::default()
314 .for_session(returned_turn.state.session_id.clone())
315 .for_turn_index(returned_turn.state.turn_index)
316 .for_turn(trace_turn_id.clone()),
317 lash_trace::TraceEvent::Custom {
318 name: "queued_work.completed".to_string(),
319 payload: queued_work_completion_trace_payload(&queued_work_completion_trace),
320 },
321 );
322 }
323 self.mark_phase_begin(RuntimeTurnPhase::PostPersistHooks);
324 self.emit_turn_persisted_event(&returned_turn, scoped_effect_controller, &trace_turn_id)
325 .await?;
326 self.mark_phase_end(RuntimeTurnPhase::PostPersistHooks);
327 self.mark_phase_end(RuntimeTurnPhase::PersistTurn);
328
329 self.emit_completed_turn_trace(
330 &returned_turn.state,
331 &returned_turn.outcome,
332 &trace_turn_id,
333 );
334 Ok(returned_turn)
335 }
336
337 fn emit_completed_turn_trace(
338 &self,
339 state: &SessionSnapshot,
340 outcome: &TurnOutcome,
341 trace_turn_id: &str,
342 ) {
343 if self.host.core.tracing.trace_sink.is_none() {
344 return;
345 }
346
347 let (status, done_reason, agent_frame_switch) = trace_fields_from_outcome(outcome);
348 crate::trace::emit_trace(
349 &self.host.core.tracing.trace_sink,
350 &self.host.core.tracing.trace_context,
351 lash_trace::TraceContext::default()
352 .for_session(state.session_id.clone())
353 .for_turn_index(state.turn_index)
354 .for_turn(trace_turn_id.to_string()),
355 lash_trace::TraceEvent::TurnCompleted {
356 status: status.to_string(),
357 done_reason: done_reason.to_string(),
358 agent_frame_switch,
359 },
360 );
361 }
362
363 async fn emit_turn_persisted_event(
364 &self,
365 returned_turn: &AssembledTurn,
366 scoped_effect_controller: &ScopedEffectController<'_>,
367 trace_turn_id: &str,
368 ) -> Result<(), RuntimeError> {
369 let Some(session) = self.session.as_ref() else {
370 return Ok(());
371 };
372 let Ok(manager) = self.runtime_session_services() else {
373 return Ok(());
374 };
375 let phase_turn_id = turn_phase_id(trace_turn_id, "turn-persisted");
376 let phase_controller = scoped_child_turn_controller(
377 scoped_effect_controller,
378 &self.state.session_id,
379 &phase_turn_id,
380 )?;
381 let direct_completions = manager.direct_completion_client(
382 RuntimeEffectControllerHandle::borrowed(phase_controller),
383 Some(phase_turn_id),
384 );
385
386 session
387 .plugins()
388 .emit_runtime_event_with_phase_probe(
389 crate::PluginLifecycleEvent::TurnPersisted(Box::new(
390 crate::SessionStateChangedContext {
391 session_id: self.state.session_id.clone(),
392 state: crate::SessionReadView::from_snapshot(&returned_turn.state),
393 sessions: manager.state_service(),
394 session_graph: manager.graph_service(),
395 direct_completions,
396 },
397 )),
398 self.turn_phase_probe.clone(),
399 )
400 .await;
401 Ok(())
402 }
403
404 pub async fn stream_turn(
406 &mut self,
407 input: TurnInput,
408 opts: TurnOptions<'_>,
409 ) -> Result<AssembledTurn, RuntimeError> {
410 let turn_id = input
411 .trace_turn_id
412 .clone()
413 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
414 let cancel = opts.cancel.clone();
415 let effect_host = Arc::clone(&self.host.core.control.effect_host);
416 let scoped_effect_controller = match opts.scoped_effect_controller() {
417 Some(scope) => scope,
418 None => effect_host.scoped(EffectScope::turn(&self.state.session_id, &turn_id))?,
419 };
420 self.stream_turn_with_scoped_effect_controller_inner(
421 input,
422 opts.events_or_noop(),
423 opts.turn_events_or_noop(),
424 scoped_effect_controller,
425 cancel,
426 None,
427 )
428 .await
429 }
430
431 pub async fn stream_next_queued_work(
432 &mut self,
433 opts: TurnOptions<'_>,
434 ) -> Result<Option<AssembledTurn>, RuntimeError> {
435 self.stream_queued_work(opts, None).await
436 }
437
438 pub async fn stream_selected_queued_work(
439 &mut self,
440 opts: TurnOptions<'_>,
441 batch_ids: &[String],
442 ) -> Result<Option<AssembledTurn>, RuntimeError> {
443 self.stream_queued_work(opts, Some(batch_ids)).await
444 }
445
446 async fn stream_queued_work(
447 &mut self,
448 opts: TurnOptions<'_>,
449 selected_batch_ids: Option<&[String]>,
450 ) -> Result<Option<AssembledTurn>, RuntimeError> {
451 if self.drain_next_session_command().await?.is_some() {
452 return Ok(None);
453 }
454 let Some(store) = self
455 .session
456 .as_ref()
457 .and_then(|session| session.history_store())
458 else {
459 return Ok(None);
460 };
461 let claim = if let Some(batch_ids) = selected_batch_ids {
462 store
463 .claim_ready_queued_work_by_batch_ids(
464 &self.state.session_id,
465 &self.runtime_scope_id,
466 crate::QueuedWorkClaimBoundary::Idle,
467 crate::QUEUED_WORK_CLAIM_TTL_MS,
468 batch_ids,
469 )
470 .await
471 } else {
472 store
473 .claim_ready_queued_work(
474 &self.state.session_id,
475 &self.runtime_scope_id,
476 crate::QueuedWorkClaimBoundary::Idle,
477 crate::QUEUED_WORK_CLAIM_TTL_MS,
478 64,
479 )
480 .await
481 }
482 .map_err(|err| RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string()))?;
483 let Some(claim) = claim else {
484 return Ok(None);
485 };
486 let mut work = claim.materialize_for_turn();
487 let turn_id = work
488 .input
489 .trace_turn_id
490 .clone()
491 .or_else(|| opts.effect_scope_id().map(ToOwned::to_owned))
492 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
493 work.input.trace_turn_id = Some(turn_id.clone());
494 let causes = work.turn_causes.clone();
495 emit_queued_work_started_to_sink(
496 opts.turn_events_or_noop(),
497 crate::QueuedWorkClaimBoundary::Idle,
498 &claim,
499 causes.clone(),
500 )
501 .await;
502 crate::trace::emit_trace(
503 &self.host.core.tracing.trace_sink,
504 &self.host.core.tracing.trace_context,
505 lash_trace::TraceContext::default()
506 .for_session(self.state.session_id.clone())
507 .for_turn_index(self.state.turn_index + 1)
508 .for_turn(turn_id.clone()),
509 lash_trace::TraceEvent::Custom {
510 name: "queued_work.claimed".to_string(),
511 payload: queued_work_trace_payload(
512 crate::QueuedWorkClaimBoundary::Idle,
513 &claim,
514 &causes,
515 ),
516 },
517 );
518 let cancel = opts.cancel.clone();
519 let effect_host = Arc::clone(&self.host.core.control.effect_host);
520 let scoped_effect_controller = match opts.scoped_effect_controller() {
521 Some(scope) => scope,
522 None => effect_host.scoped(EffectScope::queue_drain(
523 &self.state.session_id,
524 &claim.claim_id,
525 ))?,
526 };
527 self.stream_turn_with_scoped_effect_controller_inner(
528 work.input,
529 opts.events_or_noop(),
530 opts.turn_events_or_noop(),
531 scoped_effect_controller,
532 cancel,
533 Some(claim),
534 )
535 .await
536 .map(Some)
537 }
538
539 fn ensure_durable_store_facets_for_scope(
547 &self,
548 scoped_effect_controller: &ScopedEffectController<'_>,
549 ) -> Result<(), RuntimeError> {
550 if scoped_effect_controller.controller().durability_tier() != crate::DurabilityTier::Durable
551 {
552 return Ok(());
553 }
554 if self
555 .host
556 .core
557 .durability
558 .attachment_store
559 .persistence()
560 .durability_tier()
561 != crate::DurabilityTier::Durable
562 {
563 return Err(RuntimeError::durable_store_required(
564 crate::DurableStoreFacet::AttachmentStore,
565 ));
566 }
567 if self
568 .host
569 .core
570 .durability
571 .lashlang_artifact_store
572 .durability_tier()
573 != crate::DurabilityTier::Durable
574 {
575 return Err(RuntimeError::durable_store_required(
576 crate::DurableStoreFacet::ArtifactStore,
577 ));
578 }
579 if let Some(store) = self
580 .session
581 .as_ref()
582 .and_then(|session| session.history_store())
583 && store.durability_tier() != crate::DurabilityTier::Durable
584 {
585 return Err(RuntimeError::durable_store_required(
586 crate::DurableStoreFacet::SessionStore,
587 ));
588 }
589 if let Some(process_registry) = self.host.process_registry.as_ref()
590 && process_registry.durability_tier() != crate::DurabilityTier::Durable
591 {
592 return Err(RuntimeError::durable_store_required(
593 crate::DurableStoreFacet::ProcessRegistry,
594 ));
595 }
596 Ok(())
597 }
598
599 async fn stream_turn_with_scoped_effect_controller_inner(
600 &mut self,
601 mut input: TurnInput,
602 events: &dyn EventSink,
603 turn_events: &dyn TurnActivitySink,
604 scoped_effect_controller: ScopedEffectController<'_>,
605 cancel: CancellationToken,
606 queued_claim: Option<crate::QueuedWorkClaim>,
607 ) -> Result<AssembledTurn, RuntimeError> {
608 if queued_claim.is_none() {
609 while self.drain_next_session_command().await?.is_some() {}
610 }
611 if let Some(input_turn_id) = input.trace_turn_id.as_deref()
612 && scoped_effect_controller
613 .effect_scope()
614 .validates_turn_trace_id()
615 && input_turn_id != scoped_effect_controller.scope_id()
616 {
617 return Err(RuntimeError::new(
618 RuntimeErrorCode::EffectScopeTurnIdMismatch,
619 format!(
620 "input trace_turn_id `{input_turn_id}` does not match effect scope id `{}`",
621 scoped_effect_controller.scope_id()
622 ),
623 ));
624 }
625 self.ensure_durable_store_facets_for_scope(&scoped_effect_controller)?;
626 input
627 .trace_turn_id
628 .get_or_insert_with(|| scoped_effect_controller.scope_id().to_string());
629 self.stream_turn_inner(
630 input.clone(),
631 events,
632 turn_events,
633 scoped_effect_controller,
634 cancel.clone(),
635 queued_claim,
636 )
637 .await
638 }
639
640 pub async fn stream_turn_with_agent_frames(
648 &mut self,
649 input: TurnInput,
650 opts: TurnOptions<'_>,
651 ) -> Result<AgentFrameRun, RuntimeError> {
652 let follow_trace_turn_id = input
653 .trace_turn_id
654 .clone()
655 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
656 let cancel = opts.cancel.clone();
657 let effect_host = Arc::clone(&self.host.core.control.effect_host);
658 let scoped_effect_controller = match opts.scoped_effect_controller() {
659 Some(scope) => scope,
660 None => effect_host.scoped(EffectScope::turn(
661 &self.state.session_id,
662 &follow_trace_turn_id,
663 ))?,
664 };
665 self.stream_turn_with_agent_frames_inner(
666 input,
667 opts.events_or_noop(),
668 opts.turn_events_or_noop(),
669 scoped_effect_controller,
670 cancel,
671 )
672 .await
673 }
674
675 async fn stream_turn_with_agent_frames_inner(
676 &mut self,
677 mut input: TurnInput,
678 events: &dyn EventSink,
679 turn_events: &dyn TurnActivitySink,
680 scoped_effect_controller: ScopedEffectController<'_>,
681 cancel: CancellationToken,
682 ) -> Result<AgentFrameRun, RuntimeError> {
683 if let Some(input_turn_id) = input.trace_turn_id.as_deref()
684 && scoped_effect_controller
685 .effect_scope()
686 .validates_turn_trace_id()
687 && input_turn_id != scoped_effect_controller.scope_id()
688 {
689 return Err(RuntimeError::new(
690 RuntimeErrorCode::EffectScopeTurnIdMismatch,
691 format!(
692 "input trace_turn_id `{input_turn_id}` does not match effect scope id `{}`",
693 scoped_effect_controller.scope_id()
694 ),
695 ));
696 }
697 let follow_protocol_turn_options = input.protocol_turn_options.clone();
698 let follow_turn_context = input.turn_context.clone();
699 let follow_trace_turn_id = input
700 .trace_turn_id
701 .clone()
702 .unwrap_or_else(|| scoped_effect_controller.scope_id().to_string());
703 input
704 .trace_turn_id
705 .get_or_insert(follow_trace_turn_id.clone());
706 let mut turns = Vec::new();
707 loop {
708 let turn_trace_turn_id = agent_frame_follow_turn_id(&follow_trace_turn_id, turns.len());
709 input.trace_turn_id = Some(turn_trace_turn_id.clone());
710 let turn_effect_controller = if turns.is_empty() {
711 scoped_effect_controller.clone()
712 } else {
713 ScopedEffectController::borrowed(
714 scoped_effect_controller.controller(),
715 EffectScope::turn(&self.state.session_id, &turn_trace_turn_id),
716 )?
717 };
718 let turn = self
719 .stream_turn_with_scoped_effect_controller_inner(
720 input,
721 events,
722 turn_events,
723 turn_effect_controller,
724 cancel.clone(),
725 None,
726 )
727 .await?;
728 let switched_frame_id = match &turn.outcome {
729 TurnOutcome::AgentFrameSwitch { frame_id } => Some(frame_id.clone()),
730 _ => None,
731 };
732 let next_task = switched_frame_id
733 .as_ref()
734 .and_then(|frame_id| frame_switch_task(&turn, frame_id));
735 turns.push(turn);
736
737 let Some(_frame_id) = switched_frame_id else {
738 return Ok(AgentFrameRun { turns });
739 };
740
741 let task = next_task.ok_or_else(|| {
742 RuntimeError::new(
743 RuntimeErrorCode::Other("agent_frame_missing_task".to_string()),
744 "agent frame switch did not provide a task",
745 )
746 })?;
747 input = turn_input_from_text(task);
748 input.protocol_turn_options = follow_protocol_turn_options.clone();
749 input.turn_context = follow_turn_context.clone();
750 }
751 }
752
753 async fn stream_turn_inner(
754 &mut self,
755 mut input: TurnInput,
756 events: &dyn EventSink,
757 turn_events: &dyn TurnActivitySink,
758 scoped_effect_controller: ScopedEffectController<'_>,
759 cancel: CancellationToken,
760 queued_claim: Option<crate::QueuedWorkClaim>,
761 ) -> Result<AssembledTurn, RuntimeError> {
762 self.refresh_session_graph_from_store()
763 .await
764 .map_err(session_head_refresh_error)?;
765 let input_trace_turn_id = input.trace_turn_id.clone();
766 let queued_turn_work = queued_claim
767 .as_ref()
768 .map(crate::QueuedWorkClaim::materialize_for_turn);
769 if let Some(work) = queued_turn_work.as_ref()
770 && input.items.is_empty()
771 && input.image_blobs.is_empty()
772 {
773 input = work.input.clone();
774 if input.trace_turn_id.is_none() {
775 input.trace_turn_id = input_trace_turn_id;
776 }
777 }
778 if self
779 .session
780 .as_ref()
781 .and_then(|session| session.history_store())
782 .is_some()
783 {
784 ensure_durable_effect_input(&input)?;
785 }
786 if let Some(extension) = &input.protocol_extension
787 && let Some(session) = self.session.as_ref()
788 {
789 let protocol_session = std::sync::Arc::clone(session.plugins().protocol_session());
790 protocol_session
791 .validate_turn_extension(extension)
792 .await
793 .map_err(|err| {
794 RuntimeError::new(RuntimeErrorCode::ProtocolTurnExtension, err.to_string())
795 })?;
796 }
797 let previous_prompt_usage = self.state.last_prompt_usage.clone();
798 let normalized = match self.normalize_input_items(&input.items, &input.image_blobs) {
799 Ok(items) => items,
800 Err(e) => {
801 self.state.last_prompt_usage = None;
802 let mut assembler = TurnAssembler::default();
803 let error_event = SessionEvent::Error {
804 message: e.clone(),
805 envelope: Some(crate::session_model::ErrorEnvelope {
806 kind: "input_validation".to_string(),
807 code: Some("invalid_turn_input".to_string()),
808 terminal_reason: None,
809 user_message: e.clone(),
810 raw: None,
811 }),
812 };
813 assembler.push(&error_event);
814 emit_turn_activity_to_sink(
815 turn_events,
816 TurnActivity::independent(TurnEvent::Error { message: e }),
817 )
818 .await;
819 emit_session_event_to_sink(events, error_event).await;
820 let outcome_event = SessionEvent::TurnOutcome {
821 outcome: TurnOutcome::Stopped(TurnStop::InvalidInput),
822 };
823 assembler.push(&outcome_event);
824 emit_session_event_to_sink(events, outcome_event).await;
825 assembler.push(&SessionEvent::Done);
826 emit_session_event_to_sink(events, SessionEvent::Done).await;
827 return Ok(assembler.finish(
828 self.state.to_snapshot(),
829 false,
830 None,
831 &self.host.core.control.termination,
832 ));
833 }
834 };
835 let turn_index = self.state.turn_index + 1;
836 let trace_turn_id = input
837 .trace_turn_id
838 .clone()
839 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
840 if self.host.core.tracing.trace_sink.is_some() {
841 let mut trace_metadata = std::collections::BTreeMap::new();
842 trace_metadata.insert(
843 "input_item_count".to_string(),
844 serde_json::json!(normalized.len()),
845 );
846 crate::trace::emit_trace(
847 &self.host.core.tracing.trace_sink,
848 &self.host.core.tracing.trace_context,
849 lash_trace::TraceContext::default()
850 .for_session(self.state.session_id.clone())
851 .for_turn_index(turn_index)
852 .for_turn(trace_turn_id.clone()),
853 lash_trace::TraceEvent::TurnStarted {
854 metadata: trace_metadata,
855 },
856 );
857 }
858
859 let base_read_model = self.state.read_model();
860 let base_messages = base_read_model.messages;
861 let base_render_cache = base_read_model.prompt_render_cache;
862 let mut turn_delta = Vec::new();
863 let initial_turn_causes = queued_turn_work
864 .as_ref()
865 .map(|work| work.turn_causes.clone())
866 .unwrap_or_default();
867 turn_delta.extend(
868 initial_turn_causes
869 .iter()
870 .map(crate::TurnCause::to_event_message),
871 );
872
873 let user_id = fresh_message_id();
874 let mut user_parts: Vec<Part> = Vec::new();
875 for item in normalized {
876 match item {
877 NormalizedItem::Text(text) => {
878 if text.is_empty() {
879 continue;
880 }
881 user_parts.push(Part {
882 id: format!("{}.p{}", user_id, user_parts.len()),
883 kind: PartKind::Text,
884 content: text,
885 attachment: None,
886 tool_call_id: None,
887 tool_name: None,
888 tool_replay: None,
889 prune_state: PruneState::Intact,
890 reasoning_meta: None,
891 response_meta: None,
892 });
893 }
894 NormalizedItem::Image(reference) => {
895 user_parts.push(Part {
896 id: format!("{}.p{}", user_id, user_parts.len()),
897 kind: PartKind::Image,
898 content: String::new(),
899 attachment: Some(crate::session_model::message::PartAttachment {
900 reference,
901 }),
902 tool_call_id: None,
903 tool_name: None,
904 tool_replay: None,
905 prune_state: PruneState::Intact,
906 reasoning_meta: None,
907 response_meta: None,
908 });
909 }
910 }
911 }
912 if user_parts.is_empty() && initial_turn_causes.is_empty() {
913 user_parts.push(Part {
914 id: format!("{}.p0", user_id),
915 kind: PartKind::Text,
916 content: String::new(),
917 attachment: None,
918 tool_call_id: None,
919 tool_name: None,
920 tool_replay: None,
921 prune_state: PruneState::Intact,
922 reasoning_meta: None,
923 response_meta: None,
924 });
925 }
926 if !user_parts.is_empty() {
927 reassign_part_ids(&user_id, &mut user_parts);
928 turn_delta.push(Message {
929 id: user_id.clone(),
930 role: MessageRole::User,
931 parts: shared_parts(user_parts),
932 origin: None,
933 });
934 }
935
936 let manager = self
937 .runtime_session_services_for_turn(None)
938 .map_err(|err| {
939 RuntimeError::new(RuntimeErrorCode::PluginSessionManager, err.to_string())
940 })?;
941 let plugin_session = self
942 .session
943 .as_ref()
944 .map(|s| Arc::clone(s.plugins()))
945 .ok_or_else(|| {
946 RuntimeError::new(
947 RuntimeErrorCode::ContextPrepareTurn,
948 "runtime session not available",
949 )
950 })?;
951 let prepare_phase_turn_id = turn_phase_id(&trace_turn_id, "prepare-turn");
952 let prepare_phase_controller = scoped_child_turn_controller(
953 &scoped_effect_controller,
954 &self.state.session_id,
955 &prepare_phase_turn_id,
956 )?;
957 let turn_ctx = crate::TurnTransformContext {
958 session_id: self.state.session_id.clone(),
959 state: self.read_view(),
960 prompt_usage: previous_prompt_usage.clone(),
961 max_context_tokens: Some(LashRuntime::max_context_tokens(self)),
962 sessions: manager.state_service(),
963 session_lifecycle: manager.lifecycle_service(),
964 session_graph: manager.graph_service(),
965 scoped_effect_controller: scoped_effect_controller.clone(),
966 direct_completions: manager.direct_completion_client(
967 RuntimeEffectControllerHandle::borrowed(prepare_phase_controller),
968 Some(prepare_phase_turn_id),
969 ),
970 };
971 self.mark_phase_begin(RuntimeTurnPhase::ContextTransform);
972 let prepared_context = plugin_session
973 .prepare_turn_context(
974 &turn_ctx,
975 crate::session_model::context::PreparedContext {
976 messages: crate::MessageSequence::from_base_and_delta(
977 base_messages,
978 turn_delta,
979 )
980 .with_base_render_cache(base_render_cache),
981 ..Default::default()
982 },
983 self.turn_phase_probe.clone(),
984 )
985 .await
986 .map_err(|err| {
987 RuntimeError::new(RuntimeErrorCode::ContextPrepareTurn, err.to_string())
988 })?;
989 self.mark_phase_end(RuntimeTurnPhase::ContextTransform);
990 drop(turn_ctx);
995 let messages = prepared_context.messages;
996 if let Some(session) = self.session.as_mut() {
997 session
998 .set_context_surface(
999 prepared_context.tool_providers,
1000 prepared_context.prompt_contributions,
1001 prepared_context.include_base_tools,
1002 )
1003 .map_err(|err| {
1004 RuntimeError::new(
1005 RuntimeErrorCode::Other("session_tool_registry".to_string()),
1006 err.to_string(),
1007 )
1008 })?;
1009 }
1010
1011 self.state.last_prompt_usage = None;
1012
1013 self.stream_prepared_turn(
1014 messages,
1015 previous_prompt_usage,
1016 input.protocol_turn_options.clone(),
1017 input.protocol_extension.clone(),
1018 input.turn_context.clone(),
1019 initial_turn_causes,
1020 trace_turn_id,
1021 turn_index,
1022 events,
1023 turn_events,
1024 scoped_effect_controller,
1025 cancel,
1026 queued_claim,
1027 )
1028 .await
1029 }
1030
1031 pub async fn run_turn_assembled(
1033 &mut self,
1034 input: TurnInput,
1035 cancel: CancellationToken,
1036 ) -> Result<AssembledTurn, RuntimeError> {
1037 self.stream_turn(input, TurnOptions::new(cancel)).await
1038 }
1039
1040 #[allow(clippy::too_many_arguments)]
1042 pub async fn stream_prepared_turn(
1043 &mut self,
1044 messages: crate::MessageSequence,
1045 _previous_prompt_usage: Option<PromptUsage>,
1046 protocol_turn_options: Option<crate::ProtocolTurnOptions>,
1047 protocol_extension: Option<crate::ProtocolTurnExtensionHandle>,
1048 turn_context: crate::TurnContext,
1049 initial_turn_causes: Vec<crate::TurnCause>,
1050 trace_turn_id: String,
1051 turn_index: usize,
1052 events: &dyn EventSink,
1053 turn_events: &dyn TurnActivitySink,
1054 scoped_effect_controller: ScopedEffectController<'_>,
1055 cancel: CancellationToken,
1056 initial_queue_claim: Option<crate::QueuedWorkClaim>,
1057 ) -> Result<AssembledTurn, RuntimeError> {
1058 let (event_tx, mut event_rx) = mpsc::channel::<RuntimeStreamEvent>(100);
1059 let child_usage_event_relay = ChildUsageEventRelay::new(event_tx.clone());
1060 let mut turn_policy = self.state.effective_policy().clone();
1061 let turn_provider_override = turn_context.provider().cloned();
1062 if let Some(provider) = turn_provider_override.as_ref() {
1063 turn_policy.provider_id = provider.kind().to_string();
1064 }
1065 if let Some(model) = turn_context.model_spec() {
1066 turn_policy.model = model.clone();
1067 }
1068 let session_protocol_turn_options = self.state.effective_protocol_turn_options().clone();
1069 let effective_protocol_turn_options = protocol_turn_options
1070 .clone()
1071 .map(|options| session_protocol_turn_options.merged_with_override(&options))
1072 .unwrap_or(session_protocol_turn_options);
1073 let manager = self
1074 .runtime_session_services_for_turn(Some(child_usage_event_relay.clone()))
1075 .map_err(|err| {
1076 RuntimeError::new(RuntimeErrorCode::PluginSessionManager, err.to_string())
1077 })?;
1078 let plugins = {
1079 let session = self
1080 .session
1081 .as_ref()
1082 .expect("lash runtime session must be available");
1083 Arc::clone(session.plugins())
1084 };
1085 let mut assembler = TurnAssembler::new();
1086 self.mark_phase_begin(RuntimeTurnPhase::BeforeTurnHooks);
1087 let prepared = {
1093 let prepare_turn = plugins.prepare_turn_with_phase_probe(
1094 PrepareTurnRequest {
1095 session_id: self.state.session_id.clone(),
1096 state: crate::SessionReadView::from_runtime_state(
1097 &self.state,
1098 turn_policy.clone(),
1099 effective_protocol_turn_options.clone(),
1100 ),
1101 messages,
1102 sessions: manager.state_service(),
1103 session_lifecycle: manager.lifecycle_service(),
1104 session_graph: manager.graph_service(),
1105 turn_context: turn_context.clone(),
1106 },
1107 self.turn_phase_probe.clone(),
1108 );
1109 tokio::pin!(prepare_turn);
1110
1111 loop {
1112 tokio::select! {
1113 prepared = &mut prepare_turn => {
1114 let prepared = prepared.map_err(|err| {
1115 RuntimeError::new(RuntimeErrorCode::PluginPrepareTurn, err.to_string())
1116 })?;
1117 self.mark_phase_end(RuntimeTurnPhase::BeforeTurnHooks);
1118 break prepared;
1119 }
1120 maybe_event = event_rx.recv() => {
1121 if let Some(event) = maybe_event {
1122 emit_runtime_stream_event_to_sinks(
1123 events,
1124 turn_events,
1125 event,
1126 &mut assembler,
1127 )
1128 .await;
1129 }
1130 }
1131 }
1132 }
1133 };
1134 for event in &prepared.events {
1135 assembler.push(event);
1136 }
1137 emit_session_events_to_sink(events, prepared.events).await;
1138 if let Some(abort) = prepared.abort {
1139 drop(event_tx);
1140
1141 let mut turn_pipeline = TurnBoundary::from_state(self.state.clone());
1142 turn_pipeline.apply_prepared_messages(&prepared.messages);
1143 let state = turn_pipeline.into_final_state();
1144 let issue = TurnIssue {
1145 kind: "plugin".to_string(),
1146 code: Some(abort.code),
1147 terminal_reason: None,
1148 message: abort.message.clone(),
1149 raw: None,
1150 };
1151 let error_event = SessionEvent::Error {
1152 message: abort.message,
1153 envelope: Some(crate::session_model::ErrorEnvelope {
1154 kind: "plugin".to_string(),
1155 code: issue.code.clone(),
1156 terminal_reason: None,
1157 user_message: issue.message.clone(),
1158 raw: None,
1159 }),
1160 };
1161 assembler.push(&error_event);
1162 emit_turn_activity_to_sink(
1163 turn_events,
1164 TurnActivity::independent(TurnEvent::Error {
1165 message: issue.message.clone(),
1166 }),
1167 )
1168 .await;
1169 emit_session_event_to_sink(events, error_event).await;
1170 let outcome_event = SessionEvent::TurnOutcome {
1171 outcome: TurnOutcome::Stopped(TurnStop::PluginAbort),
1172 };
1173 assembler.push(&outcome_event);
1174 emit_session_event_to_sink(events, outcome_event).await;
1175 assembler.push(&SessionEvent::Done);
1176 emit_session_event_to_sink(events, SessionEvent::Done).await;
1177 return Ok(assembler.finish(
1178 state.to_snapshot(),
1179 cancel.is_cancelled(),
1180 Some(issue),
1181 &self.host.core.control.termination,
1182 ));
1183 }
1184 let mut turn_pipeline = TurnBoundary::from_state(self.state.clone());
1185 let store = self
1186 .session
1187 .as_ref()
1188 .and_then(|session| session.history_store());
1189 turn_pipeline
1190 .prepared_checkpoint(
1191 store.as_ref().map(|store| store.as_ref()),
1192 turn_policy.clone(),
1193 turn_index,
1194 &prepared.messages,
1195 self.session.as_mut(),
1196 )
1197 .await
1198 .map_err(|err| {
1199 RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
1200 })?;
1201 let resolved_turn_policy = if let Some(provider) = turn_provider_override {
1202 RuntimeSessionPolicy::from_provider(turn_policy.clone(), provider)
1203 .map_err(|err| RuntimeError::new("llm_provider", err.to_string()))?
1204 } else {
1205 self.host
1206 .resolve_session_policy(&self.state.session_id, turn_policy.clone())
1207 .map_err(|err| RuntimeError::new("llm_provider", err.to_string()))?
1208 };
1209 let manager = self
1210 .runtime_session_services_for_turn(Some(child_usage_event_relay.clone()))
1211 .map_err(|err| {
1212 RuntimeError::new(RuntimeErrorCode::PluginSessionManager, err.to_string())
1213 })?;
1214 let cancel_state = cancel.clone();
1215 let finish_scoped_effect_controller = scoped_effect_controller.clone();
1216 let session = self
1217 .session
1218 .take()
1219 .expect("lash runtime session must be available");
1220 let mut driver = RuntimeTurnDriver {
1221 session,
1222 policy: resolved_turn_policy,
1223 host: self.host.clone(),
1224 turn_id: scoped_effect_controller.scope_id().to_string(),
1225 scoped_effect_controller,
1226 session_id: self.state.session_id.clone(),
1227 turn_index,
1228 turn_pipeline,
1229 llm_stream_summaries: HashMap::new(),
1230 next_llm_ordinal: 0,
1231 session_services: manager,
1232 protocol_turn_options: effective_protocol_turn_options,
1233 protocol_extension,
1234 turn_context,
1235 turn_causes: initial_turn_causes,
1236 pending_queue_claims: initial_queue_claim.into_iter().collect(),
1237 checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
1238 turn_phase_probe: self.turn_phase_probe.clone(),
1239 };
1240 let protocol_run_offset = 0;
1241 self.mark_phase_begin(RuntimeTurnPhase::EffectLoop);
1242 let run_result = drive_turn_to_completion(
1243 driver.run(prepared.messages, event_tx, cancel, protocol_run_offset),
1244 &mut event_rx,
1245 &mut assembler,
1246 &child_usage_event_relay,
1247 events,
1248 turn_events,
1249 )
1250 .await;
1251 let (new_messages, _new_protocol_iteration) = match run_result {
1252 Ok(result) => result,
1253 Err(err) => {
1254 self.mark_phase_end(RuntimeTurnPhase::EffectLoop);
1255 let RuntimeTurnDriver { session, .. } = driver;
1256 self.session = Some(session);
1257 return Err(err);
1258 }
1259 };
1260 self.mark_phase_end(RuntimeTurnPhase::EffectLoop);
1261 tracing::debug!(
1262 new_message_count = new_messages.len(),
1263 tool_call_count = assembler.tool_calls.len(),
1264 "runtime post-run_task"
1265 );
1266
1267 let RuntimeTurnDriver {
1268 session,
1269 policy,
1270 turn_pipeline,
1271 pending_queue_claims,
1272 ..
1273 } = driver;
1274 self.session = Some(session);
1275 self.finish_turn(
1276 TurnFinishInput {
1277 turn_pipeline,
1278 assembler,
1279 new_messages,
1280 policy,
1281 turn_index,
1282 queued_work_completions: pending_queue_claims
1283 .iter()
1284 .map(crate::QueuedWorkClaim::completion)
1285 .collect(),
1286 trace_turn_id,
1287 },
1288 events,
1289 &finish_scoped_effect_controller,
1290 &cancel_state,
1291 )
1292 .await
1293 }
1294 fn normalize_input_items(
1295 &self,
1296 items: &[InputItem],
1297 image_blobs: &HashMap<String, Vec<u8>>,
1298 ) -> Result<Vec<NormalizedItem>, String> {
1299 normalize_input_items(
1300 items,
1301 image_blobs,
1302 self.host.core.durability.attachment_store.as_ref(),
1303 )
1304 }
1305}
1306
1307fn frame_switch_task(turn: &AssembledTurn, frame_id: &str) -> Option<String> {
1308 turn.tool_calls
1309 .iter()
1310 .find_map(|record| match &record.output.control {
1311 Some(crate::ToolControl::SwitchAgentFrame {
1312 frame_id: control_frame_id,
1313 task: Some(task),
1314 ..
1315 }) if control_frame_id == frame_id => Some(task.clone()),
1316 _ => None,
1317 })
1318}
1319
1320fn turn_input_from_text(text: String) -> TurnInput {
1321 TurnInput {
1322 items: vec![InputItem::Text { text }],
1323 image_blobs: HashMap::new(),
1324 protocol_turn_options: None,
1325 trace_turn_id: None,
1326 protocol_extension: None,
1327 turn_context: crate::TurnContext::default(),
1328 }
1329}
1330
1331fn agent_frame_follow_turn_id(root_turn_id: &str, completed_turn_count: usize) -> String {
1332 if completed_turn_count == 0 {
1333 root_turn_id.to_string()
1334 } else {
1335 format!("{root_turn_id}:agent-frame:{completed_turn_count}")
1336 }
1337}
1338
1339pub fn ensure_durable_effect_input(input: &TurnInput) -> Result<(), RuntimeError> {
1340 if input.protocol_extension.is_some() {
1341 return Err(RuntimeError::new(
1342 RuntimeErrorCode::DurableEffectLiveProtocolExtension,
1343 "durable effect hosts do not support live protocol_extension inputs; encode replayable data in protocol_turn_options or persisted plugin state",
1344 ));
1345 }
1346 input
1347 .turn_context
1348 .live_plugin_inputs()
1349 .durable_effect_rejection()?;
1350 Ok(())
1351}
1352
1353async fn emit_turn_activity_to_sink(events: &dyn TurnActivitySink, activity: TurnActivity) {
1354 if !events.is_noop() {
1355 events.emit(activity).await;
1356 }
1357}
1358
1359async fn drive_turn_to_completion<F>(
1369 run_future: F,
1370 event_rx: &mut mpsc::Receiver<RuntimeStreamEvent>,
1371 assembler: &mut TurnAssembler,
1372 child_usage_event_relay: &ChildUsageEventRelay,
1373 events: &dyn EventSink,
1374 turn_events: &dyn TurnActivitySink,
1375) -> Result<(crate::MessageSequence, usize), RuntimeError>
1376where
1377 F: std::future::Future<Output = Result<(crate::MessageSequence, usize), RuntimeError>>,
1378{
1379 let run_result = {
1380 tokio::pin!(run_future);
1381 loop {
1382 tokio::select! {
1383 maybe_event = event_rx.recv() => {
1384 if let Some(event) = maybe_event {
1385 emit_runtime_stream_event_to_sinks(
1386 events,
1387 turn_events,
1388 event,
1389 assembler,
1390 )
1391 .await;
1392 }
1393 }
1394 completed = &mut run_future => {
1395 child_usage_event_relay.clear();
1396 break completed;
1397 }
1398 }
1399 }
1400 };
1401 while let Some(event) = event_rx.recv().await {
1402 emit_runtime_stream_event_to_sinks(events, turn_events, event, assembler).await;
1403 }
1404 run_result
1405}
1406
1407async fn emit_runtime_stream_event_to_sinks(
1408 events: &dyn EventSink,
1409 turn_events: &dyn TurnActivitySink,
1410 event: RuntimeStreamEvent,
1411 assembler: &mut TurnAssembler,
1412) {
1413 match event {
1414 RuntimeStreamEvent::Session(event) => {
1415 assembler.push(&event);
1416 emit_session_event_to_sink(events, event).await;
1417 }
1418 RuntimeStreamEvent::Turn(activity) => {
1419 assembler.push_turn_activity(&activity);
1420 emit_turn_activity_to_sink(turn_events, activity).await;
1421 }
1422 }
1423}
1424
1425#[cfg(test)]
1426mod tests {
1427 use super::agent_frame_follow_turn_id;
1428
1429 #[test]
1430 fn agent_frame_follow_turn_ids_are_distinct_and_deterministic() {
1431 assert_eq!(agent_frame_follow_turn_id("root-turn", 0), "root-turn");
1432 assert_eq!(
1433 agent_frame_follow_turn_id("root-turn", 1),
1434 "root-turn:agent-frame:1"
1435 );
1436 assert_eq!(
1437 agent_frame_follow_turn_id("root-turn", 2),
1438 "root-turn:agent-frame:2"
1439 );
1440 }
1441}