Skip to main content

lash_core/runtime/
turn_loop.rs

1use super::*;
2
3fn trace_fields_from_outcome(
4    outcome: &TurnOutcome,
5) -> (
6    &'static str,
7    &'static str,
8    Option<lash_trace::TraceAgentFrameSwitch>,
9) {
10    match outcome {
11        TurnOutcome::Finished(TurnFinish::AssistantMessage { .. }) => {
12            ("completed", "assistant_message", None)
13        }
14        TurnOutcome::Finished(TurnFinish::SubmittedValue { .. }) => {
15            ("completed", "submitted_value", None)
16        }
17        TurnOutcome::Finished(TurnFinish::ToolValue { .. }) => ("completed", "tool_value", None),
18        TurnOutcome::AgentFrameSwitch { frame_id, .. } => (
19            "completed",
20            "agent_frame_switch",
21            Some(lash_trace::TraceAgentFrameSwitch {
22                frame_id: frame_id.clone(),
23            }),
24        ),
25        TurnOutcome::Stopped(stop) => ("failed", trace_stop_reason(stop), None),
26    }
27}
28
29fn trace_stop_reason(stop: &TurnStop) -> &'static str {
30    match stop {
31        TurnStop::Cancelled => "cancelled",
32        TurnStop::Incomplete => "incomplete",
33        TurnStop::InvalidInput => "invalid_input",
34        TurnStop::MaxTurns => "max_turns",
35        TurnStop::ToolFailure => "tool_failure",
36        TurnStop::ProviderError => "provider_error",
37        TurnStop::PluginAbort => "plugin_abort",
38        TurnStop::RuntimeError => "runtime_error",
39        TurnStop::SubmittedError { .. } => "submitted_error",
40        TurnStop::ToolError { .. } => "tool_error",
41    }
42}
43
44fn session_head_refresh_error(err: SessionError) -> RuntimeError {
45    RuntimeError::new(
46        RuntimeErrorCode::Other("session_head_refresh".to_string()),
47        err.to_string(),
48    )
49}
50
51fn queued_work_payload_type(payload: &crate::QueuedWorkPayload) -> &'static str {
52    match payload {
53        crate::QueuedWorkPayload::TurnInput { .. } => "turn_input",
54        crate::QueuedWorkPayload::ProcessWake { .. } => "process_wake",
55        crate::QueuedWorkPayload::SessionCommand { command } => command.kind(),
56    }
57}
58
59fn queued_work_batch_ids(claim: &crate::QueuedWorkClaim) -> Vec<String> {
60    claim
61        .batches
62        .iter()
63        .map(|batch| batch.batch_id.clone())
64        .collect()
65}
66
67fn turn_phase_id(parent_turn_id: &str, phase: &str) -> String {
68    format!("{parent_turn_id}:{phase}")
69}
70
71fn scoped_child_turn_controller<'run>(
72    scoped_effect_controller: &'run ScopedEffectController<'_>,
73    session_id: &str,
74    turn_id: &str,
75) -> Result<ScopedEffectController<'run>, RuntimeError> {
76    ScopedEffectController::borrowed(
77        scoped_effect_controller.controller(),
78        ExecutionScope::turn(session_id, turn_id),
79    )
80}
81
82pub(in crate::runtime) fn queued_work_trace_payload(
83    boundary: crate::QueuedWorkClaimBoundary,
84    claim: &crate::QueuedWorkClaim,
85    causes: &[crate::TurnCause],
86) -> serde_json::Value {
87    serde_json::json!({
88        "boundary": boundary,
89        "claim_id": claim.claim_id,
90        "owner_id": claim.owner_id,
91        "batch_ids": queued_work_batch_ids(claim),
92        "payload_types": claim.batches.iter()
93            .flat_map(|batch| batch.items.iter())
94            .map(|item| queued_work_payload_type(&item.payload))
95            .collect::<Vec<_>>(),
96        "causes": causes,
97    })
98}
99
100pub(in crate::runtime) fn queued_work_completion_trace_payload(
101    completions: &[crate::QueuedWorkCompletion],
102) -> serde_json::Value {
103    serde_json::json!({
104        "claims": completions.iter().map(|completion| {
105            serde_json::json!({
106                "session_id": completion.session_id,
107                "claim_id": completion.claim_id,
108                "batch_ids": completion.batch_ids,
109            })
110        }).collect::<Vec<_>>(),
111    })
112}
113
114async fn emit_queued_work_started_to_sink(
115    events: &dyn TurnActivitySink,
116    boundary: crate::QueuedWorkClaimBoundary,
117    claim: &crate::QueuedWorkClaim,
118    causes: Vec<crate::TurnCause>,
119) {
120    emit_turn_activity_to_sink(
121        events,
122        TurnActivity::independent(TurnEvent::QueuedWorkStarted {
123            boundary,
124            batch_ids: queued_work_batch_ids(claim),
125            causes,
126        }),
127    )
128    .await;
129}
130
131pub(in crate::runtime) async fn send_queued_work_started_event(
132    event_tx: &mpsc::Sender<RuntimeStreamEvent>,
133    boundary: crate::QueuedWorkClaimBoundary,
134    claim: &crate::QueuedWorkClaim,
135    causes: Vec<crate::TurnCause>,
136) {
137    send_turn_activity(
138        event_tx,
139        TurnActivityId::fresh(),
140        TurnEvent::QueuedWorkStarted {
141            boundary,
142            batch_ids: queued_work_batch_ids(claim),
143            causes,
144        },
145    )
146    .await;
147}
148
149struct TurnFinishInput {
150    turn_pipeline: TurnBoundary,
151    assembler: TurnAssembler,
152    new_messages: crate::MessageSequence,
153    policy: RuntimeSessionPolicy,
154    turn_index: usize,
155    queued_work_completions: Vec<crate::QueuedWorkCompletion>,
156    trace_turn_id: String,
157}
158
159impl LashRuntime {
160    fn max_context_tokens(&self) -> usize {
161        self.state.effective_policy().context_window_tokens()
162    }
163    #[doc(hidden)]
164    pub fn set_turn_phase_probe(&mut self, probe: Arc<dyn RuntimeTurnPhaseProbe>) {
165        self.turn_phase_probe = Some(probe);
166    }
167
168    fn mark_phase_begin(&self, phase: RuntimeTurnPhase) {
169        if let Some(probe) = self.turn_phase_probe.as_ref() {
170            probe.begin(phase);
171        }
172    }
173
174    fn mark_phase_end(&self, phase: RuntimeTurnPhase) {
175        if let Some(probe) = self.turn_phase_probe.as_ref() {
176            probe.end(phase);
177        }
178    }
179
180    async fn finish_turn(
181        &mut self,
182        finish: TurnFinishInput,
183        events: &dyn EventSink,
184        scoped_effect_controller: &ScopedEffectController<'_>,
185        cancel_state: &CancellationToken,
186    ) -> Result<AssembledTurn, RuntimeError> {
187        let TurnFinishInput {
188            mut turn_pipeline,
189            assembler,
190            new_messages,
191            policy,
192            turn_index,
193            queued_work_completions,
194            trace_turn_id,
195        } = finish;
196        self.policy = self.state.effective_policy().clone();
197        turn_pipeline.state_mut().policy = self.policy.clone();
198        turn_pipeline.state_mut().turn_index = turn_index;
199
200        let mut turn_usage_delta = {
201            let mut ledger = self.shared_token_ledger.lock().expect("token ledger lock");
202            std::mem::take(&mut *ledger)
203        };
204        if assembler.token_usage.total() > 0 || assembler.token_usage.cached_input_tokens > 0 {
205            turn_usage_delta.push(TokenLedgerEntry {
206                source: "turn".to_string(),
207                model: policy.model.id.clone(),
208                usage: assembler.token_usage.clone(),
209            });
210        }
211        let turn_usage_delta = merge_usage_delta_entries(turn_usage_delta);
212
213        turn_pipeline.finalize_turn_read_state(new_messages, cancel_state.is_cancelled());
214        if assembler.token_usage.total() > 0 || assembler.token_usage.cached_input_tokens > 0 {
215            turn_pipeline.state_mut().token_usage = assembler.token_usage.clone();
216        }
217
218        let last_prompt_usage = assembler
219            .last_llm_usage()
220            .and_then(|usage| normalize_prompt_usage(policy.provider(), usage));
221        turn_pipeline.state_mut().last_prompt_usage = last_prompt_usage;
222        let assembled_state = turn_pipeline.export_state_for_assembly();
223        let assembled = assembler.finish(
224            assembled_state,
225            cancel_state.is_cancelled(),
226            None,
227            &self.host.core.control.termination,
228        );
229
230        let Some(session) = self.session.as_ref() else {
231            self.state.apply_snapshot(&assembled.state);
232            self.emit_completed_turn_trace(&assembled.state, &assembled.outcome, &trace_turn_id);
233            return Ok(assembled);
234        };
235
236        let plugins = Arc::clone(session.plugins());
237        let manager = match self.runtime_session_services_for_turn(None) {
238            Ok(manager) => manager,
239            Err(err) => {
240                return Err(RuntimeError::new(
241                    RuntimeErrorCode::PluginSessionManager,
242                    err.to_string(),
243                ));
244            }
245        };
246
247        self.mark_phase_begin(RuntimeTurnPhase::FinalizeTurn);
248        let finalized = match plugins
249            .finalize_turn_with_phase_probe(
250                assembled,
251                manager.state_service(),
252                manager.lifecycle_service(),
253                manager.graph_service(),
254                self.turn_phase_probe.clone(),
255            )
256            .await
257        {
258            Ok(finalized) => finalized,
259            Err(err) => {
260                self.mark_phase_end(RuntimeTurnPhase::FinalizeTurn);
261                return Err(RuntimeError::new(
262                    RuntimeErrorCode::PluginFinalizeTurn,
263                    err.to_string(),
264                ));
265            }
266        };
267        self.mark_phase_end(RuntimeTurnPhase::FinalizeTurn);
268
269        let mut returned_turn = finalized.turn;
270        self.mark_phase_begin(RuntimeTurnPhase::PersistTurn);
271        self.mark_phase_begin(RuntimeTurnPhase::FinalCommit);
272        let queued_work_completion_trace = queued_work_completions.clone();
273        let pending_attachment_ids = self
274            .host
275            .core
276            .durability
277            .attachment_store
278            .pending_manifest_commit_ids();
279        if let Err(err) = turn_pipeline
280            .final_commit(
281                &mut returned_turn,
282                self.session.as_mut(),
283                &turn_usage_delta,
284                Some(&trace_turn_id),
285                queued_work_completions,
286                pending_attachment_ids.clone(),
287            )
288            .await
289        {
290            self.mark_phase_end(RuntimeTurnPhase::FinalCommit);
291            self.mark_phase_end(RuntimeTurnPhase::PersistTurn);
292            return Err(err);
293        }
294        self.host
295            .core
296            .durability
297            .attachment_store
298            .mark_manifest_committed(&pending_attachment_ids);
299        self.mark_phase_end(RuntimeTurnPhase::FinalCommit);
300
301        emit_session_events_to_sink(events, finalized.events).await;
302        self.state = turn_pipeline.into_final_state();
303        if matches!(returned_turn.outcome, TurnOutcome::AgentFrameSwitch { .. })
304            && let Some(session) = self.session.as_mut()
305        {
306            let protocol_session = Arc::clone(session.plugins().protocol_session());
307            let session_id = self.state.session_id.clone();
308            protocol_session
309                .restore_session(
310                    crate::plugin::ProtocolSessionContext::new(session, &session_id),
311                    &self.state,
312                )
313                .await
314                .map_err(|err| {
315                    RuntimeError::new(
316                        RuntimeErrorCode::Other("protocol_restore_session".to_string()),
317                        err.to_string(),
318                    )
319                })?;
320        }
321        if !queued_work_completion_trace.is_empty() {
322            crate::trace::emit_trace(
323                &self.host.core.tracing.trace_sink,
324                &self.host.core.tracing.trace_context,
325                lash_trace::TraceContext::default()
326                    .for_session(returned_turn.state.session_id.clone())
327                    .for_turn_index(returned_turn.state.turn_index)
328                    .for_turn(trace_turn_id.clone()),
329                lash_trace::TraceEvent::Custom {
330                    name: "queued_work.completed".to_string(),
331                    payload: queued_work_completion_trace_payload(&queued_work_completion_trace),
332                },
333            );
334        }
335        self.mark_phase_begin(RuntimeTurnPhase::PostPersistHooks);
336        self.emit_turn_persisted_event(&returned_turn, scoped_effect_controller, &trace_turn_id)
337            .await?;
338        self.mark_phase_end(RuntimeTurnPhase::PostPersistHooks);
339        self.mark_phase_end(RuntimeTurnPhase::PersistTurn);
340
341        self.emit_completed_turn_trace(
342            &returned_turn.state,
343            &returned_turn.outcome,
344            &trace_turn_id,
345        );
346        Ok(returned_turn)
347    }
348
349    fn emit_completed_turn_trace(
350        &self,
351        state: &SessionSnapshot,
352        outcome: &TurnOutcome,
353        trace_turn_id: &str,
354    ) {
355        if self.host.core.tracing.trace_sink.is_none() {
356            return;
357        }
358
359        let (status, done_reason, agent_frame_switch) = trace_fields_from_outcome(outcome);
360        crate::trace::emit_trace(
361            &self.host.core.tracing.trace_sink,
362            &self.host.core.tracing.trace_context,
363            lash_trace::TraceContext::default()
364                .for_session(state.session_id.clone())
365                .for_turn_index(state.turn_index)
366                .for_turn(trace_turn_id.to_string()),
367            lash_trace::TraceEvent::TurnCompleted {
368                status: status.to_string(),
369                done_reason: done_reason.to_string(),
370                agent_frame_switch,
371            },
372        );
373    }
374
375    async fn emit_turn_persisted_event(
376        &self,
377        returned_turn: &AssembledTurn,
378        scoped_effect_controller: &ScopedEffectController<'_>,
379        trace_turn_id: &str,
380    ) -> Result<(), RuntimeError> {
381        let Some(session) = self.session.as_ref() else {
382            return Ok(());
383        };
384        let Ok(manager) = self.runtime_session_services() else {
385            return Ok(());
386        };
387        let phase_turn_id = turn_phase_id(trace_turn_id, "turn-persisted");
388        let phase_controller = scoped_child_turn_controller(
389            scoped_effect_controller,
390            &self.state.session_id,
391            &phase_turn_id,
392        )?;
393        let direct_completions = manager.direct_completion_client(
394            RuntimeEffectControllerHandle::borrowed(phase_controller),
395            Some(phase_turn_id),
396        );
397
398        session
399            .plugins()
400            .emit_runtime_event_with_phase_probe(
401                crate::PluginLifecycleEvent::TurnPersisted(Box::new(
402                    crate::SessionStateChangedContext {
403                        session_id: self.state.session_id.clone(),
404                        state: crate::SessionReadView::from_snapshot(&returned_turn.state),
405                        sessions: manager.state_service(),
406                        session_graph: manager.graph_service(),
407                        direct_completions,
408                    },
409                )),
410                self.turn_phase_probe.clone(),
411            )
412            .await;
413        Ok(())
414    }
415
416    /// Run a single turn and stream events to the host sink.
417    pub async fn stream_turn(
418        &mut self,
419        input: TurnInput,
420        opts: TurnOptions<'_>,
421    ) -> Result<AssembledTurn, RuntimeError> {
422        let cancel = opts.cancel.clone();
423        let scoped_effect_controller = opts.scoped_effect_controller();
424        self.stream_turn_with_scoped_effect_controller_inner(
425            input,
426            opts.events_or_noop(),
427            opts.turn_events_or_noop(),
428            scoped_effect_controller,
429            cancel,
430            None,
431        )
432        .await
433    }
434
435    pub async fn stream_next_queued_work(
436        &mut self,
437        opts: TurnOptions<'_>,
438    ) -> Result<Option<AssembledTurn>, RuntimeError> {
439        self.stream_queued_work(opts, None).await
440    }
441
442    pub async fn stream_selected_queued_work(
443        &mut self,
444        opts: TurnOptions<'_>,
445        batch_ids: &[String],
446    ) -> Result<Option<AssembledTurn>, RuntimeError> {
447        self.stream_queued_work(opts, Some(batch_ids)).await
448    }
449
450    async fn stream_queued_work(
451        &mut self,
452        opts: TurnOptions<'_>,
453        selected_batch_ids: Option<&[String]>,
454    ) -> Result<Option<AssembledTurn>, RuntimeError> {
455        if self.drain_next_session_command().await?.is_some() {
456            return Ok(None);
457        }
458        let Some(store) = self
459            .session
460            .as_ref()
461            .and_then(|session| session.history_store())
462        else {
463            return Ok(None);
464        };
465        let claim = if let Some(batch_ids) = selected_batch_ids {
466            store
467                .claim_ready_queued_work_by_batch_ids(
468                    &self.state.session_id,
469                    &self.runtime_scope_id,
470                    crate::QueuedWorkClaimBoundary::Idle,
471                    crate::QUEUED_WORK_CLAIM_TTL_MS,
472                    batch_ids,
473                )
474                .await
475        } else {
476            store
477                .claim_ready_queued_work(
478                    &self.state.session_id,
479                    &self.runtime_scope_id,
480                    crate::QueuedWorkClaimBoundary::Idle,
481                    crate::QUEUED_WORK_CLAIM_TTL_MS,
482                    64,
483                )
484                .await
485        }
486        .map_err(|err| RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string()))?;
487        let Some(claim) = claim else {
488            return Ok(None);
489        };
490        let mut work = claim.materialize_for_turn();
491        let turn_id = work
492            .input
493            .trace_turn_id
494            .clone()
495            .or_else(|| Some(opts.execution_scope_id().to_owned()))
496            .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
497        work.input.trace_turn_id = Some(turn_id.clone());
498        let causes = work.turn_causes.clone();
499        emit_queued_work_started_to_sink(
500            opts.turn_events_or_noop(),
501            crate::QueuedWorkClaimBoundary::Idle,
502            &claim,
503            causes.clone(),
504        )
505        .await;
506        crate::trace::emit_trace(
507            &self.host.core.tracing.trace_sink,
508            &self.host.core.tracing.trace_context,
509            lash_trace::TraceContext::default()
510                .for_session(self.state.session_id.clone())
511                .for_turn_index(self.state.turn_index + 1)
512                .for_turn(turn_id.clone()),
513            lash_trace::TraceEvent::Custom {
514                name: "queued_work.claimed".to_string(),
515                payload: queued_work_trace_payload(
516                    crate::QueuedWorkClaimBoundary::Idle,
517                    &claim,
518                    &causes,
519                ),
520            },
521        );
522        let cancel = opts.cancel.clone();
523        let scoped_effect_controller = opts.scoped_effect_controller();
524        self.stream_turn_with_scoped_effect_controller_inner(
525            work.input,
526            opts.events_or_noop(),
527            opts.turn_events_or_noop(),
528            scoped_effect_controller,
529            cancel,
530            Some(claim),
531        )
532        .await
533        .map(Some)
534    }
535
536    /// Enforce the durable-first wiring invariant at a turn-scope boundary: when
537    /// the host wired a durable effect host, every store reachable from this
538    /// scope must also be durable. A durable host running against any ephemeral
539    /// store fails loudly here rather than silently degrading.
540    ///
541    /// Inline controllers (the default tier) impose no requirement, so
542    /// inline/in-memory hosts pass unchanged.
543    fn ensure_durable_store_facets_for_scope(
544        &self,
545        scoped_effect_controller: &ScopedEffectController<'_>,
546    ) -> Result<(), RuntimeError> {
547        if scoped_effect_controller.controller().durability_tier() != crate::DurabilityTier::Durable
548        {
549            return Ok(());
550        }
551        if self
552            .host
553            .core
554            .durability
555            .attachment_store
556            .persistence()
557            .durability_tier()
558            != crate::DurabilityTier::Durable
559        {
560            return Err(RuntimeError::durable_store_required(
561                crate::DurableStoreFacet::AttachmentStore,
562            ));
563        }
564        if self
565            .host
566            .core
567            .durability
568            .process_env_store
569            .durability_tier()
570            != crate::DurabilityTier::Durable
571        {
572            return Err(RuntimeError::durable_store_required(
573                crate::DurableStoreFacet::ProcessEnvStore,
574            ));
575        }
576        if let Some(store) = self
577            .session
578            .as_ref()
579            .and_then(|session| session.history_store())
580            && store.durability_tier() != crate::DurabilityTier::Durable
581        {
582            return Err(RuntimeError::durable_store_required(
583                crate::DurableStoreFacet::SessionStore,
584            ));
585        }
586        if let Some(process_registry) = self.host.process_registry.as_ref()
587            && process_registry.durability_tier() != crate::DurabilityTier::Durable
588        {
589            return Err(RuntimeError::durable_store_required(
590                crate::DurableStoreFacet::ProcessRegistry,
591            ));
592        }
593        Ok(())
594    }
595
596    async fn stream_turn_with_scoped_effect_controller_inner(
597        &mut self,
598        mut input: TurnInput,
599        events: &dyn EventSink,
600        turn_events: &dyn TurnActivitySink,
601        scoped_effect_controller: ScopedEffectController<'_>,
602        cancel: CancellationToken,
603        queued_claim: Option<crate::QueuedWorkClaim>,
604    ) -> Result<AssembledTurn, RuntimeError> {
605        if queued_claim.is_none() {
606            while self.drain_next_session_command().await?.is_some() {}
607        }
608        if let Some(input_turn_id) = input.trace_turn_id.as_deref()
609            && scoped_effect_controller
610                .execution_scope()
611                .validates_turn_trace_id()
612            && input_turn_id != scoped_effect_controller.scope_id()
613        {
614            return Err(RuntimeError::new(
615                RuntimeErrorCode::ExecutionScopeTurnIdMismatch,
616                format!(
617                    "input trace_turn_id `{input_turn_id}` does not match execution scope id `{}`",
618                    scoped_effect_controller.scope_id()
619                ),
620            ));
621        }
622        self.ensure_durable_store_facets_for_scope(&scoped_effect_controller)?;
623        input
624            .trace_turn_id
625            .get_or_insert_with(|| scoped_effect_controller.scope_id().to_string());
626        self.stream_turn_inner(
627            input.clone(),
628            events,
629            turn_events,
630            scoped_effect_controller,
631            cancel.clone(),
632            queued_claim,
633        )
634        .await
635    }
636
637    /// Stream one logical host turn, following foreground AgentFrame switches
638    /// until a terminal outcome is reached.
639    ///
640    /// A protocol continuation creates a new frame in the same session. Hosts
641    /// that only care about the benchmark/app answer should not need to
642    /// special-case that intermediate outcome; this helper keeps driving the
643    /// same session through each frame's task with the normal runtime turn
644    /// guards.
645    pub async fn stream_turn_with_agent_frames(
646        &mut self,
647        input: TurnInput,
648        opts: TurnOptions<'_>,
649    ) -> Result<AgentFrameRun, RuntimeError> {
650        let cancel = opts.cancel.clone();
651        let scoped_effect_controller = opts.scoped_effect_controller();
652        self.stream_turn_with_agent_frames_inner(
653            input,
654            opts.events_or_noop(),
655            opts.turn_events_or_noop(),
656            scoped_effect_controller,
657            cancel,
658        )
659        .await
660    }
661
662    async fn stream_turn_with_agent_frames_inner(
663        &mut self,
664        mut input: TurnInput,
665        events: &dyn EventSink,
666        turn_events: &dyn TurnActivitySink,
667        scoped_effect_controller: ScopedEffectController<'_>,
668        cancel: CancellationToken,
669    ) -> Result<AgentFrameRun, RuntimeError> {
670        if let Some(input_turn_id) = input.trace_turn_id.as_deref()
671            && scoped_effect_controller
672                .execution_scope()
673                .validates_turn_trace_id()
674            && input_turn_id != scoped_effect_controller.scope_id()
675        {
676            return Err(RuntimeError::new(
677                RuntimeErrorCode::ExecutionScopeTurnIdMismatch,
678                format!(
679                    "input trace_turn_id `{input_turn_id}` does not match execution scope id `{}`",
680                    scoped_effect_controller.scope_id()
681                ),
682            ));
683        }
684        let follow_protocol_turn_options = input.protocol_turn_options.clone();
685        let follow_turn_context = input.turn_context.clone();
686        let follow_trace_turn_id = input
687            .trace_turn_id
688            .clone()
689            .unwrap_or_else(|| scoped_effect_controller.scope_id().to_string());
690        input
691            .trace_turn_id
692            .get_or_insert(follow_trace_turn_id.clone());
693        let mut turns = Vec::new();
694        loop {
695            let turn_trace_turn_id = agent_frame_follow_turn_id(&follow_trace_turn_id, turns.len());
696            input.trace_turn_id = Some(turn_trace_turn_id.clone());
697            let turn_effect_controller = if turns.is_empty() {
698                scoped_effect_controller.clone()
699            } else {
700                ScopedEffectController::borrowed(
701                    scoped_effect_controller.controller(),
702                    ExecutionScope::turn(&self.state.session_id, &turn_trace_turn_id),
703                )?
704            };
705            let turn = self
706                .stream_turn_with_scoped_effect_controller_inner(
707                    input,
708                    events,
709                    turn_events,
710                    turn_effect_controller,
711                    cancel.clone(),
712                    None,
713                )
714                .await?;
715            let switched_frame = match &turn.outcome {
716                TurnOutcome::AgentFrameSwitch { frame_id, task } => {
717                    Some((frame_id.clone(), task.clone()))
718                }
719                _ => None,
720            };
721            turns.push(turn);
722
723            let Some((_frame_id, task)) = switched_frame else {
724                return Ok(AgentFrameRun { turns });
725            };
726            input = turn_input_from_text(task);
727            input.protocol_turn_options = follow_protocol_turn_options.clone();
728            input.turn_context = follow_turn_context.clone();
729        }
730    }
731
732    async fn stream_turn_inner(
733        &mut self,
734        mut input: TurnInput,
735        events: &dyn EventSink,
736        turn_events: &dyn TurnActivitySink,
737        scoped_effect_controller: ScopedEffectController<'_>,
738        cancel: CancellationToken,
739        queued_claim: Option<crate::QueuedWorkClaim>,
740    ) -> Result<AssembledTurn, RuntimeError> {
741        self.refresh_session_graph_from_store()
742            .await
743            .map_err(session_head_refresh_error)?;
744        let input_trace_turn_id = input.trace_turn_id.clone();
745        let queued_turn_work = queued_claim
746            .as_ref()
747            .map(crate::QueuedWorkClaim::materialize_for_turn);
748        if let Some(work) = queued_turn_work.as_ref()
749            && input.items.is_empty()
750            && input.image_blobs.is_empty()
751        {
752            input = work.input.clone();
753            if input.trace_turn_id.is_none() {
754                input.trace_turn_id = input_trace_turn_id;
755            }
756        }
757        if self
758            .session
759            .as_ref()
760            .and_then(|session| session.history_store())
761            .is_some()
762        {
763            ensure_durable_effect_input(&input)?;
764        }
765        if let Some(extension) = &input.protocol_extension
766            && let Some(session) = self.session.as_ref()
767        {
768            let protocol_session = std::sync::Arc::clone(session.plugins().protocol_session());
769            protocol_session
770                .validate_turn_extension(extension)
771                .await
772                .map_err(|err| {
773                    RuntimeError::new(RuntimeErrorCode::ProtocolTurnExtension, err.to_string())
774                })?;
775        }
776        let previous_prompt_usage = self.state.last_prompt_usage.clone();
777        let normalized = match self
778            .normalize_input_items(&input.items, &input.image_blobs)
779            .await
780        {
781            Ok(items) => items,
782            Err(e) => {
783                self.state.last_prompt_usage = None;
784                let mut assembler = TurnAssembler::default();
785                let error_event = SessionEvent::Error {
786                    message: e.clone(),
787                    envelope: Some(crate::session_model::ErrorEnvelope {
788                        kind: "input_validation".to_string(),
789                        code: Some("invalid_turn_input".to_string()),
790                        terminal_reason: None,
791                        user_message: e.clone(),
792                        raw: None,
793                    }),
794                };
795                assembler.push(&error_event);
796                emit_turn_activity_to_sink(
797                    turn_events,
798                    TurnActivity::independent(TurnEvent::Error { message: e }),
799                )
800                .await;
801                emit_session_event_to_sink(events, error_event).await;
802                let outcome_event = SessionEvent::TurnOutcome {
803                    outcome: TurnOutcome::Stopped(TurnStop::InvalidInput),
804                };
805                assembler.push(&outcome_event);
806                emit_session_event_to_sink(events, outcome_event).await;
807                assembler.push(&SessionEvent::Done);
808                emit_session_event_to_sink(events, SessionEvent::Done).await;
809                return Ok(assembler.finish(
810                    self.state.to_snapshot(),
811                    false,
812                    None,
813                    &self.host.core.control.termination,
814                ));
815            }
816        };
817        let turn_index = self.state.turn_index + 1;
818        let trace_turn_id = input
819            .trace_turn_id
820            .clone()
821            .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
822        if self.host.core.tracing.trace_sink.is_some() {
823            let mut trace_metadata = std::collections::BTreeMap::new();
824            trace_metadata.insert(
825                "input_item_count".to_string(),
826                serde_json::json!(normalized.len()),
827            );
828            crate::trace::emit_trace(
829                &self.host.core.tracing.trace_sink,
830                &self.host.core.tracing.trace_context,
831                lash_trace::TraceContext::default()
832                    .for_session(self.state.session_id.clone())
833                    .for_turn_index(turn_index)
834                    .for_turn(trace_turn_id.clone()),
835                lash_trace::TraceEvent::TurnStarted {
836                    metadata: trace_metadata,
837                },
838            );
839        }
840
841        let base_read_model = self.state.read_model();
842        let base_messages = base_read_model.messages;
843        let base_render_cache = base_read_model.prompt_render_cache;
844        let mut turn_delta = Vec::new();
845        let initial_turn_causes = queued_turn_work
846            .as_ref()
847            .map(|work| work.turn_causes.clone())
848            .unwrap_or_default();
849        turn_delta.extend(
850            initial_turn_causes
851                .iter()
852                .map(crate::TurnCause::to_event_message),
853        );
854
855        let user_id = fresh_message_id();
856        let mut user_parts: Vec<Part> = Vec::new();
857        for item in normalized {
858            match item {
859                NormalizedItem::Text(text) => {
860                    if text.is_empty() {
861                        continue;
862                    }
863                    user_parts.push(Part {
864                        id: format!("{}.p{}", user_id, user_parts.len()),
865                        kind: PartKind::Text,
866                        content: text,
867                        attachment: None,
868                        tool_call_id: None,
869                        tool_name: None,
870                        tool_replay: None,
871                        prune_state: PruneState::Intact,
872                        reasoning_meta: None,
873                        response_meta: None,
874                    });
875                }
876                NormalizedItem::Image(reference) => {
877                    user_parts.push(Part {
878                        id: format!("{}.p{}", user_id, user_parts.len()),
879                        kind: PartKind::Image,
880                        content: String::new(),
881                        attachment: Some(crate::session_model::message::PartAttachment {
882                            reference,
883                        }),
884                        tool_call_id: None,
885                        tool_name: None,
886                        tool_replay: None,
887                        prune_state: PruneState::Intact,
888                        reasoning_meta: None,
889                        response_meta: None,
890                    });
891                }
892            }
893        }
894        if user_parts.is_empty() && initial_turn_causes.is_empty() {
895            user_parts.push(Part {
896                id: format!("{}.p0", user_id),
897                kind: PartKind::Text,
898                content: String::new(),
899                attachment: None,
900                tool_call_id: None,
901                tool_name: None,
902                tool_replay: None,
903                prune_state: PruneState::Intact,
904                reasoning_meta: None,
905                response_meta: None,
906            });
907        }
908        if !user_parts.is_empty() {
909            reassign_part_ids(&user_id, &mut user_parts);
910            turn_delta.push(Message {
911                id: user_id.clone(),
912                role: MessageRole::User,
913                parts: shared_parts(user_parts),
914                origin: None,
915            });
916        }
917
918        let manager = self
919            .runtime_session_services_for_turn(None)
920            .map_err(|err| {
921                RuntimeError::new(RuntimeErrorCode::PluginSessionManager, err.to_string())
922            })?;
923        let plugin_session = self
924            .session
925            .as_ref()
926            .map(|s| Arc::clone(s.plugins()))
927            .ok_or_else(|| {
928                RuntimeError::new(
929                    RuntimeErrorCode::ContextPrepareTurn,
930                    "runtime session not available",
931                )
932            })?;
933        let prepare_phase_turn_id = turn_phase_id(&trace_turn_id, "prepare-turn");
934        let prepare_phase_controller = scoped_child_turn_controller(
935            &scoped_effect_controller,
936            &self.state.session_id,
937            &prepare_phase_turn_id,
938        )?;
939        let turn_ctx = crate::TurnTransformContext {
940            session_id: self.state.session_id.clone(),
941            state: self.read_view(),
942            prompt_usage: previous_prompt_usage.clone(),
943            max_context_tokens: Some(LashRuntime::max_context_tokens(self)),
944            sessions: manager.state_service(),
945            session_lifecycle: manager.lifecycle_service(),
946            session_graph: manager.graph_service(),
947            scoped_effect_controller: scoped_effect_controller.clone(),
948            direct_completions: manager.direct_completion_client(
949                RuntimeEffectControllerHandle::borrowed(prepare_phase_controller),
950                Some(prepare_phase_turn_id),
951            ),
952        };
953        self.mark_phase_begin(RuntimeTurnPhase::ContextTransform);
954        let prepared_context = plugin_session
955            .prepare_turn_context(
956                &turn_ctx,
957                crate::session_model::context::PreparedContext {
958                    messages: crate::MessageSequence::from_base_and_delta(
959                        base_messages,
960                        turn_delta,
961                    )
962                    .with_base_render_cache(base_render_cache),
963                    ..Default::default()
964                },
965                self.turn_phase_probe.clone(),
966            )
967            .await
968            .map_err(|err| {
969                RuntimeError::new(RuntimeErrorCode::ContextPrepareTurn, err.to_string())
970            })?;
971        self.mark_phase_end(RuntimeTurnPhase::ContextTransform);
972        // Release the read-view's graph clone before the rest of the turn
973        // runs. Keeping it alive into `stream_prepared_turn` forces the
974        // post-turn `append_active_read_delta` to deep-clone the session
975        // graph (Arc::make_mut with refcount > 1).
976        drop(turn_ctx);
977        let messages = prepared_context.messages;
978        if let Some(session) = self.session.as_mut() {
979            session
980                .set_context_overlay(
981                    prepared_context.tool_providers,
982                    prepared_context.prompt_contributions,
983                    prepared_context.include_base_tools,
984                )
985                .map_err(|err| {
986                    RuntimeError::new(
987                        RuntimeErrorCode::Other("session_tool_registry".to_string()),
988                        err.to_string(),
989                    )
990                })?;
991        }
992
993        self.state.last_prompt_usage = None;
994        Box::pin(self.stream_prepared_turn(
995            messages,
996            previous_prompt_usage,
997            input.protocol_turn_options.clone(),
998            input.protocol_extension.clone(),
999            input.turn_context.clone(),
1000            initial_turn_causes,
1001            trace_turn_id,
1002            turn_index,
1003            events,
1004            turn_events,
1005            scoped_effect_controller,
1006            cancel,
1007            queued_claim,
1008        ))
1009        .await
1010    }
1011
1012    /// Run a single turn and return only the assembled terminal result.
1013    pub async fn run_turn_assembled(
1014        &mut self,
1015        input: TurnInput,
1016        cancel: CancellationToken,
1017        scoped_effect_controller: ScopedEffectController<'_>,
1018    ) -> Result<AssembledTurn, RuntimeError> {
1019        self.stream_turn(input, TurnOptions::new(cancel, scoped_effect_controller))
1020            .await
1021    }
1022
1023    /// Run a turn using host-prepared message history.
1024    #[allow(clippy::too_many_arguments)]
1025    pub async fn stream_prepared_turn(
1026        &mut self,
1027        messages: crate::MessageSequence,
1028        _previous_prompt_usage: Option<PromptUsage>,
1029        protocol_turn_options: Option<crate::ProtocolTurnOptions>,
1030        protocol_extension: Option<crate::ProtocolTurnExtensionHandle>,
1031        turn_context: crate::TurnContext,
1032        initial_turn_causes: Vec<crate::TurnCause>,
1033        trace_turn_id: String,
1034        turn_index: usize,
1035        events: &dyn EventSink,
1036        turn_events: &dyn TurnActivitySink,
1037        scoped_effect_controller: ScopedEffectController<'_>,
1038        cancel: CancellationToken,
1039        initial_queue_claim: Option<crate::QueuedWorkClaim>,
1040    ) -> Result<AssembledTurn, RuntimeError> {
1041        let (event_tx, mut event_rx) = mpsc::channel::<RuntimeStreamEvent>(100);
1042        let child_usage_event_relay = ChildUsageEventRelay::new(event_tx.clone());
1043        let mut turn_policy = self.state.effective_policy().clone();
1044        let turn_provider_override = turn_context.provider().cloned();
1045        if let Some(provider) = turn_provider_override.as_ref() {
1046            turn_policy.provider_id = provider.kind().to_string();
1047        }
1048        if let Some(model) = turn_context.model_spec() {
1049            turn_policy.model = model.clone();
1050        }
1051        let session_protocol_turn_options = self.state.effective_protocol_turn_options().clone();
1052        let effective_protocol_turn_options = protocol_turn_options
1053            .clone()
1054            .map(|options| session_protocol_turn_options.merged_with_override(&options))
1055            .unwrap_or(session_protocol_turn_options);
1056        let manager = self
1057            .runtime_session_services_for_turn(Some(child_usage_event_relay.clone()))
1058            .map_err(|err| {
1059                RuntimeError::new(RuntimeErrorCode::PluginSessionManager, err.to_string())
1060            })?;
1061        let plugins = {
1062            let session = self
1063                .session
1064                .as_ref()
1065                .expect("lash runtime session must be available");
1066            Arc::clone(session.plugins())
1067        };
1068        let mut assembler = TurnAssembler::new();
1069        self.mark_phase_begin(RuntimeTurnPhase::BeforeTurnHooks);
1070        // Block-scope the pinned future so it (and its captured
1071        // `SessionReadView` clone of the session graph) drops before the
1072        // post-turn `append_active_read_delta` mutation. Keeping it alive
1073        // across the turn forces `Arc::make_mut` to deep-clone
1074        // `SessionGraphData`.
1075        let prepared = {
1076            let prepare_turn = plugins.prepare_turn_with_phase_probe(
1077                PrepareTurnRequest {
1078                    session_id: self.state.session_id.clone(),
1079                    state: crate::SessionReadView::from_runtime_state(
1080                        &self.state,
1081                        turn_policy.clone(),
1082                        effective_protocol_turn_options.clone(),
1083                    ),
1084                    messages,
1085                    sessions: manager.state_service(),
1086                    session_lifecycle: manager.lifecycle_service(),
1087                    session_graph: manager.graph_service(),
1088                    turn_context: turn_context.clone(),
1089                },
1090                self.turn_phase_probe.clone(),
1091            );
1092            let mut prepare_turn = Box::pin(prepare_turn);
1093
1094            loop {
1095                tokio::select! {
1096                    prepared = prepare_turn.as_mut() => {
1097                        let prepared = prepared.map_err(|err| {
1098                            RuntimeError::new(RuntimeErrorCode::PluginPrepareTurn, err.to_string())
1099                        })?;
1100                        self.mark_phase_end(RuntimeTurnPhase::BeforeTurnHooks);
1101                        break prepared;
1102                    }
1103                    maybe_event = event_rx.recv() => {
1104                        if let Some(event) = maybe_event {
1105                            emit_runtime_stream_event_to_sinks(
1106                                events,
1107                                turn_events,
1108                                event,
1109                                &mut assembler,
1110                            )
1111                            .await;
1112                        }
1113                    }
1114                }
1115            }
1116        };
1117        for event in &prepared.events {
1118            assembler.push(event);
1119        }
1120        emit_session_events_to_sink(events, prepared.events).await;
1121        if let Some(abort) = prepared.abort {
1122            drop(event_tx);
1123
1124            let mut turn_pipeline = TurnBoundary::from_state(self.state.clone());
1125            turn_pipeline.apply_prepared_messages(&prepared.messages);
1126            let state = turn_pipeline.into_final_state();
1127            let issue = TurnIssue {
1128                kind: "plugin".to_string(),
1129                code: Some(abort.code),
1130                terminal_reason: None,
1131                message: abort.message.clone(),
1132                raw: None,
1133            };
1134            let error_event = SessionEvent::Error {
1135                message: abort.message,
1136                envelope: Some(crate::session_model::ErrorEnvelope {
1137                    kind: "plugin".to_string(),
1138                    code: issue.code.clone(),
1139                    terminal_reason: None,
1140                    user_message: issue.message.clone(),
1141                    raw: None,
1142                }),
1143            };
1144            assembler.push(&error_event);
1145            emit_turn_activity_to_sink(
1146                turn_events,
1147                TurnActivity::independent(TurnEvent::Error {
1148                    message: issue.message.clone(),
1149                }),
1150            )
1151            .await;
1152            emit_session_event_to_sink(events, error_event).await;
1153            let outcome_event = SessionEvent::TurnOutcome {
1154                outcome: TurnOutcome::Stopped(TurnStop::PluginAbort),
1155            };
1156            assembler.push(&outcome_event);
1157            emit_session_event_to_sink(events, outcome_event).await;
1158            assembler.push(&SessionEvent::Done);
1159            emit_session_event_to_sink(events, SessionEvent::Done).await;
1160            return Ok(assembler.finish(
1161                state.to_snapshot(),
1162                cancel.is_cancelled(),
1163                Some(issue),
1164                &self.host.core.control.termination,
1165            ));
1166        }
1167        let mut turn_pipeline = TurnBoundary::from_state(self.state.clone());
1168        let store = self
1169            .session
1170            .as_ref()
1171            .and_then(|session| session.history_store());
1172        // Durable controllers, like Restate, own in-flight replay. Writing
1173        // progress checkpoints directly to the shared store would make handler
1174        // replay observe a newer partial turn and change effect replay keys.
1175        let progress_store = if scoped_effect_controller.controller().durability_tier()
1176            == crate::DurabilityTier::Durable
1177        {
1178            None
1179        } else {
1180            store.as_ref().map(|store| store.as_ref())
1181        };
1182        turn_pipeline
1183            .prepared_checkpoint(
1184                progress_store,
1185                turn_policy.clone(),
1186                turn_index,
1187                &prepared.messages,
1188                self.session.as_mut(),
1189            )
1190            .await
1191            .map_err(|err| {
1192                RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
1193            })?;
1194        let resolved_turn_policy = if let Some(provider) = turn_provider_override {
1195            RuntimeSessionPolicy::from_provider(turn_policy.clone(), provider)
1196                .map_err(|err| RuntimeError::new("llm_provider", err.to_string()))?
1197        } else {
1198            self.host
1199                .resolve_session_policy(&self.state.session_id, turn_policy.clone())
1200                .map_err(|err| RuntimeError::new("llm_provider", err.to_string()))?
1201        };
1202        let manager = self
1203            .runtime_session_services_for_turn(Some(child_usage_event_relay.clone()))
1204            .map_err(|err| {
1205                RuntimeError::new(RuntimeErrorCode::PluginSessionManager, err.to_string())
1206            })?;
1207        let cancel_state = cancel.clone();
1208        let finish_scoped_effect_controller = scoped_effect_controller.clone();
1209        let session = self
1210            .session
1211            .take()
1212            .expect("lash runtime session must be available");
1213        let mut driver = RuntimeTurnDriver {
1214            session,
1215            policy: resolved_turn_policy,
1216            host: self.host.clone(),
1217            turn_id: scoped_effect_controller.scope_id().to_string(),
1218            scoped_effect_controller,
1219            session_id: self.state.session_id.clone(),
1220            turn_index,
1221            turn_pipeline,
1222            llm_stream_summaries: HashMap::new(),
1223            next_llm_ordinal: 0,
1224            session_services: manager,
1225            protocol_turn_options: effective_protocol_turn_options,
1226            protocol_extension,
1227            turn_context,
1228            turn_causes: initial_turn_causes,
1229            pending_queue_claims: initial_queue_claim.into_iter().collect(),
1230            checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
1231            turn_phase_probe: self.turn_phase_probe.clone(),
1232        };
1233        let protocol_run_offset = 0;
1234        self.mark_phase_begin(RuntimeTurnPhase::EffectLoop);
1235        let run_result = drive_turn_to_completion(
1236            driver.run(prepared.messages, event_tx, cancel, protocol_run_offset),
1237            &mut event_rx,
1238            &mut assembler,
1239            &child_usage_event_relay,
1240            events,
1241            turn_events,
1242        )
1243        .await;
1244        let (new_messages, _new_protocol_iteration) = match run_result {
1245            Ok(result) => result,
1246            Err(err) => {
1247                self.mark_phase_end(RuntimeTurnPhase::EffectLoop);
1248                let RuntimeTurnDriver { session, .. } = driver;
1249                self.session = Some(session);
1250                return Err(err);
1251            }
1252        };
1253        self.mark_phase_end(RuntimeTurnPhase::EffectLoop);
1254        tracing::debug!(
1255            new_message_count = new_messages.len(),
1256            tool_call_count = assembler.tool_calls.len(),
1257            "runtime post-run_task"
1258        );
1259
1260        let RuntimeTurnDriver {
1261            session,
1262            policy,
1263            turn_pipeline,
1264            pending_queue_claims,
1265            ..
1266        } = driver;
1267        self.session = Some(session);
1268        self.finish_turn(
1269            TurnFinishInput {
1270                turn_pipeline,
1271                assembler,
1272                new_messages,
1273                policy,
1274                turn_index,
1275                queued_work_completions: pending_queue_claims
1276                    .iter()
1277                    .map(crate::QueuedWorkClaim::completion)
1278                    .collect(),
1279                trace_turn_id,
1280            },
1281            events,
1282            &finish_scoped_effect_controller,
1283            &cancel_state,
1284        )
1285        .await
1286    }
1287    async fn normalize_input_items(
1288        &self,
1289        items: &[InputItem],
1290        image_blobs: &HashMap<String, Vec<u8>>,
1291    ) -> Result<Vec<NormalizedItem>, String> {
1292        normalize_input_items(
1293            items,
1294            image_blobs,
1295            self.host.core.durability.attachment_store.as_ref(),
1296        )
1297        .await
1298    }
1299}
1300
1301fn turn_input_from_text(text: String) -> TurnInput {
1302    TurnInput {
1303        items: vec![InputItem::Text { text }],
1304        image_blobs: HashMap::new(),
1305        protocol_turn_options: None,
1306        trace_turn_id: None,
1307        protocol_extension: None,
1308        turn_context: crate::TurnContext::default(),
1309    }
1310}
1311
1312fn agent_frame_follow_turn_id(root_turn_id: &str, completed_turn_count: usize) -> String {
1313    if completed_turn_count == 0 {
1314        root_turn_id.to_string()
1315    } else {
1316        format!("{root_turn_id}:agent-frame:{completed_turn_count}")
1317    }
1318}
1319
1320pub fn ensure_durable_effect_input(input: &TurnInput) -> Result<(), RuntimeError> {
1321    if input.protocol_extension.is_some() {
1322        return Err(RuntimeError::new(
1323            RuntimeErrorCode::DurableEffectLiveProtocolExtension,
1324            "durable effect hosts do not support live protocol_extension inputs; encode replayable data in protocol_turn_options or persisted plugin state",
1325        ));
1326    }
1327    input
1328        .turn_context
1329        .live_plugin_inputs()
1330        .durable_effect_rejection()?;
1331    Ok(())
1332}
1333
1334async fn emit_turn_activity_to_sink(events: &dyn TurnActivitySink, activity: TurnActivity) {
1335    if !events.is_noop() {
1336        events.emit(activity).await;
1337    }
1338}
1339
1340/// Pump the turn driver's event channel into the host sinks while the run
1341/// future executes, then drain any events emitted between completion and the
1342/// sender dropping.
1343///
1344/// Both the fresh and resumed turn entry points construct a
1345/// `RuntimeTurnDriver`, kick off its run future, and need identical
1346/// event-pump/drain behavior before tearing the driver down. Only the driver
1347/// construction and post-run teardown differ, so each caller owns those and
1348/// shares this loop.
1349async fn drive_turn_to_completion<F>(
1350    run_future: F,
1351    event_rx: &mut mpsc::Receiver<RuntimeStreamEvent>,
1352    assembler: &mut TurnAssembler,
1353    child_usage_event_relay: &ChildUsageEventRelay,
1354    events: &dyn EventSink,
1355    turn_events: &dyn TurnActivitySink,
1356) -> Result<(crate::MessageSequence, usize), RuntimeError>
1357where
1358    F: std::future::Future<Output = Result<(crate::MessageSequence, usize), RuntimeError>>,
1359{
1360    let run_result = {
1361        let mut run_future = Box::pin(run_future);
1362        loop {
1363            tokio::select! {
1364                maybe_event = event_rx.recv() => {
1365                    if let Some(event) = maybe_event {
1366                        emit_runtime_stream_event_to_sinks(
1367                            events,
1368                            turn_events,
1369                            event,
1370                            assembler,
1371                        )
1372                        .await;
1373                    }
1374                }
1375                completed = run_future.as_mut() => {
1376                    child_usage_event_relay.clear();
1377                    break completed;
1378                }
1379            }
1380        }
1381    };
1382    while let Some(event) = event_rx.recv().await {
1383        emit_runtime_stream_event_to_sinks(events, turn_events, event, assembler).await;
1384    }
1385    run_result
1386}
1387
1388async fn emit_runtime_stream_event_to_sinks(
1389    events: &dyn EventSink,
1390    turn_events: &dyn TurnActivitySink,
1391    event: RuntimeStreamEvent,
1392    assembler: &mut TurnAssembler,
1393) {
1394    match event {
1395        RuntimeStreamEvent::Session(event) => {
1396            assembler.push(&event);
1397            emit_session_event_to_sink(events, event).await;
1398        }
1399        RuntimeStreamEvent::Turn(activity) => {
1400            assembler.push_turn_activity(&activity);
1401            emit_turn_activity_to_sink(turn_events, activity).await;
1402        }
1403    }
1404}
1405
1406#[cfg(test)]
1407mod tests {
1408    use super::agent_frame_follow_turn_id;
1409
1410    #[test]
1411    fn agent_frame_follow_turn_ids_are_distinct_and_deterministic() {
1412        assert_eq!(agent_frame_follow_turn_id("root-turn", 0), "root-turn");
1413        assert_eq!(
1414            agent_frame_follow_turn_id("root-turn", 1),
1415            "root-turn:agent-frame:1"
1416        );
1417        assert_eq!(
1418            agent_frame_follow_turn_id("root-turn", 2),
1419            "root-turn:agent-frame:2"
1420        );
1421    }
1422}