Skip to main content

lash_core/runtime/
turn_loop.rs

1use super::*;
2
3fn trace_fields_from_outcome(
4    outcome: &TurnOutcome,
5) -> (
6    &'static str,
7    &'static str,
8    Option<lash_trace::TraceAgentFrameSwitch>,
9) {
10    match outcome {
11        TurnOutcome::Finished(TurnFinish::AssistantMessage { .. }) => {
12            ("completed", "assistant_message", None)
13        }
14        TurnOutcome::Finished(TurnFinish::SubmittedValue { .. }) => {
15            ("completed", "submitted_value", None)
16        }
17        TurnOutcome::Finished(TurnFinish::ToolValue { .. }) => ("completed", "tool_value", None),
18        TurnOutcome::AgentFrameSwitch { frame_id, .. } => (
19            "completed",
20            "agent_frame_switch",
21            Some(lash_trace::TraceAgentFrameSwitch {
22                frame_id: frame_id.clone(),
23            }),
24        ),
25        TurnOutcome::Stopped(stop) => ("failed", trace_stop_reason(stop), None),
26    }
27}
28
29fn trace_stop_reason(stop: &TurnStop) -> &'static str {
30    match stop {
31        TurnStop::Cancelled => "cancelled",
32        TurnStop::Incomplete => "incomplete",
33        TurnStop::InvalidInput => "invalid_input",
34        TurnStop::MaxTurns => "max_turns",
35        TurnStop::ToolFailure => "tool_failure",
36        TurnStop::ProviderError => "provider_error",
37        TurnStop::PluginAbort => "plugin_abort",
38        TurnStop::RuntimeError => "runtime_error",
39        TurnStop::SubmittedError { .. } => "submitted_error",
40        TurnStop::ToolError { .. } => "tool_error",
41    }
42}
43
44fn session_head_refresh_error(err: SessionError) -> RuntimeError {
45    RuntimeError::new(
46        RuntimeErrorCode::Other("session_head_refresh".to_string()),
47        err.to_string(),
48    )
49}
50
51#[derive(Clone, Copy)]
52enum SessionExecutionLeaseReleasePolicy {
53    FinalCommit,
54    KeepOnAgentFrameSwitch,
55}
56
57impl SessionExecutionLeaseReleasePolicy {
58    fn should_release(self, outcome: &TurnOutcome) -> bool {
59        match self {
60            Self::FinalCommit => true,
61            Self::KeepOnAgentFrameSwitch => {
62                !matches!(outcome, TurnOutcome::AgentFrameSwitch { .. })
63            }
64        }
65    }
66}
67
68fn queued_work_payload_type(payload: &crate::QueuedWorkPayload) -> &'static str {
69    match payload {
70        crate::QueuedWorkPayload::TurnInput { .. } => "turn_input",
71        crate::QueuedWorkPayload::ProcessWake { .. } => "process_wake",
72        crate::QueuedWorkPayload::SessionCommand { command } => command.kind(),
73    }
74}
75
76fn queued_work_batch_ids(claim: &crate::QueuedWorkClaim) -> Vec<String> {
77    claim
78        .batches
79        .iter()
80        .map(|batch| batch.batch_id.clone())
81        .collect()
82}
83
84fn turn_phase_id(parent_turn_id: &str, phase: &str) -> String {
85    format!("{parent_turn_id}:{phase}")
86}
87
88fn scoped_child_turn_controller<'run>(
89    scoped_effect_controller: &'run ScopedEffectController<'_>,
90    session_id: &str,
91    turn_id: &str,
92) -> Result<ScopedEffectController<'run>, RuntimeError> {
93    ScopedEffectController::borrowed(
94        scoped_effect_controller.controller(),
95        ExecutionScope::turn(session_id, turn_id),
96    )
97}
98
99pub(in crate::runtime) fn queued_work_trace_payload(
100    boundary: crate::QueuedWorkClaimBoundary,
101    claim: &crate::QueuedWorkClaim,
102    causes: &[crate::TurnCause],
103) -> serde_json::Value {
104    serde_json::json!({
105        "boundary": boundary,
106        "claim_id": claim.claim_id,
107        "owner_id": claim.owner.owner_id,
108        "incarnation_id": claim.owner.incarnation_id,
109        "batch_ids": queued_work_batch_ids(claim),
110        "payload_types": claim.batches.iter()
111            .flat_map(|batch| batch.items.iter())
112            .map(|item| queued_work_payload_type(&item.payload))
113            .collect::<Vec<_>>(),
114        "causes": causes,
115    })
116}
117
118pub(in crate::runtime) fn queued_work_completion_trace_payload(
119    completions: &[crate::QueuedWorkCompletion],
120) -> serde_json::Value {
121    serde_json::json!({
122        "claims": completions.iter().map(|completion| {
123            serde_json::json!({
124                "session_id": completion.session_id,
125                "claim_id": completion.claim_id,
126                "batch_ids": completion.batch_ids,
127            })
128        }).collect::<Vec<_>>(),
129    })
130}
131
132async fn emit_queued_work_started_to_sink(
133    events: &dyn TurnActivitySink,
134    boundary: crate::QueuedWorkClaimBoundary,
135    claim: &crate::QueuedWorkClaim,
136    causes: Vec<crate::TurnCause>,
137) {
138    emit_turn_activity_to_sink(
139        events,
140        TurnActivity::independent(TurnEvent::QueuedWorkStarted {
141            boundary,
142            batch_ids: queued_work_batch_ids(claim),
143            causes,
144        }),
145    )
146    .await;
147}
148
149pub(in crate::runtime) async fn send_queued_work_started_event(
150    event_tx: &mpsc::Sender<RuntimeStreamEvent>,
151    boundary: crate::QueuedWorkClaimBoundary,
152    claim: &crate::QueuedWorkClaim,
153    causes: Vec<crate::TurnCause>,
154) {
155    send_turn_activity(
156        event_tx,
157        TurnActivityId::fresh(),
158        TurnEvent::QueuedWorkStarted {
159            boundary,
160            batch_ids: queued_work_batch_ids(claim),
161            causes,
162        },
163    )
164    .await;
165}
166
167struct TurnFinishInput {
168    turn_pipeline: TurnBoundary,
169    assembler: TurnAssembler,
170    new_messages: crate::MessageSequence,
171    policy: RuntimeSessionPolicy,
172    turn_index: usize,
173    queued_work_completions: Vec<crate::QueuedWorkCompletion>,
174    trace_turn_id: String,
175}
176
177impl LashRuntime {
178    fn max_context_tokens(&self) -> usize {
179        self.state.effective_policy().context_window_tokens()
180    }
181
182    async fn claim_session_execution_lease(
183        &self,
184        cancel: CancellationToken,
185        busy_is_error: bool,
186    ) -> Result<Option<SessionExecutionLeaseGuard>, RuntimeError> {
187        let Some(store) = self
188            .session
189            .as_ref()
190            .and_then(|session| session.history_store())
191        else {
192            return Ok(None);
193        };
194        match SessionExecutionLeaseGuard::try_acquire(
195            store,
196            &self.state.session_id,
197            &self.runtime_lease_owner,
198            Arc::clone(&self.host.core.clock),
199            cancel,
200        )
201        .await
202        .map_err(|err| RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string()))?
203        {
204            Some(lease) => Ok(Some(lease)),
205            None if busy_is_error => Err(RuntimeError::new(
206                RuntimeErrorCode::SessionExecutionBusy,
207                format!(
208                    "session `{}` is already executing on another runtime owner",
209                    self.state.session_id
210                ),
211            )),
212            None => Ok(None),
213        }
214    }
215
216    async fn settle_session_execution_lease<T>(
217        &self,
218        guard: Option<&SessionExecutionLeaseGuard>,
219        result: Result<T, RuntimeError>,
220    ) -> Result<T, RuntimeError> {
221        match result {
222            Ok(value) => {
223                if let Some(guard) = guard {
224                    guard.release_if_live().await.map_err(|err| {
225                        RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
226                    })?;
227                }
228                Ok(value)
229            }
230            Err(err) => {
231                if err.code != RuntimeErrorCode::StoreCommitFailed
232                    && let Some(guard) = guard
233                    && let Err(release_err) = guard.release_if_live().await
234                {
235                    tracing::warn!(
236                        error = %release_err,
237                        "failed to release session execution lease after runtime error"
238                    );
239                }
240                Err(err)
241            }
242        }
243    }
244
245    async fn ensure_session_execution_lease_live(
246        &self,
247        guard: Option<&SessionExecutionLeaseGuard>,
248    ) -> Result<(), RuntimeError> {
249        let Some(guard) = guard else {
250            return Ok(());
251        };
252        guard.refresh_or_mark_lost().await.map_err(|err| {
253            RuntimeError::new(
254                RuntimeErrorCode::SessionExecutionLeaseLost,
255                format!(
256                    "session execution lease for session `{}` was lost before commit: {err}",
257                    self.state.session_id
258                ),
259            )
260        })
261    }
262
263    async fn abandon_queued_work_claims_after_lease_loss(
264        &self,
265        err: &RuntimeError,
266        claims: &[crate::QueuedWorkClaim],
267    ) {
268        if err.code != RuntimeErrorCode::SessionExecutionLeaseLost || claims.is_empty() {
269            return;
270        }
271        let Some(store) = self
272            .session
273            .as_ref()
274            .and_then(|session| session.history_store())
275        else {
276            return;
277        };
278        for claim in claims {
279            if let Err(abandon_err) = store.abandon_queued_work_claim(claim).await {
280                tracing::warn!(
281                    error = %abandon_err,
282                    session_id = %claim.session_id,
283                    claim_id = %claim.claim_id,
284                    "failed to abandon queued work claim after session execution lease loss"
285                );
286            }
287        }
288    }
289
290    #[doc(hidden)]
291    pub fn set_turn_phase_probe(&mut self, probe: Arc<dyn RuntimeTurnPhaseProbe>) {
292        self.turn_phase_probe = Some(probe);
293    }
294
295    fn mark_phase_begin(&self, phase: RuntimeTurnPhase) {
296        if let Some(probe) = self.turn_phase_probe.as_ref() {
297            probe.begin(phase);
298        }
299    }
300
301    fn mark_phase_end(&self, phase: RuntimeTurnPhase) {
302        if let Some(probe) = self.turn_phase_probe.as_ref() {
303            probe.end(phase);
304        }
305    }
306
307    async fn finish_turn(
308        &mut self,
309        finish: TurnFinishInput,
310        events: &dyn EventSink,
311        scoped_effect_controller: &ScopedEffectController<'_>,
312        cancel_state: &CancellationToken,
313        session_execution_lease: Option<&SessionExecutionLeaseGuard>,
314        session_execution_lease_release_policy: SessionExecutionLeaseReleasePolicy,
315    ) -> Result<AssembledTurn, RuntimeError> {
316        let TurnFinishInput {
317            mut turn_pipeline,
318            assembler,
319            new_messages,
320            policy,
321            turn_index,
322            queued_work_completions,
323            trace_turn_id,
324        } = finish;
325        self.policy = self.state.effective_policy().clone();
326        turn_pipeline.state_mut().policy = self.policy.clone();
327        turn_pipeline.state_mut().turn_index = turn_index;
328
329        let mut turn_usage_delta = {
330            let mut ledger = self.shared_token_ledger.lock().expect("token ledger lock");
331            std::mem::take(&mut *ledger)
332        };
333        if assembler.token_usage.total() > 0 || assembler.token_usage.cached_input_tokens > 0 {
334            turn_usage_delta.push(TokenLedgerEntry {
335                source: "turn".to_string(),
336                model: policy.model.id.clone(),
337                usage: assembler.token_usage.clone(),
338            });
339        }
340        let turn_usage_delta = merge_usage_delta_entries(turn_usage_delta);
341
342        turn_pipeline.finalize_turn_read_state(new_messages, cancel_state.is_cancelled());
343        if assembler.token_usage.total() > 0 || assembler.token_usage.cached_input_tokens > 0 {
344            turn_pipeline.state_mut().token_usage = assembler.token_usage.clone();
345        }
346
347        let last_prompt_usage = assembler
348            .last_llm_usage()
349            .and_then(|usage| normalize_prompt_usage(policy.provider(), usage));
350        turn_pipeline.state_mut().last_prompt_usage = last_prompt_usage;
351        let assembled_state = turn_pipeline.export_state_for_assembly();
352        let assembled = assembler.finish(
353            assembled_state,
354            cancel_state.is_cancelled(),
355            None,
356            &self.host.core.control.termination,
357        );
358
359        let Some(session) = self.session.as_ref() else {
360            self.state.apply_snapshot(&assembled.state);
361            self.emit_completed_turn_trace(&assembled.state, &assembled.outcome, &trace_turn_id);
362            return Ok(assembled);
363        };
364        self.ensure_session_execution_lease_live(session_execution_lease)
365            .await?;
366
367        let plugins = Arc::clone(session.plugins());
368        let manager = match self.runtime_session_services_for_turn(None) {
369            Ok(manager) => manager,
370            Err(err) => {
371                return Err(RuntimeError::new(
372                    RuntimeErrorCode::PluginSessionManager,
373                    err.to_string(),
374                ));
375            }
376        };
377
378        self.mark_phase_begin(RuntimeTurnPhase::FinalizeTurn);
379        let finalized = match plugins
380            .finalize_turn_with_phase_probe(
381                assembled,
382                manager.state_service(),
383                manager.lifecycle_service(),
384                manager.graph_service(),
385                self.turn_phase_probe.clone(),
386            )
387            .await
388        {
389            Ok(finalized) => finalized,
390            Err(err) => {
391                self.mark_phase_end(RuntimeTurnPhase::FinalizeTurn);
392                return Err(RuntimeError::new(
393                    RuntimeErrorCode::PluginFinalizeTurn,
394                    err.to_string(),
395                ));
396            }
397        };
398        self.mark_phase_end(RuntimeTurnPhase::FinalizeTurn);
399        self.ensure_session_execution_lease_live(session_execution_lease)
400            .await?;
401
402        let mut returned_turn = finalized.turn;
403        let release_session_execution_lease =
404            session_execution_lease_release_policy.should_release(&returned_turn.outcome);
405        self.mark_phase_begin(RuntimeTurnPhase::PersistTurn);
406        self.mark_phase_begin(RuntimeTurnPhase::FinalCommit);
407        let queued_work_completion_trace = queued_work_completions.clone();
408        let pending_attachment_ids = self
409            .host
410            .core
411            .durability
412            .attachment_store
413            .pending_manifest_commit_ids();
414        if let Err(err) = turn_pipeline
415            .final_commit(
416                &mut returned_turn,
417                self.session.as_mut(),
418                &turn_usage_delta,
419                Some(&trace_turn_id),
420                queued_work_completions,
421                pending_attachment_ids.clone(),
422                release_session_execution_lease
423                    .then(|| session_execution_lease.map(SessionExecutionLeaseGuard::completion))
424                    .flatten(),
425            )
426            .await
427        {
428            self.mark_phase_end(RuntimeTurnPhase::FinalCommit);
429            self.mark_phase_end(RuntimeTurnPhase::PersistTurn);
430            return Err(err);
431        }
432        if release_session_execution_lease && let Some(lease) = session_execution_lease {
433            lease.mark_released();
434        }
435        self.host
436            .core
437            .durability
438            .attachment_store
439            .mark_manifest_committed(&pending_attachment_ids);
440        self.mark_phase_end(RuntimeTurnPhase::FinalCommit);
441
442        emit_session_events_to_sink(events, finalized.events).await;
443        self.state = turn_pipeline.into_final_state();
444        if matches!(returned_turn.outcome, TurnOutcome::AgentFrameSwitch { .. })
445            && let Some(session) = self.session.as_mut()
446        {
447            let protocol_session = Arc::clone(session.plugins().protocol_session());
448            let session_id = self.state.session_id.clone();
449            protocol_session
450                .restore_session(
451                    crate::plugin::ProtocolSessionContext::new(session, &session_id),
452                    &self.state,
453                )
454                .await
455                .map_err(|err| {
456                    RuntimeError::new(
457                        RuntimeErrorCode::Other("protocol_restore_session".to_string()),
458                        err.to_string(),
459                    )
460                })?;
461        }
462        if !queued_work_completion_trace.is_empty() {
463            crate::trace::emit_trace(
464                &self.host.core.tracing.trace_sink,
465                &self.host.core.tracing.trace_context,
466                lash_trace::TraceContext::default()
467                    .for_session(returned_turn.state.session_id.clone())
468                    .for_turn_index(returned_turn.state.turn_index)
469                    .for_turn(trace_turn_id.clone()),
470                lash_trace::TraceEvent::Custom {
471                    name: "queued_work.completed".to_string(),
472                    payload: queued_work_completion_trace_payload(&queued_work_completion_trace),
473                },
474                self.host.core.clock.as_ref(),
475            );
476        }
477        self.mark_phase_begin(RuntimeTurnPhase::PostPersistHooks);
478        self.emit_turn_persisted_event(&returned_turn, scoped_effect_controller, &trace_turn_id)
479            .await?;
480        self.mark_phase_end(RuntimeTurnPhase::PostPersistHooks);
481        self.mark_phase_end(RuntimeTurnPhase::PersistTurn);
482
483        self.emit_completed_turn_trace(
484            &returned_turn.state,
485            &returned_turn.outcome,
486            &trace_turn_id,
487        );
488        Ok(returned_turn)
489    }
490
491    fn emit_completed_turn_trace(
492        &self,
493        state: &SessionSnapshot,
494        outcome: &TurnOutcome,
495        trace_turn_id: &str,
496    ) {
497        if self.host.core.tracing.trace_sink.is_none() {
498            return;
499        }
500
501        let (status, done_reason, agent_frame_switch) = trace_fields_from_outcome(outcome);
502        crate::trace::emit_trace(
503            &self.host.core.tracing.trace_sink,
504            &self.host.core.tracing.trace_context,
505            lash_trace::TraceContext::default()
506                .for_session(state.session_id.clone())
507                .for_turn_index(state.turn_index)
508                .for_turn(trace_turn_id.to_string()),
509            lash_trace::TraceEvent::TurnCompleted {
510                status: status.to_string(),
511                done_reason: done_reason.to_string(),
512                agent_frame_switch,
513            },
514            self.host.core.clock.as_ref(),
515        );
516    }
517
518    async fn emit_turn_persisted_event(
519        &self,
520        returned_turn: &AssembledTurn,
521        scoped_effect_controller: &ScopedEffectController<'_>,
522        trace_turn_id: &str,
523    ) -> Result<(), RuntimeError> {
524        let Some(session) = self.session.as_ref() else {
525            return Ok(());
526        };
527        let Ok(manager) = self.runtime_session_services() else {
528            return Ok(());
529        };
530        let phase_turn_id = turn_phase_id(trace_turn_id, "turn-persisted");
531        let phase_controller = scoped_child_turn_controller(
532            scoped_effect_controller,
533            &self.state.session_id,
534            &phase_turn_id,
535        )?;
536        let direct_completions = manager.direct_completion_client(
537            RuntimeEffectControllerHandle::borrowed(phase_controller),
538            Some(phase_turn_id),
539        );
540
541        session
542            .plugins()
543            .emit_runtime_event_with_phase_probe(
544                crate::PluginLifecycleEvent::TurnPersisted(Box::new(
545                    crate::SessionStateChangedContext {
546                        session_id: self.state.session_id.clone(),
547                        state: crate::SessionReadView::from_snapshot(&returned_turn.state),
548                        sessions: manager.state_service(),
549                        session_graph: manager.graph_service(),
550                        direct_completions,
551                    },
552                )),
553                self.turn_phase_probe.clone(),
554            )
555            .await;
556        Ok(())
557    }
558
559    /// Run a single turn and stream events to the host sink.
560    pub async fn stream_turn(
561        &mut self,
562        input: TurnInput,
563        opts: TurnOptions<'_>,
564    ) -> Result<AssembledTurn, RuntimeError> {
565        let cancel = opts.cancel.clone();
566        let session_execution_lease = self
567            .claim_session_execution_lease(cancel.clone(), true)
568            .await?;
569        let scoped_effect_controller = opts.scoped_effect_controller();
570        let result = self
571            .stream_turn_with_scoped_effect_controller_inner(
572                input,
573                opts.events_or_noop(),
574                opts.turn_events_or_noop(),
575                scoped_effect_controller,
576                cancel,
577                None,
578                session_execution_lease.as_ref(),
579                SessionExecutionLeaseReleasePolicy::FinalCommit,
580            )
581            .await;
582        self.settle_session_execution_lease(session_execution_lease.as_ref(), result)
583            .await
584    }
585
586    pub async fn stream_next_queued_work(
587        &mut self,
588        opts: TurnOptions<'_>,
589    ) -> Result<Option<AssembledTurn>, RuntimeError> {
590        self.stream_queued_work(opts, None).await
591    }
592
593    pub async fn stream_selected_queued_work(
594        &mut self,
595        opts: TurnOptions<'_>,
596        batch_ids: &[String],
597    ) -> Result<Option<AssembledTurn>, RuntimeError> {
598        self.stream_queued_work(opts, Some(batch_ids)).await
599    }
600
601    async fn stream_queued_work(
602        &mut self,
603        opts: TurnOptions<'_>,
604        selected_batch_ids: Option<&[String]>,
605    ) -> Result<Option<AssembledTurn>, RuntimeError> {
606        let cancel = opts.cancel.clone();
607        let Some(session_execution_lease) = self
608            .claim_session_execution_lease(cancel.clone(), false)
609            .await?
610        else {
611            return Ok(None);
612        };
613        let session_execution_fence = session_execution_lease.fence();
614        loop {
615            match self
616                .drain_next_session_command(&session_execution_fence)
617                .await
618            {
619                Ok(Some(_)) => {}
620                Ok(None) => break,
621                Err(err) => {
622                    let _ = session_execution_lease.release_if_live().await;
623                    return Err(err);
624                }
625            }
626        }
627        let Some(store) = self
628            .session
629            .as_ref()
630            .and_then(|session| session.history_store())
631        else {
632            session_execution_lease
633                .release_if_live()
634                .await
635                .map_err(|err| {
636                    RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
637                })?;
638            return Ok(None);
639        };
640        let claim = if let Some(batch_ids) = selected_batch_ids {
641            store
642                .claim_ready_queued_work_by_batch_ids(
643                    &self.state.session_id,
644                    &session_execution_fence,
645                    &self.runtime_lease_owner,
646                    crate::QueuedWorkClaimBoundary::Idle,
647                    crate::QUEUED_WORK_CLAIM_TTL_MS,
648                    batch_ids,
649                )
650                .await
651        } else {
652            store
653                .claim_ready_queued_work(
654                    &self.state.session_id,
655                    &session_execution_fence,
656                    &self.runtime_lease_owner,
657                    crate::QueuedWorkClaimBoundary::Idle,
658                    crate::QUEUED_WORK_CLAIM_TTL_MS,
659                    64,
660                )
661                .await
662        }
663        .map_err(super::runtime_error_from_store_commit)?;
664        let Some(claim) = claim else {
665            session_execution_lease
666                .release_if_live()
667                .await
668                .map_err(|err| {
669                    RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
670                })?;
671            return Ok(None);
672        };
673        let mut work = claim.materialize_for_turn();
674        let turn_id = work
675            .input
676            .trace_turn_id
677            .clone()
678            .or_else(|| Some(opts.execution_scope_id().to_owned()))
679            .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
680        work.input.trace_turn_id = Some(turn_id.clone());
681        let causes = work.turn_causes.clone();
682        emit_queued_work_started_to_sink(
683            opts.turn_events_or_noop(),
684            crate::QueuedWorkClaimBoundary::Idle,
685            &claim,
686            causes.clone(),
687        )
688        .await;
689        crate::trace::emit_trace(
690            &self.host.core.tracing.trace_sink,
691            &self.host.core.tracing.trace_context,
692            lash_trace::TraceContext::default()
693                .for_session(self.state.session_id.clone())
694                .for_turn_index(self.state.turn_index + 1)
695                .for_turn(turn_id.clone()),
696            lash_trace::TraceEvent::Custom {
697                name: "queued_work.claimed".to_string(),
698                payload: queued_work_trace_payload(
699                    crate::QueuedWorkClaimBoundary::Idle,
700                    &claim,
701                    &causes,
702                ),
703            },
704            self.host.core.clock.as_ref(),
705        );
706        let claim_for_abandon = claim.clone();
707        let scoped_effect_controller = opts.scoped_effect_controller();
708        let result = self
709            .stream_turn_with_scoped_effect_controller_inner(
710                work.input,
711                opts.events_or_noop(),
712                opts.turn_events_or_noop(),
713                scoped_effect_controller,
714                cancel,
715                Some(claim),
716                Some(&session_execution_lease),
717                SessionExecutionLeaseReleasePolicy::FinalCommit,
718            )
719            .await
720            .map(Some);
721        if let Err(err) = &result {
722            self.abandon_queued_work_claims_after_lease_loss(
723                err,
724                std::slice::from_ref(&claim_for_abandon),
725            )
726            .await;
727        }
728        self.settle_session_execution_lease(Some(&session_execution_lease), result)
729            .await
730    }
731
732    /// Enforce the durable-first wiring invariant at a turn-scope boundary: when
733    /// the host wired a durable effect host, every store reachable from this
734    /// scope must also be durable. A durable host running against any ephemeral
735    /// store fails loudly here rather than silently degrading.
736    ///
737    /// Inline controllers (the default tier) impose no requirement, so
738    /// inline/in-memory hosts pass unchanged.
739    fn ensure_durable_store_facets_for_scope(
740        &self,
741        scoped_effect_controller: &ScopedEffectController<'_>,
742    ) -> Result<(), RuntimeError> {
743        if scoped_effect_controller.controller().durability_tier() != crate::DurabilityTier::Durable
744        {
745            return Ok(());
746        }
747        if self
748            .host
749            .core
750            .durability
751            .attachment_store
752            .persistence()
753            .durability_tier()
754            != crate::DurabilityTier::Durable
755        {
756            return Err(RuntimeError::durable_store_required(
757                crate::DurableStoreFacet::AttachmentStore,
758            ));
759        }
760        if self
761            .host
762            .core
763            .durability
764            .process_env_store
765            .durability_tier()
766            != crate::DurabilityTier::Durable
767        {
768            return Err(RuntimeError::durable_store_required(
769                crate::DurableStoreFacet::ProcessEnvStore,
770            ));
771        }
772        if let Some(store) = self
773            .session
774            .as_ref()
775            .and_then(|session| session.history_store())
776            && store.durability_tier() != crate::DurabilityTier::Durable
777        {
778            return Err(RuntimeError::durable_store_required(
779                crate::DurableStoreFacet::SessionStore,
780            ));
781        }
782        if let Some(process_registry) = self.host.process_registry.as_ref()
783            && process_registry.durability_tier() != crate::DurabilityTier::Durable
784        {
785            return Err(RuntimeError::durable_store_required(
786                crate::DurableStoreFacet::ProcessRegistry,
787            ));
788        }
789        Ok(())
790    }
791
792    #[allow(clippy::too_many_arguments)]
793    async fn stream_turn_with_scoped_effect_controller_inner(
794        &mut self,
795        mut input: TurnInput,
796        events: &dyn EventSink,
797        turn_events: &dyn TurnActivitySink,
798        scoped_effect_controller: ScopedEffectController<'_>,
799        cancel: CancellationToken,
800        queued_claim: Option<crate::QueuedWorkClaim>,
801        session_execution_lease: Option<&SessionExecutionLeaseGuard>,
802        session_execution_lease_release_policy: SessionExecutionLeaseReleasePolicy,
803    ) -> Result<AssembledTurn, RuntimeError> {
804        if queued_claim.is_none() {
805            if let Some(lease) = session_execution_lease {
806                while self
807                    .drain_next_session_command(&lease.fence())
808                    .await?
809                    .is_some()
810                {}
811            } else if self
812                .session
813                .as_ref()
814                .and_then(|session| session.history_store())
815                .is_some()
816            {
817                return Err(RuntimeError::new(
818                    RuntimeErrorCode::StoreCommitFailed,
819                    "session command drain requires a session execution lease",
820                ));
821            }
822        }
823        if let Some(input_turn_id) = input.trace_turn_id.as_deref()
824            && scoped_effect_controller
825                .execution_scope()
826                .validates_turn_trace_id()
827            && input_turn_id != scoped_effect_controller.scope_id()
828        {
829            return Err(RuntimeError::new(
830                RuntimeErrorCode::ExecutionScopeTurnIdMismatch,
831                format!(
832                    "input trace_turn_id `{input_turn_id}` does not match execution scope id `{}`",
833                    scoped_effect_controller.scope_id()
834                ),
835            ));
836        }
837        self.ensure_durable_store_facets_for_scope(&scoped_effect_controller)?;
838        input
839            .trace_turn_id
840            .get_or_insert_with(|| scoped_effect_controller.scope_id().to_string());
841        self.stream_turn_inner(
842            input.clone(),
843            events,
844            turn_events,
845            scoped_effect_controller,
846            cancel.clone(),
847            queued_claim,
848            session_execution_lease,
849            session_execution_lease_release_policy,
850        )
851        .await
852    }
853
854    /// Stream one logical host turn, following foreground AgentFrame switches
855    /// until a terminal outcome is reached.
856    ///
857    /// A protocol continuation creates a new frame in the same session. Hosts
858    /// that only care about the benchmark/app answer should not need to
859    /// special-case that intermediate outcome; this helper keeps driving the
860    /// same session through each frame's task with the normal runtime turn
861    /// guards.
862    pub async fn stream_turn_with_agent_frames(
863        &mut self,
864        input: TurnInput,
865        opts: TurnOptions<'_>,
866    ) -> Result<AgentFrameRun, RuntimeError> {
867        let cancel = opts.cancel.clone();
868        let session_execution_lease = self
869            .claim_session_execution_lease(cancel.clone(), true)
870            .await?;
871        let scoped_effect_controller = opts.scoped_effect_controller();
872        let result = self
873            .stream_turn_with_agent_frames_inner(
874                input,
875                opts.events_or_noop(),
876                opts.turn_events_or_noop(),
877                scoped_effect_controller,
878                cancel,
879                session_execution_lease.as_ref(),
880            )
881            .await;
882        self.settle_session_execution_lease(session_execution_lease.as_ref(), result)
883            .await
884    }
885
886    async fn stream_turn_with_agent_frames_inner(
887        &mut self,
888        mut input: TurnInput,
889        events: &dyn EventSink,
890        turn_events: &dyn TurnActivitySink,
891        scoped_effect_controller: ScopedEffectController<'_>,
892        cancel: CancellationToken,
893        session_execution_lease: Option<&SessionExecutionLeaseGuard>,
894    ) -> Result<AgentFrameRun, RuntimeError> {
895        if let Some(input_turn_id) = input.trace_turn_id.as_deref()
896            && scoped_effect_controller
897                .execution_scope()
898                .validates_turn_trace_id()
899            && input_turn_id != scoped_effect_controller.scope_id()
900        {
901            return Err(RuntimeError::new(
902                RuntimeErrorCode::ExecutionScopeTurnIdMismatch,
903                format!(
904                    "input trace_turn_id `{input_turn_id}` does not match execution scope id `{}`",
905                    scoped_effect_controller.scope_id()
906                ),
907            ));
908        }
909        let follow_protocol_turn_options = input.protocol_turn_options.clone();
910        let follow_turn_context = input.turn_context.clone();
911        let follow_trace_turn_id = input
912            .trace_turn_id
913            .clone()
914            .unwrap_or_else(|| scoped_effect_controller.scope_id().to_string());
915        input
916            .trace_turn_id
917            .get_or_insert(follow_trace_turn_id.clone());
918        let mut turns = Vec::new();
919        loop {
920            let turn_trace_turn_id = agent_frame_follow_turn_id(&follow_trace_turn_id, turns.len());
921            input.trace_turn_id = Some(turn_trace_turn_id.clone());
922            let turn_effect_controller = if turns.is_empty() {
923                scoped_effect_controller.clone()
924            } else {
925                ScopedEffectController::borrowed(
926                    scoped_effect_controller.controller(),
927                    ExecutionScope::turn(&self.state.session_id, &turn_trace_turn_id),
928                )?
929            };
930            let turn = self
931                .stream_turn_with_scoped_effect_controller_inner(
932                    input,
933                    events,
934                    turn_events,
935                    turn_effect_controller,
936                    cancel.clone(),
937                    None,
938                    session_execution_lease,
939                    SessionExecutionLeaseReleasePolicy::KeepOnAgentFrameSwitch,
940                )
941                .await?;
942            let switched_frame = match &turn.outcome {
943                TurnOutcome::AgentFrameSwitch { frame_id, task } => {
944                    Some((frame_id.clone(), task.clone()))
945                }
946                _ => None,
947            };
948            turns.push(turn);
949
950            let Some((_frame_id, task)) = switched_frame else {
951                return Ok(AgentFrameRun { turns });
952            };
953            input = turn_input_from_text(task);
954            input.protocol_turn_options = follow_protocol_turn_options.clone();
955            input.turn_context = follow_turn_context.clone();
956        }
957    }
958
959    #[allow(clippy::too_many_arguments)]
960    async fn stream_turn_inner(
961        &mut self,
962        mut input: TurnInput,
963        events: &dyn EventSink,
964        turn_events: &dyn TurnActivitySink,
965        scoped_effect_controller: ScopedEffectController<'_>,
966        cancel: CancellationToken,
967        queued_claim: Option<crate::QueuedWorkClaim>,
968        session_execution_lease: Option<&SessionExecutionLeaseGuard>,
969        session_execution_lease_release_policy: SessionExecutionLeaseReleasePolicy,
970    ) -> Result<AssembledTurn, RuntimeError> {
971        self.refresh_session_graph_from_store()
972            .await
973            .map_err(session_head_refresh_error)?;
974        let input_trace_turn_id = input.trace_turn_id.clone();
975        let queued_turn_work = queued_claim
976            .as_ref()
977            .map(crate::QueuedWorkClaim::materialize_for_turn);
978        if let Some(work) = queued_turn_work.as_ref()
979            && input.items.is_empty()
980            && input.image_blobs.is_empty()
981        {
982            input = work.input.clone();
983            if input.trace_turn_id.is_none() {
984                input.trace_turn_id = input_trace_turn_id;
985            }
986        }
987        if self
988            .session
989            .as_ref()
990            .and_then(|session| session.history_store())
991            .is_some()
992        {
993            ensure_durable_effect_input(&input)?;
994        }
995        if let Some(extension) = &input.protocol_extension
996            && let Some(session) = self.session.as_ref()
997        {
998            let protocol_session = std::sync::Arc::clone(session.plugins().protocol_session());
999            protocol_session
1000                .validate_turn_extension(extension)
1001                .await
1002                .map_err(|err| {
1003                    RuntimeError::new(RuntimeErrorCode::ProtocolTurnExtension, err.to_string())
1004                })?;
1005        }
1006        let previous_prompt_usage = self.state.last_prompt_usage.clone();
1007        let normalized = match self
1008            .normalize_input_items(&input.items, &input.image_blobs)
1009            .await
1010        {
1011            Ok(items) => items,
1012            Err(e) => {
1013                self.state.last_prompt_usage = None;
1014                let mut assembler = TurnAssembler::default();
1015                let error_event = SessionEvent::Error {
1016                    message: e.clone(),
1017                    envelope: Some(crate::session_model::ErrorEnvelope {
1018                        kind: "input_validation".to_string(),
1019                        code: Some("invalid_turn_input".to_string()),
1020                        terminal_reason: None,
1021                        user_message: e.clone(),
1022                        raw: None,
1023                    }),
1024                };
1025                assembler.push(&error_event);
1026                emit_turn_activity_to_sink(
1027                    turn_events,
1028                    TurnActivity::independent(TurnEvent::Error { message: e }),
1029                )
1030                .await;
1031                emit_session_event_to_sink(events, error_event).await;
1032                let outcome_event = SessionEvent::TurnOutcome {
1033                    outcome: TurnOutcome::Stopped(TurnStop::InvalidInput),
1034                };
1035                assembler.push(&outcome_event);
1036                emit_session_event_to_sink(events, outcome_event).await;
1037                assembler.push(&SessionEvent::Done);
1038                emit_session_event_to_sink(events, SessionEvent::Done).await;
1039                return Ok(assembler.finish(
1040                    self.state.to_snapshot(),
1041                    false,
1042                    None,
1043                    &self.host.core.control.termination,
1044                ));
1045            }
1046        };
1047        let turn_index = self.state.turn_index + 1;
1048        let trace_turn_id = input
1049            .trace_turn_id
1050            .clone()
1051            .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
1052        if self.host.core.tracing.trace_sink.is_some() {
1053            let mut trace_metadata = std::collections::BTreeMap::new();
1054            trace_metadata.insert(
1055                "input_item_count".to_string(),
1056                serde_json::json!(normalized.len()),
1057            );
1058            crate::trace::emit_trace(
1059                &self.host.core.tracing.trace_sink,
1060                &self.host.core.tracing.trace_context,
1061                lash_trace::TraceContext::default()
1062                    .for_session(self.state.session_id.clone())
1063                    .for_turn_index(turn_index)
1064                    .for_turn(trace_turn_id.clone()),
1065                lash_trace::TraceEvent::TurnStarted {
1066                    metadata: trace_metadata,
1067                },
1068                self.host.core.clock.as_ref(),
1069            );
1070        }
1071
1072        let base_read_model = self.state.read_model();
1073        let base_messages = base_read_model.messages;
1074        let base_render_cache = base_read_model.prompt_render_cache;
1075        let mut turn_delta = Vec::new();
1076        let initial_turn_causes = queued_turn_work
1077            .as_ref()
1078            .map(|work| work.turn_causes.clone())
1079            .unwrap_or_default();
1080        turn_delta.extend(
1081            initial_turn_causes
1082                .iter()
1083                .map(crate::TurnCause::to_event_message),
1084        );
1085
1086        let user_id = fresh_message_id();
1087        let mut user_parts: Vec<Part> = Vec::new();
1088        for item in normalized {
1089            match item {
1090                NormalizedItem::Text(text) => {
1091                    if text.is_empty() {
1092                        continue;
1093                    }
1094                    user_parts.push(Part {
1095                        id: format!("{}.p{}", user_id, user_parts.len()),
1096                        kind: PartKind::Text,
1097                        content: text,
1098                        attachment: None,
1099                        tool_call_id: None,
1100                        tool_name: None,
1101                        tool_replay: None,
1102                        prune_state: PruneState::Intact,
1103                        reasoning_meta: None,
1104                        response_meta: None,
1105                    });
1106                }
1107                NormalizedItem::Image(reference) => {
1108                    user_parts.push(Part {
1109                        id: format!("{}.p{}", user_id, user_parts.len()),
1110                        kind: PartKind::Image,
1111                        content: String::new(),
1112                        attachment: Some(crate::session_model::message::PartAttachment {
1113                            reference,
1114                        }),
1115                        tool_call_id: None,
1116                        tool_name: None,
1117                        tool_replay: None,
1118                        prune_state: PruneState::Intact,
1119                        reasoning_meta: None,
1120                        response_meta: None,
1121                    });
1122                }
1123            }
1124        }
1125        if user_parts.is_empty() && initial_turn_causes.is_empty() {
1126            user_parts.push(Part {
1127                id: format!("{}.p0", user_id),
1128                kind: PartKind::Text,
1129                content: String::new(),
1130                attachment: None,
1131                tool_call_id: None,
1132                tool_name: None,
1133                tool_replay: None,
1134                prune_state: PruneState::Intact,
1135                reasoning_meta: None,
1136                response_meta: None,
1137            });
1138        }
1139        if !user_parts.is_empty() {
1140            reassign_part_ids(&user_id, &mut user_parts);
1141            turn_delta.push(Message {
1142                id: user_id.clone(),
1143                role: MessageRole::User,
1144                parts: shared_parts(user_parts),
1145                origin: None,
1146            });
1147        }
1148
1149        let manager = self
1150            .runtime_session_services_for_turn(None)
1151            .map_err(|err| {
1152                RuntimeError::new(RuntimeErrorCode::PluginSessionManager, err.to_string())
1153            })?;
1154        let plugin_session = self
1155            .session
1156            .as_ref()
1157            .map(|s| Arc::clone(s.plugins()))
1158            .ok_or_else(|| {
1159                RuntimeError::new(
1160                    RuntimeErrorCode::ContextPrepareTurn,
1161                    "runtime session not available",
1162                )
1163            })?;
1164        let prepare_phase_turn_id = turn_phase_id(&trace_turn_id, "prepare-turn");
1165        let prepare_phase_controller = scoped_child_turn_controller(
1166            &scoped_effect_controller,
1167            &self.state.session_id,
1168            &prepare_phase_turn_id,
1169        )?;
1170        let turn_ctx = crate::TurnTransformContext {
1171            session_id: self.state.session_id.clone(),
1172            state: self.read_view(),
1173            prompt_usage: previous_prompt_usage.clone(),
1174            max_context_tokens: Some(LashRuntime::max_context_tokens(self)),
1175            sessions: manager.state_service(),
1176            session_lifecycle: manager.lifecycle_service(),
1177            session_graph: manager.graph_service(),
1178            scoped_effect_controller: scoped_effect_controller.clone(),
1179            direct_completions: manager.direct_completion_client(
1180                RuntimeEffectControllerHandle::borrowed(prepare_phase_controller),
1181                Some(prepare_phase_turn_id),
1182            ),
1183        };
1184        self.mark_phase_begin(RuntimeTurnPhase::ContextTransform);
1185        let prepared_context = plugin_session
1186            .prepare_turn_context(
1187                &turn_ctx,
1188                crate::session_model::context::PreparedContext {
1189                    messages: crate::MessageSequence::from_base_and_delta(
1190                        base_messages,
1191                        turn_delta,
1192                    )
1193                    .with_base_render_cache(base_render_cache),
1194                    ..Default::default()
1195                },
1196                self.turn_phase_probe.clone(),
1197            )
1198            .await
1199            .map_err(|err| {
1200                RuntimeError::new(RuntimeErrorCode::ContextPrepareTurn, err.to_string())
1201            })?;
1202        self.mark_phase_end(RuntimeTurnPhase::ContextTransform);
1203        // Release the read-view's graph clone before the rest of the turn
1204        // runs. Keeping it alive into `stream_prepared_turn` forces the
1205        // post-turn `append_active_read_delta` to deep-clone the session
1206        // graph (Arc::make_mut with refcount > 1).
1207        drop(turn_ctx);
1208        let messages = prepared_context.messages;
1209        if let Some(session) = self.session.as_mut() {
1210            session
1211                .set_context_overlay(
1212                    prepared_context.tool_providers,
1213                    prepared_context.prompt_contributions,
1214                    prepared_context.include_base_tools,
1215                )
1216                .map_err(|err| {
1217                    RuntimeError::new(
1218                        RuntimeErrorCode::Other("session_tool_registry".to_string()),
1219                        err.to_string(),
1220                    )
1221                })?;
1222        }
1223
1224        self.state.last_prompt_usage = None;
1225        Box::pin(self.stream_prepared_turn_inner(
1226            messages,
1227            previous_prompt_usage,
1228            input.protocol_turn_options.clone(),
1229            input.protocol_extension.clone(),
1230            input.turn_context.clone(),
1231            initial_turn_causes,
1232            trace_turn_id,
1233            turn_index,
1234            events,
1235            turn_events,
1236            scoped_effect_controller,
1237            cancel,
1238            queued_claim,
1239            session_execution_lease,
1240            session_execution_lease_release_policy,
1241        ))
1242        .await
1243    }
1244
1245    /// Run a single turn and return only the assembled terminal result.
1246    pub async fn run_turn_assembled(
1247        &mut self,
1248        input: TurnInput,
1249        cancel: CancellationToken,
1250        scoped_effect_controller: ScopedEffectController<'_>,
1251    ) -> Result<AssembledTurn, RuntimeError> {
1252        self.stream_turn(input, TurnOptions::new(cancel, scoped_effect_controller))
1253            .await
1254    }
1255
1256    /// Run a turn using host-prepared message history.
1257    #[allow(clippy::too_many_arguments)]
1258    pub async fn stream_prepared_turn(
1259        &mut self,
1260        messages: crate::MessageSequence,
1261        previous_prompt_usage: Option<PromptUsage>,
1262        protocol_turn_options: Option<crate::ProtocolTurnOptions>,
1263        protocol_extension: Option<crate::ProtocolTurnExtensionHandle>,
1264        turn_context: crate::TurnContext,
1265        initial_turn_causes: Vec<crate::TurnCause>,
1266        trace_turn_id: String,
1267        turn_index: usize,
1268        events: &dyn EventSink,
1269        turn_events: &dyn TurnActivitySink,
1270        scoped_effect_controller: ScopedEffectController<'_>,
1271        cancel: CancellationToken,
1272        initial_queue_claim: Option<crate::QueuedWorkClaim>,
1273    ) -> Result<AssembledTurn, RuntimeError> {
1274        let session_execution_lease = self
1275            .claim_session_execution_lease(cancel.clone(), true)
1276            .await?;
1277        let result = self
1278            .stream_prepared_turn_inner(
1279                messages,
1280                previous_prompt_usage,
1281                protocol_turn_options,
1282                protocol_extension,
1283                turn_context,
1284                initial_turn_causes,
1285                trace_turn_id,
1286                turn_index,
1287                events,
1288                turn_events,
1289                scoped_effect_controller,
1290                cancel,
1291                initial_queue_claim,
1292                session_execution_lease.as_ref(),
1293                SessionExecutionLeaseReleasePolicy::FinalCommit,
1294            )
1295            .await;
1296        self.settle_session_execution_lease(session_execution_lease.as_ref(), result)
1297            .await
1298    }
1299
1300    #[allow(clippy::too_many_arguments)]
1301    async fn stream_prepared_turn_inner(
1302        &mut self,
1303        messages: crate::MessageSequence,
1304        _previous_prompt_usage: Option<PromptUsage>,
1305        protocol_turn_options: Option<crate::ProtocolTurnOptions>,
1306        protocol_extension: Option<crate::ProtocolTurnExtensionHandle>,
1307        turn_context: crate::TurnContext,
1308        initial_turn_causes: Vec<crate::TurnCause>,
1309        trace_turn_id: String,
1310        turn_index: usize,
1311        events: &dyn EventSink,
1312        turn_events: &dyn TurnActivitySink,
1313        scoped_effect_controller: ScopedEffectController<'_>,
1314        cancel: CancellationToken,
1315        initial_queue_claim: Option<crate::QueuedWorkClaim>,
1316        session_execution_lease: Option<&SessionExecutionLeaseGuard>,
1317        session_execution_lease_release_policy: SessionExecutionLeaseReleasePolicy,
1318    ) -> Result<AssembledTurn, RuntimeError> {
1319        if session_execution_lease.is_none()
1320            && self
1321                .session
1322                .as_ref()
1323                .and_then(|session| session.history_store())
1324                .is_some()
1325        {
1326            return Err(RuntimeError::new(
1327                RuntimeErrorCode::StoreCommitFailed,
1328                "prepared turn requires a session execution lease",
1329            ));
1330        }
1331        let session_execution_fence =
1332            session_execution_lease.map(SessionExecutionLeaseGuard::fence);
1333        let (event_tx, mut event_rx) = mpsc::channel::<RuntimeStreamEvent>(100);
1334        let child_usage_event_relay = ChildUsageEventRelay::new(event_tx.clone());
1335        let mut turn_policy = self.state.effective_policy().clone();
1336        let turn_provider_override = turn_context.provider().cloned();
1337        if let Some(provider) = turn_provider_override.as_ref() {
1338            turn_policy.provider_id = provider.kind().to_string();
1339        }
1340        if let Some(model) = turn_context.model_spec() {
1341            turn_policy.model = model.clone();
1342        }
1343        let session_protocol_turn_options = self.state.effective_protocol_turn_options().clone();
1344        let effective_protocol_turn_options = protocol_turn_options
1345            .clone()
1346            .map(|options| session_protocol_turn_options.merged_with_override(&options))
1347            .unwrap_or(session_protocol_turn_options);
1348        let manager = self
1349            .runtime_session_services_for_turn(Some(child_usage_event_relay.clone()))
1350            .map_err(|err| {
1351                RuntimeError::new(RuntimeErrorCode::PluginSessionManager, err.to_string())
1352            })?;
1353        let plugins = {
1354            let session = self
1355                .session
1356                .as_ref()
1357                .expect("lash runtime session must be available");
1358            Arc::clone(session.plugins())
1359        };
1360        let mut assembler = TurnAssembler::new();
1361        self.mark_phase_begin(RuntimeTurnPhase::BeforeTurnHooks);
1362        // Block-scope the pinned future so it (and its captured
1363        // `SessionReadView` clone of the session graph) drops before the
1364        // post-turn `append_active_read_delta` mutation. Keeping it alive
1365        // across the turn forces `Arc::make_mut` to deep-clone
1366        // `SessionGraphData`.
1367        let prepared = {
1368            let prepare_turn = plugins.prepare_turn_with_phase_probe(
1369                PrepareTurnRequest {
1370                    session_id: self.state.session_id.clone(),
1371                    state: crate::SessionReadView::from_runtime_state(
1372                        &self.state,
1373                        turn_policy.clone(),
1374                        effective_protocol_turn_options.clone(),
1375                    ),
1376                    messages,
1377                    sessions: manager.state_service(),
1378                    session_lifecycle: manager.lifecycle_service(),
1379                    session_graph: manager.graph_service(),
1380                    turn_context: turn_context.clone(),
1381                },
1382                self.turn_phase_probe.clone(),
1383            );
1384            let mut prepare_turn = Box::pin(prepare_turn);
1385
1386            loop {
1387                tokio::select! {
1388                    prepared = prepare_turn.as_mut() => {
1389                        let prepared = prepared.map_err(|err| {
1390                            RuntimeError::new(RuntimeErrorCode::PluginPrepareTurn, err.to_string())
1391                        })?;
1392                        self.mark_phase_end(RuntimeTurnPhase::BeforeTurnHooks);
1393                        break prepared;
1394                    }
1395                    maybe_event = event_rx.recv() => {
1396                        if let Some(event) = maybe_event {
1397                            emit_runtime_stream_event_to_sinks(
1398                                events,
1399                                turn_events,
1400                                event,
1401                                &mut assembler,
1402                            )
1403                            .await;
1404                        }
1405                    }
1406                }
1407            }
1408        };
1409        for event in &prepared.events {
1410            assembler.push(event);
1411        }
1412        emit_session_events_to_sink(events, prepared.events).await;
1413        if let Some(abort) = prepared.abort {
1414            drop(event_tx);
1415
1416            let mut turn_pipeline = TurnBoundary::from_state_with_clock(
1417                self.state.clone(),
1418                Arc::clone(&self.host.core.clock),
1419            );
1420            turn_pipeline.apply_prepared_messages(&prepared.messages);
1421            let state = turn_pipeline.into_final_state();
1422            let issue = TurnIssue {
1423                kind: "plugin".to_string(),
1424                code: Some(abort.code),
1425                terminal_reason: None,
1426                message: abort.message.clone(),
1427                raw: None,
1428            };
1429            let error_event = SessionEvent::Error {
1430                message: abort.message,
1431                envelope: Some(crate::session_model::ErrorEnvelope {
1432                    kind: "plugin".to_string(),
1433                    code: issue.code.clone(),
1434                    terminal_reason: None,
1435                    user_message: issue.message.clone(),
1436                    raw: None,
1437                }),
1438            };
1439            assembler.push(&error_event);
1440            emit_turn_activity_to_sink(
1441                turn_events,
1442                TurnActivity::independent(TurnEvent::Error {
1443                    message: issue.message.clone(),
1444                }),
1445            )
1446            .await;
1447            emit_session_event_to_sink(events, error_event).await;
1448            let outcome_event = SessionEvent::TurnOutcome {
1449                outcome: TurnOutcome::Stopped(TurnStop::PluginAbort),
1450            };
1451            assembler.push(&outcome_event);
1452            emit_session_event_to_sink(events, outcome_event).await;
1453            assembler.push(&SessionEvent::Done);
1454            emit_session_event_to_sink(events, SessionEvent::Done).await;
1455            return Ok(assembler.finish(
1456                state.to_snapshot(),
1457                cancel.is_cancelled(),
1458                Some(issue),
1459                &self.host.core.control.termination,
1460            ));
1461        }
1462        let mut turn_pipeline = TurnBoundary::from_state_with_clock(
1463            self.state.clone(),
1464            Arc::clone(&self.host.core.clock),
1465        )
1466        .with_session_execution_lease(session_execution_fence.clone());
1467        let store = self
1468            .session
1469            .as_ref()
1470            .and_then(|session| session.history_store());
1471        // Durable controllers, like Restate, own in-flight replay. Writing
1472        // progress checkpoints directly to the shared store would make handler
1473        // replay observe a newer partial turn and change effect replay keys.
1474        let progress_store = if scoped_effect_controller.controller().durability_tier()
1475            == crate::DurabilityTier::Durable
1476        {
1477            None
1478        } else {
1479            store.as_ref().map(|store| store.as_ref())
1480        };
1481        turn_pipeline
1482            .prepared_checkpoint(
1483                progress_store,
1484                turn_policy.clone(),
1485                turn_index,
1486                &prepared.messages,
1487                self.session.as_mut(),
1488            )
1489            .await
1490            .map_err(super::runtime_error_from_store_commit)?;
1491        let resolved_turn_policy = if let Some(provider) = turn_provider_override {
1492            RuntimeSessionPolicy::from_provider(
1493                turn_policy.clone(),
1494                provider.with_clock(Arc::clone(&self.host.core.clock)),
1495            )
1496            .map_err(|err| RuntimeError::new("llm_provider", err.to_string()))?
1497        } else {
1498            self.host
1499                .resolve_session_policy(&self.state.session_id, turn_policy.clone())
1500                .map_err(|err| RuntimeError::new("llm_provider", err.to_string()))?
1501        };
1502        let manager = self
1503            .runtime_session_services_for_turn(Some(child_usage_event_relay.clone()))
1504            .map_err(|err| {
1505                RuntimeError::new(RuntimeErrorCode::PluginSessionManager, err.to_string())
1506            })?;
1507        let cancel_state = cancel.clone();
1508        let finish_scoped_effect_controller = scoped_effect_controller.clone();
1509        let session = self
1510            .session
1511            .take()
1512            .expect("lash runtime session must be available");
1513        let mut driver = RuntimeTurnDriver {
1514            session,
1515            policy: resolved_turn_policy,
1516            host: self.host.clone(),
1517            turn_id: scoped_effect_controller.scope_id().to_string(),
1518            scoped_effect_controller,
1519            session_id: self.state.session_id.clone(),
1520            turn_index,
1521            turn_pipeline,
1522            llm_stream_summaries: HashMap::new(),
1523            next_llm_ordinal: 0,
1524            session_services: manager,
1525            protocol_turn_options: effective_protocol_turn_options,
1526            protocol_extension,
1527            turn_context,
1528            turn_causes: initial_turn_causes,
1529            pending_queue_claims: initial_queue_claim.into_iter().collect(),
1530            checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
1531            session_execution_lease: session_execution_fence,
1532            runtime_lease_owner: self.runtime_lease_owner.clone(),
1533            turn_phase_probe: self.turn_phase_probe.clone(),
1534        };
1535        let protocol_run_offset = 0;
1536        self.mark_phase_begin(RuntimeTurnPhase::EffectLoop);
1537        let run_result = drive_turn_to_completion(
1538            driver.run(prepared.messages, event_tx, cancel, protocol_run_offset),
1539            &mut event_rx,
1540            &mut assembler,
1541            &child_usage_event_relay,
1542            events,
1543            turn_events,
1544        )
1545        .await;
1546        let (new_messages, _new_protocol_iteration) = match run_result {
1547            Ok(result) => result,
1548            Err(err) => {
1549                self.mark_phase_end(RuntimeTurnPhase::EffectLoop);
1550                let RuntimeTurnDriver {
1551                    session,
1552                    pending_queue_claims,
1553                    ..
1554                } = driver;
1555                self.session = Some(session);
1556                self.abandon_queued_work_claims_after_lease_loss(&err, &pending_queue_claims)
1557                    .await;
1558                return Err(err);
1559            }
1560        };
1561        self.mark_phase_end(RuntimeTurnPhase::EffectLoop);
1562        tracing::debug!(
1563            new_message_count = new_messages.len(),
1564            tool_call_count = assembler.tool_calls.len(),
1565            "runtime post-run_task"
1566        );
1567
1568        let RuntimeTurnDriver {
1569            session,
1570            policy,
1571            turn_pipeline,
1572            pending_queue_claims,
1573            ..
1574        } = driver;
1575        self.session = Some(session);
1576        let pending_queue_claims_for_abandon = pending_queue_claims.clone();
1577        let finish_result = self
1578            .finish_turn(
1579                TurnFinishInput {
1580                    turn_pipeline,
1581                    assembler,
1582                    new_messages,
1583                    policy,
1584                    turn_index,
1585                    queued_work_completions: pending_queue_claims
1586                        .iter()
1587                        .map(crate::QueuedWorkClaim::completion)
1588                        .collect(),
1589                    trace_turn_id,
1590                },
1591                events,
1592                &finish_scoped_effect_controller,
1593                &cancel_state,
1594                session_execution_lease,
1595                session_execution_lease_release_policy,
1596            )
1597            .await;
1598        if let Err(err) = &finish_result {
1599            self.abandon_queued_work_claims_after_lease_loss(
1600                err,
1601                &pending_queue_claims_for_abandon,
1602            )
1603            .await;
1604        }
1605        finish_result
1606    }
1607    async fn normalize_input_items(
1608        &self,
1609        items: &[InputItem],
1610        image_blobs: &HashMap<String, Vec<u8>>,
1611    ) -> Result<Vec<NormalizedItem>, String> {
1612        normalize_input_items(
1613            items,
1614            image_blobs,
1615            self.host.core.durability.attachment_store.as_ref(),
1616        )
1617        .await
1618    }
1619}
1620
1621fn turn_input_from_text(text: String) -> TurnInput {
1622    TurnInput {
1623        items: vec![InputItem::Text { text }],
1624        image_blobs: HashMap::new(),
1625        protocol_turn_options: None,
1626        trace_turn_id: None,
1627        protocol_extension: None,
1628        turn_context: crate::TurnContext::default(),
1629    }
1630}
1631
1632fn agent_frame_follow_turn_id(root_turn_id: &str, completed_turn_count: usize) -> String {
1633    if completed_turn_count == 0 {
1634        root_turn_id.to_string()
1635    } else {
1636        format!("{root_turn_id}:agent-frame:{completed_turn_count}")
1637    }
1638}
1639
1640pub fn ensure_durable_effect_input(input: &TurnInput) -> Result<(), RuntimeError> {
1641    if input.protocol_extension.is_some() {
1642        return Err(RuntimeError::new(
1643            RuntimeErrorCode::DurableEffectLiveProtocolExtension,
1644            "durable effect hosts do not support live protocol_extension inputs; encode replayable data in protocol_turn_options or persisted plugin state",
1645        ));
1646    }
1647    input
1648        .turn_context
1649        .live_plugin_inputs()
1650        .durable_effect_rejection()?;
1651    Ok(())
1652}
1653
1654async fn emit_turn_activity_to_sink(events: &dyn TurnActivitySink, activity: TurnActivity) {
1655    if !events.is_noop() {
1656        events.emit(activity).await;
1657    }
1658}
1659
1660/// Pump the turn driver's event channel into the host sinks while the run
1661/// future executes, then drain any events emitted between completion and the
1662/// sender dropping.
1663///
1664/// Both the fresh and resumed turn entry points construct a
1665/// `RuntimeTurnDriver`, kick off its run future, and need identical
1666/// event-pump/drain behavior before tearing the driver down. Only the driver
1667/// construction and post-run teardown differ, so each caller owns those and
1668/// shares this loop.
1669async fn drive_turn_to_completion<F>(
1670    run_future: F,
1671    event_rx: &mut mpsc::Receiver<RuntimeStreamEvent>,
1672    assembler: &mut TurnAssembler,
1673    child_usage_event_relay: &ChildUsageEventRelay,
1674    events: &dyn EventSink,
1675    turn_events: &dyn TurnActivitySink,
1676) -> Result<(crate::MessageSequence, usize), RuntimeError>
1677where
1678    F: std::future::Future<Output = Result<(crate::MessageSequence, usize), RuntimeError>>,
1679{
1680    let run_result = {
1681        let mut run_future = Box::pin(run_future);
1682        loop {
1683            tokio::select! {
1684                maybe_event = event_rx.recv() => {
1685                    if let Some(event) = maybe_event {
1686                        emit_runtime_stream_event_to_sinks(
1687                            events,
1688                            turn_events,
1689                            event,
1690                            assembler,
1691                        )
1692                        .await;
1693                    }
1694                }
1695                completed = run_future.as_mut() => {
1696                    child_usage_event_relay.clear();
1697                    break completed;
1698                }
1699            }
1700        }
1701    };
1702    while let Some(event) = event_rx.recv().await {
1703        emit_runtime_stream_event_to_sinks(events, turn_events, event, assembler).await;
1704    }
1705    run_result
1706}
1707
1708async fn emit_runtime_stream_event_to_sinks(
1709    events: &dyn EventSink,
1710    turn_events: &dyn TurnActivitySink,
1711    event: RuntimeStreamEvent,
1712    assembler: &mut TurnAssembler,
1713) {
1714    match event {
1715        RuntimeStreamEvent::Session(event) => {
1716            assembler.push(&event);
1717            emit_session_event_to_sink(events, event).await;
1718        }
1719        RuntimeStreamEvent::Turn(activity) => {
1720            assembler.push_turn_activity(&activity);
1721            emit_turn_activity_to_sink(turn_events, activity).await;
1722        }
1723    }
1724}
1725
1726#[cfg(test)]
1727mod tests {
1728    use super::agent_frame_follow_turn_id;
1729
1730    #[test]
1731    fn agent_frame_follow_turn_ids_are_distinct_and_deterministic() {
1732        assert_eq!(agent_frame_follow_turn_id("root-turn", 0), "root-turn");
1733        assert_eq!(
1734            agent_frame_follow_turn_id("root-turn", 1),
1735            "root-turn:agent-frame:1"
1736        );
1737        assert_eq!(
1738            agent_frame_follow_turn_id("root-turn", 2),
1739            "root-turn:agent-frame:2"
1740        );
1741    }
1742}