Skip to main content

lash_core/runtime/
turn_loop.rs

1use super::*;
2
3fn trace_fields_from_outcome(
4    outcome: &TurnOutcome,
5) -> (
6    &'static str,
7    &'static str,
8    Option<lash_trace::TraceAgentFrameSwitch>,
9) {
10    match outcome {
11        TurnOutcome::Finished(TurnFinish::AssistantMessage { .. }) => {
12            ("completed", "assistant_message", None)
13        }
14        TurnOutcome::Finished(TurnFinish::SubmittedValue { .. }) => {
15            ("completed", "submitted_value", None)
16        }
17        TurnOutcome::Finished(TurnFinish::ToolValue { .. }) => ("completed", "tool_value", None),
18        TurnOutcome::AgentFrameSwitch { frame_id } => (
19            "completed",
20            "agent_frame_switch",
21            Some(lash_trace::TraceAgentFrameSwitch {
22                frame_id: frame_id.clone(),
23            }),
24        ),
25        TurnOutcome::Stopped(stop) => ("failed", trace_stop_reason(stop), None),
26    }
27}
28
29fn trace_stop_reason(stop: &TurnStop) -> &'static str {
30    match stop {
31        TurnStop::Cancelled => "cancelled",
32        TurnStop::Incomplete => "incomplete",
33        TurnStop::InvalidInput => "invalid_input",
34        TurnStop::MaxTurns => "max_turns",
35        TurnStop::ToolFailure => "tool_failure",
36        TurnStop::ProviderError => "provider_error",
37        TurnStop::PluginAbort => "plugin_abort",
38        TurnStop::RuntimeError => "runtime_error",
39        TurnStop::SubmittedError { .. } => "submitted_error",
40        TurnStop::ToolError { .. } => "tool_error",
41    }
42}
43
44fn session_head_refresh_error(err: SessionError) -> RuntimeError {
45    RuntimeError::new(
46        RuntimeErrorCode::Other("session_head_refresh".to_string()),
47        err.to_string(),
48    )
49}
50
51fn queued_work_payload_type(payload: &crate::QueuedWorkPayload) -> &'static str {
52    match payload {
53        crate::QueuedWorkPayload::TurnInput { .. } => "turn_input",
54        crate::QueuedWorkPayload::ProcessWake { .. } => "process_wake",
55        crate::QueuedWorkPayload::HostEvent { .. } => "host_event",
56        crate::QueuedWorkPayload::Timer { .. } => "timer",
57        crate::QueuedWorkPayload::SessionCommand { command } => command.kind(),
58    }
59}
60
61fn queued_work_batch_ids(claim: &crate::QueuedWorkClaim) -> Vec<String> {
62    claim
63        .batches
64        .iter()
65        .map(|batch| batch.batch_id.clone())
66        .collect()
67}
68
69fn turn_phase_id(parent_turn_id: &str, phase: &str) -> String {
70    format!("{parent_turn_id}:{phase}")
71}
72
73fn scoped_child_turn_controller<'run>(
74    scoped_effect_controller: &'run ScopedEffectController<'_>,
75    session_id: &str,
76    turn_id: &str,
77) -> Result<ScopedEffectController<'run>, RuntimeError> {
78    ScopedEffectController::borrowed(
79        scoped_effect_controller.controller(),
80        EffectScope::turn(session_id, turn_id),
81    )
82}
83
84pub(in crate::runtime) fn queued_work_trace_payload(
85    boundary: crate::QueuedWorkClaimBoundary,
86    claim: &crate::QueuedWorkClaim,
87    causes: &[crate::TurnCause],
88) -> serde_json::Value {
89    serde_json::json!({
90        "boundary": boundary,
91        "claim_id": claim.claim_id,
92        "owner_id": claim.owner_id,
93        "batch_ids": queued_work_batch_ids(claim),
94        "payload_types": claim.batches.iter()
95            .flat_map(|batch| batch.items.iter())
96            .map(|item| queued_work_payload_type(&item.payload))
97            .collect::<Vec<_>>(),
98        "causes": causes,
99    })
100}
101
102pub(in crate::runtime) fn queued_work_completion_trace_payload(
103    completions: &[crate::QueuedWorkCompletion],
104) -> serde_json::Value {
105    serde_json::json!({
106        "claims": completions.iter().map(|completion| {
107            serde_json::json!({
108                "session_id": completion.session_id,
109                "claim_id": completion.claim_id,
110                "batch_ids": completion.batch_ids,
111            })
112        }).collect::<Vec<_>>(),
113    })
114}
115
116async fn emit_queued_work_started_to_sink(
117    events: &dyn TurnActivitySink,
118    boundary: crate::QueuedWorkClaimBoundary,
119    claim: &crate::QueuedWorkClaim,
120    causes: Vec<crate::TurnCause>,
121) {
122    emit_turn_activity_to_sink(
123        events,
124        TurnActivity::independent(TurnEvent::QueuedWorkStarted {
125            boundary,
126            batch_ids: queued_work_batch_ids(claim),
127            causes,
128        }),
129    )
130    .await;
131}
132
133pub(in crate::runtime) async fn send_queued_work_started_event(
134    event_tx: &mpsc::Sender<RuntimeStreamEvent>,
135    boundary: crate::QueuedWorkClaimBoundary,
136    claim: &crate::QueuedWorkClaim,
137    causes: Vec<crate::TurnCause>,
138) {
139    send_turn_activity(
140        event_tx,
141        TurnActivityId::fresh(),
142        TurnEvent::QueuedWorkStarted {
143            boundary,
144            batch_ids: queued_work_batch_ids(claim),
145            causes,
146        },
147    )
148    .await;
149}
150
151struct TurnFinishInput {
152    turn_pipeline: TurnBoundary,
153    assembler: TurnAssembler,
154    new_messages: crate::MessageSequence,
155    policy: RuntimeSessionPolicy,
156    turn_index: usize,
157    queued_work_completions: Vec<crate::QueuedWorkCompletion>,
158    tool_host_events: Vec<crate::tool_dispatch::ToolHostEventEffectOutcome>,
159    trace_turn_id: String,
160}
161
162impl LashRuntime {
163    fn max_context_tokens(&self) -> usize {
164        self.state.effective_policy().context_window_tokens()
165    }
166    #[doc(hidden)]
167    pub fn set_turn_phase_probe(&mut self, probe: Arc<dyn RuntimeTurnPhaseProbe>) {
168        self.turn_phase_probe = Some(probe);
169    }
170
171    fn mark_phase_begin(&self, phase: RuntimeTurnPhase) {
172        if let Some(probe) = self.turn_phase_probe.as_ref() {
173            probe.begin(phase);
174        }
175    }
176
177    fn mark_phase_end(&self, phase: RuntimeTurnPhase) {
178        if let Some(probe) = self.turn_phase_probe.as_ref() {
179            probe.end(phase);
180        }
181    }
182
183    async fn finish_turn(
184        &mut self,
185        finish: TurnFinishInput,
186        events: &dyn EventSink,
187        scoped_effect_controller: &ScopedEffectController<'_>,
188        cancel_state: &CancellationToken,
189    ) -> Result<AssembledTurn, RuntimeError> {
190        let TurnFinishInput {
191            mut turn_pipeline,
192            assembler,
193            new_messages,
194            policy,
195            turn_index,
196            queued_work_completions,
197            tool_host_events,
198            trace_turn_id,
199        } = finish;
200        self.policy = self.state.effective_policy().clone();
201        turn_pipeline.state_mut().policy = self.policy.clone();
202        turn_pipeline.state_mut().turn_index = turn_index;
203
204        let mut turn_usage_delta = {
205            let mut ledger = self.shared_token_ledger.lock().expect("token ledger lock");
206            std::mem::take(&mut *ledger)
207        };
208        if assembler.token_usage.total() > 0 || assembler.token_usage.cached_input_tokens > 0 {
209            turn_usage_delta.push(TokenLedgerEntry {
210                source: "turn".to_string(),
211                model: policy.model.id.clone(),
212                usage: assembler.token_usage.clone(),
213            });
214        }
215        let turn_usage_delta = merge_usage_delta_entries(turn_usage_delta);
216
217        turn_pipeline.finalize_turn_read_state(
218            new_messages,
219            &assembler.tool_calls,
220            cancel_state.is_cancelled(),
221        );
222        turn_pipeline.append_tool_host_events(&tool_host_events);
223        if assembler.token_usage.total() > 0 || assembler.token_usage.cached_input_tokens > 0 {
224            turn_pipeline.state_mut().token_usage = assembler.token_usage.clone();
225        }
226
227        let last_prompt_usage = assembler
228            .last_llm_usage()
229            .and_then(|usage| normalize_prompt_usage(policy.provider(), usage));
230        turn_pipeline.state_mut().last_prompt_usage = last_prompt_usage;
231        let assembled_state = turn_pipeline.export_state_for_assembly();
232        let assembled = assembler.finish(
233            assembled_state,
234            cancel_state.is_cancelled(),
235            None,
236            &self.host.core.control.termination,
237        );
238
239        let Some(session) = self.session.as_ref() else {
240            self.state.apply_snapshot(&assembled.state);
241            self.emit_completed_turn_trace(&assembled.state, &assembled.outcome, &trace_turn_id);
242            return Ok(assembled);
243        };
244
245        let plugins = Arc::clone(session.plugins());
246        let manager = match self.runtime_session_services_for_turn(None) {
247            Ok(manager) => manager,
248            Err(err) => {
249                return Err(RuntimeError::new(
250                    RuntimeErrorCode::PluginSessionManager,
251                    err.to_string(),
252                ));
253            }
254        };
255
256        self.mark_phase_begin(RuntimeTurnPhase::FinalizeTurn);
257        let finalized = match plugins
258            .finalize_turn_with_phase_probe(
259                assembled,
260                manager.state_service(),
261                manager.lifecycle_service(),
262                manager.graph_service(),
263                self.turn_phase_probe.clone(),
264            )
265            .await
266        {
267            Ok(finalized) => finalized,
268            Err(err) => {
269                self.mark_phase_end(RuntimeTurnPhase::FinalizeTurn);
270                return Err(RuntimeError::new(
271                    RuntimeErrorCode::PluginFinalizeTurn,
272                    err.to_string(),
273                ));
274            }
275        };
276        self.mark_phase_end(RuntimeTurnPhase::FinalizeTurn);
277
278        let mut returned_turn = finalized.turn;
279        self.mark_phase_begin(RuntimeTurnPhase::PersistTurn);
280        self.mark_phase_begin(RuntimeTurnPhase::FinalCommit);
281        let queued_work_completion_trace = queued_work_completions.clone();
282        if let Err(err) = turn_pipeline
283            .final_commit(
284                &mut returned_turn,
285                self.session.as_mut(),
286                &turn_usage_delta,
287                Some(&trace_turn_id),
288                queued_work_completions,
289            )
290            .await
291        {
292            self.mark_phase_end(RuntimeTurnPhase::FinalCommit);
293            self.mark_phase_end(RuntimeTurnPhase::PersistTurn);
294            return Err(err);
295        }
296        self.mark_phase_end(RuntimeTurnPhase::FinalCommit);
297
298        emit_session_events_to_sink(events, finalized.events).await;
299        self.state = turn_pipeline.into_final_state();
300        if matches!(returned_turn.outcome, TurnOutcome::AgentFrameSwitch { .. })
301            && let Some(session) = self.session.as_mut()
302        {
303            let protocol_session = Arc::clone(session.plugins().protocol_session());
304            let session_id = self.state.session_id.clone();
305            protocol_session
306                .restore_session(
307                    crate::plugin::ProtocolSessionContext::new(session, &session_id),
308                    &self.state,
309                )
310                .await
311                .map_err(|err| {
312                    RuntimeError::new(
313                        RuntimeErrorCode::Other("protocol_restore_session".to_string()),
314                        err.to_string(),
315                    )
316                })?;
317        }
318        if !queued_work_completion_trace.is_empty() {
319            crate::trace::emit_trace(
320                &self.host.core.tracing.trace_sink,
321                &self.host.core.tracing.trace_context,
322                lash_trace::TraceContext::default()
323                    .for_session(returned_turn.state.session_id.clone())
324                    .for_turn_index(returned_turn.state.turn_index)
325                    .for_turn(trace_turn_id.clone()),
326                lash_trace::TraceEvent::Custom {
327                    name: "queued_work.completed".to_string(),
328                    payload: queued_work_completion_trace_payload(&queued_work_completion_trace),
329                },
330            );
331        }
332        self.mark_phase_begin(RuntimeTurnPhase::PostPersistHooks);
333        self.emit_turn_persisted_event(&returned_turn, scoped_effect_controller, &trace_turn_id)
334            .await?;
335        self.mark_phase_end(RuntimeTurnPhase::PostPersistHooks);
336        self.mark_phase_end(RuntimeTurnPhase::PersistTurn);
337
338        self.emit_completed_turn_trace(
339            &returned_turn.state,
340            &returned_turn.outcome,
341            &trace_turn_id,
342        );
343        Ok(returned_turn)
344    }
345
346    fn emit_completed_turn_trace(
347        &self,
348        state: &SessionSnapshot,
349        outcome: &TurnOutcome,
350        trace_turn_id: &str,
351    ) {
352        if self.host.core.tracing.trace_sink.is_none() {
353            return;
354        }
355
356        let (status, done_reason, agent_frame_switch) = trace_fields_from_outcome(outcome);
357        crate::trace::emit_trace(
358            &self.host.core.tracing.trace_sink,
359            &self.host.core.tracing.trace_context,
360            lash_trace::TraceContext::default()
361                .for_session(state.session_id.clone())
362                .for_turn_index(state.turn_index)
363                .for_turn(trace_turn_id.to_string()),
364            lash_trace::TraceEvent::TurnCompleted {
365                status: status.to_string(),
366                done_reason: done_reason.to_string(),
367                agent_frame_switch,
368            },
369        );
370    }
371
372    async fn emit_turn_persisted_event(
373        &self,
374        returned_turn: &AssembledTurn,
375        scoped_effect_controller: &ScopedEffectController<'_>,
376        trace_turn_id: &str,
377    ) -> Result<(), RuntimeError> {
378        let Some(session) = self.session.as_ref() else {
379            return Ok(());
380        };
381        let Ok(manager) = self.runtime_session_services() else {
382            return Ok(());
383        };
384        let phase_turn_id = turn_phase_id(trace_turn_id, "turn-persisted");
385        let phase_controller = scoped_child_turn_controller(
386            scoped_effect_controller,
387            &self.state.session_id,
388            &phase_turn_id,
389        )?;
390        let direct_completions = manager.direct_completion_client(
391            RuntimeEffectControllerHandle::borrowed(phase_controller),
392            Some(phase_turn_id),
393        );
394
395        session
396            .plugins()
397            .emit_runtime_event_with_phase_probe(
398                crate::PluginLifecycleEvent::TurnPersisted(Box::new(
399                    crate::SessionStateChangedContext {
400                        session_id: self.state.session_id.clone(),
401                        state: crate::SessionReadView::from_snapshot(&returned_turn.state),
402                        sessions: manager.state_service(),
403                        session_graph: manager.graph_service(),
404                        direct_completions,
405                    },
406                )),
407                self.turn_phase_probe.clone(),
408            )
409            .await;
410        Ok(())
411    }
412
413    /// Run a single turn and stream events to the host sink.
414    pub async fn stream_turn(
415        &mut self,
416        input: TurnInput,
417        opts: TurnOptions<'_>,
418    ) -> Result<AssembledTurn, RuntimeError> {
419        let turn_id = input
420            .trace_turn_id
421            .clone()
422            .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
423        let cancel = opts.cancel.clone();
424        let effect_host = Arc::clone(&self.host.core.control.effect_host);
425        let scoped_effect_controller = match opts.scoped_effect_controller() {
426            Some(scope) => scope,
427            None => effect_host.scoped(EffectScope::turn(&self.state.session_id, &turn_id))?,
428        };
429        self.stream_turn_with_scoped_effect_controller_inner(
430            input,
431            opts.events_or_noop(),
432            opts.turn_events_or_noop(),
433            scoped_effect_controller,
434            cancel,
435            None,
436        )
437        .await
438    }
439
440    pub async fn stream_next_queued_work(
441        &mut self,
442        opts: TurnOptions<'_>,
443    ) -> Result<Option<AssembledTurn>, RuntimeError> {
444        self.stream_queued_work(opts, None).await
445    }
446
447    pub async fn stream_selected_queued_work(
448        &mut self,
449        opts: TurnOptions<'_>,
450        batch_ids: &[String],
451    ) -> Result<Option<AssembledTurn>, RuntimeError> {
452        self.stream_queued_work(opts, Some(batch_ids)).await
453    }
454
455    async fn stream_queued_work(
456        &mut self,
457        opts: TurnOptions<'_>,
458        selected_batch_ids: Option<&[String]>,
459    ) -> Result<Option<AssembledTurn>, RuntimeError> {
460        if self.drain_next_session_command().await?.is_some() {
461            return Ok(None);
462        }
463        let Some(store) = self
464            .session
465            .as_ref()
466            .and_then(|session| session.history_store())
467        else {
468            return Ok(None);
469        };
470        let claim = if let Some(batch_ids) = selected_batch_ids {
471            store
472                .claim_ready_queued_work_by_batch_ids(
473                    &self.state.session_id,
474                    &self.runtime_scope_id,
475                    crate::QueuedWorkClaimBoundary::Idle,
476                    crate::QUEUED_WORK_CLAIM_TTL_MS,
477                    batch_ids,
478                )
479                .await
480        } else {
481            store
482                .claim_ready_queued_work(
483                    &self.state.session_id,
484                    &self.runtime_scope_id,
485                    crate::QueuedWorkClaimBoundary::Idle,
486                    crate::QUEUED_WORK_CLAIM_TTL_MS,
487                    64,
488                )
489                .await
490        }
491        .map_err(|err| RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string()))?;
492        let Some(claim) = claim else {
493            return Ok(None);
494        };
495        let mut work = claim.materialize_for_turn();
496        let turn_id = work
497            .input
498            .trace_turn_id
499            .clone()
500            .or_else(|| opts.effect_scope_id().map(ToOwned::to_owned))
501            .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
502        work.input.trace_turn_id = Some(turn_id.clone());
503        let causes = work.turn_causes.clone();
504        emit_queued_work_started_to_sink(
505            opts.turn_events_or_noop(),
506            crate::QueuedWorkClaimBoundary::Idle,
507            &claim,
508            causes.clone(),
509        )
510        .await;
511        crate::trace::emit_trace(
512            &self.host.core.tracing.trace_sink,
513            &self.host.core.tracing.trace_context,
514            lash_trace::TraceContext::default()
515                .for_session(self.state.session_id.clone())
516                .for_turn_index(self.state.turn_index + 1)
517                .for_turn(turn_id.clone()),
518            lash_trace::TraceEvent::Custom {
519                name: "queued_work.claimed".to_string(),
520                payload: queued_work_trace_payload(
521                    crate::QueuedWorkClaimBoundary::Idle,
522                    &claim,
523                    &causes,
524                ),
525            },
526        );
527        let cancel = opts.cancel.clone();
528        let effect_host = Arc::clone(&self.host.core.control.effect_host);
529        let scoped_effect_controller = match opts.scoped_effect_controller() {
530            Some(scope) => scope,
531            None => effect_host.scoped(EffectScope::queue_drain(
532                &self.state.session_id,
533                &claim.claim_id,
534            ))?,
535        };
536        self.stream_turn_with_scoped_effect_controller_inner(
537            work.input,
538            opts.events_or_noop(),
539            opts.turn_events_or_noop(),
540            scoped_effect_controller,
541            cancel,
542            Some(claim),
543        )
544        .await
545        .map(Some)
546    }
547
548    /// Enforce the durable-first wiring invariant at a turn-scope boundary: when
549    /// the host wired a durable effect host, every store reachable from this
550    /// scope must also be durable. A durable host running against any ephemeral
551    /// store fails loudly here rather than silently degrading.
552    ///
553    /// Inline controllers (the default tier) impose no requirement, so
554    /// inline/in-memory hosts pass unchanged.
555    fn ensure_durable_store_facets_for_scope(
556        &self,
557        scoped_effect_controller: &ScopedEffectController<'_>,
558    ) -> Result<(), RuntimeError> {
559        if scoped_effect_controller.controller().durability_tier() != crate::DurabilityTier::Durable
560        {
561            return Ok(());
562        }
563        if self
564            .host
565            .core
566            .durability
567            .attachment_store
568            .persistence()
569            .durability_tier()
570            != crate::DurabilityTier::Durable
571        {
572            return Err(RuntimeError::durable_store_required(
573                crate::DurableStoreFacet::AttachmentStore,
574            ));
575        }
576        if self
577            .host
578            .core
579            .durability
580            .lashlang_artifact_store
581            .durability_tier()
582            != crate::DurabilityTier::Durable
583        {
584            return Err(RuntimeError::durable_store_required(
585                crate::DurableStoreFacet::ArtifactStore,
586            ));
587        }
588        if let Some(store) = self
589            .session
590            .as_ref()
591            .and_then(|session| session.history_store())
592            && store.durability_tier() != crate::DurabilityTier::Durable
593        {
594            return Err(RuntimeError::durable_store_required(
595                crate::DurableStoreFacet::SessionStore,
596            ));
597        }
598        if let Some(process_registry) = self.host.process_registry.as_ref()
599            && process_registry.durability_tier() != crate::DurabilityTier::Durable
600        {
601            return Err(RuntimeError::durable_store_required(
602                crate::DurableStoreFacet::ProcessRegistry,
603            ));
604        }
605        Ok(())
606    }
607
608    async fn stream_turn_with_scoped_effect_controller_inner(
609        &mut self,
610        mut input: TurnInput,
611        events: &dyn EventSink,
612        turn_events: &dyn TurnActivitySink,
613        scoped_effect_controller: ScopedEffectController<'_>,
614        cancel: CancellationToken,
615        queued_claim: Option<crate::QueuedWorkClaim>,
616    ) -> Result<AssembledTurn, RuntimeError> {
617        if queued_claim.is_none() {
618            while self.drain_next_session_command().await?.is_some() {}
619        }
620        if let Some(input_turn_id) = input.trace_turn_id.as_deref()
621            && scoped_effect_controller
622                .effect_scope()
623                .validates_turn_trace_id()
624            && input_turn_id != scoped_effect_controller.scope_id()
625        {
626            return Err(RuntimeError::new(
627                RuntimeErrorCode::EffectScopeTurnIdMismatch,
628                format!(
629                    "input trace_turn_id `{input_turn_id}` does not match effect scope id `{}`",
630                    scoped_effect_controller.scope_id()
631                ),
632            ));
633        }
634        self.ensure_durable_store_facets_for_scope(&scoped_effect_controller)?;
635        input
636            .trace_turn_id
637            .get_or_insert_with(|| scoped_effect_controller.scope_id().to_string());
638        self.stream_turn_inner(
639            input.clone(),
640            events,
641            turn_events,
642            scoped_effect_controller,
643            cancel.clone(),
644            queued_claim,
645        )
646        .await
647    }
648
649    /// Stream one logical host turn, following foreground AgentFrame switches
650    /// until a terminal outcome is reached.
651    ///
652    /// RLM `continue_as` creates a new frame in the same session. Hosts that
653    /// only care about the benchmark/app answer should not need to special-case
654    /// that intermediate outcome; this helper keeps driving the same session
655    /// through each frame's task with the normal runtime turn guards.
656    pub async fn stream_turn_with_agent_frames(
657        &mut self,
658        input: TurnInput,
659        opts: TurnOptions<'_>,
660    ) -> Result<AgentFrameRun, RuntimeError> {
661        let follow_trace_turn_id = input
662            .trace_turn_id
663            .clone()
664            .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
665        let cancel = opts.cancel.clone();
666        let effect_host = Arc::clone(&self.host.core.control.effect_host);
667        let scoped_effect_controller = match opts.scoped_effect_controller() {
668            Some(scope) => scope,
669            None => effect_host.scoped(EffectScope::turn(
670                &self.state.session_id,
671                &follow_trace_turn_id,
672            ))?,
673        };
674        self.stream_turn_with_agent_frames_inner(
675            input,
676            opts.events_or_noop(),
677            opts.turn_events_or_noop(),
678            scoped_effect_controller,
679            cancel,
680        )
681        .await
682    }
683
684    async fn stream_turn_with_agent_frames_inner(
685        &mut self,
686        mut input: TurnInput,
687        events: &dyn EventSink,
688        turn_events: &dyn TurnActivitySink,
689        scoped_effect_controller: ScopedEffectController<'_>,
690        cancel: CancellationToken,
691    ) -> Result<AgentFrameRun, RuntimeError> {
692        if let Some(input_turn_id) = input.trace_turn_id.as_deref()
693            && scoped_effect_controller
694                .effect_scope()
695                .validates_turn_trace_id()
696            && input_turn_id != scoped_effect_controller.scope_id()
697        {
698            return Err(RuntimeError::new(
699                RuntimeErrorCode::EffectScopeTurnIdMismatch,
700                format!(
701                    "input trace_turn_id `{input_turn_id}` does not match effect scope id `{}`",
702                    scoped_effect_controller.scope_id()
703                ),
704            ));
705        }
706        let follow_protocol_turn_options = input.protocol_turn_options.clone();
707        let follow_turn_context = input.turn_context.clone();
708        let follow_trace_turn_id = input
709            .trace_turn_id
710            .clone()
711            .unwrap_or_else(|| scoped_effect_controller.scope_id().to_string());
712        input
713            .trace_turn_id
714            .get_or_insert(follow_trace_turn_id.clone());
715        let mut turns = Vec::new();
716        loop {
717            let turn_trace_turn_id = agent_frame_follow_turn_id(&follow_trace_turn_id, turns.len());
718            input.trace_turn_id = Some(turn_trace_turn_id.clone());
719            let turn_effect_controller = if turns.is_empty() {
720                scoped_effect_controller.clone()
721            } else {
722                ScopedEffectController::borrowed(
723                    scoped_effect_controller.controller(),
724                    EffectScope::turn(&self.state.session_id, &turn_trace_turn_id),
725                )?
726            };
727            let turn = self
728                .stream_turn_with_scoped_effect_controller_inner(
729                    input,
730                    events,
731                    turn_events,
732                    turn_effect_controller,
733                    cancel.clone(),
734                    None,
735                )
736                .await?;
737            let switched_frame_id = match &turn.outcome {
738                TurnOutcome::AgentFrameSwitch { frame_id } => Some(frame_id.clone()),
739                _ => None,
740            };
741            let next_task = switched_frame_id
742                .as_ref()
743                .and_then(|frame_id| frame_switch_task(&turn, frame_id));
744            turns.push(turn);
745
746            let Some(_frame_id) = switched_frame_id else {
747                return Ok(AgentFrameRun { turns });
748            };
749
750            let task = next_task.ok_or_else(|| {
751                RuntimeError::new(
752                    RuntimeErrorCode::Other("agent_frame_missing_task".to_string()),
753                    "agent frame switch did not provide a task",
754                )
755            })?;
756            input = turn_input_from_text(task);
757            input.protocol_turn_options = follow_protocol_turn_options.clone();
758            input.turn_context = follow_turn_context.clone();
759        }
760    }
761
762    async fn stream_turn_inner(
763        &mut self,
764        mut input: TurnInput,
765        events: &dyn EventSink,
766        turn_events: &dyn TurnActivitySink,
767        scoped_effect_controller: ScopedEffectController<'_>,
768        cancel: CancellationToken,
769        queued_claim: Option<crate::QueuedWorkClaim>,
770    ) -> Result<AssembledTurn, RuntimeError> {
771        self.refresh_session_graph_from_store()
772            .await
773            .map_err(session_head_refresh_error)?;
774        let input_trace_turn_id = input.trace_turn_id.clone();
775        let queued_turn_work = queued_claim
776            .as_ref()
777            .map(crate::QueuedWorkClaim::materialize_for_turn);
778        if let Some(work) = queued_turn_work.as_ref()
779            && input.items.is_empty()
780            && input.image_blobs.is_empty()
781        {
782            input = work.input.clone();
783            if input.trace_turn_id.is_none() {
784                input.trace_turn_id = input_trace_turn_id;
785            }
786        }
787        if self
788            .session
789            .as_ref()
790            .and_then(|session| session.history_store())
791            .is_some()
792        {
793            ensure_durable_effect_input(&input)?;
794        }
795        if let Some(extension) = &input.protocol_extension
796            && let Some(session) = self.session.as_ref()
797        {
798            let protocol_session = std::sync::Arc::clone(session.plugins().protocol_session());
799            protocol_session
800                .validate_turn_extension(extension)
801                .await
802                .map_err(|err| {
803                    RuntimeError::new(RuntimeErrorCode::ProtocolTurnExtension, err.to_string())
804                })?;
805        }
806        let previous_prompt_usage = self.state.last_prompt_usage.clone();
807        let normalized = match self.normalize_input_items(&input.items, &input.image_blobs) {
808            Ok(items) => items,
809            Err(e) => {
810                self.state.last_prompt_usage = None;
811                let mut assembler = TurnAssembler::default();
812                let error_event = SessionEvent::Error {
813                    message: e.clone(),
814                    envelope: Some(crate::session_model::ErrorEnvelope {
815                        kind: "input_validation".to_string(),
816                        code: Some("invalid_turn_input".to_string()),
817                        terminal_reason: None,
818                        user_message: e.clone(),
819                        raw: None,
820                    }),
821                };
822                assembler.push(&error_event);
823                emit_turn_activity_to_sink(
824                    turn_events,
825                    TurnActivity::independent(TurnEvent::Error { message: e }),
826                )
827                .await;
828                emit_session_event_to_sink(events, error_event).await;
829                let outcome_event = SessionEvent::TurnOutcome {
830                    outcome: TurnOutcome::Stopped(TurnStop::InvalidInput),
831                };
832                assembler.push(&outcome_event);
833                emit_session_event_to_sink(events, outcome_event).await;
834                assembler.push(&SessionEvent::Done);
835                emit_session_event_to_sink(events, SessionEvent::Done).await;
836                return Ok(assembler.finish(
837                    self.state.to_snapshot(),
838                    false,
839                    None,
840                    &self.host.core.control.termination,
841                ));
842            }
843        };
844        let turn_index = self.state.turn_index + 1;
845        let trace_turn_id = input
846            .trace_turn_id
847            .clone()
848            .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
849        if self.host.core.tracing.trace_sink.is_some() {
850            let mut trace_metadata = std::collections::BTreeMap::new();
851            trace_metadata.insert(
852                "input_item_count".to_string(),
853                serde_json::json!(normalized.len()),
854            );
855            crate::trace::emit_trace(
856                &self.host.core.tracing.trace_sink,
857                &self.host.core.tracing.trace_context,
858                lash_trace::TraceContext::default()
859                    .for_session(self.state.session_id.clone())
860                    .for_turn_index(turn_index)
861                    .for_turn(trace_turn_id.clone()),
862                lash_trace::TraceEvent::TurnStarted {
863                    metadata: trace_metadata,
864                },
865            );
866        }
867
868        let base_read_model = self.state.read_model();
869        let base_messages = base_read_model.messages;
870        let base_render_cache = base_read_model.prompt_render_cache;
871        let mut turn_delta = Vec::new();
872        let initial_turn_causes = queued_turn_work
873            .as_ref()
874            .map(|work| work.turn_causes.clone())
875            .unwrap_or_default();
876        turn_delta.extend(
877            initial_turn_causes
878                .iter()
879                .map(crate::TurnCause::to_event_message),
880        );
881
882        let user_id = fresh_message_id();
883        let mut user_parts: Vec<Part> = Vec::new();
884        for item in normalized {
885            match item {
886                NormalizedItem::Text(text) => {
887                    if text.is_empty() {
888                        continue;
889                    }
890                    user_parts.push(Part {
891                        id: format!("{}.p{}", user_id, user_parts.len()),
892                        kind: PartKind::Text,
893                        content: text,
894                        attachment: None,
895                        tool_call_id: None,
896                        tool_name: None,
897                        tool_replay: None,
898                        prune_state: PruneState::Intact,
899                        reasoning_meta: None,
900                        response_meta: None,
901                    });
902                }
903                NormalizedItem::Image(reference) => {
904                    user_parts.push(Part {
905                        id: format!("{}.p{}", user_id, user_parts.len()),
906                        kind: PartKind::Image,
907                        content: String::new(),
908                        attachment: Some(crate::session_model::message::PartAttachment {
909                            reference,
910                        }),
911                        tool_call_id: None,
912                        tool_name: None,
913                        tool_replay: None,
914                        prune_state: PruneState::Intact,
915                        reasoning_meta: None,
916                        response_meta: None,
917                    });
918                }
919            }
920        }
921        if user_parts.is_empty() && initial_turn_causes.is_empty() {
922            user_parts.push(Part {
923                id: format!("{}.p0", user_id),
924                kind: PartKind::Text,
925                content: String::new(),
926                attachment: None,
927                tool_call_id: None,
928                tool_name: None,
929                tool_replay: None,
930                prune_state: PruneState::Intact,
931                reasoning_meta: None,
932                response_meta: None,
933            });
934        }
935        if !user_parts.is_empty() {
936            reassign_part_ids(&user_id, &mut user_parts);
937            turn_delta.push(Message {
938                id: user_id.clone(),
939                role: MessageRole::User,
940                parts: shared_parts(user_parts),
941                origin: None,
942            });
943        }
944
945        let manager = self
946            .runtime_session_services_for_turn(None)
947            .map_err(|err| {
948                RuntimeError::new(RuntimeErrorCode::PluginSessionManager, err.to_string())
949            })?;
950        let plugin_session = self
951            .session
952            .as_ref()
953            .map(|s| Arc::clone(s.plugins()))
954            .ok_or_else(|| {
955                RuntimeError::new(
956                    RuntimeErrorCode::ContextPrepareTurn,
957                    "runtime session not available",
958                )
959            })?;
960        let prepare_phase_turn_id = turn_phase_id(&trace_turn_id, "prepare-turn");
961        let prepare_phase_controller = scoped_child_turn_controller(
962            &scoped_effect_controller,
963            &self.state.session_id,
964            &prepare_phase_turn_id,
965        )?;
966        let turn_ctx = crate::TurnTransformContext {
967            session_id: self.state.session_id.clone(),
968            state: self.read_view(),
969            prompt_usage: previous_prompt_usage.clone(),
970            max_context_tokens: Some(LashRuntime::max_context_tokens(self)),
971            sessions: manager.state_service(),
972            session_lifecycle: manager.lifecycle_service(),
973            session_graph: manager.graph_service(),
974            scoped_effect_controller: scoped_effect_controller.clone(),
975            direct_completions: manager.direct_completion_client(
976                RuntimeEffectControllerHandle::borrowed(prepare_phase_controller),
977                Some(prepare_phase_turn_id),
978            ),
979        };
980        self.mark_phase_begin(RuntimeTurnPhase::ContextTransform);
981        let prepared_context = plugin_session
982            .prepare_turn_context(
983                &turn_ctx,
984                crate::session_model::context::PreparedContext {
985                    messages: crate::MessageSequence::from_base_and_delta(
986                        base_messages,
987                        turn_delta,
988                    )
989                    .with_base_render_cache(base_render_cache),
990                    ..Default::default()
991                },
992                self.turn_phase_probe.clone(),
993            )
994            .await
995            .map_err(|err| {
996                RuntimeError::new(RuntimeErrorCode::ContextPrepareTurn, err.to_string())
997            })?;
998        self.mark_phase_end(RuntimeTurnPhase::ContextTransform);
999        // Release the read-view's graph clone before the rest of the turn
1000        // runs. Keeping it alive into `stream_prepared_turn` forces the
1001        // post-turn `append_active_read_delta` to deep-clone the session
1002        // graph (Arc::make_mut with refcount > 1).
1003        drop(turn_ctx);
1004        let messages = prepared_context.messages;
1005        if let Some(session) = self.session.as_mut() {
1006            session
1007                .set_context_surface(
1008                    prepared_context.tool_providers,
1009                    prepared_context.prompt_contributions,
1010                    prepared_context.include_base_tools,
1011                )
1012                .map_err(|err| {
1013                    RuntimeError::new(
1014                        RuntimeErrorCode::Other("session_tool_registry".to_string()),
1015                        err.to_string(),
1016                    )
1017                })?;
1018        }
1019
1020        self.state.last_prompt_usage = None;
1021
1022        self.stream_prepared_turn(
1023            messages,
1024            previous_prompt_usage,
1025            input.protocol_turn_options.clone(),
1026            input.protocol_extension.clone(),
1027            input.turn_context.clone(),
1028            initial_turn_causes,
1029            trace_turn_id,
1030            turn_index,
1031            events,
1032            turn_events,
1033            scoped_effect_controller,
1034            cancel,
1035            queued_claim,
1036        )
1037        .await
1038    }
1039
1040    /// Run a single turn and return only the assembled terminal result.
1041    pub async fn run_turn_assembled(
1042        &mut self,
1043        input: TurnInput,
1044        cancel: CancellationToken,
1045    ) -> Result<AssembledTurn, RuntimeError> {
1046        self.stream_turn(input, TurnOptions::new(cancel)).await
1047    }
1048
1049    /// Run a turn using host-prepared message history.
1050    #[allow(clippy::too_many_arguments)]
1051    pub async fn stream_prepared_turn(
1052        &mut self,
1053        messages: crate::MessageSequence,
1054        _previous_prompt_usage: Option<PromptUsage>,
1055        protocol_turn_options: Option<crate::ProtocolTurnOptions>,
1056        protocol_extension: Option<crate::ProtocolTurnExtensionHandle>,
1057        turn_context: crate::TurnContext,
1058        initial_turn_causes: Vec<crate::TurnCause>,
1059        trace_turn_id: String,
1060        turn_index: usize,
1061        events: &dyn EventSink,
1062        turn_events: &dyn TurnActivitySink,
1063        scoped_effect_controller: ScopedEffectController<'_>,
1064        cancel: CancellationToken,
1065        initial_queue_claim: Option<crate::QueuedWorkClaim>,
1066    ) -> Result<AssembledTurn, RuntimeError> {
1067        let (event_tx, mut event_rx) = mpsc::channel::<RuntimeStreamEvent>(100);
1068        let child_usage_event_relay = ChildUsageEventRelay::new(event_tx.clone());
1069        let mut turn_policy = self.state.effective_policy().clone();
1070        let turn_provider_override = turn_context.provider().cloned();
1071        if let Some(provider) = turn_provider_override.as_ref() {
1072            turn_policy.provider_id = provider.kind().to_string();
1073        }
1074        if let Some(model) = turn_context.model_spec() {
1075            turn_policy.model = model.clone();
1076        }
1077        let session_protocol_turn_options = self.state.effective_protocol_turn_options().clone();
1078        let effective_protocol_turn_options = protocol_turn_options
1079            .clone()
1080            .map(|options| session_protocol_turn_options.merged_with_override(&options))
1081            .unwrap_or(session_protocol_turn_options);
1082        let manager = self
1083            .runtime_session_services_for_turn(Some(child_usage_event_relay.clone()))
1084            .map_err(|err| {
1085                RuntimeError::new(RuntimeErrorCode::PluginSessionManager, err.to_string())
1086            })?;
1087        let plugins = {
1088            let session = self
1089                .session
1090                .as_ref()
1091                .expect("lash runtime session must be available");
1092            Arc::clone(session.plugins())
1093        };
1094        let mut assembler = TurnAssembler::new();
1095        self.mark_phase_begin(RuntimeTurnPhase::BeforeTurnHooks);
1096        // Block-scope the pinned future so it (and its captured
1097        // `SessionReadView` clone of the session graph) drops before the
1098        // post-turn `append_active_read_delta` mutation. Keeping it alive
1099        // across the turn forces `Arc::make_mut` to deep-clone
1100        // `SessionGraphData`.
1101        let prepared = {
1102            let prepare_turn = plugins.prepare_turn_with_phase_probe(
1103                PrepareTurnRequest {
1104                    session_id: self.state.session_id.clone(),
1105                    state: crate::SessionReadView::from_runtime_state(
1106                        &self.state,
1107                        turn_policy.clone(),
1108                        effective_protocol_turn_options.clone(),
1109                    ),
1110                    messages,
1111                    sessions: manager.state_service(),
1112                    session_lifecycle: manager.lifecycle_service(),
1113                    session_graph: manager.graph_service(),
1114                    turn_context: turn_context.clone(),
1115                },
1116                self.turn_phase_probe.clone(),
1117            );
1118            tokio::pin!(prepare_turn);
1119
1120            loop {
1121                tokio::select! {
1122                    prepared = &mut prepare_turn => {
1123                        let prepared = prepared.map_err(|err| {
1124                            RuntimeError::new(RuntimeErrorCode::PluginPrepareTurn, err.to_string())
1125                        })?;
1126                        self.mark_phase_end(RuntimeTurnPhase::BeforeTurnHooks);
1127                        break prepared;
1128                    }
1129                    maybe_event = event_rx.recv() => {
1130                        if let Some(event) = maybe_event {
1131                            emit_runtime_stream_event_to_sinks(
1132                                events,
1133                                turn_events,
1134                                event,
1135                                &mut assembler,
1136                            )
1137                            .await;
1138                        }
1139                    }
1140                }
1141            }
1142        };
1143        for event in &prepared.events {
1144            assembler.push(event);
1145        }
1146        emit_session_events_to_sink(events, prepared.events).await;
1147        if let Some(abort) = prepared.abort {
1148            drop(event_tx);
1149
1150            let mut turn_pipeline = TurnBoundary::from_state(self.state.clone());
1151            turn_pipeline.apply_prepared_messages(&prepared.messages);
1152            let state = turn_pipeline.into_final_state();
1153            let issue = TurnIssue {
1154                kind: "plugin".to_string(),
1155                code: Some(abort.code),
1156                terminal_reason: None,
1157                message: abort.message.clone(),
1158                raw: None,
1159            };
1160            let error_event = SessionEvent::Error {
1161                message: abort.message,
1162                envelope: Some(crate::session_model::ErrorEnvelope {
1163                    kind: "plugin".to_string(),
1164                    code: issue.code.clone(),
1165                    terminal_reason: None,
1166                    user_message: issue.message.clone(),
1167                    raw: None,
1168                }),
1169            };
1170            assembler.push(&error_event);
1171            emit_turn_activity_to_sink(
1172                turn_events,
1173                TurnActivity::independent(TurnEvent::Error {
1174                    message: issue.message.clone(),
1175                }),
1176            )
1177            .await;
1178            emit_session_event_to_sink(events, error_event).await;
1179            let outcome_event = SessionEvent::TurnOutcome {
1180                outcome: TurnOutcome::Stopped(TurnStop::PluginAbort),
1181            };
1182            assembler.push(&outcome_event);
1183            emit_session_event_to_sink(events, outcome_event).await;
1184            assembler.push(&SessionEvent::Done);
1185            emit_session_event_to_sink(events, SessionEvent::Done).await;
1186            return Ok(assembler.finish(
1187                state.to_snapshot(),
1188                cancel.is_cancelled(),
1189                Some(issue),
1190                &self.host.core.control.termination,
1191            ));
1192        }
1193        let mut turn_pipeline = TurnBoundary::from_state(self.state.clone());
1194        let store = self
1195            .session
1196            .as_ref()
1197            .and_then(|session| session.history_store());
1198        turn_pipeline
1199            .prepared_checkpoint(
1200                store.as_ref().map(|store| store.as_ref()),
1201                turn_policy.clone(),
1202                turn_index,
1203                &prepared.messages,
1204                self.session.as_mut(),
1205            )
1206            .await
1207            .map_err(|err| {
1208                RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
1209            })?;
1210        let resolved_turn_policy = if let Some(provider) = turn_provider_override {
1211            RuntimeSessionPolicy::from_provider(turn_policy.clone(), provider)
1212                .map_err(|err| RuntimeError::new("llm_provider", err.to_string()))?
1213        } else {
1214            self.host
1215                .resolve_session_policy(&self.state.session_id, turn_policy.clone())
1216                .map_err(|err| RuntimeError::new("llm_provider", err.to_string()))?
1217        };
1218        let manager = self
1219            .runtime_session_services_for_turn(Some(child_usage_event_relay.clone()))
1220            .map_err(|err| {
1221                RuntimeError::new(RuntimeErrorCode::PluginSessionManager, err.to_string())
1222            })?;
1223        let cancel_state = cancel.clone();
1224        let finish_scoped_effect_controller = scoped_effect_controller.clone();
1225        let session = self
1226            .session
1227            .take()
1228            .expect("lash runtime session must be available");
1229        let mut driver = RuntimeTurnDriver {
1230            session,
1231            policy: resolved_turn_policy,
1232            host: self.host.clone(),
1233            turn_id: scoped_effect_controller.scope_id().to_string(),
1234            scoped_effect_controller,
1235            session_id: self.state.session_id.clone(),
1236            turn_index,
1237            turn_pipeline,
1238            llm_stream_summaries: HashMap::new(),
1239            next_llm_ordinal: 0,
1240            session_services: manager,
1241            protocol_turn_options: effective_protocol_turn_options,
1242            protocol_extension,
1243            turn_context,
1244            turn_causes: initial_turn_causes,
1245            pending_queue_claims: initial_queue_claim.into_iter().collect(),
1246            checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
1247            pending_tool_host_events: Vec::new(),
1248            turn_phase_probe: self.turn_phase_probe.clone(),
1249        };
1250        let protocol_run_offset = 0;
1251        self.mark_phase_begin(RuntimeTurnPhase::EffectLoop);
1252        let run_result = drive_turn_to_completion(
1253            driver.run(prepared.messages, event_tx, cancel, protocol_run_offset),
1254            &mut event_rx,
1255            &mut assembler,
1256            &child_usage_event_relay,
1257            events,
1258            turn_events,
1259        )
1260        .await;
1261        let (new_messages, _new_protocol_iteration) = match run_result {
1262            Ok(result) => result,
1263            Err(err) => {
1264                self.mark_phase_end(RuntimeTurnPhase::EffectLoop);
1265                let RuntimeTurnDriver { session, .. } = driver;
1266                self.session = Some(session);
1267                return Err(err);
1268            }
1269        };
1270        self.mark_phase_end(RuntimeTurnPhase::EffectLoop);
1271        tracing::debug!(
1272            new_message_count = new_messages.len(),
1273            tool_call_count = assembler.tool_calls.len(),
1274            "runtime post-run_task"
1275        );
1276
1277        let RuntimeTurnDriver {
1278            session,
1279            policy,
1280            turn_pipeline,
1281            pending_queue_claims,
1282            pending_tool_host_events,
1283            ..
1284        } = driver;
1285        self.session = Some(session);
1286        self.finish_turn(
1287            TurnFinishInput {
1288                turn_pipeline,
1289                assembler,
1290                new_messages,
1291                policy,
1292                turn_index,
1293                queued_work_completions: pending_queue_claims
1294                    .iter()
1295                    .map(crate::QueuedWorkClaim::completion)
1296                    .collect(),
1297                tool_host_events: pending_tool_host_events,
1298                trace_turn_id,
1299            },
1300            events,
1301            &finish_scoped_effect_controller,
1302            &cancel_state,
1303        )
1304        .await
1305    }
1306    fn normalize_input_items(
1307        &self,
1308        items: &[InputItem],
1309        image_blobs: &HashMap<String, Vec<u8>>,
1310    ) -> Result<Vec<NormalizedItem>, String> {
1311        normalize_input_items(
1312            items,
1313            image_blobs,
1314            self.host.core.durability.attachment_store.as_ref(),
1315        )
1316    }
1317}
1318
1319fn frame_switch_task(turn: &AssembledTurn, frame_id: &str) -> Option<String> {
1320    turn.tool_calls
1321        .iter()
1322        .find_map(|record| match &record.output.control {
1323            Some(crate::ToolControl::SwitchAgentFrame {
1324                frame_id: control_frame_id,
1325                task: Some(task),
1326                ..
1327            }) if control_frame_id == frame_id => Some(task.clone()),
1328            _ => None,
1329        })
1330}
1331
1332fn turn_input_from_text(text: String) -> TurnInput {
1333    TurnInput {
1334        items: vec![InputItem::Text { text }],
1335        image_blobs: HashMap::new(),
1336        protocol_turn_options: None,
1337        trace_turn_id: None,
1338        protocol_extension: None,
1339        turn_context: crate::TurnContext::default(),
1340    }
1341}
1342
1343fn agent_frame_follow_turn_id(root_turn_id: &str, completed_turn_count: usize) -> String {
1344    if completed_turn_count == 0 {
1345        root_turn_id.to_string()
1346    } else {
1347        format!("{root_turn_id}:agent-frame:{completed_turn_count}")
1348    }
1349}
1350
1351pub fn ensure_durable_effect_input(input: &TurnInput) -> Result<(), RuntimeError> {
1352    if input.protocol_extension.is_some() {
1353        return Err(RuntimeError::new(
1354            RuntimeErrorCode::DurableEffectLiveProtocolExtension,
1355            "durable effect hosts do not support live protocol_extension inputs; encode replayable data in protocol_turn_options or persisted plugin state",
1356        ));
1357    }
1358    input
1359        .turn_context
1360        .live_plugin_inputs()
1361        .durable_effect_rejection()?;
1362    Ok(())
1363}
1364
1365async fn emit_turn_activity_to_sink(events: &dyn TurnActivitySink, activity: TurnActivity) {
1366    if !events.is_noop() {
1367        events.emit(activity).await;
1368    }
1369}
1370
1371/// Pump the turn driver's event channel into the host sinks while the run
1372/// future executes, then drain any events emitted between completion and the
1373/// sender dropping.
1374///
1375/// Both the fresh and resumed turn entry points construct a
1376/// `RuntimeTurnDriver`, kick off its run future, and need identical
1377/// event-pump/drain behavior before tearing the driver down. Only the driver
1378/// construction and post-run teardown differ, so each caller owns those and
1379/// shares this loop.
1380async fn drive_turn_to_completion<F>(
1381    run_future: F,
1382    event_rx: &mut mpsc::Receiver<RuntimeStreamEvent>,
1383    assembler: &mut TurnAssembler,
1384    child_usage_event_relay: &ChildUsageEventRelay,
1385    events: &dyn EventSink,
1386    turn_events: &dyn TurnActivitySink,
1387) -> Result<(crate::MessageSequence, usize), RuntimeError>
1388where
1389    F: std::future::Future<Output = Result<(crate::MessageSequence, usize), RuntimeError>>,
1390{
1391    let run_result = {
1392        tokio::pin!(run_future);
1393        loop {
1394            tokio::select! {
1395                maybe_event = event_rx.recv() => {
1396                    if let Some(event) = maybe_event {
1397                        emit_runtime_stream_event_to_sinks(
1398                            events,
1399                            turn_events,
1400                            event,
1401                            assembler,
1402                        )
1403                        .await;
1404                    }
1405                }
1406                completed = &mut run_future => {
1407                    child_usage_event_relay.clear();
1408                    break completed;
1409                }
1410            }
1411        }
1412    };
1413    while let Some(event) = event_rx.recv().await {
1414        emit_runtime_stream_event_to_sinks(events, turn_events, event, assembler).await;
1415    }
1416    run_result
1417}
1418
1419async fn emit_runtime_stream_event_to_sinks(
1420    events: &dyn EventSink,
1421    turn_events: &dyn TurnActivitySink,
1422    event: RuntimeStreamEvent,
1423    assembler: &mut TurnAssembler,
1424) {
1425    match event {
1426        RuntimeStreamEvent::Session(event) => {
1427            assembler.push(&event);
1428            emit_session_event_to_sink(events, event).await;
1429        }
1430        RuntimeStreamEvent::Turn(activity) => {
1431            assembler.push_turn_activity(&activity);
1432            emit_turn_activity_to_sink(turn_events, activity).await;
1433        }
1434    }
1435}
1436
1437#[cfg(test)]
1438mod tests {
1439    use super::agent_frame_follow_turn_id;
1440
1441    #[test]
1442    fn agent_frame_follow_turn_ids_are_distinct_and_deterministic() {
1443        assert_eq!(agent_frame_follow_turn_id("root-turn", 0), "root-turn");
1444        assert_eq!(
1445            agent_frame_follow_turn_id("root-turn", 1),
1446            "root-turn:agent-frame:1"
1447        );
1448        assert_eq!(
1449            agent_frame_follow_turn_id("root-turn", 2),
1450            "root-turn:agent-frame:2"
1451        );
1452    }
1453}