Skip to main content

lash_core/runtime/
turn_loop.rs

1use super::*;
2
3fn trace_fields_from_outcome(
4    outcome: &TurnOutcome,
5) -> (
6    &'static str,
7    &'static str,
8    Option<lash_trace::TraceAgentFrameSwitch>,
9) {
10    match outcome {
11        TurnOutcome::Finished(TurnFinish::AssistantMessage { .. }) => {
12            ("completed", "assistant_message", None)
13        }
14        TurnOutcome::Finished(TurnFinish::SubmittedValue { .. }) => {
15            ("completed", "submitted_value", None)
16        }
17        TurnOutcome::Finished(TurnFinish::ToolValue { .. }) => ("completed", "tool_value", None),
18        TurnOutcome::AgentFrameSwitch { frame_id } => (
19            "completed",
20            "agent_frame_switch",
21            Some(lash_trace::TraceAgentFrameSwitch {
22                frame_id: frame_id.clone(),
23            }),
24        ),
25        TurnOutcome::Stopped(stop) => ("failed", trace_stop_reason(stop), None),
26    }
27}
28
29fn trace_stop_reason(stop: &TurnStop) -> &'static str {
30    match stop {
31        TurnStop::Cancelled => "cancelled",
32        TurnStop::Incomplete => "incomplete",
33        TurnStop::InvalidInput => "invalid_input",
34        TurnStop::MaxTurns => "max_turns",
35        TurnStop::ToolFailure => "tool_failure",
36        TurnStop::ProviderError => "provider_error",
37        TurnStop::PluginAbort => "plugin_abort",
38        TurnStop::RuntimeError => "runtime_error",
39        TurnStop::SubmittedError { .. } => "submitted_error",
40        TurnStop::ToolError { .. } => "tool_error",
41    }
42}
43
44fn session_head_refresh_error(err: SessionError) -> RuntimeError {
45    RuntimeError::new(
46        RuntimeErrorCode::Other("session_head_refresh".to_string()),
47        err.to_string(),
48    )
49}
50
51fn queued_work_payload_type(payload: &crate::QueuedWorkPayload) -> &'static str {
52    match payload {
53        crate::QueuedWorkPayload::TurnInput { .. } => "turn_input",
54        crate::QueuedWorkPayload::ProcessWake { .. } => "process_wake",
55        crate::QueuedWorkPayload::SessionCommand { command } => command.kind(),
56    }
57}
58
59fn queued_work_batch_ids(claim: &crate::QueuedWorkClaim) -> Vec<String> {
60    claim
61        .batches
62        .iter()
63        .map(|batch| batch.batch_id.clone())
64        .collect()
65}
66
67fn turn_phase_id(parent_turn_id: &str, phase: &str) -> String {
68    format!("{parent_turn_id}:{phase}")
69}
70
71fn scoped_child_turn_controller<'run>(
72    scoped_effect_controller: &'run ScopedEffectController<'_>,
73    session_id: &str,
74    turn_id: &str,
75) -> Result<ScopedEffectController<'run>, RuntimeError> {
76    ScopedEffectController::borrowed(
77        scoped_effect_controller.controller(),
78        EffectScope::turn(session_id, turn_id),
79    )
80}
81
82pub(in crate::runtime) fn queued_work_trace_payload(
83    boundary: crate::QueuedWorkClaimBoundary,
84    claim: &crate::QueuedWorkClaim,
85    causes: &[crate::TurnCause],
86) -> serde_json::Value {
87    serde_json::json!({
88        "boundary": boundary,
89        "claim_id": claim.claim_id,
90        "owner_id": claim.owner_id,
91        "batch_ids": queued_work_batch_ids(claim),
92        "payload_types": claim.batches.iter()
93            .flat_map(|batch| batch.items.iter())
94            .map(|item| queued_work_payload_type(&item.payload))
95            .collect::<Vec<_>>(),
96        "causes": causes,
97    })
98}
99
100pub(in crate::runtime) fn queued_work_completion_trace_payload(
101    completions: &[crate::QueuedWorkCompletion],
102) -> serde_json::Value {
103    serde_json::json!({
104        "claims": completions.iter().map(|completion| {
105            serde_json::json!({
106                "session_id": completion.session_id,
107                "claim_id": completion.claim_id,
108                "batch_ids": completion.batch_ids,
109            })
110        }).collect::<Vec<_>>(),
111    })
112}
113
114async fn emit_queued_work_started_to_sink(
115    events: &dyn TurnActivitySink,
116    boundary: crate::QueuedWorkClaimBoundary,
117    claim: &crate::QueuedWorkClaim,
118    causes: Vec<crate::TurnCause>,
119) {
120    emit_turn_activity_to_sink(
121        events,
122        TurnActivity::independent(TurnEvent::QueuedWorkStarted {
123            boundary,
124            batch_ids: queued_work_batch_ids(claim),
125            causes,
126        }),
127    )
128    .await;
129}
130
131pub(in crate::runtime) async fn send_queued_work_started_event(
132    event_tx: &mpsc::Sender<RuntimeStreamEvent>,
133    boundary: crate::QueuedWorkClaimBoundary,
134    claim: &crate::QueuedWorkClaim,
135    causes: Vec<crate::TurnCause>,
136) {
137    send_turn_activity(
138        event_tx,
139        TurnActivityId::fresh(),
140        TurnEvent::QueuedWorkStarted {
141            boundary,
142            batch_ids: queued_work_batch_ids(claim),
143            causes,
144        },
145    )
146    .await;
147}
148
149struct TurnFinishInput {
150    turn_pipeline: TurnBoundary,
151    assembler: TurnAssembler,
152    new_messages: crate::MessageSequence,
153    policy: RuntimeSessionPolicy,
154    turn_index: usize,
155    queued_work_completions: Vec<crate::QueuedWorkCompletion>,
156    trace_turn_id: String,
157}
158
159impl LashRuntime {
160    fn max_context_tokens(&self) -> usize {
161        self.state.effective_policy().context_window_tokens()
162    }
163    #[doc(hidden)]
164    pub fn set_turn_phase_probe(&mut self, probe: Arc<dyn RuntimeTurnPhaseProbe>) {
165        self.turn_phase_probe = Some(probe);
166    }
167
168    fn mark_phase_begin(&self, phase: RuntimeTurnPhase) {
169        if let Some(probe) = self.turn_phase_probe.as_ref() {
170            probe.begin(phase);
171        }
172    }
173
174    fn mark_phase_end(&self, phase: RuntimeTurnPhase) {
175        if let Some(probe) = self.turn_phase_probe.as_ref() {
176            probe.end(phase);
177        }
178    }
179
180    async fn finish_turn(
181        &mut self,
182        finish: TurnFinishInput,
183        events: &dyn EventSink,
184        scoped_effect_controller: &ScopedEffectController<'_>,
185        cancel_state: &CancellationToken,
186    ) -> Result<AssembledTurn, RuntimeError> {
187        let TurnFinishInput {
188            mut turn_pipeline,
189            assembler,
190            new_messages,
191            policy,
192            turn_index,
193            queued_work_completions,
194            trace_turn_id,
195        } = finish;
196        self.policy = self.state.effective_policy().clone();
197        turn_pipeline.state_mut().policy = self.policy.clone();
198        turn_pipeline.state_mut().turn_index = turn_index;
199
200        let mut turn_usage_delta = {
201            let mut ledger = self.shared_token_ledger.lock().expect("token ledger lock");
202            std::mem::take(&mut *ledger)
203        };
204        if assembler.token_usage.total() > 0 || assembler.token_usage.cached_input_tokens > 0 {
205            turn_usage_delta.push(TokenLedgerEntry {
206                source: "turn".to_string(),
207                model: policy.model.id.clone(),
208                usage: assembler.token_usage.clone(),
209            });
210        }
211        let turn_usage_delta = merge_usage_delta_entries(turn_usage_delta);
212
213        turn_pipeline.finalize_turn_read_state(new_messages, cancel_state.is_cancelled());
214        if assembler.token_usage.total() > 0 || assembler.token_usage.cached_input_tokens > 0 {
215            turn_pipeline.state_mut().token_usage = assembler.token_usage.clone();
216        }
217
218        let last_prompt_usage = assembler
219            .last_llm_usage()
220            .and_then(|usage| normalize_prompt_usage(policy.provider(), usage));
221        turn_pipeline.state_mut().last_prompt_usage = last_prompt_usage;
222        let assembled_state = turn_pipeline.export_state_for_assembly();
223        let assembled = assembler.finish(
224            assembled_state,
225            cancel_state.is_cancelled(),
226            None,
227            &self.host.core.control.termination,
228        );
229
230        let Some(session) = self.session.as_ref() else {
231            self.state.apply_snapshot(&assembled.state);
232            self.emit_completed_turn_trace(&assembled.state, &assembled.outcome, &trace_turn_id);
233            return Ok(assembled);
234        };
235
236        let plugins = Arc::clone(session.plugins());
237        let manager = match self.runtime_session_services_for_turn(None) {
238            Ok(manager) => manager,
239            Err(err) => {
240                return Err(RuntimeError::new(
241                    RuntimeErrorCode::PluginSessionManager,
242                    err.to_string(),
243                ));
244            }
245        };
246
247        self.mark_phase_begin(RuntimeTurnPhase::FinalizeTurn);
248        let finalized = match plugins
249            .finalize_turn_with_phase_probe(
250                assembled,
251                manager.state_service(),
252                manager.lifecycle_service(),
253                manager.graph_service(),
254                self.turn_phase_probe.clone(),
255            )
256            .await
257        {
258            Ok(finalized) => finalized,
259            Err(err) => {
260                self.mark_phase_end(RuntimeTurnPhase::FinalizeTurn);
261                return Err(RuntimeError::new(
262                    RuntimeErrorCode::PluginFinalizeTurn,
263                    err.to_string(),
264                ));
265            }
266        };
267        self.mark_phase_end(RuntimeTurnPhase::FinalizeTurn);
268
269        let mut returned_turn = finalized.turn;
270        self.mark_phase_begin(RuntimeTurnPhase::PersistTurn);
271        self.mark_phase_begin(RuntimeTurnPhase::FinalCommit);
272        let queued_work_completion_trace = queued_work_completions.clone();
273        if let Err(err) = turn_pipeline
274            .final_commit(
275                &mut returned_turn,
276                self.session.as_mut(),
277                &turn_usage_delta,
278                Some(&trace_turn_id),
279                queued_work_completions,
280            )
281            .await
282        {
283            self.mark_phase_end(RuntimeTurnPhase::FinalCommit);
284            self.mark_phase_end(RuntimeTurnPhase::PersistTurn);
285            return Err(err);
286        }
287        self.mark_phase_end(RuntimeTurnPhase::FinalCommit);
288
289        emit_session_events_to_sink(events, finalized.events).await;
290        self.state = turn_pipeline.into_final_state();
291        if matches!(returned_turn.outcome, TurnOutcome::AgentFrameSwitch { .. })
292            && let Some(session) = self.session.as_mut()
293        {
294            let protocol_session = Arc::clone(session.plugins().protocol_session());
295            let session_id = self.state.session_id.clone();
296            protocol_session
297                .restore_session(
298                    crate::plugin::ProtocolSessionContext::new(session, &session_id),
299                    &self.state,
300                )
301                .await
302                .map_err(|err| {
303                    RuntimeError::new(
304                        RuntimeErrorCode::Other("protocol_restore_session".to_string()),
305                        err.to_string(),
306                    )
307                })?;
308        }
309        if !queued_work_completion_trace.is_empty() {
310            crate::trace::emit_trace(
311                &self.host.core.tracing.trace_sink,
312                &self.host.core.tracing.trace_context,
313                lash_trace::TraceContext::default()
314                    .for_session(returned_turn.state.session_id.clone())
315                    .for_turn_index(returned_turn.state.turn_index)
316                    .for_turn(trace_turn_id.clone()),
317                lash_trace::TraceEvent::Custom {
318                    name: "queued_work.completed".to_string(),
319                    payload: queued_work_completion_trace_payload(&queued_work_completion_trace),
320                },
321            );
322        }
323        self.mark_phase_begin(RuntimeTurnPhase::PostPersistHooks);
324        self.emit_turn_persisted_event(&returned_turn, scoped_effect_controller, &trace_turn_id)
325            .await?;
326        self.mark_phase_end(RuntimeTurnPhase::PostPersistHooks);
327        self.mark_phase_end(RuntimeTurnPhase::PersistTurn);
328
329        self.emit_completed_turn_trace(
330            &returned_turn.state,
331            &returned_turn.outcome,
332            &trace_turn_id,
333        );
334        Ok(returned_turn)
335    }
336
337    fn emit_completed_turn_trace(
338        &self,
339        state: &SessionSnapshot,
340        outcome: &TurnOutcome,
341        trace_turn_id: &str,
342    ) {
343        if self.host.core.tracing.trace_sink.is_none() {
344            return;
345        }
346
347        let (status, done_reason, agent_frame_switch) = trace_fields_from_outcome(outcome);
348        crate::trace::emit_trace(
349            &self.host.core.tracing.trace_sink,
350            &self.host.core.tracing.trace_context,
351            lash_trace::TraceContext::default()
352                .for_session(state.session_id.clone())
353                .for_turn_index(state.turn_index)
354                .for_turn(trace_turn_id.to_string()),
355            lash_trace::TraceEvent::TurnCompleted {
356                status: status.to_string(),
357                done_reason: done_reason.to_string(),
358                agent_frame_switch,
359            },
360        );
361    }
362
363    async fn emit_turn_persisted_event(
364        &self,
365        returned_turn: &AssembledTurn,
366        scoped_effect_controller: &ScopedEffectController<'_>,
367        trace_turn_id: &str,
368    ) -> Result<(), RuntimeError> {
369        let Some(session) = self.session.as_ref() else {
370            return Ok(());
371        };
372        let Ok(manager) = self.runtime_session_services() else {
373            return Ok(());
374        };
375        let phase_turn_id = turn_phase_id(trace_turn_id, "turn-persisted");
376        let phase_controller = scoped_child_turn_controller(
377            scoped_effect_controller,
378            &self.state.session_id,
379            &phase_turn_id,
380        )?;
381        let direct_completions = manager.direct_completion_client(
382            RuntimeEffectControllerHandle::borrowed(phase_controller),
383            Some(phase_turn_id),
384        );
385
386        session
387            .plugins()
388            .emit_runtime_event_with_phase_probe(
389                crate::PluginLifecycleEvent::TurnPersisted(Box::new(
390                    crate::SessionStateChangedContext {
391                        session_id: self.state.session_id.clone(),
392                        state: crate::SessionReadView::from_snapshot(&returned_turn.state),
393                        sessions: manager.state_service(),
394                        session_graph: manager.graph_service(),
395                        direct_completions,
396                    },
397                )),
398                self.turn_phase_probe.clone(),
399            )
400            .await;
401        Ok(())
402    }
403
404    /// Run a single turn and stream events to the host sink.
405    pub async fn stream_turn(
406        &mut self,
407        input: TurnInput,
408        opts: TurnOptions<'_>,
409    ) -> Result<AssembledTurn, RuntimeError> {
410        let turn_id = input
411            .trace_turn_id
412            .clone()
413            .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
414        let cancel = opts.cancel.clone();
415        let effect_host = Arc::clone(&self.host.core.control.effect_host);
416        let scoped_effect_controller = match opts.scoped_effect_controller() {
417            Some(scope) => scope,
418            None => effect_host.scoped(EffectScope::turn(&self.state.session_id, &turn_id))?,
419        };
420        self.stream_turn_with_scoped_effect_controller_inner(
421            input,
422            opts.events_or_noop(),
423            opts.turn_events_or_noop(),
424            scoped_effect_controller,
425            cancel,
426            None,
427        )
428        .await
429    }
430
431    pub async fn stream_next_queued_work(
432        &mut self,
433        opts: TurnOptions<'_>,
434    ) -> Result<Option<AssembledTurn>, RuntimeError> {
435        self.stream_queued_work(opts, None).await
436    }
437
438    pub async fn stream_selected_queued_work(
439        &mut self,
440        opts: TurnOptions<'_>,
441        batch_ids: &[String],
442    ) -> Result<Option<AssembledTurn>, RuntimeError> {
443        self.stream_queued_work(opts, Some(batch_ids)).await
444    }
445
446    async fn stream_queued_work(
447        &mut self,
448        opts: TurnOptions<'_>,
449        selected_batch_ids: Option<&[String]>,
450    ) -> Result<Option<AssembledTurn>, RuntimeError> {
451        if self.drain_next_session_command().await?.is_some() {
452            return Ok(None);
453        }
454        let Some(store) = self
455            .session
456            .as_ref()
457            .and_then(|session| session.history_store())
458        else {
459            return Ok(None);
460        };
461        let claim = if let Some(batch_ids) = selected_batch_ids {
462            store
463                .claim_ready_queued_work_by_batch_ids(
464                    &self.state.session_id,
465                    &self.runtime_scope_id,
466                    crate::QueuedWorkClaimBoundary::Idle,
467                    crate::QUEUED_WORK_CLAIM_TTL_MS,
468                    batch_ids,
469                )
470                .await
471        } else {
472            store
473                .claim_ready_queued_work(
474                    &self.state.session_id,
475                    &self.runtime_scope_id,
476                    crate::QueuedWorkClaimBoundary::Idle,
477                    crate::QUEUED_WORK_CLAIM_TTL_MS,
478                    64,
479                )
480                .await
481        }
482        .map_err(|err| RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string()))?;
483        let Some(claim) = claim else {
484            return Ok(None);
485        };
486        let mut work = claim.materialize_for_turn();
487        let turn_id = work
488            .input
489            .trace_turn_id
490            .clone()
491            .or_else(|| opts.effect_scope_id().map(ToOwned::to_owned))
492            .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
493        work.input.trace_turn_id = Some(turn_id.clone());
494        let causes = work.turn_causes.clone();
495        emit_queued_work_started_to_sink(
496            opts.turn_events_or_noop(),
497            crate::QueuedWorkClaimBoundary::Idle,
498            &claim,
499            causes.clone(),
500        )
501        .await;
502        crate::trace::emit_trace(
503            &self.host.core.tracing.trace_sink,
504            &self.host.core.tracing.trace_context,
505            lash_trace::TraceContext::default()
506                .for_session(self.state.session_id.clone())
507                .for_turn_index(self.state.turn_index + 1)
508                .for_turn(turn_id.clone()),
509            lash_trace::TraceEvent::Custom {
510                name: "queued_work.claimed".to_string(),
511                payload: queued_work_trace_payload(
512                    crate::QueuedWorkClaimBoundary::Idle,
513                    &claim,
514                    &causes,
515                ),
516            },
517        );
518        let cancel = opts.cancel.clone();
519        let effect_host = Arc::clone(&self.host.core.control.effect_host);
520        let scoped_effect_controller = match opts.scoped_effect_controller() {
521            Some(scope) => scope,
522            None => effect_host.scoped(EffectScope::queue_drain(
523                &self.state.session_id,
524                &claim.claim_id,
525            ))?,
526        };
527        self.stream_turn_with_scoped_effect_controller_inner(
528            work.input,
529            opts.events_or_noop(),
530            opts.turn_events_or_noop(),
531            scoped_effect_controller,
532            cancel,
533            Some(claim),
534        )
535        .await
536        .map(Some)
537    }
538
539    /// Enforce the durable-first wiring invariant at a turn-scope boundary: when
540    /// the host wired a durable effect host, every store reachable from this
541    /// scope must also be durable. A durable host running against any ephemeral
542    /// store fails loudly here rather than silently degrading.
543    ///
544    /// Inline controllers (the default tier) impose no requirement, so
545    /// inline/in-memory hosts pass unchanged.
546    fn ensure_durable_store_facets_for_scope(
547        &self,
548        scoped_effect_controller: &ScopedEffectController<'_>,
549    ) -> Result<(), RuntimeError> {
550        if scoped_effect_controller.controller().durability_tier() != crate::DurabilityTier::Durable
551        {
552            return Ok(());
553        }
554        if self
555            .host
556            .core
557            .durability
558            .attachment_store
559            .persistence()
560            .durability_tier()
561            != crate::DurabilityTier::Durable
562        {
563            return Err(RuntimeError::durable_store_required(
564                crate::DurableStoreFacet::AttachmentStore,
565            ));
566        }
567        if self
568            .host
569            .core
570            .durability
571            .lashlang_artifact_store
572            .durability_tier()
573            != crate::DurabilityTier::Durable
574        {
575            return Err(RuntimeError::durable_store_required(
576                crate::DurableStoreFacet::ArtifactStore,
577            ));
578        }
579        if let Some(store) = self
580            .session
581            .as_ref()
582            .and_then(|session| session.history_store())
583            && store.durability_tier() != crate::DurabilityTier::Durable
584        {
585            return Err(RuntimeError::durable_store_required(
586                crate::DurableStoreFacet::SessionStore,
587            ));
588        }
589        if let Some(process_registry) = self.host.process_registry.as_ref()
590            && process_registry.durability_tier() != crate::DurabilityTier::Durable
591        {
592            return Err(RuntimeError::durable_store_required(
593                crate::DurableStoreFacet::ProcessRegistry,
594            ));
595        }
596        Ok(())
597    }
598
599    async fn stream_turn_with_scoped_effect_controller_inner(
600        &mut self,
601        mut input: TurnInput,
602        events: &dyn EventSink,
603        turn_events: &dyn TurnActivitySink,
604        scoped_effect_controller: ScopedEffectController<'_>,
605        cancel: CancellationToken,
606        queued_claim: Option<crate::QueuedWorkClaim>,
607    ) -> Result<AssembledTurn, RuntimeError> {
608        if queued_claim.is_none() {
609            while self.drain_next_session_command().await?.is_some() {}
610        }
611        if let Some(input_turn_id) = input.trace_turn_id.as_deref()
612            && scoped_effect_controller
613                .effect_scope()
614                .validates_turn_trace_id()
615            && input_turn_id != scoped_effect_controller.scope_id()
616        {
617            return Err(RuntimeError::new(
618                RuntimeErrorCode::EffectScopeTurnIdMismatch,
619                format!(
620                    "input trace_turn_id `{input_turn_id}` does not match effect scope id `{}`",
621                    scoped_effect_controller.scope_id()
622                ),
623            ));
624        }
625        self.ensure_durable_store_facets_for_scope(&scoped_effect_controller)?;
626        input
627            .trace_turn_id
628            .get_or_insert_with(|| scoped_effect_controller.scope_id().to_string());
629        self.stream_turn_inner(
630            input.clone(),
631            events,
632            turn_events,
633            scoped_effect_controller,
634            cancel.clone(),
635            queued_claim,
636        )
637        .await
638    }
639
640    /// Stream one logical host turn, following foreground AgentFrame switches
641    /// until a terminal outcome is reached.
642    ///
643    /// RLM `continue_as` creates a new frame in the same session. Hosts that
644    /// only care about the benchmark/app answer should not need to special-case
645    /// that intermediate outcome; this helper keeps driving the same session
646    /// through each frame's task with the normal runtime turn guards.
647    pub async fn stream_turn_with_agent_frames(
648        &mut self,
649        input: TurnInput,
650        opts: TurnOptions<'_>,
651    ) -> Result<AgentFrameRun, RuntimeError> {
652        let follow_trace_turn_id = input
653            .trace_turn_id
654            .clone()
655            .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
656        let cancel = opts.cancel.clone();
657        let effect_host = Arc::clone(&self.host.core.control.effect_host);
658        let scoped_effect_controller = match opts.scoped_effect_controller() {
659            Some(scope) => scope,
660            None => effect_host.scoped(EffectScope::turn(
661                &self.state.session_id,
662                &follow_trace_turn_id,
663            ))?,
664        };
665        self.stream_turn_with_agent_frames_inner(
666            input,
667            opts.events_or_noop(),
668            opts.turn_events_or_noop(),
669            scoped_effect_controller,
670            cancel,
671        )
672        .await
673    }
674
675    async fn stream_turn_with_agent_frames_inner(
676        &mut self,
677        mut input: TurnInput,
678        events: &dyn EventSink,
679        turn_events: &dyn TurnActivitySink,
680        scoped_effect_controller: ScopedEffectController<'_>,
681        cancel: CancellationToken,
682    ) -> Result<AgentFrameRun, RuntimeError> {
683        if let Some(input_turn_id) = input.trace_turn_id.as_deref()
684            && scoped_effect_controller
685                .effect_scope()
686                .validates_turn_trace_id()
687            && input_turn_id != scoped_effect_controller.scope_id()
688        {
689            return Err(RuntimeError::new(
690                RuntimeErrorCode::EffectScopeTurnIdMismatch,
691                format!(
692                    "input trace_turn_id `{input_turn_id}` does not match effect scope id `{}`",
693                    scoped_effect_controller.scope_id()
694                ),
695            ));
696        }
697        let follow_protocol_turn_options = input.protocol_turn_options.clone();
698        let follow_turn_context = input.turn_context.clone();
699        let follow_trace_turn_id = input
700            .trace_turn_id
701            .clone()
702            .unwrap_or_else(|| scoped_effect_controller.scope_id().to_string());
703        input
704            .trace_turn_id
705            .get_or_insert(follow_trace_turn_id.clone());
706        let mut turns = Vec::new();
707        loop {
708            let turn_trace_turn_id = agent_frame_follow_turn_id(&follow_trace_turn_id, turns.len());
709            input.trace_turn_id = Some(turn_trace_turn_id.clone());
710            let turn_effect_controller = if turns.is_empty() {
711                scoped_effect_controller.clone()
712            } else {
713                ScopedEffectController::borrowed(
714                    scoped_effect_controller.controller(),
715                    EffectScope::turn(&self.state.session_id, &turn_trace_turn_id),
716                )?
717            };
718            let turn = self
719                .stream_turn_with_scoped_effect_controller_inner(
720                    input,
721                    events,
722                    turn_events,
723                    turn_effect_controller,
724                    cancel.clone(),
725                    None,
726                )
727                .await?;
728            let switched_frame_id = match &turn.outcome {
729                TurnOutcome::AgentFrameSwitch { frame_id } => Some(frame_id.clone()),
730                _ => None,
731            };
732            let next_task = switched_frame_id
733                .as_ref()
734                .and_then(|frame_id| frame_switch_task(&turn, frame_id));
735            turns.push(turn);
736
737            let Some(_frame_id) = switched_frame_id else {
738                return Ok(AgentFrameRun { turns });
739            };
740
741            let task = next_task.ok_or_else(|| {
742                RuntimeError::new(
743                    RuntimeErrorCode::Other("agent_frame_missing_task".to_string()),
744                    "agent frame switch did not provide a task",
745                )
746            })?;
747            input = turn_input_from_text(task);
748            input.protocol_turn_options = follow_protocol_turn_options.clone();
749            input.turn_context = follow_turn_context.clone();
750        }
751    }
752
753    async fn stream_turn_inner(
754        &mut self,
755        mut input: TurnInput,
756        events: &dyn EventSink,
757        turn_events: &dyn TurnActivitySink,
758        scoped_effect_controller: ScopedEffectController<'_>,
759        cancel: CancellationToken,
760        queued_claim: Option<crate::QueuedWorkClaim>,
761    ) -> Result<AssembledTurn, RuntimeError> {
762        self.refresh_session_graph_from_store()
763            .await
764            .map_err(session_head_refresh_error)?;
765        let input_trace_turn_id = input.trace_turn_id.clone();
766        let queued_turn_work = queued_claim
767            .as_ref()
768            .map(crate::QueuedWorkClaim::materialize_for_turn);
769        if let Some(work) = queued_turn_work.as_ref()
770            && input.items.is_empty()
771            && input.image_blobs.is_empty()
772        {
773            input = work.input.clone();
774            if input.trace_turn_id.is_none() {
775                input.trace_turn_id = input_trace_turn_id;
776            }
777        }
778        if self
779            .session
780            .as_ref()
781            .and_then(|session| session.history_store())
782            .is_some()
783        {
784            ensure_durable_effect_input(&input)?;
785        }
786        if let Some(extension) = &input.protocol_extension
787            && let Some(session) = self.session.as_ref()
788        {
789            let protocol_session = std::sync::Arc::clone(session.plugins().protocol_session());
790            protocol_session
791                .validate_turn_extension(extension)
792                .await
793                .map_err(|err| {
794                    RuntimeError::new(RuntimeErrorCode::ProtocolTurnExtension, err.to_string())
795                })?;
796        }
797        let previous_prompt_usage = self.state.last_prompt_usage.clone();
798        let normalized = match self.normalize_input_items(&input.items, &input.image_blobs) {
799            Ok(items) => items,
800            Err(e) => {
801                self.state.last_prompt_usage = None;
802                let mut assembler = TurnAssembler::default();
803                let error_event = SessionEvent::Error {
804                    message: e.clone(),
805                    envelope: Some(crate::session_model::ErrorEnvelope {
806                        kind: "input_validation".to_string(),
807                        code: Some("invalid_turn_input".to_string()),
808                        terminal_reason: None,
809                        user_message: e.clone(),
810                        raw: None,
811                    }),
812                };
813                assembler.push(&error_event);
814                emit_turn_activity_to_sink(
815                    turn_events,
816                    TurnActivity::independent(TurnEvent::Error { message: e }),
817                )
818                .await;
819                emit_session_event_to_sink(events, error_event).await;
820                let outcome_event = SessionEvent::TurnOutcome {
821                    outcome: TurnOutcome::Stopped(TurnStop::InvalidInput),
822                };
823                assembler.push(&outcome_event);
824                emit_session_event_to_sink(events, outcome_event).await;
825                assembler.push(&SessionEvent::Done);
826                emit_session_event_to_sink(events, SessionEvent::Done).await;
827                return Ok(assembler.finish(
828                    self.state.to_snapshot(),
829                    false,
830                    None,
831                    &self.host.core.control.termination,
832                ));
833            }
834        };
835        let turn_index = self.state.turn_index + 1;
836        let trace_turn_id = input
837            .trace_turn_id
838            .clone()
839            .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
840        if self.host.core.tracing.trace_sink.is_some() {
841            let mut trace_metadata = std::collections::BTreeMap::new();
842            trace_metadata.insert(
843                "input_item_count".to_string(),
844                serde_json::json!(normalized.len()),
845            );
846            crate::trace::emit_trace(
847                &self.host.core.tracing.trace_sink,
848                &self.host.core.tracing.trace_context,
849                lash_trace::TraceContext::default()
850                    .for_session(self.state.session_id.clone())
851                    .for_turn_index(turn_index)
852                    .for_turn(trace_turn_id.clone()),
853                lash_trace::TraceEvent::TurnStarted {
854                    metadata: trace_metadata,
855                },
856            );
857        }
858
859        let base_read_model = self.state.read_model();
860        let base_messages = base_read_model.messages;
861        let base_render_cache = base_read_model.prompt_render_cache;
862        let mut turn_delta = Vec::new();
863        let initial_turn_causes = queued_turn_work
864            .as_ref()
865            .map(|work| work.turn_causes.clone())
866            .unwrap_or_default();
867        turn_delta.extend(
868            initial_turn_causes
869                .iter()
870                .map(crate::TurnCause::to_event_message),
871        );
872
873        let user_id = fresh_message_id();
874        let mut user_parts: Vec<Part> = Vec::new();
875        for item in normalized {
876            match item {
877                NormalizedItem::Text(text) => {
878                    if text.is_empty() {
879                        continue;
880                    }
881                    user_parts.push(Part {
882                        id: format!("{}.p{}", user_id, user_parts.len()),
883                        kind: PartKind::Text,
884                        content: text,
885                        attachment: None,
886                        tool_call_id: None,
887                        tool_name: None,
888                        tool_replay: None,
889                        prune_state: PruneState::Intact,
890                        reasoning_meta: None,
891                        response_meta: None,
892                    });
893                }
894                NormalizedItem::Image(reference) => {
895                    user_parts.push(Part {
896                        id: format!("{}.p{}", user_id, user_parts.len()),
897                        kind: PartKind::Image,
898                        content: String::new(),
899                        attachment: Some(crate::session_model::message::PartAttachment {
900                            reference,
901                        }),
902                        tool_call_id: None,
903                        tool_name: None,
904                        tool_replay: None,
905                        prune_state: PruneState::Intact,
906                        reasoning_meta: None,
907                        response_meta: None,
908                    });
909                }
910            }
911        }
912        if user_parts.is_empty() && initial_turn_causes.is_empty() {
913            user_parts.push(Part {
914                id: format!("{}.p0", user_id),
915                kind: PartKind::Text,
916                content: String::new(),
917                attachment: None,
918                tool_call_id: None,
919                tool_name: None,
920                tool_replay: None,
921                prune_state: PruneState::Intact,
922                reasoning_meta: None,
923                response_meta: None,
924            });
925        }
926        if !user_parts.is_empty() {
927            reassign_part_ids(&user_id, &mut user_parts);
928            turn_delta.push(Message {
929                id: user_id.clone(),
930                role: MessageRole::User,
931                parts: shared_parts(user_parts),
932                origin: None,
933            });
934        }
935
936        let manager = self
937            .runtime_session_services_for_turn(None)
938            .map_err(|err| {
939                RuntimeError::new(RuntimeErrorCode::PluginSessionManager, err.to_string())
940            })?;
941        let plugin_session = self
942            .session
943            .as_ref()
944            .map(|s| Arc::clone(s.plugins()))
945            .ok_or_else(|| {
946                RuntimeError::new(
947                    RuntimeErrorCode::ContextPrepareTurn,
948                    "runtime session not available",
949                )
950            })?;
951        let prepare_phase_turn_id = turn_phase_id(&trace_turn_id, "prepare-turn");
952        let prepare_phase_controller = scoped_child_turn_controller(
953            &scoped_effect_controller,
954            &self.state.session_id,
955            &prepare_phase_turn_id,
956        )?;
957        let turn_ctx = crate::TurnTransformContext {
958            session_id: self.state.session_id.clone(),
959            state: self.read_view(),
960            prompt_usage: previous_prompt_usage.clone(),
961            max_context_tokens: Some(LashRuntime::max_context_tokens(self)),
962            sessions: manager.state_service(),
963            session_lifecycle: manager.lifecycle_service(),
964            session_graph: manager.graph_service(),
965            scoped_effect_controller: scoped_effect_controller.clone(),
966            direct_completions: manager.direct_completion_client(
967                RuntimeEffectControllerHandle::borrowed(prepare_phase_controller),
968                Some(prepare_phase_turn_id),
969            ),
970        };
971        self.mark_phase_begin(RuntimeTurnPhase::ContextTransform);
972        let prepared_context = plugin_session
973            .prepare_turn_context(
974                &turn_ctx,
975                crate::session_model::context::PreparedContext {
976                    messages: crate::MessageSequence::from_base_and_delta(
977                        base_messages,
978                        turn_delta,
979                    )
980                    .with_base_render_cache(base_render_cache),
981                    ..Default::default()
982                },
983                self.turn_phase_probe.clone(),
984            )
985            .await
986            .map_err(|err| {
987                RuntimeError::new(RuntimeErrorCode::ContextPrepareTurn, err.to_string())
988            })?;
989        self.mark_phase_end(RuntimeTurnPhase::ContextTransform);
990        // Release the read-view's graph clone before the rest of the turn
991        // runs. Keeping it alive into `stream_prepared_turn` forces the
992        // post-turn `append_active_read_delta` to deep-clone the session
993        // graph (Arc::make_mut with refcount > 1).
994        drop(turn_ctx);
995        let messages = prepared_context.messages;
996        if let Some(session) = self.session.as_mut() {
997            session
998                .set_context_surface(
999                    prepared_context.tool_providers,
1000                    prepared_context.prompt_contributions,
1001                    prepared_context.include_base_tools,
1002                )
1003                .map_err(|err| {
1004                    RuntimeError::new(
1005                        RuntimeErrorCode::Other("session_tool_registry".to_string()),
1006                        err.to_string(),
1007                    )
1008                })?;
1009        }
1010
1011        self.state.last_prompt_usage = None;
1012
1013        self.stream_prepared_turn(
1014            messages,
1015            previous_prompt_usage,
1016            input.protocol_turn_options.clone(),
1017            input.protocol_extension.clone(),
1018            input.turn_context.clone(),
1019            initial_turn_causes,
1020            trace_turn_id,
1021            turn_index,
1022            events,
1023            turn_events,
1024            scoped_effect_controller,
1025            cancel,
1026            queued_claim,
1027        )
1028        .await
1029    }
1030
1031    /// Run a single turn and return only the assembled terminal result.
1032    pub async fn run_turn_assembled(
1033        &mut self,
1034        input: TurnInput,
1035        cancel: CancellationToken,
1036    ) -> Result<AssembledTurn, RuntimeError> {
1037        self.stream_turn(input, TurnOptions::new(cancel)).await
1038    }
1039
1040    /// Run a turn using host-prepared message history.
1041    #[allow(clippy::too_many_arguments)]
1042    pub async fn stream_prepared_turn(
1043        &mut self,
1044        messages: crate::MessageSequence,
1045        _previous_prompt_usage: Option<PromptUsage>,
1046        protocol_turn_options: Option<crate::ProtocolTurnOptions>,
1047        protocol_extension: Option<crate::ProtocolTurnExtensionHandle>,
1048        turn_context: crate::TurnContext,
1049        initial_turn_causes: Vec<crate::TurnCause>,
1050        trace_turn_id: String,
1051        turn_index: usize,
1052        events: &dyn EventSink,
1053        turn_events: &dyn TurnActivitySink,
1054        scoped_effect_controller: ScopedEffectController<'_>,
1055        cancel: CancellationToken,
1056        initial_queue_claim: Option<crate::QueuedWorkClaim>,
1057    ) -> Result<AssembledTurn, RuntimeError> {
1058        let (event_tx, mut event_rx) = mpsc::channel::<RuntimeStreamEvent>(100);
1059        let child_usage_event_relay = ChildUsageEventRelay::new(event_tx.clone());
1060        let mut turn_policy = self.state.effective_policy().clone();
1061        let turn_provider_override = turn_context.provider().cloned();
1062        if let Some(provider) = turn_provider_override.as_ref() {
1063            turn_policy.provider_id = provider.kind().to_string();
1064        }
1065        if let Some(model) = turn_context.model_spec() {
1066            turn_policy.model = model.clone();
1067        }
1068        let session_protocol_turn_options = self.state.effective_protocol_turn_options().clone();
1069        let effective_protocol_turn_options = protocol_turn_options
1070            .clone()
1071            .map(|options| session_protocol_turn_options.merged_with_override(&options))
1072            .unwrap_or(session_protocol_turn_options);
1073        let manager = self
1074            .runtime_session_services_for_turn(Some(child_usage_event_relay.clone()))
1075            .map_err(|err| {
1076                RuntimeError::new(RuntimeErrorCode::PluginSessionManager, err.to_string())
1077            })?;
1078        let plugins = {
1079            let session = self
1080                .session
1081                .as_ref()
1082                .expect("lash runtime session must be available");
1083            Arc::clone(session.plugins())
1084        };
1085        let mut assembler = TurnAssembler::new();
1086        self.mark_phase_begin(RuntimeTurnPhase::BeforeTurnHooks);
1087        // Block-scope the pinned future so it (and its captured
1088        // `SessionReadView` clone of the session graph) drops before the
1089        // post-turn `append_active_read_delta` mutation. Keeping it alive
1090        // across the turn forces `Arc::make_mut` to deep-clone
1091        // `SessionGraphData`.
1092        let prepared = {
1093            let prepare_turn = plugins.prepare_turn_with_phase_probe(
1094                PrepareTurnRequest {
1095                    session_id: self.state.session_id.clone(),
1096                    state: crate::SessionReadView::from_runtime_state(
1097                        &self.state,
1098                        turn_policy.clone(),
1099                        effective_protocol_turn_options.clone(),
1100                    ),
1101                    messages,
1102                    sessions: manager.state_service(),
1103                    session_lifecycle: manager.lifecycle_service(),
1104                    session_graph: manager.graph_service(),
1105                    turn_context: turn_context.clone(),
1106                },
1107                self.turn_phase_probe.clone(),
1108            );
1109            tokio::pin!(prepare_turn);
1110
1111            loop {
1112                tokio::select! {
1113                    prepared = &mut prepare_turn => {
1114                        let prepared = prepared.map_err(|err| {
1115                            RuntimeError::new(RuntimeErrorCode::PluginPrepareTurn, err.to_string())
1116                        })?;
1117                        self.mark_phase_end(RuntimeTurnPhase::BeforeTurnHooks);
1118                        break prepared;
1119                    }
1120                    maybe_event = event_rx.recv() => {
1121                        if let Some(event) = maybe_event {
1122                            emit_runtime_stream_event_to_sinks(
1123                                events,
1124                                turn_events,
1125                                event,
1126                                &mut assembler,
1127                            )
1128                            .await;
1129                        }
1130                    }
1131                }
1132            }
1133        };
1134        for event in &prepared.events {
1135            assembler.push(event);
1136        }
1137        emit_session_events_to_sink(events, prepared.events).await;
1138        if let Some(abort) = prepared.abort {
1139            drop(event_tx);
1140
1141            let mut turn_pipeline = TurnBoundary::from_state(self.state.clone());
1142            turn_pipeline.apply_prepared_messages(&prepared.messages);
1143            let state = turn_pipeline.into_final_state();
1144            let issue = TurnIssue {
1145                kind: "plugin".to_string(),
1146                code: Some(abort.code),
1147                terminal_reason: None,
1148                message: abort.message.clone(),
1149                raw: None,
1150            };
1151            let error_event = SessionEvent::Error {
1152                message: abort.message,
1153                envelope: Some(crate::session_model::ErrorEnvelope {
1154                    kind: "plugin".to_string(),
1155                    code: issue.code.clone(),
1156                    terminal_reason: None,
1157                    user_message: issue.message.clone(),
1158                    raw: None,
1159                }),
1160            };
1161            assembler.push(&error_event);
1162            emit_turn_activity_to_sink(
1163                turn_events,
1164                TurnActivity::independent(TurnEvent::Error {
1165                    message: issue.message.clone(),
1166                }),
1167            )
1168            .await;
1169            emit_session_event_to_sink(events, error_event).await;
1170            let outcome_event = SessionEvent::TurnOutcome {
1171                outcome: TurnOutcome::Stopped(TurnStop::PluginAbort),
1172            };
1173            assembler.push(&outcome_event);
1174            emit_session_event_to_sink(events, outcome_event).await;
1175            assembler.push(&SessionEvent::Done);
1176            emit_session_event_to_sink(events, SessionEvent::Done).await;
1177            return Ok(assembler.finish(
1178                state.to_snapshot(),
1179                cancel.is_cancelled(),
1180                Some(issue),
1181                &self.host.core.control.termination,
1182            ));
1183        }
1184        let mut turn_pipeline = TurnBoundary::from_state(self.state.clone());
1185        let store = self
1186            .session
1187            .as_ref()
1188            .and_then(|session| session.history_store());
1189        turn_pipeline
1190            .prepared_checkpoint(
1191                store.as_ref().map(|store| store.as_ref()),
1192                turn_policy.clone(),
1193                turn_index,
1194                &prepared.messages,
1195                self.session.as_mut(),
1196            )
1197            .await
1198            .map_err(|err| {
1199                RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
1200            })?;
1201        let resolved_turn_policy = if let Some(provider) = turn_provider_override {
1202            RuntimeSessionPolicy::from_provider(turn_policy.clone(), provider)
1203                .map_err(|err| RuntimeError::new("llm_provider", err.to_string()))?
1204        } else {
1205            self.host
1206                .resolve_session_policy(&self.state.session_id, turn_policy.clone())
1207                .map_err(|err| RuntimeError::new("llm_provider", err.to_string()))?
1208        };
1209        let manager = self
1210            .runtime_session_services_for_turn(Some(child_usage_event_relay.clone()))
1211            .map_err(|err| {
1212                RuntimeError::new(RuntimeErrorCode::PluginSessionManager, err.to_string())
1213            })?;
1214        let cancel_state = cancel.clone();
1215        let finish_scoped_effect_controller = scoped_effect_controller.clone();
1216        let session = self
1217            .session
1218            .take()
1219            .expect("lash runtime session must be available");
1220        let mut driver = RuntimeTurnDriver {
1221            session,
1222            policy: resolved_turn_policy,
1223            host: self.host.clone(),
1224            turn_id: scoped_effect_controller.scope_id().to_string(),
1225            scoped_effect_controller,
1226            session_id: self.state.session_id.clone(),
1227            turn_index,
1228            turn_pipeline,
1229            llm_stream_summaries: HashMap::new(),
1230            next_llm_ordinal: 0,
1231            session_services: manager,
1232            protocol_turn_options: effective_protocol_turn_options,
1233            protocol_extension,
1234            turn_context,
1235            turn_causes: initial_turn_causes,
1236            pending_queue_claims: initial_queue_claim.into_iter().collect(),
1237            checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
1238            turn_phase_probe: self.turn_phase_probe.clone(),
1239        };
1240        let protocol_run_offset = 0;
1241        self.mark_phase_begin(RuntimeTurnPhase::EffectLoop);
1242        let run_result = drive_turn_to_completion(
1243            driver.run(prepared.messages, event_tx, cancel, protocol_run_offset),
1244            &mut event_rx,
1245            &mut assembler,
1246            &child_usage_event_relay,
1247            events,
1248            turn_events,
1249        )
1250        .await;
1251        let (new_messages, _new_protocol_iteration) = match run_result {
1252            Ok(result) => result,
1253            Err(err) => {
1254                self.mark_phase_end(RuntimeTurnPhase::EffectLoop);
1255                let RuntimeTurnDriver { session, .. } = driver;
1256                self.session = Some(session);
1257                return Err(err);
1258            }
1259        };
1260        self.mark_phase_end(RuntimeTurnPhase::EffectLoop);
1261        tracing::debug!(
1262            new_message_count = new_messages.len(),
1263            tool_call_count = assembler.tool_calls.len(),
1264            "runtime post-run_task"
1265        );
1266
1267        let RuntimeTurnDriver {
1268            session,
1269            policy,
1270            turn_pipeline,
1271            pending_queue_claims,
1272            ..
1273        } = driver;
1274        self.session = Some(session);
1275        self.finish_turn(
1276            TurnFinishInput {
1277                turn_pipeline,
1278                assembler,
1279                new_messages,
1280                policy,
1281                turn_index,
1282                queued_work_completions: pending_queue_claims
1283                    .iter()
1284                    .map(crate::QueuedWorkClaim::completion)
1285                    .collect(),
1286                trace_turn_id,
1287            },
1288            events,
1289            &finish_scoped_effect_controller,
1290            &cancel_state,
1291        )
1292        .await
1293    }
1294    fn normalize_input_items(
1295        &self,
1296        items: &[InputItem],
1297        image_blobs: &HashMap<String, Vec<u8>>,
1298    ) -> Result<Vec<NormalizedItem>, String> {
1299        normalize_input_items(
1300            items,
1301            image_blobs,
1302            self.host.core.durability.attachment_store.as_ref(),
1303        )
1304    }
1305}
1306
1307fn frame_switch_task(turn: &AssembledTurn, frame_id: &str) -> Option<String> {
1308    turn.tool_calls
1309        .iter()
1310        .find_map(|record| match &record.output.control {
1311            Some(crate::ToolControl::SwitchAgentFrame {
1312                frame_id: control_frame_id,
1313                task: Some(task),
1314                ..
1315            }) if control_frame_id == frame_id => Some(task.clone()),
1316            _ => None,
1317        })
1318}
1319
1320fn turn_input_from_text(text: String) -> TurnInput {
1321    TurnInput {
1322        items: vec![InputItem::Text { text }],
1323        image_blobs: HashMap::new(),
1324        protocol_turn_options: None,
1325        trace_turn_id: None,
1326        protocol_extension: None,
1327        turn_context: crate::TurnContext::default(),
1328    }
1329}
1330
1331fn agent_frame_follow_turn_id(root_turn_id: &str, completed_turn_count: usize) -> String {
1332    if completed_turn_count == 0 {
1333        root_turn_id.to_string()
1334    } else {
1335        format!("{root_turn_id}:agent-frame:{completed_turn_count}")
1336    }
1337}
1338
1339pub fn ensure_durable_effect_input(input: &TurnInput) -> Result<(), RuntimeError> {
1340    if input.protocol_extension.is_some() {
1341        return Err(RuntimeError::new(
1342            RuntimeErrorCode::DurableEffectLiveProtocolExtension,
1343            "durable effect hosts do not support live protocol_extension inputs; encode replayable data in protocol_turn_options or persisted plugin state",
1344        ));
1345    }
1346    input
1347        .turn_context
1348        .live_plugin_inputs()
1349        .durable_effect_rejection()?;
1350    Ok(())
1351}
1352
1353async fn emit_turn_activity_to_sink(events: &dyn TurnActivitySink, activity: TurnActivity) {
1354    if !events.is_noop() {
1355        events.emit(activity).await;
1356    }
1357}
1358
1359/// Pump the turn driver's event channel into the host sinks while the run
1360/// future executes, then drain any events emitted between completion and the
1361/// sender dropping.
1362///
1363/// Both the fresh and resumed turn entry points construct a
1364/// `RuntimeTurnDriver`, kick off its run future, and need identical
1365/// event-pump/drain behavior before tearing the driver down. Only the driver
1366/// construction and post-run teardown differ, so each caller owns those and
1367/// shares this loop.
1368async fn drive_turn_to_completion<F>(
1369    run_future: F,
1370    event_rx: &mut mpsc::Receiver<RuntimeStreamEvent>,
1371    assembler: &mut TurnAssembler,
1372    child_usage_event_relay: &ChildUsageEventRelay,
1373    events: &dyn EventSink,
1374    turn_events: &dyn TurnActivitySink,
1375) -> Result<(crate::MessageSequence, usize), RuntimeError>
1376where
1377    F: std::future::Future<Output = Result<(crate::MessageSequence, usize), RuntimeError>>,
1378{
1379    let run_result = {
1380        tokio::pin!(run_future);
1381        loop {
1382            tokio::select! {
1383                maybe_event = event_rx.recv() => {
1384                    if let Some(event) = maybe_event {
1385                        emit_runtime_stream_event_to_sinks(
1386                            events,
1387                            turn_events,
1388                            event,
1389                            assembler,
1390                        )
1391                        .await;
1392                    }
1393                }
1394                completed = &mut run_future => {
1395                    child_usage_event_relay.clear();
1396                    break completed;
1397                }
1398            }
1399        }
1400    };
1401    while let Some(event) = event_rx.recv().await {
1402        emit_runtime_stream_event_to_sinks(events, turn_events, event, assembler).await;
1403    }
1404    run_result
1405}
1406
1407async fn emit_runtime_stream_event_to_sinks(
1408    events: &dyn EventSink,
1409    turn_events: &dyn TurnActivitySink,
1410    event: RuntimeStreamEvent,
1411    assembler: &mut TurnAssembler,
1412) {
1413    match event {
1414        RuntimeStreamEvent::Session(event) => {
1415            assembler.push(&event);
1416            emit_session_event_to_sink(events, event).await;
1417        }
1418        RuntimeStreamEvent::Turn(activity) => {
1419            assembler.push_turn_activity(&activity);
1420            emit_turn_activity_to_sink(turn_events, activity).await;
1421        }
1422    }
1423}
1424
1425#[cfg(test)]
1426mod tests {
1427    use super::agent_frame_follow_turn_id;
1428
1429    #[test]
1430    fn agent_frame_follow_turn_ids_are_distinct_and_deterministic() {
1431        assert_eq!(agent_frame_follow_turn_id("root-turn", 0), "root-turn");
1432        assert_eq!(
1433            agent_frame_follow_turn_id("root-turn", 1),
1434            "root-turn:agent-frame:1"
1435        );
1436        assert_eq!(
1437            agent_frame_follow_turn_id("root-turn", 2),
1438            "root-turn:agent-frame:2"
1439        );
1440    }
1441}