Skip to main content

lash_core/runtime/
turn_loop.rs

1use super::*;
2
3fn trace_fields_from_outcome(
4    outcome: &TurnOutcome,
5) -> (
6    &'static str,
7    &'static str,
8    Option<lash_trace::TraceAgentFrameSwitch>,
9) {
10    match outcome {
11        TurnOutcome::Finished(TurnFinish::AssistantMessage { .. }) => {
12            ("completed", "assistant_message", None)
13        }
14        TurnOutcome::Finished(TurnFinish::SubmittedValue { .. }) => {
15            ("completed", "submitted_value", None)
16        }
17        TurnOutcome::Finished(TurnFinish::ToolValue { .. }) => ("completed", "tool_value", None),
18        TurnOutcome::AgentFrameSwitch { frame_id, .. } => (
19            "completed",
20            "agent_frame_switch",
21            Some(lash_trace::TraceAgentFrameSwitch {
22                frame_id: frame_id.clone(),
23            }),
24        ),
25        TurnOutcome::Stopped(stop) => ("failed", trace_stop_reason(stop), None),
26    }
27}
28
29fn trace_stop_reason(stop: &TurnStop) -> &'static str {
30    match stop {
31        TurnStop::Cancelled => "cancelled",
32        TurnStop::Incomplete => "incomplete",
33        TurnStop::InvalidInput => "invalid_input",
34        TurnStop::MaxTurns => "max_turns",
35        TurnStop::ToolFailure => "tool_failure",
36        TurnStop::ProviderError => "provider_error",
37        TurnStop::PluginAbort => "plugin_abort",
38        TurnStop::RuntimeError => "runtime_error",
39        TurnStop::SubmittedError { .. } => "submitted_error",
40        TurnStop::ToolError { .. } => "tool_error",
41    }
42}
43
44fn session_head_refresh_error(err: SessionError) -> RuntimeError {
45    RuntimeError::new(
46        RuntimeErrorCode::Other("session_head_refresh".to_string()),
47        err.to_string(),
48    )
49}
50
51#[derive(Clone, Copy)]
52enum SessionExecutionLeaseReleasePolicy {
53    FinalCommit,
54    KeepOnAgentFrameSwitch,
55}
56
57impl SessionExecutionLeaseReleasePolicy {
58    fn should_release(self, outcome: &TurnOutcome) -> bool {
59        match self {
60            Self::FinalCommit => true,
61            Self::KeepOnAgentFrameSwitch => {
62                !matches!(outcome, TurnOutcome::AgentFrameSwitch { .. })
63            }
64        }
65    }
66}
67
68fn queued_work_payload_type(payload: &crate::QueuedWorkPayload) -> &'static str {
69    match payload {
70        crate::QueuedWorkPayload::TurnInput { .. } => "turn_input",
71        crate::QueuedWorkPayload::ProcessWake { .. } => "process_wake",
72        crate::QueuedWorkPayload::SessionCommand { command } => command.kind(),
73    }
74}
75
76fn queued_work_batch_ids(claim: &crate::QueuedWorkClaim) -> Vec<String> {
77    claim
78        .batches
79        .iter()
80        .map(|batch| batch.batch_id.clone())
81        .collect()
82}
83
84fn turn_phase_id(parent_turn_id: &str, phase: &str) -> String {
85    format!("{parent_turn_id}:{phase}")
86}
87
88fn scoped_child_turn_controller<'run>(
89    scoped_effect_controller: &'run ScopedEffectController<'_>,
90    session_id: &str,
91    turn_id: &str,
92) -> Result<ScopedEffectController<'run>, RuntimeError> {
93    ScopedEffectController::borrowed(
94        scoped_effect_controller.controller(),
95        ExecutionScope::turn(session_id, turn_id),
96    )
97}
98
99pub(in crate::runtime) fn queued_work_trace_payload(
100    boundary: crate::QueuedWorkClaimBoundary,
101    claim: &crate::QueuedWorkClaim,
102    causes: &[crate::TurnCause],
103) -> serde_json::Value {
104    serde_json::json!({
105        "boundary": boundary,
106        "claim_id": claim.claim_id,
107        "owner_id": claim.owner.owner_id,
108        "incarnation_id": claim.owner.incarnation_id,
109        "batch_ids": queued_work_batch_ids(claim),
110        "payload_types": claim.batches.iter()
111            .flat_map(|batch| batch.items.iter())
112            .map(|item| queued_work_payload_type(&item.payload))
113            .collect::<Vec<_>>(),
114        "causes": causes,
115    })
116}
117
118pub(in crate::runtime) fn queued_work_completion_trace_payload(
119    completions: &[crate::QueuedWorkCompletion],
120) -> serde_json::Value {
121    serde_json::json!({
122        "claims": completions.iter().map(|completion| {
123            serde_json::json!({
124                "session_id": completion.session_id,
125                "claim_id": completion.claim_id,
126                "batch_ids": completion.batch_ids,
127            })
128        }).collect::<Vec<_>>(),
129    })
130}
131
132async fn emit_queued_work_started_to_sink(
133    events: &dyn TurnActivitySink,
134    boundary: crate::QueuedWorkClaimBoundary,
135    claim: &crate::QueuedWorkClaim,
136    causes: Vec<crate::TurnCause>,
137) {
138    emit_turn_activity_to_sink(
139        events,
140        TurnActivity::independent(TurnEvent::QueuedWorkStarted {
141            boundary,
142            batch_ids: queued_work_batch_ids(claim),
143            causes,
144        }),
145    )
146    .await;
147}
148
149pub(in crate::runtime) async fn send_queued_work_started_event(
150    event_tx: &mpsc::Sender<RuntimeStreamEvent>,
151    boundary: crate::QueuedWorkClaimBoundary,
152    claim: &crate::QueuedWorkClaim,
153    causes: Vec<crate::TurnCause>,
154) {
155    send_turn_activity(
156        event_tx,
157        TurnActivityId::fresh(),
158        TurnEvent::QueuedWorkStarted {
159            boundary,
160            batch_ids: queued_work_batch_ids(claim),
161            causes,
162        },
163    )
164    .await;
165}
166
167struct TurnFinishInput {
168    turn_pipeline: TurnBoundary,
169    assembler: TurnAssembler,
170    new_messages: crate::MessageSequence,
171    policy: RuntimeSessionPolicy,
172    turn_index: usize,
173    queued_work_completions: Vec<crate::QueuedWorkCompletion>,
174    trace_turn_id: String,
175}
176
177impl LashRuntime {
178    fn max_context_tokens(&self) -> usize {
179        self.state.effective_policy().context_window_tokens()
180    }
181
182    async fn claim_session_execution_lease(
183        &self,
184        cancel: CancellationToken,
185        busy_is_error: bool,
186    ) -> Result<Option<SessionExecutionLeaseGuard>, RuntimeError> {
187        let Some(store) = self
188            .session
189            .as_ref()
190            .and_then(|session| session.history_store())
191        else {
192            return Ok(None);
193        };
194        match SessionExecutionLeaseGuard::try_acquire(
195            store,
196            &self.state.session_id,
197            &self.runtime_lease_owner,
198            Arc::clone(&self.host.core.clock),
199            cancel,
200        )
201        .await
202        .map_err(|err| RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string()))?
203        {
204            Some(lease) => Ok(Some(lease)),
205            None if busy_is_error => Err(RuntimeError::new(
206                RuntimeErrorCode::SessionExecutionBusy,
207                format!(
208                    "session `{}` is already executing on another runtime owner",
209                    self.state.session_id
210                ),
211            )),
212            None => Ok(None),
213        }
214    }
215
216    async fn settle_session_execution_lease<T>(
217        &self,
218        guard: Option<&SessionExecutionLeaseGuard>,
219        result: Result<T, RuntimeError>,
220    ) -> Result<T, RuntimeError> {
221        match result {
222            Ok(value) => {
223                if let Some(guard) = guard {
224                    guard.release_if_live().await.map_err(|err| {
225                        RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
226                    })?;
227                }
228                Ok(value)
229            }
230            Err(err) => {
231                if err.code != RuntimeErrorCode::StoreCommitFailed
232                    && let Some(guard) = guard
233                    && let Err(release_err) = guard.release_if_live().await
234                {
235                    tracing::warn!(
236                        error = %release_err,
237                        "failed to release session execution lease after runtime error"
238                    );
239                }
240                Err(err)
241            }
242        }
243    }
244
245    async fn ensure_session_execution_lease_live(
246        &self,
247        guard: Option<&SessionExecutionLeaseGuard>,
248    ) -> Result<(), RuntimeError> {
249        let Some(guard) = guard else {
250            return Ok(());
251        };
252        guard.refresh_or_mark_lost().await.map_err(|err| {
253            RuntimeError::new(
254                RuntimeErrorCode::SessionExecutionLeaseLost,
255                format!(
256                    "session execution lease for session `{}` was lost before commit: {err}",
257                    self.state.session_id
258                ),
259            )
260        })
261    }
262
263    async fn abandon_queued_work_claims_after_lease_loss(
264        &self,
265        err: &RuntimeError,
266        claims: &[crate::QueuedWorkClaim],
267    ) {
268        if err.code != RuntimeErrorCode::SessionExecutionLeaseLost || claims.is_empty() {
269            return;
270        }
271        let Some(store) = self
272            .session
273            .as_ref()
274            .and_then(|session| session.history_store())
275        else {
276            return;
277        };
278        for claim in claims {
279            if let Err(abandon_err) = store.abandon_queued_work_claim(claim).await {
280                tracing::warn!(
281                    error = %abandon_err,
282                    session_id = %claim.session_id,
283                    claim_id = %claim.claim_id,
284                    "failed to abandon queued work claim after session execution lease loss"
285                );
286            }
287        }
288    }
289
290    #[doc(hidden)]
291    pub fn set_turn_phase_probe(&mut self, probe: Arc<dyn RuntimeTurnPhaseProbe>) {
292        self.turn_phase_probe = Some(probe);
293    }
294
295    fn mark_phase_begin(&self, phase: RuntimeTurnPhase) {
296        if let Some(probe) = self.turn_phase_probe.as_ref() {
297            probe.begin(phase);
298        }
299    }
300
301    fn mark_phase_end(&self, phase: RuntimeTurnPhase) {
302        if let Some(probe) = self.turn_phase_probe.as_ref() {
303            probe.end(phase);
304        }
305    }
306
307    async fn finish_turn(
308        &mut self,
309        finish: TurnFinishInput,
310        events: &dyn EventSink,
311        scoped_effect_controller: &ScopedEffectController<'_>,
312        cancel_state: &CancellationToken,
313        session_execution_lease: Option<&SessionExecutionLeaseGuard>,
314        session_execution_lease_release_policy: SessionExecutionLeaseReleasePolicy,
315    ) -> Result<AssembledTurn, RuntimeError> {
316        let TurnFinishInput {
317            mut turn_pipeline,
318            assembler,
319            new_messages,
320            policy,
321            turn_index,
322            queued_work_completions,
323            trace_turn_id,
324        } = finish;
325        self.policy = self.state.effective_policy().clone();
326        turn_pipeline.state_mut().policy = self.policy.clone();
327        turn_pipeline.state_mut().turn_index = turn_index;
328
329        let mut turn_usage_delta = {
330            let mut ledger = self.shared_token_ledger.lock().expect("token ledger lock");
331            std::mem::take(&mut *ledger)
332        };
333        if assembler.token_usage.total() > 0 || assembler.token_usage.cached_input_tokens > 0 {
334            turn_usage_delta.push(TokenLedgerEntry {
335                source: "turn".to_string(),
336                model: policy.model.id.clone(),
337                usage: assembler.token_usage.clone(),
338            });
339        }
340        let turn_usage_delta = merge_usage_delta_entries(turn_usage_delta);
341
342        turn_pipeline.finalize_turn_read_state(new_messages, cancel_state.is_cancelled());
343        if assembler.token_usage.total() > 0 || assembler.token_usage.cached_input_tokens > 0 {
344            turn_pipeline.state_mut().token_usage = assembler.token_usage.clone();
345        }
346
347        let last_prompt_usage = assembler
348            .last_llm_usage()
349            .and_then(|usage| normalize_prompt_usage(policy.provider(), usage));
350        turn_pipeline.state_mut().last_prompt_usage = last_prompt_usage;
351        let assembled_state = turn_pipeline.export_state_for_assembly();
352        let assembled = assembler.finish(
353            assembled_state,
354            cancel_state.is_cancelled(),
355            None,
356            &self.host.core.control.termination,
357        );
358
359        let Some(session) = self.session.as_ref() else {
360            self.state.apply_snapshot(&assembled.state);
361            self.emit_completed_turn_trace(&assembled.state, &assembled.outcome, &trace_turn_id);
362            return Ok(assembled);
363        };
364        self.ensure_session_execution_lease_live(session_execution_lease)
365            .await?;
366
367        let plugins = Arc::clone(session.plugins());
368        let manager = match self.runtime_session_services_for_turn(None) {
369            Ok(manager) => manager,
370            Err(err) => {
371                return Err(RuntimeError::new(
372                    RuntimeErrorCode::PluginSessionManager,
373                    err.to_string(),
374                ));
375            }
376        };
377
378        self.mark_phase_begin(RuntimeTurnPhase::FinalizeTurn);
379        let finalized = match plugins
380            .finalize_turn_with_phase_probe(
381                assembled,
382                manager.state_service(),
383                manager.lifecycle_service(),
384                manager.graph_service(),
385                self.turn_phase_probe.clone(),
386            )
387            .await
388        {
389            Ok(finalized) => finalized,
390            Err(err) => {
391                self.mark_phase_end(RuntimeTurnPhase::FinalizeTurn);
392                return Err(RuntimeError::new(
393                    RuntimeErrorCode::PluginFinalizeTurn,
394                    err.to_string(),
395                ));
396            }
397        };
398        self.mark_phase_end(RuntimeTurnPhase::FinalizeTurn);
399        self.ensure_session_execution_lease_live(session_execution_lease)
400            .await?;
401
402        let mut returned_turn = finalized.turn;
403        let release_session_execution_lease =
404            session_execution_lease_release_policy.should_release(&returned_turn.outcome);
405        self.mark_phase_begin(RuntimeTurnPhase::PersistTurn);
406        self.mark_phase_begin(RuntimeTurnPhase::FinalCommit);
407        let queued_work_completion_trace = queued_work_completions.clone();
408        let pending_attachment_ids = self
409            .host
410            .core
411            .durability
412            .attachment_store
413            .pending_manifest_commit_ids();
414        if let Err(err) = turn_pipeline
415            .final_commit(
416                &mut returned_turn,
417                self.session.as_mut(),
418                &turn_usage_delta,
419                Some(&trace_turn_id),
420                queued_work_completions,
421                pending_attachment_ids.clone(),
422                release_session_execution_lease
423                    .then(|| session_execution_lease.map(SessionExecutionLeaseGuard::completion))
424                    .flatten(),
425            )
426            .await
427        {
428            self.mark_phase_end(RuntimeTurnPhase::FinalCommit);
429            self.mark_phase_end(RuntimeTurnPhase::PersistTurn);
430            return Err(err);
431        }
432        if release_session_execution_lease && let Some(lease) = session_execution_lease {
433            lease.mark_released();
434        }
435        self.host
436            .core
437            .durability
438            .attachment_store
439            .mark_manifest_committed(&pending_attachment_ids);
440        self.mark_phase_end(RuntimeTurnPhase::FinalCommit);
441
442        emit_session_events_to_sink(events, finalized.events).await;
443        self.state = turn_pipeline.into_final_state();
444        if matches!(returned_turn.outcome, TurnOutcome::AgentFrameSwitch { .. })
445            && let Some(session) = self.session.as_mut()
446        {
447            let protocol_session = Arc::clone(session.plugins().protocol_session());
448            let session_id = self.state.session_id.clone();
449            protocol_session
450                .restore_session(
451                    crate::plugin::ProtocolSessionContext::new(session, &session_id),
452                    &self.state,
453                )
454                .await
455                .map_err(|err| {
456                    RuntimeError::new(
457                        RuntimeErrorCode::Other("protocol_restore_session".to_string()),
458                        err.to_string(),
459                    )
460                })?;
461        }
462        if !queued_work_completion_trace.is_empty() {
463            crate::trace::emit_trace(
464                &self.host.core.tracing.trace_sink,
465                &self.host.core.tracing.trace_context,
466                lash_trace::TraceContext::default()
467                    .for_session(returned_turn.state.session_id.clone())
468                    .for_turn_index(returned_turn.state.turn_index)
469                    .for_turn(trace_turn_id.clone()),
470                lash_trace::TraceEvent::Custom {
471                    name: "queued_work.completed".to_string(),
472                    payload: queued_work_completion_trace_payload(&queued_work_completion_trace),
473                },
474                self.host.core.clock.as_ref(),
475            );
476        }
477        self.mark_phase_begin(RuntimeTurnPhase::PostPersistHooks);
478        self.emit_turn_persisted_event(&returned_turn, scoped_effect_controller, &trace_turn_id)
479            .await?;
480        self.mark_phase_end(RuntimeTurnPhase::PostPersistHooks);
481        self.mark_phase_end(RuntimeTurnPhase::PersistTurn);
482
483        self.emit_completed_turn_trace(
484            &returned_turn.state,
485            &returned_turn.outcome,
486            &trace_turn_id,
487        );
488        Ok(returned_turn)
489    }
490
491    fn emit_completed_turn_trace(
492        &self,
493        state: &SessionSnapshot,
494        outcome: &TurnOutcome,
495        trace_turn_id: &str,
496    ) {
497        if self.host.core.tracing.trace_sink.is_none() {
498            return;
499        }
500
501        let (status, done_reason, agent_frame_switch) = trace_fields_from_outcome(outcome);
502        crate::trace::emit_trace(
503            &self.host.core.tracing.trace_sink,
504            &self.host.core.tracing.trace_context,
505            lash_trace::TraceContext::default()
506                .for_session(state.session_id.clone())
507                .for_turn_index(state.turn_index)
508                .for_turn(trace_turn_id.to_string()),
509            lash_trace::TraceEvent::TurnCompleted {
510                status: status.to_string(),
511                done_reason: done_reason.to_string(),
512                agent_frame_switch,
513            },
514            self.host.core.clock.as_ref(),
515        );
516    }
517
518    async fn emit_turn_persisted_event(
519        &self,
520        returned_turn: &AssembledTurn,
521        scoped_effect_controller: &ScopedEffectController<'_>,
522        trace_turn_id: &str,
523    ) -> Result<(), RuntimeError> {
524        let Some(session) = self.session.as_ref() else {
525            return Ok(());
526        };
527        let Ok(manager) = self.runtime_session_services() else {
528            return Ok(());
529        };
530        let phase_turn_id = turn_phase_id(trace_turn_id, "turn-persisted");
531        let phase_controller = scoped_child_turn_controller(
532            scoped_effect_controller,
533            &self.state.session_id,
534            &phase_turn_id,
535        )?;
536        let direct_completions = manager.direct_completion_client(
537            RuntimeEffectControllerHandle::borrowed(phase_controller),
538            Some(phase_turn_id),
539        );
540
541        session
542            .plugins()
543            .emit_runtime_event_with_phase_probe(
544                crate::PluginLifecycleEvent::TurnPersisted(Box::new(
545                    crate::SessionStateChangedContext {
546                        session_id: self.state.session_id.clone(),
547                        state: crate::SessionReadView::from_snapshot(&returned_turn.state),
548                        sessions: manager.state_service(),
549                        session_graph: manager.graph_service(),
550                        direct_completions,
551                    },
552                )),
553                self.turn_phase_probe.clone(),
554            )
555            .await;
556        Ok(())
557    }
558
559    /// Run a single turn and stream events to the host sink.
560    pub async fn stream_turn(
561        &mut self,
562        input: TurnInput,
563        opts: TurnOptions<'_>,
564    ) -> Result<AssembledTurn, RuntimeError> {
565        let cancel = opts.cancel.clone();
566        let session_execution_lease = self
567            .claim_session_execution_lease(cancel.clone(), true)
568            .await?;
569        let scoped_effect_controller = opts.scoped_effect_controller();
570        let result = self
571            .stream_turn_with_scoped_effect_controller_inner(
572                input,
573                opts.events_or_noop(),
574                opts.turn_events_or_noop(),
575                scoped_effect_controller,
576                cancel,
577                None,
578                session_execution_lease.as_ref(),
579                SessionExecutionLeaseReleasePolicy::FinalCommit,
580            )
581            .await;
582        self.settle_session_execution_lease(session_execution_lease.as_ref(), result)
583            .await
584    }
585
586    pub async fn stream_next_queued_work(
587        &mut self,
588        opts: TurnOptions<'_>,
589    ) -> Result<Option<AssembledTurn>, RuntimeError> {
590        self.stream_queued_work(opts, None).await
591    }
592
593    pub async fn stream_selected_queued_work(
594        &mut self,
595        opts: TurnOptions<'_>,
596        batch_ids: &[String],
597    ) -> Result<Option<AssembledTurn>, RuntimeError> {
598        self.stream_queued_work(opts, Some(batch_ids)).await
599    }
600
601    async fn stream_queued_work(
602        &mut self,
603        opts: TurnOptions<'_>,
604        selected_batch_ids: Option<&[String]>,
605    ) -> Result<Option<AssembledTurn>, RuntimeError> {
606        let cancel = opts.cancel.clone();
607        let Some(session_execution_lease) = self
608            .claim_session_execution_lease(cancel.clone(), false)
609            .await?
610        else {
611            return Ok(None);
612        };
613        let session_execution_fence = session_execution_lease.fence();
614        if self
615            .drain_next_session_command(&session_execution_fence)
616            .await?
617            .is_some()
618        {
619            session_execution_lease
620                .release_if_live()
621                .await
622                .map_err(|err| {
623                    RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
624                })?;
625            return Ok(None);
626        }
627        let Some(store) = self
628            .session
629            .as_ref()
630            .and_then(|session| session.history_store())
631        else {
632            return Ok(None);
633        };
634        let claim = if let Some(batch_ids) = selected_batch_ids {
635            store
636                .claim_ready_queued_work_by_batch_ids(
637                    &self.state.session_id,
638                    &session_execution_fence,
639                    &self.runtime_lease_owner,
640                    crate::QueuedWorkClaimBoundary::Idle,
641                    crate::QUEUED_WORK_CLAIM_TTL_MS,
642                    batch_ids,
643                )
644                .await
645        } else {
646            store
647                .claim_ready_queued_work(
648                    &self.state.session_id,
649                    &session_execution_fence,
650                    &self.runtime_lease_owner,
651                    crate::QueuedWorkClaimBoundary::Idle,
652                    crate::QUEUED_WORK_CLAIM_TTL_MS,
653                    64,
654                )
655                .await
656        }
657        .map_err(super::runtime_error_from_store_commit)?;
658        let Some(claim) = claim else {
659            session_execution_lease
660                .release_if_live()
661                .await
662                .map_err(|err| {
663                    RuntimeError::new(RuntimeErrorCode::StoreCommitFailed, err.to_string())
664                })?;
665            return Ok(None);
666        };
667        let mut work = claim.materialize_for_turn();
668        let turn_id = work
669            .input
670            .trace_turn_id
671            .clone()
672            .or_else(|| Some(opts.execution_scope_id().to_owned()))
673            .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
674        work.input.trace_turn_id = Some(turn_id.clone());
675        let causes = work.turn_causes.clone();
676        emit_queued_work_started_to_sink(
677            opts.turn_events_or_noop(),
678            crate::QueuedWorkClaimBoundary::Idle,
679            &claim,
680            causes.clone(),
681        )
682        .await;
683        crate::trace::emit_trace(
684            &self.host.core.tracing.trace_sink,
685            &self.host.core.tracing.trace_context,
686            lash_trace::TraceContext::default()
687                .for_session(self.state.session_id.clone())
688                .for_turn_index(self.state.turn_index + 1)
689                .for_turn(turn_id.clone()),
690            lash_trace::TraceEvent::Custom {
691                name: "queued_work.claimed".to_string(),
692                payload: queued_work_trace_payload(
693                    crate::QueuedWorkClaimBoundary::Idle,
694                    &claim,
695                    &causes,
696                ),
697            },
698            self.host.core.clock.as_ref(),
699        );
700        let claim_for_abandon = claim.clone();
701        let scoped_effect_controller = opts.scoped_effect_controller();
702        let result = self
703            .stream_turn_with_scoped_effect_controller_inner(
704                work.input,
705                opts.events_or_noop(),
706                opts.turn_events_or_noop(),
707                scoped_effect_controller,
708                cancel,
709                Some(claim),
710                Some(&session_execution_lease),
711                SessionExecutionLeaseReleasePolicy::FinalCommit,
712            )
713            .await
714            .map(Some);
715        if let Err(err) = &result {
716            self.abandon_queued_work_claims_after_lease_loss(
717                err,
718                std::slice::from_ref(&claim_for_abandon),
719            )
720            .await;
721        }
722        self.settle_session_execution_lease(Some(&session_execution_lease), result)
723            .await
724    }
725
726    /// Enforce the durable-first wiring invariant at a turn-scope boundary: when
727    /// the host wired a durable effect host, every store reachable from this
728    /// scope must also be durable. A durable host running against any ephemeral
729    /// store fails loudly here rather than silently degrading.
730    ///
731    /// Inline controllers (the default tier) impose no requirement, so
732    /// inline/in-memory hosts pass unchanged.
733    fn ensure_durable_store_facets_for_scope(
734        &self,
735        scoped_effect_controller: &ScopedEffectController<'_>,
736    ) -> Result<(), RuntimeError> {
737        if scoped_effect_controller.controller().durability_tier() != crate::DurabilityTier::Durable
738        {
739            return Ok(());
740        }
741        if self
742            .host
743            .core
744            .durability
745            .attachment_store
746            .persistence()
747            .durability_tier()
748            != crate::DurabilityTier::Durable
749        {
750            return Err(RuntimeError::durable_store_required(
751                crate::DurableStoreFacet::AttachmentStore,
752            ));
753        }
754        if self
755            .host
756            .core
757            .durability
758            .process_env_store
759            .durability_tier()
760            != crate::DurabilityTier::Durable
761        {
762            return Err(RuntimeError::durable_store_required(
763                crate::DurableStoreFacet::ProcessEnvStore,
764            ));
765        }
766        if let Some(store) = self
767            .session
768            .as_ref()
769            .and_then(|session| session.history_store())
770            && store.durability_tier() != crate::DurabilityTier::Durable
771        {
772            return Err(RuntimeError::durable_store_required(
773                crate::DurableStoreFacet::SessionStore,
774            ));
775        }
776        if let Some(process_registry) = self.host.process_registry.as_ref()
777            && process_registry.durability_tier() != crate::DurabilityTier::Durable
778        {
779            return Err(RuntimeError::durable_store_required(
780                crate::DurableStoreFacet::ProcessRegistry,
781            ));
782        }
783        Ok(())
784    }
785
786    #[allow(clippy::too_many_arguments)]
787    async fn stream_turn_with_scoped_effect_controller_inner(
788        &mut self,
789        mut input: TurnInput,
790        events: &dyn EventSink,
791        turn_events: &dyn TurnActivitySink,
792        scoped_effect_controller: ScopedEffectController<'_>,
793        cancel: CancellationToken,
794        queued_claim: Option<crate::QueuedWorkClaim>,
795        session_execution_lease: Option<&SessionExecutionLeaseGuard>,
796        session_execution_lease_release_policy: SessionExecutionLeaseReleasePolicy,
797    ) -> Result<AssembledTurn, RuntimeError> {
798        if queued_claim.is_none() {
799            if let Some(lease) = session_execution_lease {
800                while self
801                    .drain_next_session_command(&lease.fence())
802                    .await?
803                    .is_some()
804                {}
805            } else if self
806                .session
807                .as_ref()
808                .and_then(|session| session.history_store())
809                .is_some()
810            {
811                return Err(RuntimeError::new(
812                    RuntimeErrorCode::StoreCommitFailed,
813                    "session command drain requires a session execution lease",
814                ));
815            }
816        }
817        if let Some(input_turn_id) = input.trace_turn_id.as_deref()
818            && scoped_effect_controller
819                .execution_scope()
820                .validates_turn_trace_id()
821            && input_turn_id != scoped_effect_controller.scope_id()
822        {
823            return Err(RuntimeError::new(
824                RuntimeErrorCode::ExecutionScopeTurnIdMismatch,
825                format!(
826                    "input trace_turn_id `{input_turn_id}` does not match execution scope id `{}`",
827                    scoped_effect_controller.scope_id()
828                ),
829            ));
830        }
831        self.ensure_durable_store_facets_for_scope(&scoped_effect_controller)?;
832        input
833            .trace_turn_id
834            .get_or_insert_with(|| scoped_effect_controller.scope_id().to_string());
835        self.stream_turn_inner(
836            input.clone(),
837            events,
838            turn_events,
839            scoped_effect_controller,
840            cancel.clone(),
841            queued_claim,
842            session_execution_lease,
843            session_execution_lease_release_policy,
844        )
845        .await
846    }
847
848    /// Stream one logical host turn, following foreground AgentFrame switches
849    /// until a terminal outcome is reached.
850    ///
851    /// A protocol continuation creates a new frame in the same session. Hosts
852    /// that only care about the benchmark/app answer should not need to
853    /// special-case that intermediate outcome; this helper keeps driving the
854    /// same session through each frame's task with the normal runtime turn
855    /// guards.
856    pub async fn stream_turn_with_agent_frames(
857        &mut self,
858        input: TurnInput,
859        opts: TurnOptions<'_>,
860    ) -> Result<AgentFrameRun, RuntimeError> {
861        let cancel = opts.cancel.clone();
862        let session_execution_lease = self
863            .claim_session_execution_lease(cancel.clone(), true)
864            .await?;
865        let scoped_effect_controller = opts.scoped_effect_controller();
866        let result = self
867            .stream_turn_with_agent_frames_inner(
868                input,
869                opts.events_or_noop(),
870                opts.turn_events_or_noop(),
871                scoped_effect_controller,
872                cancel,
873                session_execution_lease.as_ref(),
874            )
875            .await;
876        self.settle_session_execution_lease(session_execution_lease.as_ref(), result)
877            .await
878    }
879
880    async fn stream_turn_with_agent_frames_inner(
881        &mut self,
882        mut input: TurnInput,
883        events: &dyn EventSink,
884        turn_events: &dyn TurnActivitySink,
885        scoped_effect_controller: ScopedEffectController<'_>,
886        cancel: CancellationToken,
887        session_execution_lease: Option<&SessionExecutionLeaseGuard>,
888    ) -> Result<AgentFrameRun, RuntimeError> {
889        if let Some(input_turn_id) = input.trace_turn_id.as_deref()
890            && scoped_effect_controller
891                .execution_scope()
892                .validates_turn_trace_id()
893            && input_turn_id != scoped_effect_controller.scope_id()
894        {
895            return Err(RuntimeError::new(
896                RuntimeErrorCode::ExecutionScopeTurnIdMismatch,
897                format!(
898                    "input trace_turn_id `{input_turn_id}` does not match execution scope id `{}`",
899                    scoped_effect_controller.scope_id()
900                ),
901            ));
902        }
903        let follow_protocol_turn_options = input.protocol_turn_options.clone();
904        let follow_turn_context = input.turn_context.clone();
905        let follow_trace_turn_id = input
906            .trace_turn_id
907            .clone()
908            .unwrap_or_else(|| scoped_effect_controller.scope_id().to_string());
909        input
910            .trace_turn_id
911            .get_or_insert(follow_trace_turn_id.clone());
912        let mut turns = Vec::new();
913        loop {
914            let turn_trace_turn_id = agent_frame_follow_turn_id(&follow_trace_turn_id, turns.len());
915            input.trace_turn_id = Some(turn_trace_turn_id.clone());
916            let turn_effect_controller = if turns.is_empty() {
917                scoped_effect_controller.clone()
918            } else {
919                ScopedEffectController::borrowed(
920                    scoped_effect_controller.controller(),
921                    ExecutionScope::turn(&self.state.session_id, &turn_trace_turn_id),
922                )?
923            };
924            let turn = self
925                .stream_turn_with_scoped_effect_controller_inner(
926                    input,
927                    events,
928                    turn_events,
929                    turn_effect_controller,
930                    cancel.clone(),
931                    None,
932                    session_execution_lease,
933                    SessionExecutionLeaseReleasePolicy::KeepOnAgentFrameSwitch,
934                )
935                .await?;
936            let switched_frame = match &turn.outcome {
937                TurnOutcome::AgentFrameSwitch { frame_id, task } => {
938                    Some((frame_id.clone(), task.clone()))
939                }
940                _ => None,
941            };
942            turns.push(turn);
943
944            let Some((_frame_id, task)) = switched_frame else {
945                return Ok(AgentFrameRun { turns });
946            };
947            input = turn_input_from_text(task);
948            input.protocol_turn_options = follow_protocol_turn_options.clone();
949            input.turn_context = follow_turn_context.clone();
950        }
951    }
952
953    #[allow(clippy::too_many_arguments)]
954    async fn stream_turn_inner(
955        &mut self,
956        mut input: TurnInput,
957        events: &dyn EventSink,
958        turn_events: &dyn TurnActivitySink,
959        scoped_effect_controller: ScopedEffectController<'_>,
960        cancel: CancellationToken,
961        queued_claim: Option<crate::QueuedWorkClaim>,
962        session_execution_lease: Option<&SessionExecutionLeaseGuard>,
963        session_execution_lease_release_policy: SessionExecutionLeaseReleasePolicy,
964    ) -> Result<AssembledTurn, RuntimeError> {
965        self.refresh_session_graph_from_store()
966            .await
967            .map_err(session_head_refresh_error)?;
968        let input_trace_turn_id = input.trace_turn_id.clone();
969        let queued_turn_work = queued_claim
970            .as_ref()
971            .map(crate::QueuedWorkClaim::materialize_for_turn);
972        if let Some(work) = queued_turn_work.as_ref()
973            && input.items.is_empty()
974            && input.image_blobs.is_empty()
975        {
976            input = work.input.clone();
977            if input.trace_turn_id.is_none() {
978                input.trace_turn_id = input_trace_turn_id;
979            }
980        }
981        if self
982            .session
983            .as_ref()
984            .and_then(|session| session.history_store())
985            .is_some()
986        {
987            ensure_durable_effect_input(&input)?;
988        }
989        if let Some(extension) = &input.protocol_extension
990            && let Some(session) = self.session.as_ref()
991        {
992            let protocol_session = std::sync::Arc::clone(session.plugins().protocol_session());
993            protocol_session
994                .validate_turn_extension(extension)
995                .await
996                .map_err(|err| {
997                    RuntimeError::new(RuntimeErrorCode::ProtocolTurnExtension, err.to_string())
998                })?;
999        }
1000        let previous_prompt_usage = self.state.last_prompt_usage.clone();
1001        let normalized = match self
1002            .normalize_input_items(&input.items, &input.image_blobs)
1003            .await
1004        {
1005            Ok(items) => items,
1006            Err(e) => {
1007                self.state.last_prompt_usage = None;
1008                let mut assembler = TurnAssembler::default();
1009                let error_event = SessionEvent::Error {
1010                    message: e.clone(),
1011                    envelope: Some(crate::session_model::ErrorEnvelope {
1012                        kind: "input_validation".to_string(),
1013                        code: Some("invalid_turn_input".to_string()),
1014                        terminal_reason: None,
1015                        user_message: e.clone(),
1016                        raw: None,
1017                    }),
1018                };
1019                assembler.push(&error_event);
1020                emit_turn_activity_to_sink(
1021                    turn_events,
1022                    TurnActivity::independent(TurnEvent::Error { message: e }),
1023                )
1024                .await;
1025                emit_session_event_to_sink(events, error_event).await;
1026                let outcome_event = SessionEvent::TurnOutcome {
1027                    outcome: TurnOutcome::Stopped(TurnStop::InvalidInput),
1028                };
1029                assembler.push(&outcome_event);
1030                emit_session_event_to_sink(events, outcome_event).await;
1031                assembler.push(&SessionEvent::Done);
1032                emit_session_event_to_sink(events, SessionEvent::Done).await;
1033                return Ok(assembler.finish(
1034                    self.state.to_snapshot(),
1035                    false,
1036                    None,
1037                    &self.host.core.control.termination,
1038                ));
1039            }
1040        };
1041        let turn_index = self.state.turn_index + 1;
1042        let trace_turn_id = input
1043            .trace_turn_id
1044            .clone()
1045            .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
1046        if self.host.core.tracing.trace_sink.is_some() {
1047            let mut trace_metadata = std::collections::BTreeMap::new();
1048            trace_metadata.insert(
1049                "input_item_count".to_string(),
1050                serde_json::json!(normalized.len()),
1051            );
1052            crate::trace::emit_trace(
1053                &self.host.core.tracing.trace_sink,
1054                &self.host.core.tracing.trace_context,
1055                lash_trace::TraceContext::default()
1056                    .for_session(self.state.session_id.clone())
1057                    .for_turn_index(turn_index)
1058                    .for_turn(trace_turn_id.clone()),
1059                lash_trace::TraceEvent::TurnStarted {
1060                    metadata: trace_metadata,
1061                },
1062                self.host.core.clock.as_ref(),
1063            );
1064        }
1065
1066        let base_read_model = self.state.read_model();
1067        let base_messages = base_read_model.messages;
1068        let base_render_cache = base_read_model.prompt_render_cache;
1069        let mut turn_delta = Vec::new();
1070        let initial_turn_causes = queued_turn_work
1071            .as_ref()
1072            .map(|work| work.turn_causes.clone())
1073            .unwrap_or_default();
1074        turn_delta.extend(
1075            initial_turn_causes
1076                .iter()
1077                .map(crate::TurnCause::to_event_message),
1078        );
1079
1080        let user_id = fresh_message_id();
1081        let mut user_parts: Vec<Part> = Vec::new();
1082        for item in normalized {
1083            match item {
1084                NormalizedItem::Text(text) => {
1085                    if text.is_empty() {
1086                        continue;
1087                    }
1088                    user_parts.push(Part {
1089                        id: format!("{}.p{}", user_id, user_parts.len()),
1090                        kind: PartKind::Text,
1091                        content: text,
1092                        attachment: None,
1093                        tool_call_id: None,
1094                        tool_name: None,
1095                        tool_replay: None,
1096                        prune_state: PruneState::Intact,
1097                        reasoning_meta: None,
1098                        response_meta: None,
1099                    });
1100                }
1101                NormalizedItem::Image(reference) => {
1102                    user_parts.push(Part {
1103                        id: format!("{}.p{}", user_id, user_parts.len()),
1104                        kind: PartKind::Image,
1105                        content: String::new(),
1106                        attachment: Some(crate::session_model::message::PartAttachment {
1107                            reference,
1108                        }),
1109                        tool_call_id: None,
1110                        tool_name: None,
1111                        tool_replay: None,
1112                        prune_state: PruneState::Intact,
1113                        reasoning_meta: None,
1114                        response_meta: None,
1115                    });
1116                }
1117            }
1118        }
1119        if user_parts.is_empty() && initial_turn_causes.is_empty() {
1120            user_parts.push(Part {
1121                id: format!("{}.p0", user_id),
1122                kind: PartKind::Text,
1123                content: String::new(),
1124                attachment: None,
1125                tool_call_id: None,
1126                tool_name: None,
1127                tool_replay: None,
1128                prune_state: PruneState::Intact,
1129                reasoning_meta: None,
1130                response_meta: None,
1131            });
1132        }
1133        if !user_parts.is_empty() {
1134            reassign_part_ids(&user_id, &mut user_parts);
1135            turn_delta.push(Message {
1136                id: user_id.clone(),
1137                role: MessageRole::User,
1138                parts: shared_parts(user_parts),
1139                origin: None,
1140            });
1141        }
1142
1143        let manager = self
1144            .runtime_session_services_for_turn(None)
1145            .map_err(|err| {
1146                RuntimeError::new(RuntimeErrorCode::PluginSessionManager, err.to_string())
1147            })?;
1148        let plugin_session = self
1149            .session
1150            .as_ref()
1151            .map(|s| Arc::clone(s.plugins()))
1152            .ok_or_else(|| {
1153                RuntimeError::new(
1154                    RuntimeErrorCode::ContextPrepareTurn,
1155                    "runtime session not available",
1156                )
1157            })?;
1158        let prepare_phase_turn_id = turn_phase_id(&trace_turn_id, "prepare-turn");
1159        let prepare_phase_controller = scoped_child_turn_controller(
1160            &scoped_effect_controller,
1161            &self.state.session_id,
1162            &prepare_phase_turn_id,
1163        )?;
1164        let turn_ctx = crate::TurnTransformContext {
1165            session_id: self.state.session_id.clone(),
1166            state: self.read_view(),
1167            prompt_usage: previous_prompt_usage.clone(),
1168            max_context_tokens: Some(LashRuntime::max_context_tokens(self)),
1169            sessions: manager.state_service(),
1170            session_lifecycle: manager.lifecycle_service(),
1171            session_graph: manager.graph_service(),
1172            scoped_effect_controller: scoped_effect_controller.clone(),
1173            direct_completions: manager.direct_completion_client(
1174                RuntimeEffectControllerHandle::borrowed(prepare_phase_controller),
1175                Some(prepare_phase_turn_id),
1176            ),
1177        };
1178        self.mark_phase_begin(RuntimeTurnPhase::ContextTransform);
1179        let prepared_context = plugin_session
1180            .prepare_turn_context(
1181                &turn_ctx,
1182                crate::session_model::context::PreparedContext {
1183                    messages: crate::MessageSequence::from_base_and_delta(
1184                        base_messages,
1185                        turn_delta,
1186                    )
1187                    .with_base_render_cache(base_render_cache),
1188                    ..Default::default()
1189                },
1190                self.turn_phase_probe.clone(),
1191            )
1192            .await
1193            .map_err(|err| {
1194                RuntimeError::new(RuntimeErrorCode::ContextPrepareTurn, err.to_string())
1195            })?;
1196        self.mark_phase_end(RuntimeTurnPhase::ContextTransform);
1197        // Release the read-view's graph clone before the rest of the turn
1198        // runs. Keeping it alive into `stream_prepared_turn` forces the
1199        // post-turn `append_active_read_delta` to deep-clone the session
1200        // graph (Arc::make_mut with refcount > 1).
1201        drop(turn_ctx);
1202        let messages = prepared_context.messages;
1203        if let Some(session) = self.session.as_mut() {
1204            session
1205                .set_context_overlay(
1206                    prepared_context.tool_providers,
1207                    prepared_context.prompt_contributions,
1208                    prepared_context.include_base_tools,
1209                )
1210                .map_err(|err| {
1211                    RuntimeError::new(
1212                        RuntimeErrorCode::Other("session_tool_registry".to_string()),
1213                        err.to_string(),
1214                    )
1215                })?;
1216        }
1217
1218        self.state.last_prompt_usage = None;
1219        Box::pin(self.stream_prepared_turn_inner(
1220            messages,
1221            previous_prompt_usage,
1222            input.protocol_turn_options.clone(),
1223            input.protocol_extension.clone(),
1224            input.turn_context.clone(),
1225            initial_turn_causes,
1226            trace_turn_id,
1227            turn_index,
1228            events,
1229            turn_events,
1230            scoped_effect_controller,
1231            cancel,
1232            queued_claim,
1233            session_execution_lease,
1234            session_execution_lease_release_policy,
1235        ))
1236        .await
1237    }
1238
1239    /// Run a single turn and return only the assembled terminal result.
1240    pub async fn run_turn_assembled(
1241        &mut self,
1242        input: TurnInput,
1243        cancel: CancellationToken,
1244        scoped_effect_controller: ScopedEffectController<'_>,
1245    ) -> Result<AssembledTurn, RuntimeError> {
1246        self.stream_turn(input, TurnOptions::new(cancel, scoped_effect_controller))
1247            .await
1248    }
1249
1250    /// Run a turn using host-prepared message history.
1251    #[allow(clippy::too_many_arguments)]
1252    pub async fn stream_prepared_turn(
1253        &mut self,
1254        messages: crate::MessageSequence,
1255        previous_prompt_usage: Option<PromptUsage>,
1256        protocol_turn_options: Option<crate::ProtocolTurnOptions>,
1257        protocol_extension: Option<crate::ProtocolTurnExtensionHandle>,
1258        turn_context: crate::TurnContext,
1259        initial_turn_causes: Vec<crate::TurnCause>,
1260        trace_turn_id: String,
1261        turn_index: usize,
1262        events: &dyn EventSink,
1263        turn_events: &dyn TurnActivitySink,
1264        scoped_effect_controller: ScopedEffectController<'_>,
1265        cancel: CancellationToken,
1266        initial_queue_claim: Option<crate::QueuedWorkClaim>,
1267    ) -> Result<AssembledTurn, RuntimeError> {
1268        let session_execution_lease = self
1269            .claim_session_execution_lease(cancel.clone(), true)
1270            .await?;
1271        let result = self
1272            .stream_prepared_turn_inner(
1273                messages,
1274                previous_prompt_usage,
1275                protocol_turn_options,
1276                protocol_extension,
1277                turn_context,
1278                initial_turn_causes,
1279                trace_turn_id,
1280                turn_index,
1281                events,
1282                turn_events,
1283                scoped_effect_controller,
1284                cancel,
1285                initial_queue_claim,
1286                session_execution_lease.as_ref(),
1287                SessionExecutionLeaseReleasePolicy::FinalCommit,
1288            )
1289            .await;
1290        self.settle_session_execution_lease(session_execution_lease.as_ref(), result)
1291            .await
1292    }
1293
1294    #[allow(clippy::too_many_arguments)]
1295    async fn stream_prepared_turn_inner(
1296        &mut self,
1297        messages: crate::MessageSequence,
1298        _previous_prompt_usage: Option<PromptUsage>,
1299        protocol_turn_options: Option<crate::ProtocolTurnOptions>,
1300        protocol_extension: Option<crate::ProtocolTurnExtensionHandle>,
1301        turn_context: crate::TurnContext,
1302        initial_turn_causes: Vec<crate::TurnCause>,
1303        trace_turn_id: String,
1304        turn_index: usize,
1305        events: &dyn EventSink,
1306        turn_events: &dyn TurnActivitySink,
1307        scoped_effect_controller: ScopedEffectController<'_>,
1308        cancel: CancellationToken,
1309        initial_queue_claim: Option<crate::QueuedWorkClaim>,
1310        session_execution_lease: Option<&SessionExecutionLeaseGuard>,
1311        session_execution_lease_release_policy: SessionExecutionLeaseReleasePolicy,
1312    ) -> Result<AssembledTurn, RuntimeError> {
1313        if session_execution_lease.is_none()
1314            && self
1315                .session
1316                .as_ref()
1317                .and_then(|session| session.history_store())
1318                .is_some()
1319        {
1320            return Err(RuntimeError::new(
1321                RuntimeErrorCode::StoreCommitFailed,
1322                "prepared turn requires a session execution lease",
1323            ));
1324        }
1325        let session_execution_fence =
1326            session_execution_lease.map(SessionExecutionLeaseGuard::fence);
1327        let (event_tx, mut event_rx) = mpsc::channel::<RuntimeStreamEvent>(100);
1328        let child_usage_event_relay = ChildUsageEventRelay::new(event_tx.clone());
1329        let mut turn_policy = self.state.effective_policy().clone();
1330        let turn_provider_override = turn_context.provider().cloned();
1331        if let Some(provider) = turn_provider_override.as_ref() {
1332            turn_policy.provider_id = provider.kind().to_string();
1333        }
1334        if let Some(model) = turn_context.model_spec() {
1335            turn_policy.model = model.clone();
1336        }
1337        let session_protocol_turn_options = self.state.effective_protocol_turn_options().clone();
1338        let effective_protocol_turn_options = protocol_turn_options
1339            .clone()
1340            .map(|options| session_protocol_turn_options.merged_with_override(&options))
1341            .unwrap_or(session_protocol_turn_options);
1342        let manager = self
1343            .runtime_session_services_for_turn(Some(child_usage_event_relay.clone()))
1344            .map_err(|err| {
1345                RuntimeError::new(RuntimeErrorCode::PluginSessionManager, err.to_string())
1346            })?;
1347        let plugins = {
1348            let session = self
1349                .session
1350                .as_ref()
1351                .expect("lash runtime session must be available");
1352            Arc::clone(session.plugins())
1353        };
1354        let mut assembler = TurnAssembler::new();
1355        self.mark_phase_begin(RuntimeTurnPhase::BeforeTurnHooks);
1356        // Block-scope the pinned future so it (and its captured
1357        // `SessionReadView` clone of the session graph) drops before the
1358        // post-turn `append_active_read_delta` mutation. Keeping it alive
1359        // across the turn forces `Arc::make_mut` to deep-clone
1360        // `SessionGraphData`.
1361        let prepared = {
1362            let prepare_turn = plugins.prepare_turn_with_phase_probe(
1363                PrepareTurnRequest {
1364                    session_id: self.state.session_id.clone(),
1365                    state: crate::SessionReadView::from_runtime_state(
1366                        &self.state,
1367                        turn_policy.clone(),
1368                        effective_protocol_turn_options.clone(),
1369                    ),
1370                    messages,
1371                    sessions: manager.state_service(),
1372                    session_lifecycle: manager.lifecycle_service(),
1373                    session_graph: manager.graph_service(),
1374                    turn_context: turn_context.clone(),
1375                },
1376                self.turn_phase_probe.clone(),
1377            );
1378            let mut prepare_turn = Box::pin(prepare_turn);
1379
1380            loop {
1381                tokio::select! {
1382                    prepared = prepare_turn.as_mut() => {
1383                        let prepared = prepared.map_err(|err| {
1384                            RuntimeError::new(RuntimeErrorCode::PluginPrepareTurn, err.to_string())
1385                        })?;
1386                        self.mark_phase_end(RuntimeTurnPhase::BeforeTurnHooks);
1387                        break prepared;
1388                    }
1389                    maybe_event = event_rx.recv() => {
1390                        if let Some(event) = maybe_event {
1391                            emit_runtime_stream_event_to_sinks(
1392                                events,
1393                                turn_events,
1394                                event,
1395                                &mut assembler,
1396                            )
1397                            .await;
1398                        }
1399                    }
1400                }
1401            }
1402        };
1403        for event in &prepared.events {
1404            assembler.push(event);
1405        }
1406        emit_session_events_to_sink(events, prepared.events).await;
1407        if let Some(abort) = prepared.abort {
1408            drop(event_tx);
1409
1410            let mut turn_pipeline = TurnBoundary::from_state_with_clock(
1411                self.state.clone(),
1412                Arc::clone(&self.host.core.clock),
1413            );
1414            turn_pipeline.apply_prepared_messages(&prepared.messages);
1415            let state = turn_pipeline.into_final_state();
1416            let issue = TurnIssue {
1417                kind: "plugin".to_string(),
1418                code: Some(abort.code),
1419                terminal_reason: None,
1420                message: abort.message.clone(),
1421                raw: None,
1422            };
1423            let error_event = SessionEvent::Error {
1424                message: abort.message,
1425                envelope: Some(crate::session_model::ErrorEnvelope {
1426                    kind: "plugin".to_string(),
1427                    code: issue.code.clone(),
1428                    terminal_reason: None,
1429                    user_message: issue.message.clone(),
1430                    raw: None,
1431                }),
1432            };
1433            assembler.push(&error_event);
1434            emit_turn_activity_to_sink(
1435                turn_events,
1436                TurnActivity::independent(TurnEvent::Error {
1437                    message: issue.message.clone(),
1438                }),
1439            )
1440            .await;
1441            emit_session_event_to_sink(events, error_event).await;
1442            let outcome_event = SessionEvent::TurnOutcome {
1443                outcome: TurnOutcome::Stopped(TurnStop::PluginAbort),
1444            };
1445            assembler.push(&outcome_event);
1446            emit_session_event_to_sink(events, outcome_event).await;
1447            assembler.push(&SessionEvent::Done);
1448            emit_session_event_to_sink(events, SessionEvent::Done).await;
1449            return Ok(assembler.finish(
1450                state.to_snapshot(),
1451                cancel.is_cancelled(),
1452                Some(issue),
1453                &self.host.core.control.termination,
1454            ));
1455        }
1456        let mut turn_pipeline = TurnBoundary::from_state_with_clock(
1457            self.state.clone(),
1458            Arc::clone(&self.host.core.clock),
1459        )
1460        .with_session_execution_lease(session_execution_fence.clone());
1461        let store = self
1462            .session
1463            .as_ref()
1464            .and_then(|session| session.history_store());
1465        // Durable controllers, like Restate, own in-flight replay. Writing
1466        // progress checkpoints directly to the shared store would make handler
1467        // replay observe a newer partial turn and change effect replay keys.
1468        let progress_store = if scoped_effect_controller.controller().durability_tier()
1469            == crate::DurabilityTier::Durable
1470        {
1471            None
1472        } else {
1473            store.as_ref().map(|store| store.as_ref())
1474        };
1475        turn_pipeline
1476            .prepared_checkpoint(
1477                progress_store,
1478                turn_policy.clone(),
1479                turn_index,
1480                &prepared.messages,
1481                self.session.as_mut(),
1482            )
1483            .await
1484            .map_err(super::runtime_error_from_store_commit)?;
1485        let resolved_turn_policy = if let Some(provider) = turn_provider_override {
1486            RuntimeSessionPolicy::from_provider(
1487                turn_policy.clone(),
1488                provider.with_clock(Arc::clone(&self.host.core.clock)),
1489            )
1490            .map_err(|err| RuntimeError::new("llm_provider", err.to_string()))?
1491        } else {
1492            self.host
1493                .resolve_session_policy(&self.state.session_id, turn_policy.clone())
1494                .map_err(|err| RuntimeError::new("llm_provider", err.to_string()))?
1495        };
1496        let manager = self
1497            .runtime_session_services_for_turn(Some(child_usage_event_relay.clone()))
1498            .map_err(|err| {
1499                RuntimeError::new(RuntimeErrorCode::PluginSessionManager, err.to_string())
1500            })?;
1501        let cancel_state = cancel.clone();
1502        let finish_scoped_effect_controller = scoped_effect_controller.clone();
1503        let session = self
1504            .session
1505            .take()
1506            .expect("lash runtime session must be available");
1507        let mut driver = RuntimeTurnDriver {
1508            session,
1509            policy: resolved_turn_policy,
1510            host: self.host.clone(),
1511            turn_id: scoped_effect_controller.scope_id().to_string(),
1512            scoped_effect_controller,
1513            session_id: self.state.session_id.clone(),
1514            turn_index,
1515            turn_pipeline,
1516            llm_stream_summaries: HashMap::new(),
1517            next_llm_ordinal: 0,
1518            session_services: manager,
1519            protocol_turn_options: effective_protocol_turn_options,
1520            protocol_extension,
1521            turn_context,
1522            turn_causes: initial_turn_causes,
1523            pending_queue_claims: initial_queue_claim.into_iter().collect(),
1524            checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
1525            session_execution_lease: session_execution_fence,
1526            runtime_lease_owner: self.runtime_lease_owner.clone(),
1527            turn_phase_probe: self.turn_phase_probe.clone(),
1528        };
1529        let protocol_run_offset = 0;
1530        self.mark_phase_begin(RuntimeTurnPhase::EffectLoop);
1531        let run_result = drive_turn_to_completion(
1532            driver.run(prepared.messages, event_tx, cancel, protocol_run_offset),
1533            &mut event_rx,
1534            &mut assembler,
1535            &child_usage_event_relay,
1536            events,
1537            turn_events,
1538        )
1539        .await;
1540        let (new_messages, _new_protocol_iteration) = match run_result {
1541            Ok(result) => result,
1542            Err(err) => {
1543                self.mark_phase_end(RuntimeTurnPhase::EffectLoop);
1544                let RuntimeTurnDriver {
1545                    session,
1546                    pending_queue_claims,
1547                    ..
1548                } = driver;
1549                self.session = Some(session);
1550                self.abandon_queued_work_claims_after_lease_loss(&err, &pending_queue_claims)
1551                    .await;
1552                return Err(err);
1553            }
1554        };
1555        self.mark_phase_end(RuntimeTurnPhase::EffectLoop);
1556        tracing::debug!(
1557            new_message_count = new_messages.len(),
1558            tool_call_count = assembler.tool_calls.len(),
1559            "runtime post-run_task"
1560        );
1561
1562        let RuntimeTurnDriver {
1563            session,
1564            policy,
1565            turn_pipeline,
1566            pending_queue_claims,
1567            ..
1568        } = driver;
1569        self.session = Some(session);
1570        let pending_queue_claims_for_abandon = pending_queue_claims.clone();
1571        let finish_result = self
1572            .finish_turn(
1573                TurnFinishInput {
1574                    turn_pipeline,
1575                    assembler,
1576                    new_messages,
1577                    policy,
1578                    turn_index,
1579                    queued_work_completions: pending_queue_claims
1580                        .iter()
1581                        .map(crate::QueuedWorkClaim::completion)
1582                        .collect(),
1583                    trace_turn_id,
1584                },
1585                events,
1586                &finish_scoped_effect_controller,
1587                &cancel_state,
1588                session_execution_lease,
1589                session_execution_lease_release_policy,
1590            )
1591            .await;
1592        if let Err(err) = &finish_result {
1593            self.abandon_queued_work_claims_after_lease_loss(
1594                err,
1595                &pending_queue_claims_for_abandon,
1596            )
1597            .await;
1598        }
1599        finish_result
1600    }
1601    async fn normalize_input_items(
1602        &self,
1603        items: &[InputItem],
1604        image_blobs: &HashMap<String, Vec<u8>>,
1605    ) -> Result<Vec<NormalizedItem>, String> {
1606        normalize_input_items(
1607            items,
1608            image_blobs,
1609            self.host.core.durability.attachment_store.as_ref(),
1610        )
1611        .await
1612    }
1613}
1614
1615fn turn_input_from_text(text: String) -> TurnInput {
1616    TurnInput {
1617        items: vec![InputItem::Text { text }],
1618        image_blobs: HashMap::new(),
1619        protocol_turn_options: None,
1620        trace_turn_id: None,
1621        protocol_extension: None,
1622        turn_context: crate::TurnContext::default(),
1623    }
1624}
1625
1626fn agent_frame_follow_turn_id(root_turn_id: &str, completed_turn_count: usize) -> String {
1627    if completed_turn_count == 0 {
1628        root_turn_id.to_string()
1629    } else {
1630        format!("{root_turn_id}:agent-frame:{completed_turn_count}")
1631    }
1632}
1633
1634pub fn ensure_durable_effect_input(input: &TurnInput) -> Result<(), RuntimeError> {
1635    if input.protocol_extension.is_some() {
1636        return Err(RuntimeError::new(
1637            RuntimeErrorCode::DurableEffectLiveProtocolExtension,
1638            "durable effect hosts do not support live protocol_extension inputs; encode replayable data in protocol_turn_options or persisted plugin state",
1639        ));
1640    }
1641    input
1642        .turn_context
1643        .live_plugin_inputs()
1644        .durable_effect_rejection()?;
1645    Ok(())
1646}
1647
1648async fn emit_turn_activity_to_sink(events: &dyn TurnActivitySink, activity: TurnActivity) {
1649    if !events.is_noop() {
1650        events.emit(activity).await;
1651    }
1652}
1653
1654/// Pump the turn driver's event channel into the host sinks while the run
1655/// future executes, then drain any events emitted between completion and the
1656/// sender dropping.
1657///
1658/// Both the fresh and resumed turn entry points construct a
1659/// `RuntimeTurnDriver`, kick off its run future, and need identical
1660/// event-pump/drain behavior before tearing the driver down. Only the driver
1661/// construction and post-run teardown differ, so each caller owns those and
1662/// shares this loop.
1663async fn drive_turn_to_completion<F>(
1664    run_future: F,
1665    event_rx: &mut mpsc::Receiver<RuntimeStreamEvent>,
1666    assembler: &mut TurnAssembler,
1667    child_usage_event_relay: &ChildUsageEventRelay,
1668    events: &dyn EventSink,
1669    turn_events: &dyn TurnActivitySink,
1670) -> Result<(crate::MessageSequence, usize), RuntimeError>
1671where
1672    F: std::future::Future<Output = Result<(crate::MessageSequence, usize), RuntimeError>>,
1673{
1674    let run_result = {
1675        let mut run_future = Box::pin(run_future);
1676        loop {
1677            tokio::select! {
1678                maybe_event = event_rx.recv() => {
1679                    if let Some(event) = maybe_event {
1680                        emit_runtime_stream_event_to_sinks(
1681                            events,
1682                            turn_events,
1683                            event,
1684                            assembler,
1685                        )
1686                        .await;
1687                    }
1688                }
1689                completed = run_future.as_mut() => {
1690                    child_usage_event_relay.clear();
1691                    break completed;
1692                }
1693            }
1694        }
1695    };
1696    while let Some(event) = event_rx.recv().await {
1697        emit_runtime_stream_event_to_sinks(events, turn_events, event, assembler).await;
1698    }
1699    run_result
1700}
1701
1702async fn emit_runtime_stream_event_to_sinks(
1703    events: &dyn EventSink,
1704    turn_events: &dyn TurnActivitySink,
1705    event: RuntimeStreamEvent,
1706    assembler: &mut TurnAssembler,
1707) {
1708    match event {
1709        RuntimeStreamEvent::Session(event) => {
1710            assembler.push(&event);
1711            emit_session_event_to_sink(events, event).await;
1712        }
1713        RuntimeStreamEvent::Turn(activity) => {
1714            assembler.push_turn_activity(&activity);
1715            emit_turn_activity_to_sink(turn_events, activity).await;
1716        }
1717    }
1718}
1719
1720#[cfg(test)]
1721mod tests {
1722    use super::agent_frame_follow_turn_id;
1723
1724    #[test]
1725    fn agent_frame_follow_turn_ids_are_distinct_and_deterministic() {
1726        assert_eq!(agent_frame_follow_turn_id("root-turn", 0), "root-turn");
1727        assert_eq!(
1728            agent_frame_follow_turn_id("root-turn", 1),
1729            "root-turn:agent-frame:1"
1730        );
1731        assert_eq!(
1732            agent_frame_follow_turn_id("root-turn", 2),
1733            "root-turn:agent-frame:2"
1734        );
1735    }
1736}