Skip to main content

lash_core/runtime/
turn_loop.rs

1use super::*;
2
3fn trace_fields_from_outcome(
4    outcome: &TurnOutcome,
5) -> (
6    &'static str,
7    &'static str,
8    Option<lash_trace::TraceAgentFrameSwitch>,
9) {
10    match outcome {
11        TurnOutcome::Finished(TurnFinish::AssistantMessage { .. }) => {
12            ("completed", "assistant_message", None)
13        }
14        TurnOutcome::Finished(TurnFinish::SubmittedValue { .. }) => {
15            ("completed", "submitted_value", None)
16        }
17        TurnOutcome::Finished(TurnFinish::ToolValue { .. }) => ("completed", "tool_value", None),
18        TurnOutcome::AgentFrameSwitch { frame_id, .. } => (
19            "completed",
20            "agent_frame_switch",
21            Some(lash_trace::TraceAgentFrameSwitch {
22                frame_id: frame_id.clone(),
23            }),
24        ),
25        TurnOutcome::Stopped(stop) => ("failed", trace_stop_reason(stop), None),
26    }
27}
28
29fn trace_stop_reason(stop: &TurnStop) -> &'static str {
30    match stop {
31        TurnStop::Cancelled => "cancelled",
32        TurnStop::Incomplete => "incomplete",
33        TurnStop::InvalidInput => "invalid_input",
34        TurnStop::MaxTurns => "max_turns",
35        TurnStop::ToolFailure => "tool_failure",
36        TurnStop::ProviderError => "provider_error",
37        TurnStop::PluginAbort => "plugin_abort",
38        TurnStop::RuntimeError => "runtime_error",
39        TurnStop::SubmittedError { .. } => "submitted_error",
40        TurnStop::ToolError { .. } => "tool_error",
41    }
42}
43
44fn session_head_refresh_error(err: SessionError) -> RuntimeError {
45    RuntimeError::new(
46        RuntimeErrorCode::Other("session_head_refresh".to_string()),
47        err.to_string(),
48    )
49}
50
51fn queued_work_payload_type(payload: &crate::QueuedWorkPayload) -> &'static str {
52    match payload {
53        crate::QueuedWorkPayload::TurnInput { .. } => "turn_input",
54        crate::QueuedWorkPayload::ProcessWake { .. } => "process_wake",
55        crate::QueuedWorkPayload::SessionCommand { command } => command.kind(),
56    }
57}
58
59fn queued_work_batch_ids(claim: &crate::QueuedWorkClaim) -> Vec<String> {
60    claim
61        .batches
62        .iter()
63        .map(|batch| batch.batch_id.clone())
64        .collect()
65}
66
67fn turn_phase_id(parent_turn_id: &str, phase: &str) -> String {
68    format!("{parent_turn_id}:{phase}")
69}
70
71fn scoped_child_turn_controller<'run>(
72    scoped_effect_controller: &'run ScopedEffectController<'_>,
73    session_id: &str,
74    turn_id: &str,
75) -> Result<ScopedEffectController<'run>, RuntimeError> {
76    ScopedEffectController::borrowed(
77        scoped_effect_controller.controller(),
78        ExecutionScope::turn(session_id, turn_id),
79    )
80}
81
82pub(in crate::runtime) fn queued_work_trace_payload(
83    boundary: crate::QueuedWorkClaimBoundary,
84    claim: &crate::QueuedWorkClaim,
85    causes: &[crate::TurnCause],
86) -> serde_json::Value {
87    serde_json::json!({
88        "boundary": boundary,
89        "claim_id": claim.claim_id,
90        "owner_id": claim.owner_id,
91        "batch_ids": queued_work_batch_ids(claim),
92        "payload_types": claim.batches.iter()
93            .flat_map(|batch| batch.items.iter())
94            .map(|item| queued_work_payload_type(&item.payload))
95            .collect::<Vec<_>>(),
96        "causes": causes,
97    })
98}
99
100pub(in crate::runtime) fn queued_work_completion_trace_payload(
101    completions: &[crate::QueuedWorkCompletion],
102) -> serde_json::Value {
103    serde_json::json!({
104        "claims": completions.iter().map(|completion| {
105            serde_json::json!({
106                "session_id": completion.session_id,
107                "claim_id": completion.claim_id,
108                "batch_ids": completion.batch_ids,
109            })
110        }).collect::<Vec<_>>(),
111    })
112}
113
114async fn emit_queued_work_started_to_sink(
115    events: &dyn TurnActivitySink,
116    boundary: crate::QueuedWorkClaimBoundary,
117    claim: &crate::QueuedWorkClaim,
118    causes: Vec<crate::TurnCause>,
119) {
120    emit_turn_activity_to_sink(
121        events,
122        TurnActivity::independent(TurnEvent::QueuedWorkStarted {
123            boundary,
124            batch_ids: queued_work_batch_ids(claim),
125            causes,
126        }),
127    )
128    .await;
129}
130
131pub(in crate::runtime) async fn send_queued_work_started_event(
132    event_tx: &mpsc::Sender<RuntimeStreamEvent>,
133    boundary: crate::QueuedWorkClaimBoundary,
134    claim: &crate::QueuedWorkClaim,
135    causes: Vec<crate::TurnCause>,
136) {
137    send_turn_activity(
138        event_tx,
139        TurnActivityId::fresh(),
140        TurnEvent::QueuedWorkStarted {
141            boundary,
142            batch_ids: queued_work_batch_ids(claim),
143            causes,
144        },
145    )
146    .await;
147}
148
149struct TurnFinishInput {
150    turn_pipeline: TurnBoundary,
151    assembler: TurnAssembler,
152    new_messages: crate::MessageSequence,
153    policy: RuntimeSessionPolicy,
154    turn_index: usize,
155    queued_work_completions: Vec<crate::QueuedWorkCompletion>,
156    trace_turn_id: String,
157}
158
159impl LashRuntime {
160    fn max_context_tokens(&self) -> usize {
161        self.state.effective_policy().context_window_tokens()
162    }
163    #[doc(hidden)]
164    pub fn set_turn_phase_probe(&mut self, probe: Arc<dyn RuntimeTurnPhaseProbe>) {
165        self.turn_phase_probe = Some(probe);
166    }
167
168    fn mark_phase_begin(&self, phase: RuntimeTurnPhase) {
169        if let Some(probe) = self.turn_phase_probe.as_ref() {
170            probe.begin(phase);
171        }
172    }
173
174    fn mark_phase_end(&self, phase: RuntimeTurnPhase) {
175        if let Some(probe) = self.turn_phase_probe.as_ref() {
176            probe.end(phase);
177        }
178    }
179
180    async fn finish_turn(
181        &mut self,
182        finish: TurnFinishInput,
183        events: &dyn EventSink,
184        scoped_effect_controller: &ScopedEffectController<'_>,
185        cancel_state: &CancellationToken,
186    ) -> Result<AssembledTurn, RuntimeError> {
187        let TurnFinishInput {
188            mut turn_pipeline,
189            assembler,
190            new_messages,
191            policy,
192            turn_index,
193            queued_work_completions,
194            trace_turn_id,
195        } = finish;
196        self.policy = self.state.effective_policy().clone();
197        turn_pipeline.state_mut().policy = self.policy.clone();
198        turn_pipeline.state_mut().turn_index = turn_index;
199
200        let mut turn_usage_delta = {
201            let mut ledger = self.shared_token_ledger.lock().expect("token ledger lock");
202            std::mem::take(&mut *ledger)
203        };
204        if assembler.token_usage.total() > 0 || assembler.token_usage.cached_input_tokens > 0 {
205            turn_usage_delta.push(TokenLedgerEntry {
206                source: "turn".to_string(),
207                model: policy.model.id.clone(),
208                usage: assembler.token_usage.clone(),
209            });
210        }
211        let turn_usage_delta = merge_usage_delta_entries(turn_usage_delta);
212
213        turn_pipeline.finalize_turn_read_state(new_messages, cancel_state.is_cancelled());
214        if assembler.token_usage.total() > 0 || assembler.token_usage.cached_input_tokens > 0 {
215            turn_pipeline.state_mut().token_usage = assembler.token_usage.clone();
216        }
217
218        let last_prompt_usage = assembler
219            .last_llm_usage()
220            .and_then(|usage| normalize_prompt_usage(policy.provider(), usage));
221        turn_pipeline.state_mut().last_prompt_usage = last_prompt_usage;
222        let assembled_state = turn_pipeline.export_state_for_assembly();
223        let assembled = assembler.finish(
224            assembled_state,
225            cancel_state.is_cancelled(),
226            None,
227            &self.host.core.control.termination,
228        );
229
230        let Some(session) = self.session.as_ref() else {
231            self.state.apply_snapshot(&assembled.state);
232            self.emit_completed_turn_trace(&assembled.state, &assembled.outcome, &trace_turn_id);
233            return Ok(assembled);
234        };
235
236        let plugins = Arc::clone(session.plugins());
237        let manager = match self.runtime_session_services_for_turn(None) {
238            Ok(manager) => manager,
239            Err(err) => {
240                return Err(RuntimeError::new(
241                    RuntimeErrorCode::PluginSessionManager,
242                    err.to_string(),
243                ));
244            }
245        };
246
247        self.mark_phase_begin(RuntimeTurnPhase::FinalizeTurn);
248        let finalized = match plugins
249            .finalize_turn_with_phase_probe(
250                assembled,
251                manager.state_service(),
252                manager.lifecycle_service(),
253                manager.graph_service(),
254                self.turn_phase_probe.clone(),
255            )
256            .await
257        {
258            Ok(finalized) => finalized,
259            Err(err) => {
260                self.mark_phase_end(RuntimeTurnPhase::FinalizeTurn);
261                return Err(RuntimeError::new(
262                    RuntimeErrorCode::PluginFinalizeTurn,
263                    err.to_string(),
264                ));
265            }
266        };
267        self.mark_phase_end(RuntimeTurnPhase::FinalizeTurn);
268
269        let mut returned_turn = finalized.turn;
270        self.mark_phase_begin(RuntimeTurnPhase::PersistTurn);
271        self.mark_phase_begin(RuntimeTurnPhase::FinalCommit);
272        let queued_work_completion_trace = queued_work_completions.clone();
273        let pending_attachment_ids = self
274            .host
275            .core
276            .durability
277            .attachment_store
278            .pending_manifest_commit_ids();
279        if let Err(err) = turn_pipeline
280            .final_commit(
281                &mut returned_turn,
282                self.session.as_mut(),
283                &turn_usage_delta,
284                Some(&trace_turn_id),
285                queued_work_completions,
286                pending_attachment_ids.clone(),
287            )
288            .await
289        {
290            self.mark_phase_end(RuntimeTurnPhase::FinalCommit);
291            self.mark_phase_end(RuntimeTurnPhase::PersistTurn);
292            return Err(err);
293        }
294        self.host
295            .core
296            .durability
297            .attachment_store
298            .mark_manifest_committed(&pending_attachment_ids);
299        self.mark_phase_end(RuntimeTurnPhase::FinalCommit);
300
301        emit_session_events_to_sink(events, finalized.events).await;
302        self.state = turn_pipeline.into_final_state();
303        if matches!(returned_turn.outcome, TurnOutcome::AgentFrameSwitch { .. })
304            && let Some(session) = self.session.as_mut()
305        {
306            let protocol_session = Arc::clone(session.plugins().protocol_session());
307            let session_id = self.state.session_id.clone();
308            protocol_session
309                .restore_session(
310                    crate::plugin::ProtocolSessionContext::new(session, &session_id),
311                    &self.state,
312                )
313                .await
314                .map_err(|err| {
315                    RuntimeError::new(
316                        RuntimeErrorCode::Other("protocol_restore_session".to_string()),
317                        err.to_string(),
318                    )
319                })?;
320        }
321        if !queued_work_completion_trace.is_empty() {
322            crate::trace::emit_trace(
323                &self.host.core.tracing.trace_sink,
324                &self.host.core.tracing.trace_context,
325                lash_trace::TraceContext::default()
326                    .for_session(returned_turn.state.session_id.clone())
327                    .for_turn_index(returned_turn.state.turn_index)
328                    .for_turn(trace_turn_id.clone()),
329                lash_trace::TraceEvent::Custom {
330                    name: "queued_work.completed".to_string(),
331                    payload: queued_work_completion_trace_payload(&queued_work_completion_trace),
332                },
333                self.host.core.clock.as_ref(),
334            );
335        }
336        self.mark_phase_begin(RuntimeTurnPhase::PostPersistHooks);
337        self.emit_turn_persisted_event(&returned_turn, scoped_effect_controller, &trace_turn_id)
338            .await?;
339        self.mark_phase_end(RuntimeTurnPhase::PostPersistHooks);
340        self.mark_phase_end(RuntimeTurnPhase::PersistTurn);
341
342        self.emit_completed_turn_trace(
343            &returned_turn.state,
344            &returned_turn.outcome,
345            &trace_turn_id,
346        );
347        Ok(returned_turn)
348    }
349
350    fn emit_completed_turn_trace(
351        &self,
352        state: &SessionSnapshot,
353        outcome: &TurnOutcome,
354        trace_turn_id: &str,
355    ) {
356        if self.host.core.tracing.trace_sink.is_none() {
357            return;
358        }
359
360        let (status, done_reason, agent_frame_switch) = trace_fields_from_outcome(outcome);
361        crate::trace::emit_trace(
362            &self.host.core.tracing.trace_sink,
363            &self.host.core.tracing.trace_context,
364            lash_trace::TraceContext::default()
365                .for_session(state.session_id.clone())
366                .for_turn_index(state.turn_index)
367                .for_turn(trace_turn_id.to_string()),
368            lash_trace::TraceEvent::TurnCompleted {
369                status: status.to_string(),
370                done_reason: done_reason.to_string(),
371                agent_frame_switch,
372            },
373            self.host.core.clock.as_ref(),
374        );
375    }
376
377    async fn emit_turn_persisted_event(
378        &self,
379        returned_turn: &AssembledTurn,
380        scoped_effect_controller: &ScopedEffectController<'_>,
381        trace_turn_id: &str,
382    ) -> Result<(), RuntimeError> {
383        let Some(session) = self.session.as_ref() else {
384            return Ok(());
385        };
386        let Ok(manager) = self.runtime_session_services() else {
387            return Ok(());
388        };
389        let phase_turn_id = turn_phase_id(trace_turn_id, "turn-persisted");
390        let phase_controller = scoped_child_turn_controller(
391            scoped_effect_controller,
392            &self.state.session_id,
393            &phase_turn_id,
394        )?;
395        let direct_completions = manager.direct_completion_client(
396            RuntimeEffectControllerHandle::borrowed(phase_controller),
397            Some(phase_turn_id),
398        );
399
400        session
401            .plugins()
402            .emit_runtime_event_with_phase_probe(
403                crate::PluginLifecycleEvent::TurnPersisted(Box::new(
404                    crate::SessionStateChangedContext {
405                        session_id: self.state.session_id.clone(),
406                        state: crate::SessionReadView::from_snapshot(&returned_turn.state),
407                        sessions: manager.state_service(),
408                        session_graph: manager.graph_service(),
409                        direct_completions,
410                    },
411                )),
412                self.turn_phase_probe.clone(),
413            )
414            .await;
415        Ok(())
416    }
417
418    /// Run a single turn and stream events to the host sink.
419    pub async fn stream_turn(
420        &mut self,
421        input: TurnInput,
422        opts: TurnOptions<'_>,
423    ) -> Result<AssembledTurn, RuntimeError> {
424        let cancel = opts.cancel.clone();
425        let scoped_effect_controller = opts.scoped_effect_controller();
426        self.stream_turn_with_scoped_effect_controller_inner(
427            input,
428            opts.events_or_noop(),
429            opts.turn_events_or_noop(),
430            scoped_effect_controller,
431            cancel,
432            None,
433        )
434        .await
435    }
436
437    pub async fn stream_next_queued_work(
438        &mut self,
439        opts: TurnOptions<'_>,
440    ) -> Result<Option<AssembledTurn>, RuntimeError> {
441        self.stream_queued_work(opts, None).await
442    }
443
444    pub async fn stream_selected_queued_work(
445        &mut self,
446        opts: TurnOptions<'_>,
447        batch_ids: &[String],
448    ) -> Result<Option<AssembledTurn>, RuntimeError> {
449        self.stream_queued_work(opts, Some(batch_ids)).await
450    }
451
452    async fn stream_queued_work(
453        &mut self,
454        opts: TurnOptions<'_>,
455        selected_batch_ids: Option<&[String]>,
456    ) -> Result<Option<AssembledTurn>, RuntimeError> {
457        if self.drain_next_session_command().await?.is_some() {
458            return Ok(None);
459        }
460        let Some(store) = self
461            .session
462            .as_ref()
463            .and_then(|session| session.history_store())
464        else {
465            return Ok(None);
466        };
467        let claim = if let Some(batch_ids) = selected_batch_ids {
468            store
469                .claim_ready_queued_work_by_batch_ids(
470                    &self.state.session_id,
471                    &self.runtime_scope_id,
472                    crate::QueuedWorkClaimBoundary::Idle,
473                    crate::QUEUED_WORK_CLAIM_TTL_MS,
474                    batch_ids,
475                )
476                .await
477        } else {
478            store
479                .claim_ready_queued_work(
480                    &self.state.session_id,
481                    &self.runtime_scope_id,
482                    crate::QueuedWorkClaimBoundary::Idle,
483                    crate::QUEUED_WORK_CLAIM_TTL_MS,
484                    64,
485                )
486                .await
487        }
488        .map_err(|err| RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string()))?;
489        let Some(claim) = claim else {
490            return Ok(None);
491        };
492        let mut work = claim.materialize_for_turn();
493        let turn_id = work
494            .input
495            .trace_turn_id
496            .clone()
497            .or_else(|| Some(opts.execution_scope_id().to_owned()))
498            .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
499        work.input.trace_turn_id = Some(turn_id.clone());
500        let causes = work.turn_causes.clone();
501        emit_queued_work_started_to_sink(
502            opts.turn_events_or_noop(),
503            crate::QueuedWorkClaimBoundary::Idle,
504            &claim,
505            causes.clone(),
506        )
507        .await;
508        crate::trace::emit_trace(
509            &self.host.core.tracing.trace_sink,
510            &self.host.core.tracing.trace_context,
511            lash_trace::TraceContext::default()
512                .for_session(self.state.session_id.clone())
513                .for_turn_index(self.state.turn_index + 1)
514                .for_turn(turn_id.clone()),
515            lash_trace::TraceEvent::Custom {
516                name: "queued_work.claimed".to_string(),
517                payload: queued_work_trace_payload(
518                    crate::QueuedWorkClaimBoundary::Idle,
519                    &claim,
520                    &causes,
521                ),
522            },
523            self.host.core.clock.as_ref(),
524        );
525        let cancel = opts.cancel.clone();
526        let scoped_effect_controller = opts.scoped_effect_controller();
527        self.stream_turn_with_scoped_effect_controller_inner(
528            work.input,
529            opts.events_or_noop(),
530            opts.turn_events_or_noop(),
531            scoped_effect_controller,
532            cancel,
533            Some(claim),
534        )
535        .await
536        .map(Some)
537    }
538
539    /// Enforce the durable-first wiring invariant at a turn-scope boundary: when
540    /// the host wired a durable effect host, every store reachable from this
541    /// scope must also be durable. A durable host running against any ephemeral
542    /// store fails loudly here rather than silently degrading.
543    ///
544    /// Inline controllers (the default tier) impose no requirement, so
545    /// inline/in-memory hosts pass unchanged.
546    fn ensure_durable_store_facets_for_scope(
547        &self,
548        scoped_effect_controller: &ScopedEffectController<'_>,
549    ) -> Result<(), RuntimeError> {
550        if scoped_effect_controller.controller().durability_tier() != crate::DurabilityTier::Durable
551        {
552            return Ok(());
553        }
554        if self
555            .host
556            .core
557            .durability
558            .attachment_store
559            .persistence()
560            .durability_tier()
561            != crate::DurabilityTier::Durable
562        {
563            return Err(RuntimeError::durable_store_required(
564                crate::DurableStoreFacet::AttachmentStore,
565            ));
566        }
567        if self
568            .host
569            .core
570            .durability
571            .process_env_store
572            .durability_tier()
573            != crate::DurabilityTier::Durable
574        {
575            return Err(RuntimeError::durable_store_required(
576                crate::DurableStoreFacet::ProcessEnvStore,
577            ));
578        }
579        if let Some(store) = self
580            .session
581            .as_ref()
582            .and_then(|session| session.history_store())
583            && store.durability_tier() != crate::DurabilityTier::Durable
584        {
585            return Err(RuntimeError::durable_store_required(
586                crate::DurableStoreFacet::SessionStore,
587            ));
588        }
589        if let Some(process_registry) = self.host.process_registry.as_ref()
590            && process_registry.durability_tier() != crate::DurabilityTier::Durable
591        {
592            return Err(RuntimeError::durable_store_required(
593                crate::DurableStoreFacet::ProcessRegistry,
594            ));
595        }
596        Ok(())
597    }
598
599    async fn stream_turn_with_scoped_effect_controller_inner(
600        &mut self,
601        mut input: TurnInput,
602        events: &dyn EventSink,
603        turn_events: &dyn TurnActivitySink,
604        scoped_effect_controller: ScopedEffectController<'_>,
605        cancel: CancellationToken,
606        queued_claim: Option<crate::QueuedWorkClaim>,
607    ) -> Result<AssembledTurn, RuntimeError> {
608        if queued_claim.is_none() {
609            while self.drain_next_session_command().await?.is_some() {}
610        }
611        if let Some(input_turn_id) = input.trace_turn_id.as_deref()
612            && scoped_effect_controller
613                .execution_scope()
614                .validates_turn_trace_id()
615            && input_turn_id != scoped_effect_controller.scope_id()
616        {
617            return Err(RuntimeError::new(
618                RuntimeErrorCode::ExecutionScopeTurnIdMismatch,
619                format!(
620                    "input trace_turn_id `{input_turn_id}` does not match execution scope id `{}`",
621                    scoped_effect_controller.scope_id()
622                ),
623            ));
624        }
625        self.ensure_durable_store_facets_for_scope(&scoped_effect_controller)?;
626        input
627            .trace_turn_id
628            .get_or_insert_with(|| scoped_effect_controller.scope_id().to_string());
629        self.stream_turn_inner(
630            input.clone(),
631            events,
632            turn_events,
633            scoped_effect_controller,
634            cancel.clone(),
635            queued_claim,
636        )
637        .await
638    }
639
640    /// Stream one logical host turn, following foreground AgentFrame switches
641    /// until a terminal outcome is reached.
642    ///
643    /// A protocol continuation creates a new frame in the same session. Hosts
644    /// that only care about the benchmark/app answer should not need to
645    /// special-case that intermediate outcome; this helper keeps driving the
646    /// same session through each frame's task with the normal runtime turn
647    /// guards.
648    pub async fn stream_turn_with_agent_frames(
649        &mut self,
650        input: TurnInput,
651        opts: TurnOptions<'_>,
652    ) -> Result<AgentFrameRun, RuntimeError> {
653        let cancel = opts.cancel.clone();
654        let scoped_effect_controller = opts.scoped_effect_controller();
655        self.stream_turn_with_agent_frames_inner(
656            input,
657            opts.events_or_noop(),
658            opts.turn_events_or_noop(),
659            scoped_effect_controller,
660            cancel,
661        )
662        .await
663    }
664
665    async fn stream_turn_with_agent_frames_inner(
666        &mut self,
667        mut input: TurnInput,
668        events: &dyn EventSink,
669        turn_events: &dyn TurnActivitySink,
670        scoped_effect_controller: ScopedEffectController<'_>,
671        cancel: CancellationToken,
672    ) -> Result<AgentFrameRun, RuntimeError> {
673        if let Some(input_turn_id) = input.trace_turn_id.as_deref()
674            && scoped_effect_controller
675                .execution_scope()
676                .validates_turn_trace_id()
677            && input_turn_id != scoped_effect_controller.scope_id()
678        {
679            return Err(RuntimeError::new(
680                RuntimeErrorCode::ExecutionScopeTurnIdMismatch,
681                format!(
682                    "input trace_turn_id `{input_turn_id}` does not match execution scope id `{}`",
683                    scoped_effect_controller.scope_id()
684                ),
685            ));
686        }
687        let follow_protocol_turn_options = input.protocol_turn_options.clone();
688        let follow_turn_context = input.turn_context.clone();
689        let follow_trace_turn_id = input
690            .trace_turn_id
691            .clone()
692            .unwrap_or_else(|| scoped_effect_controller.scope_id().to_string());
693        input
694            .trace_turn_id
695            .get_or_insert(follow_trace_turn_id.clone());
696        let mut turns = Vec::new();
697        loop {
698            let turn_trace_turn_id = agent_frame_follow_turn_id(&follow_trace_turn_id, turns.len());
699            input.trace_turn_id = Some(turn_trace_turn_id.clone());
700            let turn_effect_controller = if turns.is_empty() {
701                scoped_effect_controller.clone()
702            } else {
703                ScopedEffectController::borrowed(
704                    scoped_effect_controller.controller(),
705                    ExecutionScope::turn(&self.state.session_id, &turn_trace_turn_id),
706                )?
707            };
708            let turn = self
709                .stream_turn_with_scoped_effect_controller_inner(
710                    input,
711                    events,
712                    turn_events,
713                    turn_effect_controller,
714                    cancel.clone(),
715                    None,
716                )
717                .await?;
718            let switched_frame = match &turn.outcome {
719                TurnOutcome::AgentFrameSwitch { frame_id, task } => {
720                    Some((frame_id.clone(), task.clone()))
721                }
722                _ => None,
723            };
724            turns.push(turn);
725
726            let Some((_frame_id, task)) = switched_frame else {
727                return Ok(AgentFrameRun { turns });
728            };
729            input = turn_input_from_text(task);
730            input.protocol_turn_options = follow_protocol_turn_options.clone();
731            input.turn_context = follow_turn_context.clone();
732        }
733    }
734
735    async fn stream_turn_inner(
736        &mut self,
737        mut input: TurnInput,
738        events: &dyn EventSink,
739        turn_events: &dyn TurnActivitySink,
740        scoped_effect_controller: ScopedEffectController<'_>,
741        cancel: CancellationToken,
742        queued_claim: Option<crate::QueuedWorkClaim>,
743    ) -> Result<AssembledTurn, RuntimeError> {
744        self.refresh_session_graph_from_store()
745            .await
746            .map_err(session_head_refresh_error)?;
747        let input_trace_turn_id = input.trace_turn_id.clone();
748        let queued_turn_work = queued_claim
749            .as_ref()
750            .map(crate::QueuedWorkClaim::materialize_for_turn);
751        if let Some(work) = queued_turn_work.as_ref()
752            && input.items.is_empty()
753            && input.image_blobs.is_empty()
754        {
755            input = work.input.clone();
756            if input.trace_turn_id.is_none() {
757                input.trace_turn_id = input_trace_turn_id;
758            }
759        }
760        if self
761            .session
762            .as_ref()
763            .and_then(|session| session.history_store())
764            .is_some()
765        {
766            ensure_durable_effect_input(&input)?;
767        }
768        if let Some(extension) = &input.protocol_extension
769            && let Some(session) = self.session.as_ref()
770        {
771            let protocol_session = std::sync::Arc::clone(session.plugins().protocol_session());
772            protocol_session
773                .validate_turn_extension(extension)
774                .await
775                .map_err(|err| {
776                    RuntimeError::new(RuntimeErrorCode::ProtocolTurnExtension, err.to_string())
777                })?;
778        }
779        let previous_prompt_usage = self.state.last_prompt_usage.clone();
780        let normalized = match self
781            .normalize_input_items(&input.items, &input.image_blobs)
782            .await
783        {
784            Ok(items) => items,
785            Err(e) => {
786                self.state.last_prompt_usage = None;
787                let mut assembler = TurnAssembler::default();
788                let error_event = SessionEvent::Error {
789                    message: e.clone(),
790                    envelope: Some(crate::session_model::ErrorEnvelope {
791                        kind: "input_validation".to_string(),
792                        code: Some("invalid_turn_input".to_string()),
793                        terminal_reason: None,
794                        user_message: e.clone(),
795                        raw: None,
796                    }),
797                };
798                assembler.push(&error_event);
799                emit_turn_activity_to_sink(
800                    turn_events,
801                    TurnActivity::independent(TurnEvent::Error { message: e }),
802                )
803                .await;
804                emit_session_event_to_sink(events, error_event).await;
805                let outcome_event = SessionEvent::TurnOutcome {
806                    outcome: TurnOutcome::Stopped(TurnStop::InvalidInput),
807                };
808                assembler.push(&outcome_event);
809                emit_session_event_to_sink(events, outcome_event).await;
810                assembler.push(&SessionEvent::Done);
811                emit_session_event_to_sink(events, SessionEvent::Done).await;
812                return Ok(assembler.finish(
813                    self.state.to_snapshot(),
814                    false,
815                    None,
816                    &self.host.core.control.termination,
817                ));
818            }
819        };
820        let turn_index = self.state.turn_index + 1;
821        let trace_turn_id = input
822            .trace_turn_id
823            .clone()
824            .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
825        if self.host.core.tracing.trace_sink.is_some() {
826            let mut trace_metadata = std::collections::BTreeMap::new();
827            trace_metadata.insert(
828                "input_item_count".to_string(),
829                serde_json::json!(normalized.len()),
830            );
831            crate::trace::emit_trace(
832                &self.host.core.tracing.trace_sink,
833                &self.host.core.tracing.trace_context,
834                lash_trace::TraceContext::default()
835                    .for_session(self.state.session_id.clone())
836                    .for_turn_index(turn_index)
837                    .for_turn(trace_turn_id.clone()),
838                lash_trace::TraceEvent::TurnStarted {
839                    metadata: trace_metadata,
840                },
841                self.host.core.clock.as_ref(),
842            );
843        }
844
845        let base_read_model = self.state.read_model();
846        let base_messages = base_read_model.messages;
847        let base_render_cache = base_read_model.prompt_render_cache;
848        let mut turn_delta = Vec::new();
849        let initial_turn_causes = queued_turn_work
850            .as_ref()
851            .map(|work| work.turn_causes.clone())
852            .unwrap_or_default();
853        turn_delta.extend(
854            initial_turn_causes
855                .iter()
856                .map(crate::TurnCause::to_event_message),
857        );
858
859        let user_id = fresh_message_id();
860        let mut user_parts: Vec<Part> = Vec::new();
861        for item in normalized {
862            match item {
863                NormalizedItem::Text(text) => {
864                    if text.is_empty() {
865                        continue;
866                    }
867                    user_parts.push(Part {
868                        id: format!("{}.p{}", user_id, user_parts.len()),
869                        kind: PartKind::Text,
870                        content: text,
871                        attachment: None,
872                        tool_call_id: None,
873                        tool_name: None,
874                        tool_replay: None,
875                        prune_state: PruneState::Intact,
876                        reasoning_meta: None,
877                        response_meta: None,
878                    });
879                }
880                NormalizedItem::Image(reference) => {
881                    user_parts.push(Part {
882                        id: format!("{}.p{}", user_id, user_parts.len()),
883                        kind: PartKind::Image,
884                        content: String::new(),
885                        attachment: Some(crate::session_model::message::PartAttachment {
886                            reference,
887                        }),
888                        tool_call_id: None,
889                        tool_name: None,
890                        tool_replay: None,
891                        prune_state: PruneState::Intact,
892                        reasoning_meta: None,
893                        response_meta: None,
894                    });
895                }
896            }
897        }
898        if user_parts.is_empty() && initial_turn_causes.is_empty() {
899            user_parts.push(Part {
900                id: format!("{}.p0", user_id),
901                kind: PartKind::Text,
902                content: String::new(),
903                attachment: None,
904                tool_call_id: None,
905                tool_name: None,
906                tool_replay: None,
907                prune_state: PruneState::Intact,
908                reasoning_meta: None,
909                response_meta: None,
910            });
911        }
912        if !user_parts.is_empty() {
913            reassign_part_ids(&user_id, &mut user_parts);
914            turn_delta.push(Message {
915                id: user_id.clone(),
916                role: MessageRole::User,
917                parts: shared_parts(user_parts),
918                origin: None,
919            });
920        }
921
922        let manager = self
923            .runtime_session_services_for_turn(None)
924            .map_err(|err| {
925                RuntimeError::new(RuntimeErrorCode::PluginSessionManager, err.to_string())
926            })?;
927        let plugin_session = self
928            .session
929            .as_ref()
930            .map(|s| Arc::clone(s.plugins()))
931            .ok_or_else(|| {
932                RuntimeError::new(
933                    RuntimeErrorCode::ContextPrepareTurn,
934                    "runtime session not available",
935                )
936            })?;
937        let prepare_phase_turn_id = turn_phase_id(&trace_turn_id, "prepare-turn");
938        let prepare_phase_controller = scoped_child_turn_controller(
939            &scoped_effect_controller,
940            &self.state.session_id,
941            &prepare_phase_turn_id,
942        )?;
943        let turn_ctx = crate::TurnTransformContext {
944            session_id: self.state.session_id.clone(),
945            state: self.read_view(),
946            prompt_usage: previous_prompt_usage.clone(),
947            max_context_tokens: Some(LashRuntime::max_context_tokens(self)),
948            sessions: manager.state_service(),
949            session_lifecycle: manager.lifecycle_service(),
950            session_graph: manager.graph_service(),
951            scoped_effect_controller: scoped_effect_controller.clone(),
952            direct_completions: manager.direct_completion_client(
953                RuntimeEffectControllerHandle::borrowed(prepare_phase_controller),
954                Some(prepare_phase_turn_id),
955            ),
956        };
957        self.mark_phase_begin(RuntimeTurnPhase::ContextTransform);
958        let prepared_context = plugin_session
959            .prepare_turn_context(
960                &turn_ctx,
961                crate::session_model::context::PreparedContext {
962                    messages: crate::MessageSequence::from_base_and_delta(
963                        base_messages,
964                        turn_delta,
965                    )
966                    .with_base_render_cache(base_render_cache),
967                    ..Default::default()
968                },
969                self.turn_phase_probe.clone(),
970            )
971            .await
972            .map_err(|err| {
973                RuntimeError::new(RuntimeErrorCode::ContextPrepareTurn, err.to_string())
974            })?;
975        self.mark_phase_end(RuntimeTurnPhase::ContextTransform);
976        // Release the read-view's graph clone before the rest of the turn
977        // runs. Keeping it alive into `stream_prepared_turn` forces the
978        // post-turn `append_active_read_delta` to deep-clone the session
979        // graph (Arc::make_mut with refcount > 1).
980        drop(turn_ctx);
981        let messages = prepared_context.messages;
982        if let Some(session) = self.session.as_mut() {
983            session
984                .set_context_overlay(
985                    prepared_context.tool_providers,
986                    prepared_context.prompt_contributions,
987                    prepared_context.include_base_tools,
988                )
989                .map_err(|err| {
990                    RuntimeError::new(
991                        RuntimeErrorCode::Other("session_tool_registry".to_string()),
992                        err.to_string(),
993                    )
994                })?;
995        }
996
997        self.state.last_prompt_usage = None;
998        Box::pin(self.stream_prepared_turn(
999            messages,
1000            previous_prompt_usage,
1001            input.protocol_turn_options.clone(),
1002            input.protocol_extension.clone(),
1003            input.turn_context.clone(),
1004            initial_turn_causes,
1005            trace_turn_id,
1006            turn_index,
1007            events,
1008            turn_events,
1009            scoped_effect_controller,
1010            cancel,
1011            queued_claim,
1012        ))
1013        .await
1014    }
1015
1016    /// Run a single turn and return only the assembled terminal result.
1017    pub async fn run_turn_assembled(
1018        &mut self,
1019        input: TurnInput,
1020        cancel: CancellationToken,
1021        scoped_effect_controller: ScopedEffectController<'_>,
1022    ) -> Result<AssembledTurn, RuntimeError> {
1023        self.stream_turn(input, TurnOptions::new(cancel, scoped_effect_controller))
1024            .await
1025    }
1026
1027    /// Run a turn using host-prepared message history.
1028    #[allow(clippy::too_many_arguments)]
1029    pub async fn stream_prepared_turn(
1030        &mut self,
1031        messages: crate::MessageSequence,
1032        _previous_prompt_usage: Option<PromptUsage>,
1033        protocol_turn_options: Option<crate::ProtocolTurnOptions>,
1034        protocol_extension: Option<crate::ProtocolTurnExtensionHandle>,
1035        turn_context: crate::TurnContext,
1036        initial_turn_causes: Vec<crate::TurnCause>,
1037        trace_turn_id: String,
1038        turn_index: usize,
1039        events: &dyn EventSink,
1040        turn_events: &dyn TurnActivitySink,
1041        scoped_effect_controller: ScopedEffectController<'_>,
1042        cancel: CancellationToken,
1043        initial_queue_claim: Option<crate::QueuedWorkClaim>,
1044    ) -> Result<AssembledTurn, RuntimeError> {
1045        let (event_tx, mut event_rx) = mpsc::channel::<RuntimeStreamEvent>(100);
1046        let child_usage_event_relay = ChildUsageEventRelay::new(event_tx.clone());
1047        let mut turn_policy = self.state.effective_policy().clone();
1048        let turn_provider_override = turn_context.provider().cloned();
1049        if let Some(provider) = turn_provider_override.as_ref() {
1050            turn_policy.provider_id = provider.kind().to_string();
1051        }
1052        if let Some(model) = turn_context.model_spec() {
1053            turn_policy.model = model.clone();
1054        }
1055        let session_protocol_turn_options = self.state.effective_protocol_turn_options().clone();
1056        let effective_protocol_turn_options = protocol_turn_options
1057            .clone()
1058            .map(|options| session_protocol_turn_options.merged_with_override(&options))
1059            .unwrap_or(session_protocol_turn_options);
1060        let manager = self
1061            .runtime_session_services_for_turn(Some(child_usage_event_relay.clone()))
1062            .map_err(|err| {
1063                RuntimeError::new(RuntimeErrorCode::PluginSessionManager, err.to_string())
1064            })?;
1065        let plugins = {
1066            let session = self
1067                .session
1068                .as_ref()
1069                .expect("lash runtime session must be available");
1070            Arc::clone(session.plugins())
1071        };
1072        let mut assembler = TurnAssembler::new();
1073        self.mark_phase_begin(RuntimeTurnPhase::BeforeTurnHooks);
1074        // Block-scope the pinned future so it (and its captured
1075        // `SessionReadView` clone of the session graph) drops before the
1076        // post-turn `append_active_read_delta` mutation. Keeping it alive
1077        // across the turn forces `Arc::make_mut` to deep-clone
1078        // `SessionGraphData`.
1079        let prepared = {
1080            let prepare_turn = plugins.prepare_turn_with_phase_probe(
1081                PrepareTurnRequest {
1082                    session_id: self.state.session_id.clone(),
1083                    state: crate::SessionReadView::from_runtime_state(
1084                        &self.state,
1085                        turn_policy.clone(),
1086                        effective_protocol_turn_options.clone(),
1087                    ),
1088                    messages,
1089                    sessions: manager.state_service(),
1090                    session_lifecycle: manager.lifecycle_service(),
1091                    session_graph: manager.graph_service(),
1092                    turn_context: turn_context.clone(),
1093                },
1094                self.turn_phase_probe.clone(),
1095            );
1096            let mut prepare_turn = Box::pin(prepare_turn);
1097
1098            loop {
1099                tokio::select! {
1100                    prepared = prepare_turn.as_mut() => {
1101                        let prepared = prepared.map_err(|err| {
1102                            RuntimeError::new(RuntimeErrorCode::PluginPrepareTurn, err.to_string())
1103                        })?;
1104                        self.mark_phase_end(RuntimeTurnPhase::BeforeTurnHooks);
1105                        break prepared;
1106                    }
1107                    maybe_event = event_rx.recv() => {
1108                        if let Some(event) = maybe_event {
1109                            emit_runtime_stream_event_to_sinks(
1110                                events,
1111                                turn_events,
1112                                event,
1113                                &mut assembler,
1114                            )
1115                            .await;
1116                        }
1117                    }
1118                }
1119            }
1120        };
1121        for event in &prepared.events {
1122            assembler.push(event);
1123        }
1124        emit_session_events_to_sink(events, prepared.events).await;
1125        if let Some(abort) = prepared.abort {
1126            drop(event_tx);
1127
1128            let mut turn_pipeline = TurnBoundary::from_state_with_clock(
1129                self.state.clone(),
1130                Arc::clone(&self.host.core.clock),
1131            );
1132            turn_pipeline.apply_prepared_messages(&prepared.messages);
1133            let state = turn_pipeline.into_final_state();
1134            let issue = TurnIssue {
1135                kind: "plugin".to_string(),
1136                code: Some(abort.code),
1137                terminal_reason: None,
1138                message: abort.message.clone(),
1139                raw: None,
1140            };
1141            let error_event = SessionEvent::Error {
1142                message: abort.message,
1143                envelope: Some(crate::session_model::ErrorEnvelope {
1144                    kind: "plugin".to_string(),
1145                    code: issue.code.clone(),
1146                    terminal_reason: None,
1147                    user_message: issue.message.clone(),
1148                    raw: None,
1149                }),
1150            };
1151            assembler.push(&error_event);
1152            emit_turn_activity_to_sink(
1153                turn_events,
1154                TurnActivity::independent(TurnEvent::Error {
1155                    message: issue.message.clone(),
1156                }),
1157            )
1158            .await;
1159            emit_session_event_to_sink(events, error_event).await;
1160            let outcome_event = SessionEvent::TurnOutcome {
1161                outcome: TurnOutcome::Stopped(TurnStop::PluginAbort),
1162            };
1163            assembler.push(&outcome_event);
1164            emit_session_event_to_sink(events, outcome_event).await;
1165            assembler.push(&SessionEvent::Done);
1166            emit_session_event_to_sink(events, SessionEvent::Done).await;
1167            return Ok(assembler.finish(
1168                state.to_snapshot(),
1169                cancel.is_cancelled(),
1170                Some(issue),
1171                &self.host.core.control.termination,
1172            ));
1173        }
1174        let mut turn_pipeline = TurnBoundary::from_state_with_clock(
1175            self.state.clone(),
1176            Arc::clone(&self.host.core.clock),
1177        );
1178        let store = self
1179            .session
1180            .as_ref()
1181            .and_then(|session| session.history_store());
1182        // Durable controllers, like Restate, own in-flight replay. Writing
1183        // progress checkpoints directly to the shared store would make handler
1184        // replay observe a newer partial turn and change effect replay keys.
1185        let progress_store = if scoped_effect_controller.controller().durability_tier()
1186            == crate::DurabilityTier::Durable
1187        {
1188            None
1189        } else {
1190            store.as_ref().map(|store| store.as_ref())
1191        };
1192        turn_pipeline
1193            .prepared_checkpoint(
1194                progress_store,
1195                turn_policy.clone(),
1196                turn_index,
1197                &prepared.messages,
1198                self.session.as_mut(),
1199            )
1200            .await
1201            .map_err(|err| {
1202                RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
1203            })?;
1204        let resolved_turn_policy = if let Some(provider) = turn_provider_override {
1205            RuntimeSessionPolicy::from_provider(
1206                turn_policy.clone(),
1207                provider.with_clock(Arc::clone(&self.host.core.clock)),
1208            )
1209            .map_err(|err| RuntimeError::new("llm_provider", err.to_string()))?
1210        } else {
1211            self.host
1212                .resolve_session_policy(&self.state.session_id, turn_policy.clone())
1213                .map_err(|err| RuntimeError::new("llm_provider", err.to_string()))?
1214        };
1215        let manager = self
1216            .runtime_session_services_for_turn(Some(child_usage_event_relay.clone()))
1217            .map_err(|err| {
1218                RuntimeError::new(RuntimeErrorCode::PluginSessionManager, err.to_string())
1219            })?;
1220        let cancel_state = cancel.clone();
1221        let finish_scoped_effect_controller = scoped_effect_controller.clone();
1222        let session = self
1223            .session
1224            .take()
1225            .expect("lash runtime session must be available");
1226        let mut driver = RuntimeTurnDriver {
1227            session,
1228            policy: resolved_turn_policy,
1229            host: self.host.clone(),
1230            turn_id: scoped_effect_controller.scope_id().to_string(),
1231            scoped_effect_controller,
1232            session_id: self.state.session_id.clone(),
1233            turn_index,
1234            turn_pipeline,
1235            llm_stream_summaries: HashMap::new(),
1236            next_llm_ordinal: 0,
1237            session_services: manager,
1238            protocol_turn_options: effective_protocol_turn_options,
1239            protocol_extension,
1240            turn_context,
1241            turn_causes: initial_turn_causes,
1242            pending_queue_claims: initial_queue_claim.into_iter().collect(),
1243            checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
1244            turn_phase_probe: self.turn_phase_probe.clone(),
1245        };
1246        let protocol_run_offset = 0;
1247        self.mark_phase_begin(RuntimeTurnPhase::EffectLoop);
1248        let run_result = drive_turn_to_completion(
1249            driver.run(prepared.messages, event_tx, cancel, protocol_run_offset),
1250            &mut event_rx,
1251            &mut assembler,
1252            &child_usage_event_relay,
1253            events,
1254            turn_events,
1255        )
1256        .await;
1257        let (new_messages, _new_protocol_iteration) = match run_result {
1258            Ok(result) => result,
1259            Err(err) => {
1260                self.mark_phase_end(RuntimeTurnPhase::EffectLoop);
1261                let RuntimeTurnDriver { session, .. } = driver;
1262                self.session = Some(session);
1263                return Err(err);
1264            }
1265        };
1266        self.mark_phase_end(RuntimeTurnPhase::EffectLoop);
1267        tracing::debug!(
1268            new_message_count = new_messages.len(),
1269            tool_call_count = assembler.tool_calls.len(),
1270            "runtime post-run_task"
1271        );
1272
1273        let RuntimeTurnDriver {
1274            session,
1275            policy,
1276            turn_pipeline,
1277            pending_queue_claims,
1278            ..
1279        } = driver;
1280        self.session = Some(session);
1281        self.finish_turn(
1282            TurnFinishInput {
1283                turn_pipeline,
1284                assembler,
1285                new_messages,
1286                policy,
1287                turn_index,
1288                queued_work_completions: pending_queue_claims
1289                    .iter()
1290                    .map(crate::QueuedWorkClaim::completion)
1291                    .collect(),
1292                trace_turn_id,
1293            },
1294            events,
1295            &finish_scoped_effect_controller,
1296            &cancel_state,
1297        )
1298        .await
1299    }
1300    async fn normalize_input_items(
1301        &self,
1302        items: &[InputItem],
1303        image_blobs: &HashMap<String, Vec<u8>>,
1304    ) -> Result<Vec<NormalizedItem>, String> {
1305        normalize_input_items(
1306            items,
1307            image_blobs,
1308            self.host.core.durability.attachment_store.as_ref(),
1309        )
1310        .await
1311    }
1312}
1313
1314fn turn_input_from_text(text: String) -> TurnInput {
1315    TurnInput {
1316        items: vec![InputItem::Text { text }],
1317        image_blobs: HashMap::new(),
1318        protocol_turn_options: None,
1319        trace_turn_id: None,
1320        protocol_extension: None,
1321        turn_context: crate::TurnContext::default(),
1322    }
1323}
1324
1325fn agent_frame_follow_turn_id(root_turn_id: &str, completed_turn_count: usize) -> String {
1326    if completed_turn_count == 0 {
1327        root_turn_id.to_string()
1328    } else {
1329        format!("{root_turn_id}:agent-frame:{completed_turn_count}")
1330    }
1331}
1332
1333pub fn ensure_durable_effect_input(input: &TurnInput) -> Result<(), RuntimeError> {
1334    if input.protocol_extension.is_some() {
1335        return Err(RuntimeError::new(
1336            RuntimeErrorCode::DurableEffectLiveProtocolExtension,
1337            "durable effect hosts do not support live protocol_extension inputs; encode replayable data in protocol_turn_options or persisted plugin state",
1338        ));
1339    }
1340    input
1341        .turn_context
1342        .live_plugin_inputs()
1343        .durable_effect_rejection()?;
1344    Ok(())
1345}
1346
1347async fn emit_turn_activity_to_sink(events: &dyn TurnActivitySink, activity: TurnActivity) {
1348    if !events.is_noop() {
1349        events.emit(activity).await;
1350    }
1351}
1352
1353/// Pump the turn driver's event channel into the host sinks while the run
1354/// future executes, then drain any events emitted between completion and the
1355/// sender dropping.
1356///
1357/// Both the fresh and resumed turn entry points construct a
1358/// `RuntimeTurnDriver`, kick off its run future, and need identical
1359/// event-pump/drain behavior before tearing the driver down. Only the driver
1360/// construction and post-run teardown differ, so each caller owns those and
1361/// shares this loop.
1362async fn drive_turn_to_completion<F>(
1363    run_future: F,
1364    event_rx: &mut mpsc::Receiver<RuntimeStreamEvent>,
1365    assembler: &mut TurnAssembler,
1366    child_usage_event_relay: &ChildUsageEventRelay,
1367    events: &dyn EventSink,
1368    turn_events: &dyn TurnActivitySink,
1369) -> Result<(crate::MessageSequence, usize), RuntimeError>
1370where
1371    F: std::future::Future<Output = Result<(crate::MessageSequence, usize), RuntimeError>>,
1372{
1373    let run_result = {
1374        let mut run_future = Box::pin(run_future);
1375        loop {
1376            tokio::select! {
1377                maybe_event = event_rx.recv() => {
1378                    if let Some(event) = maybe_event {
1379                        emit_runtime_stream_event_to_sinks(
1380                            events,
1381                            turn_events,
1382                            event,
1383                            assembler,
1384                        )
1385                        .await;
1386                    }
1387                }
1388                completed = run_future.as_mut() => {
1389                    child_usage_event_relay.clear();
1390                    break completed;
1391                }
1392            }
1393        }
1394    };
1395    while let Some(event) = event_rx.recv().await {
1396        emit_runtime_stream_event_to_sinks(events, turn_events, event, assembler).await;
1397    }
1398    run_result
1399}
1400
1401async fn emit_runtime_stream_event_to_sinks(
1402    events: &dyn EventSink,
1403    turn_events: &dyn TurnActivitySink,
1404    event: RuntimeStreamEvent,
1405    assembler: &mut TurnAssembler,
1406) {
1407    match event {
1408        RuntimeStreamEvent::Session(event) => {
1409            assembler.push(&event);
1410            emit_session_event_to_sink(events, event).await;
1411        }
1412        RuntimeStreamEvent::Turn(activity) => {
1413            assembler.push_turn_activity(&activity);
1414            emit_turn_activity_to_sink(turn_events, activity).await;
1415        }
1416    }
1417}
1418
1419#[cfg(test)]
1420mod tests {
1421    use super::agent_frame_follow_turn_id;
1422
1423    #[test]
1424    fn agent_frame_follow_turn_ids_are_distinct_and_deterministic() {
1425        assert_eq!(agent_frame_follow_turn_id("root-turn", 0), "root-turn");
1426        assert_eq!(
1427            agent_frame_follow_turn_id("root-turn", 1),
1428            "root-turn:agent-frame:1"
1429        );
1430        assert_eq!(
1431            agent_frame_follow_turn_id("root-turn", 2),
1432            "root-turn:agent-frame:2"
1433        );
1434    }
1435}